二次开发
RHILEX 是一款使用 Go 语言开发的商业级高级边缘网关系统,RULEX 是 RHILEX 抽离出来的一个边缘网关开发框架,为了回馈社区以促进社区的参与和贡献,同时帮助开发者更好地理解和使用 RHILEX,我们开放了 RULEX 的源码。
RHILEX 是基于 RULEX 框架开发的高级商业边缘计算系统。
我们将通过一系列具体的示例,详细展示如何对 RULEX 进行二次开发。每个示例都将包含详细的步骤说明和代码示例,以便开发者可以轻松地跟随并理解每个操作的含义。我们将涵盖不同的开发场景,包括扩展功能、添加新模块、优化性能等,以展示 Go 语言和 RULEX 的灵活性和强大功能。 此外,我们还将提供一些实用的技巧和最佳实践,帮助开发者编写高效的 Go 代码。我们将介绍如何使用 Go 的并发特性、优化代码结构、避免常见的编程错误以及如何利用RULEX 的特性来提高开发效率。我们相信本实例教程将极大地帮助开发者更好地利用 RULEX 的二次开发能力,发挥其强大的功能。
开源协议
RULEX 基于 AGPL 协议开源。AGPL(Affero General Public License)是一种自由软件许可证,它是 GPL(GNU General Public License)的变体,主要目的是加强GPL 在网络应用和服务中的版权规定。AGPL 要求,如果您使用 AGPL 许可的软件与用户通过网络进行交互,那么您必须提供软件的源代码,并且所有对软件的修改也必须遵循同样的开源条件。这是为了防止软件提供者利用网络服务的形式来规避 GPL 要求的源代码公开义务.
AGPL 具有污染性,如果您修改了 RULEX 的代码必须要开源,否则是违反开源法律规定的行为。AGPL 详细规范:https://www.gnu.org/licenses/agpl-3.0.en.html (opens in a new tab)
RULEX架构
快速开始
环境搭建
推荐在 Linux 下开发,最好是 Ubuntu 系统。Windows 下可能环境配置比较麻烦。如果你需要交叉编译,请看这篇文章:https://jensd.be/1126/linux/cross-compiling-for-arm-or-aarch64-on-debian-or-ubuntu (opens in a new tab) , 当环境搭建好以后就可以开发了。
git clone https://github.com/hootrhino/rulex.git
cd rulex
make
# make arm32
# make arm64
启动程序
启动需要带 2 个参数,db 是保存配置数据的位置,该参数指定的路径最后会生成一个 sqlite3 数据库文件,config 参数是 ini 的路径
./rulex run -config=conf/rulex.ini
开发指南
Visual Studio Code
推荐使用 Visual Studio Code 编辑器,下面给出一个 Vscode 编辑器调试用的配置,也就是 .vscode/launch.json
。
{
"version": "0.2.0",
"configurations": [
{
"name": "Debug",
"type": "go",
"request": "launch",
"mode": "auto",
"env": {
"GODEBUG": "'gctrace=1'"
},
"program": "${workspaceFolder}/main.go",
"args": ["run", "-config=test/conf/rulex.ini"]
}
]
}
Jetbrain GoLand
GoLand 直接导入项目即可。
强烈推荐使用 Ubuntu 开发,能少走很多弯路,或者用 Windows 下 WSL2 来开发。
常见问题
Go 环境
有时候你的系统要是没带 Go,则需要装一个,Go 的版本要求是 1.8+。安装步骤可以参考此处:https://go.dev/doc/install (opens in a new tab) 。
GCC 编译器
因为部分功能需要依赖 cgo(Sqlite),因此需要安装 GCC 编译器,此时 Windows 下就很麻烦了,这个过程困难程度因本地环境而异,Linux 下可以看这篇文章:https://jensd.be/1126/linux/cross-compiling-for-arm-or-aarch64-on-debian-or-ubuntu (opens in a new tab) 。
OpenCV
因为有视频流相关模块,依赖了 GoLang 绑定的 OpenCV,因此需要搭建好环境,具体搭建方法可以看此处:https://gocv.io (opens in a new tab) 。
Lua 脚本
如果不熟悉 Lua 可能需要简单学习一下,Lua 语法简单,半小时即可入门,入门教程可以参考此处:https://www.runoob.com/lua/lua-tutorial.html (opens in a new tab) 。
关键类型
南向
资源主要用来和外部资源对接,比如 MQTT 等。具备双向通信的功能。
接口
// XSource 接口代表了一个终端资源,例如实际的 MQTT 客户端。
// 它定义了与资源交互所需的一系列方法,包括测试资源可用性、初始化、启动、数据传输等。
type XSource interface {
// Test 方法用于测试资源是否可用。
// inEndId 是资源的标识符。
// 返回测试结果,如果资源可用则返回 true,否则返回 false。
Test(inEndId string) bool
// Init 方法用于初始化资源,传递资源配置信息。
// inEndId 是资源的标识符,configMap 是资源配置的映射。
// 返回初始化是否成功的错误信息。
Init(inEndId string, configMap map[string]interface{}) error
// Start 方法用于启动资源。
// CCTX 是上下文,具体作用取决于资源的实现。
// 返回启动是否成功的错误信息。
Start(CCTX context.Context) error
// DataModels 方法用于获取资源支持的数据模型列表。
// 这些模型对应于云平台的物模型。
DataModels() []XDataModel
// Status 方法用于获取资源的当前状态。
Status() SourceState
// Details 方法用于获取资源绑定的详细信息。
Details() *InEnd
// Stop 方法用于停止资源并释放相关资源。
Stop()
// DownStream 方法用于处理下行数据,即从云平台发送到本地资源的数据。
// 接收一个字节切片作为数据。
// 返回实际处理的数据长度和错误信息。
DownStream([]byte) (int, error)
// UpStream 方法用于处理上行数据,即从本地资源发送到云平台的数据。
// 接收一个字节切片作为数据。
// 返回实际处理的数据长度和错误信息。
UpStream([]byte) (int, error)
}
北向
主要用于实现将数据推向目的地。
接口
type XTarget interface {
//
// 用来初始化传递资源配置
//
Init(outEndId string, configMap map[string]interface{}) error
//
// 启动资源
//
Start(CCTX) error
//
// 获取资源状态
//
Status() SourceState
//
// 获取资源绑定的的详情
//
Details() *OutEnd
//
// 数据出口
//
To(data interface{}) (interface{}, error)
//
// 停止资源, 用来释放资源
//
Stop()
}
设备
主要用来接入外部设备。
接口
type XDevice interface {
// 初始化 通常用来获取设备的配置
Init(devId string, configMap map[string]interface{}) error
// 启动, 设备的工作进程
Start(CCTX) error
// 从设备里面读数据出来, 第一个参数一般作 flag 用, 也就是常说的指令类型
OnRead(cmd []byte, data []byte) (int, error)
// 把数据写入设备, 第一个参数一般作 flag 用, 也就是常说的指令类型
OnWrite(cmd []byte, data []byte) (int, error)
// 新特性, 适用于自定义协议读写
OnCtrl(cmd []byte, args []byte) ([]byte, error)
// 设备当前状态
Status() DeviceState
// 停止设备, 在这里释放资源,一般是先置状态为 STOP,然后 CancelContext()
Stop()
// 链接指向真实设备,保存在内存里面,和 SQLite 里的数据是对应关系
Details() *Device
// 状态
SetState(DeviceState)
// 外部调用, 该接口是个高级功能, 准备为了设计分布式部署设备的时候用, 但是相当长时间内都不会开启
// 默认情况下该接口没有用
OnDCACall(UUID string, Command string, Args interface{}) DCAResult
}
插件
插件用于扩展 RHILEX 本身的功能。
接口
type XPlugin interface {
Init(*ini.Section) error // 参数为外部配置
Start(Rhilex) error
Service(ServiceArg) ServiceResult // 对外提供一些服务
Stop() error
PluginMetaInfo() XPluginMetaInfo
}
开发案例
设备:HTTP 数据采集器
package device
import (
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
"github.com/hootrhino/rhilex/common"
"github.com/hootrhino/rhilex/glogger"
"github.com/hootrhino/rhilex/typex"
"github.com/hootrhino/rhilex/utils"
)
type __HttpCommonConfig struct {
Timeout *int `json:"timeout" validate:"required"`
AutoRequest *bool `json:"autoRequest" validate:"required"`
Frequency *int64 `json:"frequency" validate:"required"`
}
type __HttpMainConfig struct {
CommonConfig __HttpCommonConfig `json:"commonConfig" validate:"required"`
HttpConfig common.HTTPConfig `json:"httpConfig" validate:"required"`
}
type GenericHttpDevice struct {
typex.XStatus
client http.Client
status typex.DeviceState
RuleEngine typex.Rhilex
mainConfig __HttpMainConfig
locker sync.Locker
}
/*
*
* 通用串口透传
*
*/
func NewGenericHttpDevice(e typex.Rhilex) typex.XDevice {
hd := new(GenericHttpDevice)
hd.locker = &sync.Mutex{}
hd.client = *http.DefaultClient
hd.mainConfig = __HttpMainConfig{
CommonConfig: __HttpCommonConfig{
AutoRequest: func() *bool {
b := false
return &b
}(),
Timeout: func() *int {
b := 3000
return &b
}(),
},
}
hd.RuleEngine = e
return hd
}
// 初始化
func (hd *GenericHttpDevice) Init(devId string, configMap map[string]interface{}) error {
hd.PointId = devId
if err := utils.BindSourceConfig(configMap, &hd.mainConfig); err != nil {
glogger.GLogger.Error(err)
return err
}
if _, err := isValidHTTP_URL(hd.mainConfig.HttpConfig.Url); err != nil {
return fmt.Errorf("Invalid url format:%s, %s", hd.mainConfig.HttpConfig.Url, err)
}
return nil
}
// 启动
func (hd *GenericHttpDevice) Start(cctx typex.CCTX) error {
hd.Ctx = cctx.Ctx
hd.CancelCTX = cctx.CancelCTX
if *hd.mainConfig.CommonConfig.AutoRequest {
ticker := time.NewTicker(
time.Duration(*hd.mainConfig.CommonConfig.Frequency) * time.Millisecond)
go func() {
for {
select {
case <-hd.Ctx.Done():
{
ticker.Stop()
return
}
default:
{
}
}
body := httpGet(hd.client, hd.mainConfig.HttpConfig.Url)
if body != "" {
hd.RuleEngine.WorkDevice(hd.Details(), body)
}
<-ticker.C
}
}()
}
hd.status = typex.DEV_UP
return nil
}
func (hd *GenericHttpDevice) OnRead(cmd []byte, data []byte) (int, error) {
return 0, nil
}
// 把数据写入设备
func (hd *GenericHttpDevice) OnWrite(cmd []byte, b []byte) (int, error) {
return 0, nil
}
// 设备当前状态
func (hd *GenericHttpDevice) Status() typex.DeviceState {
return typex.DEV_UP
}
// 停止设备
func (hd *GenericHttpDevice) Stop() {
hd.status = typex.DEV_DOWN
if hd.CancelCTX != nil {
hd.CancelCTX()
}
}
// 真实设备
func (hd *GenericHttpDevice) Details() *typex.Device {
return hd.RuleEngine.GetDevice(hd.PointId)
}
// 状态
func (hd *GenericHttpDevice) SetState(status typex.DeviceState) {
hd.status = status
}
// --------------------------------------------------------------------------------------------------
//
// --------------------------------------------------------------------------------------------------
func (hd *GenericHttpDevice) OnDCACall(UUID string, Command string, Args interface{}) typex.DCAResult {
return typex.DCAResult{}
}
func (hd *GenericHttpDevice) OnCtrl(cmd []byte, args []byte) ([]byte, error) {
return []byte{}, nil
}
/*
*
* HTTP GET
*
*/
func httpGet(client http.Client, url string) string {
var err error
client.Timeout = 2 * time.Second
request, err := http.NewRequest("GET", url, nil)
if err != nil {
glogger.GLogger.Warn(err)
return ""
}
response, err := client.Do(request)
if err != nil {
glogger.GLogger.Warn(err)
return ""
}
defer response.Body.Close()
body, err := io.ReadAll(response.Body)
if err != nil {
glogger.GLogger.Warn(err)
return ""
}
return string(body)
}
/*
*
* 验证URL语法
*
*/
func isValidHTTP_URL(urlStr string) (bool, error) {
r, err := url.Parse(urlStr)
if err != nil {
return false, fmt.Errorf("error parsing URL: %w", err)
}
if r.Scheme != "http" && r.Scheme != "https" {
return false, fmt.Errorf("Invalid scheme; must be http or https")
}
return true, nil
}
北向:数据推送到HTTP
package target
import (
"net"
"net/http"
"net/url"
"time"
"github.com/hootrhino/rhilex/common"
"github.com/hootrhino/rhilex/glogger"
"github.com/hootrhino/rhilex/typex"
"github.com/hootrhino/rhilex/utils"
)
type HTTPTarget struct {
typex.XStatus
client http.Client
mainConfig common.HTTPConfig
status typex.SourceState
}
func NewHTTPTarget(e typex.Rhilex) typex.XTarget {
ht := new(HTTPTarget)
ht.RuleEngine = e
ht.mainConfig = common.HTTPConfig{}
ht.status = typex.SOURCE_DOWN
return ht
}
func (ht *HTTPTarget) Init(outEndId string, configMap map[string]interface{}) error {
ht.PointId = outEndId
if err := utils.BindSourceConfig(configMap, &ht.mainConfig); err != nil {
return err
}
return nil
}
func (ht *HTTPTarget) Start(cctx typex.CCTX) error {
ht.Ctx = cctx.Ctx
ht.CancelCTX = cctx.CancelCTX
ht.client = http.Client{}
ht.status = typex.SOURCE_UP
glogger.GLogger.Info("HTTP Target started")
return nil
}
func (ht *HTTPTarget) Status() typex.SourceState {
if err := ht.prob(); err != nil {
glogger.GLogger.Error(err)
return typex.SOURCE_DOWN
}
return ht.status
}
func (ht *HTTPTarget) To(data interface{}) (interface{}, error) {
r, err := utils.Post(ht.client, data, ht.mainConfig.Url, ht.mainConfig.Headers)
return r, err
}
func (ht *HTTPTarget) Stop() {
ht.status = typex.SOURCE_DOWN
if ht.CancelCTX != nil {
ht.CancelCTX()
}
}
func (ht *HTTPTarget) Details() *typex.OutEnd {
return ht.RuleEngine.GetOutEnd(ht.PointId)
}
func (ht *HTTPTarget) prob() error {
d := net.Dialer{
Timeout: 3 * time.Second,
}
Url, err := url.Parse(ht.mainConfig.Url)
if err != nil {
return err
}
conn, err := d.Dial("tcp", Url.Host)
if err != nil {
return err
}
conn.Close()
return nil
}
插件:Demo 插件
package demo_plugin
import (
"github.com/hootrhino/rhilex/typex"
"gopkg.in/ini.v1"
)
type DemoPlugin struct {
}
func NewDemoPlugin() *DemoPlugin {
return &DemoPlugin{}
}
func (dm *DemoPlugin) Init(config *ini.Section) error {
return nil
}
func (dm *DemoPlugin) Start(typex.Rhilex) error {
return nil
}
func (dm *DemoPlugin) Stop() error {
return nil
}
func (dm *DemoPlugin) PluginMetaInfo() typex.XPluginMetaInfo {
return typex.XPluginMetaInfo{
UUID: "DEMO01",
Name: "DemoPlugin",
Version: "v0.0.1",
Description: "DEMO01",
}
}
/*
*
* 服务调用接口
*
*/
func (dm *DemoPlugin) Service(arg typex.ServiceArg) typex.ServiceResult {
return typex.ServiceResult{}
}
综合案例
Modbus 温湿度传感器网关
这是一个 Modbus 温湿度传感器网关示例。
package test
import (
"context"
"os"
"os/signal"
"syscall"
"testing"
"time"
httpserver "github.com/hootrhino/rhilex/component/apiserver"
"github.com/hootrhino/rhilex/component/rhilexrpc"
"github.com/hootrhino/rhilex/config"
"github.com/hootrhino/rhilex/engine"
"github.com/hootrhino/rhilex/glogger"
"github.com/hootrhino/rhilex/plugin/demo_plugin"
"github.com/hootrhino/rhilex/typex"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func Test_Modbus_LUA_Parse(t *testing.T) {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGABRT)
engine := engine.InitRuleEngine(core.InitGlobalConfig("config/rhilex.ini"))
engine.Start()
hh := httpserver.NewHttpApiServer(engine)
// HttpApiServer loaded default
if err := engine.LoadPlugin("plugin.http_server", hh); err != nil {
glogger.GLogger.Fatal("Rule load failed:", err)
}
// Load a demo plugin
if err := engine.LoadPlugin("plugin.demo", demo_plugin.NewDemoPlugin()); err != nil {
glogger.GLogger.Error("Rule load failed:", err)
}
// Grpc Inend
grpcInend := typex.NewInEnd("GRPC", "rhilex Grpc InEnd", "rhilex Grpc InEnd", map[string]interface{}{
"port": 2581,
})
ctx, cancelF := typex.NewCCTX() // ,ctx, cancelF
if err := engine.LoadInEndWithCtx(grpcInend, ctx, cancelF); err != nil {
glogger.GLogger.Error("Rule load failed:", err)
}
rule := typex.NewRule(engine,
"uuid",
"Just a test",
"Just a test",
[]string{grpcInend.UUID}[0],
"",
`function Success() print("[LUA Success Callback]=> OK") end`,
`
Actions = {
function(args)
Debug(args)
return true, args
end
}`,
`function Failed(error) print("[LUA Failed Callback]", error) end`)
if err := engine.LoadRule(rule); err != nil {
glogger.GLogger.Error(err)
}
conn, err := grpc.Dial("127.0.0.1:2581", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
glogger.GLogger.Error(err)
}
defer conn.Close()
client := rhilexrpc.NewRhilexRpcClient(conn)
resp, err := client.Work(context.Background(), &rhilexrpc.Data{
// lua 输出 {"a":"0000000000000001","b":"00000000","c":"00000001"}
Value: string([]byte{0, 1, 0, 1}),
})
if err != nil {
glogger.GLogger.Error(err)
}
glogger.GLogger.Infof("rhilex Rpc Call Result ====>>: %v", resp.GetMessage())
time.Sleep(1 * time.Second)
engine.Stop()
}
总结
RULEX 框架整体来说设计上很简单,只需阅读本指南后分析代码结构即可很快上手开发,因此本文档不赘述一些基础细节。希望用户能基于 RULEX 开发出各种强大的应用。同时希望开发者们能攻坚 RULEX 社区。