南向资源接入规范
RHILEX 提供了很多南向接入资源,外部设备可以通过一定的规范直接把数据推入到 RHILEX 的规则引擎,然后交给 RHILEX 来处理。本文主要讲解南向资源接入规范。
TCP Server
RHILEX 可以充当一个 TCP Server 的角色,供给外部设备来推送数据。
参数
名称 | 类型 | 说明 |
---|---|---|
服务地址 | 字符串 | TCP 服务监听的地址 |
端口 | 数字(标准端口范围) | TCP 服务监听的端口 |
数据接入规范
- 长度字段(2字节):表示数据的长度,不包括头部的 4 字节。
- CRC 字段(4字节):使用 CRC32-Koopman 算法计算的校验值,用于验证数据的完整性。
- 数据字段(不定长):实际的数据内容,长度由长度字段指定。
数据包结构如下:
+-------------+-------------+-------------------+
| 2 字节长度 | 4 字节 CRC | 数据 (n 字节) |
+-------------+-------------+-------------------+
返回
当推送完成后,RHILEX 会返回 ok
字符给客户端。
注意
需要及时发送数据激活,当超过 5 秒没有发送数据时,RHILEX 将断开连接。
UDP Server
RHILEX 可以充当一个 UDP Server 的角色,供给外部设备来推送数据。
参数
名称 | 类型 | 说明 |
---|---|---|
服务地址 | 字符串 | TCP 服务监听的地址 |
端口 | 数字(标准端口范围) | TCP 服务监听的端口 |
数据接入规范
UDP 是数据报格式,一次全部发上来即可,但是不能超过 1024 个字节!
数据处理
RHILEX 会把客户端传上来的数据封装成 JSON 格式推向规则引擎:
{
"clientAddr": "192.168.1.100:5000",
"data": "0102030405....."
}
返回
当推送完成后,RHILEX 会返回 ok
字符给客户端。
HTTP Server
RHILEX 可以充当一个 HTTP Server 的角色,供给外部设备来推送数据。
参数
名称 | 类型 | 说明 |
---|---|---|
服务地址 | 字符串 | HTTP 服务监听的地址 |
端口 | 数字(标准端口范围) | HTTP 服务监听的端口 |
数据接入规范
HTTP Server 通过 JSON 的形式进行交互。数据需要遵守一定格式,必须要有一个 data
字段,其内容也必须是 JSON。
数据接入示例
curl -X POST http://your-api-endpoint.com/your-api-path \
-H "Content-Type: application/json" \
-d '{"data": {"k": "v"}}'
成功返回
{
"message": "success",
"code": 200
}
失败返回
{
"message": "error-msg.....",
"code": 500
}
gRPC Server
本文档描述了一个基于 gRPC 的服务接口规范,使用了 Protocol Buffers(protobuf)版本 3 进行定义。此服务接口提供了一个 RhilexRpc
服务,其中包含一个名为 Work
的方法,用于处理客户端发送的数据并返回响应结果。
Protobuf 定义
以下是 Protobuf 文件的定义,主要包括服务接口、请求和响应消息格式的定义。
syntax = "proto3";
option go_package = "./;rhilexrpc";
option java_multiple_files = false;
option java_package = "rhilexrpc";
option java_outer_classname = "RhilexRpcService";
package rhilexrpc;
service RhilexRpc {
rpc Work (Data) returns (Response) {}
}
message Data {
string value = 1;
}
message Response {
int32 code = 1;
string message = 2;
}
Protocol Buffers 版本
- 指定使用 Protocol Buffers 版本 3。
选项设置
option go_package = "./;rhilexrpc";
:指定生成的 Go 代码的包路径和名称。option java_multiple_files = false;
:指定在 Java 中生成的所有代码是否在一个文件中。option java_package = "rhilexrpc";
:指定生成的 Java 代码的包名。option java_outer_classname = "RhilexRpcService";
:指定生成的 Java 代码的外部类名称。
包定义
package rhilexrpc;
:定义了这个 proto 文件的包名。
服务定义
service RhilexRpc { ... }
:定义了一个 gRPC 服务,服务名为RhilexRpc
。rpc Work (Data) returns (Response);
:定义了服务中的Work
方法,接收Data
类型的请求,并返回Response
类型的响应。
消息定义
message Data { string value = 1; }
:定义了一个名为Data
的消息类型,包含一个字符串字段value
,标识号为 1。message Response { int32 code = 1; string message = 2; }
:定义了一个名为Response
的消息类型,包含一个整数字段code
和一个字符串字段message
,分别标识号为 1 和 2。
Python 示例
下面的 Python 示例展示了如何使用 grpcio
和 grpcio-tools
库来实现和调用这个 RhilexRpc
服务。
安装依赖
首先,确保已安装必要的 Python 包:
pip install grpcio grpcio-tools
生成 Python 代码
使用 grpcio-tools
生成 Python 代码:
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. rhilexrpc.proto
这将生成两个文件:
rhilexrpc_pb2.py
:包含消息类定义。rhilexrpc_pb2_grpc.py
:包含服务的客户端和服务端的接口代码。
客户端实现
import grpc
import rhilexrpc_pb2
import rhilexrpc_pb2_grpc
def run():
with grpc.insecure_channel('localhost:50051') as channel:
stub = rhilexrpc_pb2_grpc.RhilexRpcStub(channel)
response = stub.Work(rhilexrpc_pb2.Data(value="Hello, Rhilex!"))
print(f"Response received: code={response.code}, message={response.message}")
if __name__ == '__main__':
run()
MQTT Client
此时 RHILEX 作为 MQTT 客户端,和其他设备或者资源同时订阅某些 Topic,就可以接管和代理设备发上来的数据。
参数
名称 | 类型 | 说明 |
---|---|---|
clientId | 字符串 | 客户端 ID,用于唯一标识客户端 |
host | 字符串 | 服务地址,通常为 MQTT 代理的 IP 地址 |
port | 整数 | 端口,MQTT 服务的监听端口,通常为 1883 |
username | 字符串 | 用户名,用于身份验证 |
password | 字符串 | 密码,用于身份验证 |
qos | 整数 | 数据质量等级,0(最多一次)、1(至少一次)、2(仅一次) |
subTopics | 数组 | 所需订阅的主题,支持通配符 |
数据接入规范
MQTT 数据最终会被封装成 JSON 格式:
{"topic":"/topic1/dev","payload":"........"}
MQTT Broker
此时 RHILEX 作为 MQTT Broker,子设备可以连接 RHILEX 进行数据推送。
参数
名称 | 类型 | 说明 |
---|---|---|
服务地址 | 字符串 | HTTP 服务监听的地址 |
端口 | 数字(标准端口范围) | HTTP 服务监听的端口 |
客户端示例
import paho.mqtt.client as mqtt
# MQTT 回调函数
def on_connect(client, userdata, flags, rc):
print(f"连接成功,返回码: {rc}")
# 连接成功后订阅主题
client.subscribe("/topic1/#")
def on_message(client, userdata, msg):
print(f"收到消息: {msg.topic} {msg.payload.decode()}")
# 创建 MQTT 客户端
client = mqtt.Client(client_id="rhilexg19791")
client.username_pw_set("hootrhino", "12345678") # 设置用户名和密码
# 绑定回调函数
client.on_connect = on_connect
client.on_message = on_message
# 连接到 MQTT 代理
client.connect("127.0.0.1", 1883, 60)
# 开始循环,等待消息
client.loop_start()
# 发布消息示例
client.publish("/topic1/test", "Hello MQTT!")
try:
while True:
pass # 保持脚本运行
except KeyboardInterrupt:
print("退出程序")
finally:
client.loop_stop()
client.disconnect()
CoAP Server
CoAP(Constrained Application Protocol)是一种轻量级的网络协议,用于在资源受限的网络环境中传输数据。它主要用于物联网(IoT)设备之间的通信,特别适用于传感器网络和移动设备等资源受限的设备。
参数
名称 | 类型 | 说明 |
---|---|---|
服务地址 | 字符串 | CoAP 服务监听的地址 |
端口 | 数字(标准端口范围) | CoAP 服务监听的端口 |
数据接入规范
数据为字节格式,请求 path 为 /
,数据会被 RHILEX 处理成 JSON 推向规则引擎:
{
"ts": 1713691696641,
"type": "POST",
"payload": "012"
}
接入示例
package main
import (
"bytes"
"context"
"io"
"log"
"time"
"github.com/plgd-dev/go-coap/v3/message"
"github.com/plgd-dev/go-coap/v3/udp"
)
type ReadSeeker struct {
io.ReadSeeker
}
func main() {
co, err := udp.Dial("localhost:2582")
if err != nil {
log.Fatalf("Error dialing: %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
resp, err := co.Post(ctx, "/", message.AppOctets, bytes.NewReader([]byte{48, 49, 50}), message.Option{})
if err != nil {
log.Fatalf("Error sending request: %v", err)
}
log.Printf("Response payload: %v", resp.Body())
}
内部事件源
RHILEX 运行时内部会产生很多事件,目前有下面这些类型:
- SOURCE:南向事件
- DEVICE:设备事件
- TARGET:北向事件
- SYSTEM:系统内部事件
- HARDWARE:硬件事件
借助该功能可以辅助运维 RHILEX 设备。
事件示例
设备上线
{
"type" :"DEVICE",
"event":"event.connected",
"ts":121312431432,
"device_info":{
"uuid":"UUID1234567",
"name":"温湿度计"
}
}
设备离线
{
"type" :"DEVICE",
"event":"event.disconnected",
"ts":121312431432,
"device_info":{
"uuid":"UUID1234567",
"name":"温湿度计"
}
}
通信模组
RHILEX 提供了软件层的插件兼容支持,常见的一些串口设备可以很容易集成进来。比如串口 Lora 模块,串口 4G 模块、串口蓝牙模块等等。
数据示例
{ "comName":"COM3", "data": "0001020304"}
注意
数据格式是十六进制。
自定义数据解析服务
自定义数据解析服务旨在为设备提供一种灵活的方式,通过 TCP 协议上传二进制字节数据并进行解析。该服务根据特定格式的表达式提取关键信息,以支持后续的数据处理和存储。
协议格式
上传数据格式
设备上传的数据格式如下:
"ID:32:int:BE; Name:40:string:BE; Age:16:int:LE"
解析表达式说明
- 字段名称:标识数据的意义。
- 字段长度:以位为单位,表示该字段的数据长度。
- 数据类型:字段数据的类型(如整数
int
、字符串string
)。 - 字节顺序:数据的字节顺序(大端
BE
或小端LE
)。
示例解析
- ID:格式:
ID:32:int:BE
(32 位大端整数) - Name:格式:
Name:40:string:BE
(最大 40 字符的大端字符串) - Age:格式:
Age:16:int:LE
(16 位小端整数)
数据解析流程
接收数据
- TCP 连接:服务通过 TCP 协议接收设备上传的二进制数据流。
- 数据格式检查:检查接收到的数据是否符合预期的解析表达式。
解析数据
- 解析表达式:根据解析表达式解析接收到的二进制数据。
- 数据提取:
- 根据字段长度和数据类型提取对应的数据。
- 处理字节顺序,以确保数据的准确性。
数据验证
- 完整性检查:确保提取的数据与定义的长度一致。
- 类型验证:检查提取后的数据类型是否符合预期(例如,确保 ID 和 Age 为整数,Name 为字符串)。
TCP 数据传输
TCP 连接说明
- 连接建立:设备通过 TCP 建立连接,连接成功后可以进行数据传输。
- 数据格式:上传的数据必须符合预定义的格式。
数据上传流程
- 设备发送数据:设备通过 TCP 将二进制数据包发送到解析服务。
- 服务接收数据:解析服务接收数据包并进行解析。
示例数据
上传数据示例
假设设备上传的数据如下:
- ID: 32(4 字节,表示为
0x00 0x00 0x00 0x20
) - Name: "Example"(40 字节,后补零)
- Age: 25(2 字节,表示为
0x19
)
生成的二进制数据流可能如下:
[0x00, 0x00, 0x00, 0x20, 'E', 'x', 'a', 'm', 'p', 'l', 'e', 0x00, ... (后补零) ..., 0x19]
解析结果
解析服务将提取以下信息:
- ID: 32
- Name: "Example"
- Age: 25
注意事项
- 确保上传的二进制数据符合预定义格式。
- 字段长度与实际上传的数据长度必须一致。
- 字节顺序的设置必须正确,以确保数据准确解析。