欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

容器化环境内的数据流

程序员文章站 2022-07-13 14:56:55
...

关于容器化环境内的数据流

利用数据总线,可以在容器化环境内实现Predix机器与其他应用程序间的交换机器数据流和交换。

数据总线便于:
• 通过数据总线River向其他应用程序发布机器适配器消化获取的机器数据。
• 获取消化数据。由其他应用程序发布,并通过HTTPS、WebSocket或其他river向云端传输的数据通过数据总线适配器消化获取。
• 使用数据总线API在应用程序间交换数据。经数据线传输的数据采用Predix机器使用的PDataValue串行格式。

数据流使用情景

机器数据从传感器传输至 Predix机器 容器,再经MQTT代理(broker)传输至容器化应用程序,最终到达Predix云的示例如下:
• 在典型数据流中,经 OPC-UA适配器消化获取的Predix机器 数据通过WebSocket River传输至Time Series设备。
下图展示了该数据流:
容器化环境内的数据流

• 在容器化环境中:
1. 经OPC-UA适配器消化获取的温度数据通过数据总线River在 Temperature_Subscription 主题上发布。
2. Temperature_Subscription 主题被应用程序消费并处理。应用程序将处理后的数据发布为 Temperature_Analyzed 。
3. 数据总线适配器消费 Temperature_Analyzed 并通过WebSocket River将其传输至Time Series服务。

下图展示了该数据流:
容器化环境内的数据流

• 在另一个容器化环境示例中:
1. 经OPC-UA适配器消化获取的温度数据通过数据总线River在 Temperature_Subscription 主题上发布。
2. Temperature_Subscription 主题被应用程序消费并处理。处理后的数据被发布为 Temperature_Analyzed 。
3. 另一个应用程序消费 Temperature_Analyzed,执行进一步分析,并将结果发布为 Temperature_Aggregated。
4. 数据总线适配器消费 Temperature_Aggregated 并通过WebSocket River将其传输至Time Series服务。

下图展示了该数据流:
容器化环境内的数据流

数据总线概述

数据总线是一个MQTT客户端,方便在边缘设备内的容器化应用程序间进行通信。

应用程序可以使用Java或C++语言编写,通过统一的主题和数据模式(data schema)进行通信。
数据总线支持以下数据类型。

PDataValue/Data
从传感器采集到的数据。更多信息参见数据通信

数据通信

利用DataComm API,可以实现 Predix机器与Docker化应用程序间的通信和数据传输。

数据

数据可以是:
• 通过机器网关从传感器流采集到的入口数据。
• 通过HTTP、MQTT和WebSocket等出口信道的数据。
• 应用程序至应用程序数据。

主题

所有数据均在根主题内通信。关于 topicTag,请注意以下信息和限制:
容器化环境内的数据流

使用情景

提供入口和出口数据流示例。
• 入口数据流示例
1. 一个分析应用程序订阅了一个名为 data/Temperature_Subscription的主题。
2. Predix机器使用Spillway配置,将数据从 Temperature_Subscription 主题转发至数据总线River。
3. Predix机器通过Modbus适配器接收数据。
4. Spillways将数据发送给数据总线River。
5. 数据总线River将数据发布至 data/Temperature_Subscription 主题。
• 出口数据流示例
1. Predix机器使用Spillway配置,将数据从数据总线适配器转发至WebSocket River服务。
2. 数据总线适配器订阅 data/Temperature_Analyzed 主题。
3. 一个分析应用程序将数据发布至 data/Temperature_Analyzed 主题。
4. 数据总线适配器接收数据。
5. Spillway将数据转发给WebSocket River。

获取数据总线服务

要获取OSGi应用程序的Data API,使用声明式服务注入 IDataComm 服务。
如何注入 IDataComm 服务的示例:

private IDataComm dataComm; /** * Dependency injection of IDataComm.* * @param dataComm Communication handler for data plane messages.*/ @Reference public void setDataComm(IDataComm dataComm) { this.dataComm = dataComm; } /** * Clears dependency injection of IDataComm.* * @param dataComm Communication handler for data plane messages.*/ public void unsetDataComm(@SuppressWarnings("hiding") IDataComm dataComm) { this.dataComm = null; } 

数据总线消费者配置

数据总线服务需要某些Maven依赖和OSGI导入。

Maven依赖

消费此服务需要以下Maven依赖:
对于Databus API

<!-- Databus API --> <dependency>     <groupId>com.ge.dspmicro</groupId>     <artifactId>databus-api</artifactId>     <version>17.1.0</version> </dependency>

对于EdgeDataComm API和EdgeData:

<!-- If using EdgeDataComm API and EdgeData --> <dependency>     <groupId>com.ge.predixmachine</groupId>     <artifactId>protobuf-models</artifactId>     <version>17.1.0</version> </dependency> <dependency>     <groupId>com.google.protobuf</groupId>     <artifactId>protobuf-java</artifactId>     <version>3.0.0</version> </dependency>

对于DataComm API和PDataValue:

<!-- If using DataComm API and PDataValue --> <dependency>     <groupId>com.ge.dspmicro</groupId>     <artifactId>machinegateway-api</artifactId>     <version>17.1.0</version> </dependency> <dependency>     <groupId>com.ge.dspmicro</groupId>     <artifactId>device-common</artifactId>     <version>17.1.0</version> </dependency>

OSGI导入

消费此捆绑束组件需要以下OSGI导入:

Import-Packages:com.ge.dspmicro.databus.api.common;version="[2.0,3.0)", com.ge.dspmicro.databus.api.data;version="[2.0,3.0)", com.ge.dspmicro.machinegateway.types;version="[1.4,2.0)", com.ge.predixmachine.datamodel.datacomm;version="[1.0,2.0)", com.google.protobuf;version="[3.0,4.0)", com.google.protobuf.util;version="[3.0,4.0)",

配置数据总线

对于 OSGI应用程序,可以配置数据总线,指定Docker应用程序的ID和处理设备内消息的MQTT代理的URI。
安装Mosquitto(一个MQTT代理)

1.  导航至<Predix Machine runtime container location>/configuration/machine。
2.  打开com.ge.dspmicro.databus.mqtt.config 文件。
3.  设置下列属性:

容器化环境内的数据流

使用数据总线示例应用程序

为了示范如何使用此服务,特提供了Java和C++示例应用程序。Java应用程序包括一个使用OSGi框架的应用程序和一个不使用 OSGi框架的独立应用程序。C++示例应用程序也是一个不使用OSGi框架的独立应用程序。

1.  要访问OSGi Java示例应用程序:
a.  导航至<SDK installation location>/samples/sample-apps.zip,解压文件。
b.  在sample-apps/samples文件夹内,打开sample-databus应用程序。
2.  要访问独立Java示例应用程序:
a.  导航至<Predix Machine SDK download location>/edgesdk/predixmachine-edgesdk-java-17.1.0.zip/,解压文件。
b.  在samples/sample-edge-apps/sample/sample-edge-databus文件夹内,打开sample-edge-databus应用程序。
3.  要访问C++示例应用程序:
a.  导航至<Predix Machine SDK download location>/edgesdk/predixmachine-edgesdk-cpp-17.1.0.zip/,解压文件。
b.  在samples/sample-edge-apps文件夹内,打开sample-edge-databus应用程序。

使用Data C++ API

查看C++ API,了解如何在数据总线内利用C++实现数据通信。

1.  导航至<Predix Machine SDK download location>/edgesdk/predixmachine-edgesdk-cpp-17.1.0.zip/文件夹,解压文件。
2.  打开doc文件夹,查看C++ API。

使用Data Java API

查看com.ge.dspmicro.databus.api,了解如何在数据总线内利用Java实现数据通信。

1.  导航至<Predix Machine SDK download location>/edgesdk/predixmachine-edgesdk-java-17.1.0.zip/ /docs/apidocs/index.html
2.  打开com.ge.dspmicro.databus.api

数据总线机器适配器

数据总线机器适配器使用现有的机器网关适配器基础设施从数据总线获取数据,机器网关适配器接口允许数据总线机器适配器通过Spillway连接至river服务。
使用此适配器,Docker化应用程序可对数据进行处理,然后将数据推送回WebSocket River,以便上传至云端。例如,如果Docker应用程序接收和处理OPC-UA数据,对数据进行处理后,将把数据推送回主题内的数据总线。数据总线适配器提取数据,并将其提供给订阅应用程序。一个Spillway将监听此数据的订阅情况,并使用River将其发送给Time Series,进而传输至云端。
重要:
虽然数据总线机器适配器和数据总线River都是数据总线的组件,但前两者不可通过spillway连接到一起。否则会导致无限循环。

依赖

消费此服务需要Maven依赖和OSGi导入包:
• 需要以下Maven依赖:

<dependency> <groupId>com.ge.dspmicro</groupId> <artifactId>machineadapter-databus</artifactId> <version>17.1.0</version> </dependency>

• 消费捆绑束组件需要以下OSGi导入,但仅限在使用机器适配器数据总线常量时:

Import-Package: com.ge.dspmicro.machineadapter.databus;version="[1.0,2)"

配置数据总线机器适配器

数据总线机器适配器配置为工厂服务组件。服务ID的配置文件com.ge.dspmicro.machineadapter.databus-[n].config 必须存在,其中 n 是每个实例的唯一整数。

数据总线River
1. 找到并打开

<Predix Machine runtime container location>/configuration/machine/ com.ge.dspmicro.machineadapter.databus-[n].config

2.置下列属性:

容器化环境内的数据流

数据总线River

可以使用数据总线River将数据从 Predix机器 传输至同一设备上的其他应用程序。例如,将任何机器适配器消化获取的数据发送至同一设备上运行的其他应用程序,如分析应用程序等。

功能与数据流

数据总线River服务从机器适配器接收数据,将其作为PDataValues的序列化的列表串行清单,用数据总线API发布到数据总线上。数据的订阅名称为发布主题。
下图展示了从数据源到适配器,到Hoover spillway和处理器,最后到达数据总线API的数据流。
容器化环境内的数据流

依赖

消费此数据总线River需要Maven依赖和OSGi导入。
• 需要以下Maven依赖:

<dependency> <groupId>com.ge.dspmicro</groupId> <artifactId>databusriver-send</artifactId> <version>16.4.0</version> </dependency>

• 消费捆绑束组件需要以下OSGi导入:

Import-Package: com.ge.dspmicro.databusriver.send.api;version="[1.0,2)"

获取数据总线River服务

使用声明式服务注入服务。
注入服务示例:

import com.ge.dspmicro.mqttriver.send.api.IMqttRiverSend; ...@Reference public void setService(IDatabusRiverSend service) { //set service here }

配置数据总线River

1.  导航至<Predix Machine runtime container location>/configuration/machine。
2.  打开 com.ge.dspmicro.databusriver.send.config 文件。
3.  设置下列属性:

容器化环境内的数据流

管理总线

使用管理总线和容器化应用程序,以发挥Predix机器的强大功能,向EdgeManager传递管理事件。
利用Predix机器管理总线服务,可以开发应用程序,使其在应用程序自己的容器内运行。可以添加自己的业务逻辑,使用任何语言开发应用程序(只要遵守消息协议即可)。这样可以允许自定义应用程序与 Predix 机器交换命令和配置,而不必局限于使用某种特定语言。

命令通信

下图展示了命令通信的执行流程。
容器化环境内的数据流

启动

下面描述了启动的命令通信流程:

1.  Predix Agent启动并订阅所有应用程序发布的命令状态(topic edgeCommandStatus/<appid>)。
2.  MyApp启动并订阅相关命令(topic edgeCommand/<appId>

从EdgeManager推送新命令

下面描述了从EdgeManager推送新命令的命令通信流程:

1.  Agent接收到EdgeManager发布给MyApp的Upload Configuration 命令。
a.  Agent检查MyApp配置的缓冲配置,并将其上传至EdgeManager。
2.  其他命令:
o   
a.  Predix Agent向edgeCommand/<appId>主题发布命令。
b.  Predix Agent向EdgeManager回复状态”delivered”。
c.  MyApp收到命令并进行处理。
d.  MyApp将命令状态发布至edgeCommandStatus/<appId>主题。
e.  Predix Agent收到命令状态并进行记录。
3.  Predix Agent将命令状态提交给EdgeManager。

关机

下面描述了关机时的命令通信流程:
1. MyApp取消订阅边缘设备命令。
2. 管理总线根据需要进行清除。
3. Predix Agent取消订阅所有边缘设备命令状态。
4. 管理总线根据需要进行清除。
注:
命令为单向传输,也就是说只从Predix Agent向应用程序传输。命令不从应用程序向agent或应用程序向应用程序传输。
命令状态为单向传输,也就是说只从应用程序向Predix Agent传输。命令状态不从agent向应用程序或应用程序向应用程序传输。

主题

容器化环境内的数据流
下面展示了命令和命令状态的payload格式:

/* * Copyright (c) 2016 General Electric Company.All rights reserved.* * The copyright to the computer software herein is the property of * General Electric Company.The software may be used and/or copied only * with the written permission of General Electric Company or in accordance * with the terms and conditions stipulated in the agreement/contract * under which the software has been supplied.*/ /* * Describes the structures of Predix Machine Data downloaded from the cloud.*/ syntax = "proto3"; package com.ge.predixmachine.protobuf; import "google/protobuf/timestamp.proto"; import "task_info.proto"; option java_multiple_files = true; option java_package = "com.ge.predixmachine.datamodel.commandcomm"; option java_generate_equals_and_hash = true; // // Represents the Command that the edge device will process // message EdgeCommand { reserved 3; reserved "app_name"; // The Command ID string id = 1; // The Command Handler Type string command_handler_type = 2; // The Command Name string command = 4; // URL to upload output string url = 5; // The parameters for this command map<string, string> params = 6; } // // Represents the Command Status for a previous executed task from Cloud Edge Gateway Service // Ensure that the TaskStatus is set.If not set, the default TaskStatus will be used, // which may cause misinterpretation of the status.// message CommandStatus { // The ID of the Command sent down from the cloud string task_id = 1; // A Status defined in the TaskStatus TaskStatus status = 2; // A Status Message string status_message = 3; // A Detailed Status Message string status_detailed_message = 4; // An output for the Edge Command string output = 5; // The command start time google.protobuf.Timestamp start_time = 6; // The command end time google.protobuf.Timestamp end_time = 7; }

使用命令通信API

查看命令通信Javadoc API(CommandComm和CommandListener),了解如何实现命令通信。
找到并打开下列API: <Predix_Machine_SDK installation location>/docs/apidocs/index.html/com.ge.dspmicro.managementbus.api.command

配置通信

启动

下面描述了启动的配置通信流程:

1.  Predix Agent启动并订阅所有应用程序发布的配置状态(topic edgeConfigStatus/<appid>)。
2.  MyApp启动并订阅相关配置(topic edgeConfig/<appId>)。
3.  MyApp向Predix Agent发布启动配置(topic edgeConfigStatus/<appId>)。
4.  Predix Agent收到启动配置,将其缓存,以备随后使用。

从EdgeManager推送新配置

下面描述了从EdgeManager推送新配置的配置通信流程:

1.  Predix Agent从EdgeManager收到新配置。
2.  Predix Agent将新配置发布至主题edgeConfig/<appId>3.  MyApp收到新配置。
4.  MyApp更新配置,根据需要重启。
5.  MyApp将更新的配置发布至主题edgeConfigStatus/<appId>6.  Predix Agent收到配置,更新缓存。

关机

下面描述了关机时的配置通信流程:
1. MyApp取消订阅边缘设备配置。
2. 管理总线根据需要进行清除。
3. Predix Agent取消订阅所有边缘设备配置状态。
4. 管理总线根据需要进行清除。

主题

容器化环境内的数据流
下面展示了配置和配置状态的payload格式:

/* * Copyright (c) 2016 General Electric Company.All rights reserved.* * The copyright to the computer software herein is the property of * General Electric Company.The software may be used and/or copied only * with the written permission of General Electric Company or in accordance * with the terms and conditions stipulated in the agreement/contract * under which the software has been supplied.*/ /* * Describes the structures used in communication of Predix Machine Configurations.*/ syntax = "proto3"; package com.ge.predixmachine.protobuf; import "edge_package.proto"; option java_multiple_files = true; option java_package = "com.ge.predixmachine.datamodel.configcomm"; option java_generate_equals_and_hash = true; // // Represents the configuration of the application.// This is typically new configuration deployed from Edge Manager for the application.// message EdgeConfig { // Identifier for the configuration update task.// This is used to tie the task to the ConfigStatus. string task_id = 1; // Binary archive of application configuration. bytes configuration = 5; } // // Represents the status of a configuration update operation.// This is typically published by containerized applications to acknowledge a configuration update operation // and notify Predix Machine agent with its latest configuration.// message ConfigStatus { // Status of configuration update operation.PackageStatus package_status = 1; // Latest configuration of the application.// This may or may not be the same as the configuration received by the application. bytes configuration = 5; }

使用配置通信API

查看命令通信Javadoc API(CommandComm和CommandListener),了解如何实现命令通信。

找到并打开下列API: <Predix_Machine_SDK installation location>/docs/apidocs/index.html/com.ge.dspmicro.managementbus.api.command

访问管理总线示例应用程序

为了示范如何使用管理总线服务,特提供一个独立Java应用程序。

1.  导航至<Predix Machine SDK download location>/edgesdk/predixmachine-edgesdk-java-17.1.0.zip/,解压文件。 
2.  导航至samples/sample-edge-apps.zip,解压文件。 
3.  Java应用程序示例位于sample-edge-managementbus文件夹。