二次开发
二次开发

二次开发

devel

RHILEX 是一款使用 Go 语言开发的商业级高级边缘网关系统,RULEXRHILEX 抽离出来的一个边缘网关开发框架,为了回馈社区以促进社区的参与和贡献,同时帮助开发者更好地理解和使用 RHILEX,我们开放了 RULEX 的源码。

⚠️

RHILEX 是基于 RULEX 框架开发的高级商业边缘计算系统。

我们将通过一系列具体的示例,详细展示如何对 RULEX 进行二次开发。每个示例都将包含详细的步骤说明和代码示例,以便开发者可以轻松地跟随并理解每个操作的含义。我们将涵盖不同的开发场景,包括扩展功能、添加新模块、优化性能等,以展示 Go 语言和 RULEX 的灵活性和强大功能。 此外,我们还将提供一些实用的技巧和最佳实践,帮助开发者编写高效的 Go 代码。我们将介绍如何使用 Go 的并发特性、优化代码结构、避免常见的编程错误以及如何利用RULEX 的特性来提高开发效率。我们相信本实例教程将极大地帮助开发者更好地利用 RULEX 的二次开发能力,发挥其强大的功能。

开源协议

RULEX 基于 AGPL 协议开源。AGPL(Affero General Public License)是一种自由软件许可证,它是 GPL(GNU General Public License)的变体,主要目的是加强GPL 在网络应用和服务中的版权规定。AGPL 要求,如果您使用 AGPL 许可的软件与用户通过网络进行交互,那么您必须提供软件的源代码,并且所有对软件的修改也必须遵循同样的开源条件。这是为了防止软件提供者利用网络服务的形式来规避 GPL 要求的源代码公开义务.

🚫

AGPL 具有污染性,如果您修改了 RULEX 的代码必须要开源,否则是违反开源法律规定的行为。AGPL 详细规范:https://www.gnu.org/licenses/agpl-3.0.en.html (opens in a new tab)

RULEX架构

devel

快速开始

环境搭建

推荐在 Linux 下开发,最好是 Ubuntu 系统。Windows 下可能环境配置比较麻烦。如果你需要交叉编译,请看这篇文章:https://jensd.be/1126/linux/cross-compiling-for-arm-or-aarch64-on-debian-or-ubuntu (opens in a new tab) , 当环境搭建好以后就可以开发了。

git clone https://github.com/hootrhino/rulex.git
cd rulex
make
# make arm32
# make arm64

启动程序

启动需要带 2 个参数,db 是保存配置数据的位置,该参数指定的路径最后会生成一个 sqlite3 数据库文件,config 参数是 ini 的路径

./rulex run -config=conf/rulex.ini

开发指南

Visual Studio Code

推荐使用 Visual Studio Code 编辑器,下面给出一个 Vscode 编辑器调试用的配置,也就是 .vscode/launch.json

launch.json
{
  "version": "0.2.0",
  "configurations": [
    {
      "name": "Debug",
      "type": "go",
      "request": "launch",
      "mode": "auto",
      "env": {
        "GODEBUG": "'gctrace=1'"
      },
      "program": "${workspaceFolder}/main.go",
      "args": ["run", "-config=test/conf/rulex.ini"]
    }
  ]
}

Jetbrain GoLand

GoLand 直接导入项目即可。

⚠️

强烈推荐使用 Ubuntu 开发,能少走很多弯路,或者用 Windows 下 WSL2 来开发。

常见问题

Go 环境

有时候你的系统要是没带 Go,则需要装一个,Go 的版本要求是 1.8+。安装步骤可以参考此处:https://go.dev/doc/install (opens in a new tab)

GCC 编译器

因为部分功能需要依赖 cgo(Sqlite),因此需要安装 GCC 编译器,此时 Windows 下就很麻烦了,这个过程困难程度因本地环境而异,Linux 下可以看这篇文章:https://jensd.be/1126/linux/cross-compiling-for-arm-or-aarch64-on-debian-or-ubuntu (opens in a new tab)

OpenCV

因为有视频流相关模块,依赖了 GoLang 绑定的 OpenCV,因此需要搭建好环境,具体搭建方法可以看此处:https://gocv.io (opens in a new tab)

Lua 脚本

如果不熟悉 Lua 可能需要简单学习一下,Lua 语法简单,半小时即可入门,入门教程可以参考此处:https://www.runoob.com/lua/lua-tutorial.html (opens in a new tab)

关键类型

南向

资源主要用来和外部资源对接,比如 MQTT 等。具备双向通信的功能。

devel

接口

Go
// XSource 接口代表了一个终端资源,例如实际的 MQTT 客户端。
// 它定义了与资源交互所需的一系列方法,包括测试资源可用性、初始化、启动、数据传输等。
type XSource interface {
   // Test 方法用于测试资源是否可用。
   // inEndId 是资源的标识符。
   // 返回测试结果,如果资源可用则返回 true,否则返回 false。
   Test(inEndId string) bool
 
   // Init 方法用于初始化资源,传递资源配置信息。
   // inEndId 是资源的标识符,configMap 是资源配置的映射。
   // 返回初始化是否成功的错误信息。
   Init(inEndId string, configMap map[string]interface{}) error
 
   // Start 方法用于启动资源。
   // CCTX 是上下文,具体作用取决于资源的实现。
   // 返回启动是否成功的错误信息。
   Start(CCTX context.Context) error
 
   // DataModels 方法用于获取资源支持的数据模型列表。
   // 这些模型对应于云平台的物模型。
   DataModels() []XDataModel
 
   // Status 方法用于获取资源的当前状态。
   Status() SourceState
 
   // Details 方法用于获取资源绑定的详细信息。
   Details() *InEnd
 
   // Stop 方法用于停止资源并释放相关资源。
   Stop()
 
   // DownStream 方法用于处理下行数据,即从云平台发送到本地资源的数据。
   // 接收一个字节切片作为数据。
   // 返回实际处理的数据长度和错误信息。
   DownStream([]byte) (int, error)
 
   // UpStream 方法用于处理上行数据,即从本地资源发送到云平台的数据。
   // 接收一个字节切片作为数据。
   // 返回实际处理的数据长度和错误信息。
   UpStream([]byte) (int, error)
}

北向

主要用于实现将数据推向目的地。

devel

接口

Go
type XTarget interface {
 //
 // 用来初始化传递资源配置
 //
 Init(outEndId string, configMap map[string]interface{}) error
 //
 // 启动资源
 //
 Start(CCTX) error
 //
 // 获取资源状态
 //
 Status() SourceState
 //
 // 获取资源绑定的的详情
 //
 Details() *OutEnd
 //
 // 数据出口
 //
 To(data interface{}) (interface{}, error)
 //
 // 停止资源, 用来释放资源
 //
 Stop()
}

设备

主要用来接入外部设备。

devel

接口

Go
type XDevice interface {
 // 初始化 通常用来获取设备的配置
 Init(devId string, configMap map[string]interface{}) error
 // 启动, 设备的工作进程
 Start(CCTX) error
 // 从设备里面读数据出来, 第一个参数一般作 flag 用, 也就是常说的指令类型
 OnRead(cmd []byte, data []byte) (int, error)
 // 把数据写入设备, 第一个参数一般作 flag 用, 也就是常说的指令类型
 OnWrite(cmd []byte, data []byte) (int, error)
 // 新特性, 适用于自定义协议读写
 OnCtrl(cmd []byte, args []byte) ([]byte, error)
 // 设备当前状态
 Status() DeviceState
 // 停止设备, 在这里释放资源,一般是先置状态为 STOP,然后 CancelContext()
 Stop()
 // 链接指向真实设备,保存在内存里面,和 SQLite 里的数据是对应关系
 Details() *Device
 // 状态
 SetState(DeviceState)
 // 外部调用, 该接口是个高级功能, 准备为了设计分布式部署设备的时候用, 但是相当长时间内都不会开启
 // 默认情况下该接口没有用
 OnDCACall(UUID string, Command string, Args interface{}) DCAResult
}

插件

插件用于扩展 RHILEX 本身的功能。

devel

接口

Go
type XPlugin interface {
 Init(*ini.Section) error // 参数为外部配置
 Start(Rhilex) error
 Service(ServiceArg) ServiceResult // 对外提供一些服务
 Stop() error
 PluginMetaInfo() XPluginMetaInfo
}

开发案例

设备:HTTP 数据采集器

Go
package device
 
import (
 "fmt"
 "io"
 "net/http"
 "net/url"
 "sync"
 "time"
 
 "github.com/hootrhino/rhilex/common"
 
 "github.com/hootrhino/rhilex/glogger"
 "github.com/hootrhino/rhilex/typex"
 "github.com/hootrhino/rhilex/utils"
)
 
type __HttpCommonConfig struct {
 Timeout     *int   `json:"timeout" validate:"required"`
 AutoRequest *bool  `json:"autoRequest" validate:"required"`
 Frequency   *int64 `json:"frequency" validate:"required"`
}
type __HttpMainConfig struct {
 CommonConfig __HttpCommonConfig `json:"commonConfig" validate:"required"`
 HttpConfig   common.HTTPConfig  `json:"httpConfig" validate:"required"`
}
 
type GenericHttpDevice struct {
 typex.XStatus
 client     http.Client
 status     typex.DeviceState
 RuleEngine typex.Rhilex
 mainConfig __HttpMainConfig
 locker     sync.Locker
}
 
/*
*
* 通用串口透传
*
 */
func NewGenericHttpDevice(e typex.Rhilex) typex.XDevice {
 hd := new(GenericHttpDevice)
 hd.locker = &sync.Mutex{}
 hd.client = *http.DefaultClient
 hd.mainConfig = __HttpMainConfig{
  CommonConfig: __HttpCommonConfig{
   AutoRequest: func() *bool {
    b := false
    return &b
   }(),
   Timeout: func() *int {
    b := 3000
    return &b
   }(),
  },
 }
 hd.RuleEngine = e
 return hd
}
 
//  初始化
func (hd *GenericHttpDevice) Init(devId string, configMap map[string]interface{}) error {
 hd.PointId = devId
 if err := utils.BindSourceConfig(configMap, &hd.mainConfig); err != nil {
  glogger.GLogger.Error(err)
  return err
 }
 if _, err := isValidHTTP_URL(hd.mainConfig.HttpConfig.Url); err != nil {
  return fmt.Errorf("Invalid url format:%s, %s", hd.mainConfig.HttpConfig.Url, err)
 }
 return nil
}
 
// 启动
func (hd *GenericHttpDevice) Start(cctx typex.CCTX) error {
 hd.Ctx = cctx.Ctx
 hd.CancelCTX = cctx.CancelCTX
 
 if *hd.mainConfig.CommonConfig.AutoRequest {
  ticker := time.NewTicker(
   time.Duration(*hd.mainConfig.CommonConfig.Frequency) * time.Millisecond)
  go func() {
   for {
    select {
    case <-hd.Ctx.Done():
     {
      ticker.Stop()
      return
     }
    default:
     {
     }
    }
    body := httpGet(hd.client, hd.mainConfig.HttpConfig.Url)
    if body != "" {
     hd.RuleEngine.WorkDevice(hd.Details(), body)
    }
    <-ticker.C
   }
  }()
 
 }
 hd.status = typex.DEV_UP
 return nil
}
 
func (hd *GenericHttpDevice) OnRead(cmd []byte, data []byte) (int, error) {
 
 return 0, nil
}
 
// 把数据写入设备
func (hd *GenericHttpDevice) OnWrite(cmd []byte, b []byte) (int, error) {
 return 0, nil
}
 
// 设备当前状态
func (hd *GenericHttpDevice) Status() typex.DeviceState {
 return typex.DEV_UP
}
 
// 停止设备
func (hd *GenericHttpDevice) Stop() {
 hd.status = typex.DEV_DOWN
 if hd.CancelCTX != nil {
  hd.CancelCTX()
 }
}
 
// 真实设备
func (hd *GenericHttpDevice) Details() *typex.Device {
 return hd.RuleEngine.GetDevice(hd.PointId)
}
 
// 状态
func (hd *GenericHttpDevice) SetState(status typex.DeviceState) {
 hd.status = status
 
}
 
// --------------------------------------------------------------------------------------------------
//
// --------------------------------------------------------------------------------------------------
 
func (hd *GenericHttpDevice) OnDCACall(UUID string, Command string, Args interface{}) typex.DCAResult {
 return typex.DCAResult{}
}
func (hd *GenericHttpDevice) OnCtrl(cmd []byte, args []byte) ([]byte, error) {
 return []byte{}, nil
}
 
/*
*
* HTTP GET
*
 */
func httpGet(client http.Client, url string) string {
 var err error
 client.Timeout = 2 * time.Second
 request, err := http.NewRequest("GET", url, nil)
 if err != nil {
  glogger.GLogger.Warn(err)
  return ""
 }
 
 response, err := client.Do(request)
 if err != nil {
  glogger.GLogger.Warn(err)
  return ""
 }
 defer response.Body.Close()
 body, err := io.ReadAll(response.Body)
 if err != nil {
  glogger.GLogger.Warn(err)
  return ""
 }
 return string(body)
}
 
/*
*
* 验证URL语法
*
 */
func isValidHTTP_URL(urlStr string) (bool, error) {
 r, err := url.Parse(urlStr)
 if err != nil {
  return false, fmt.Errorf("error parsing URL: %w", err)
 }
 if r.Scheme != "http" && r.Scheme != "https" {
  return false, fmt.Errorf("Invalid scheme; must be http or https")
 }
 return true, nil
}
 

北向:数据推送到HTTP

Go
 
package target
 
import (
 "net"
 "net/http"
 "net/url"
 "time"
 
 "github.com/hootrhino/rhilex/common"
 "github.com/hootrhino/rhilex/glogger"
 "github.com/hootrhino/rhilex/typex"
 "github.com/hootrhino/rhilex/utils"
)
 
type HTTPTarget struct {
 typex.XStatus
 client     http.Client
 mainConfig common.HTTPConfig
 status     typex.SourceState
}
 
func NewHTTPTarget(e typex.Rhilex) typex.XTarget {
 ht := new(HTTPTarget)
 ht.RuleEngine = e
 ht.mainConfig = common.HTTPConfig{}
 ht.status = typex.SOURCE_DOWN
 return ht
}
 
func (ht *HTTPTarget) Init(outEndId string, configMap map[string]interface{}) error {
 ht.PointId = outEndId
 
 if err := utils.BindSourceConfig(configMap, &ht.mainConfig); err != nil {
  return err
 }
 
 return nil
 
}
func (ht *HTTPTarget) Start(cctx typex.CCTX) error {
 ht.Ctx = cctx.Ctx
 ht.CancelCTX = cctx.CancelCTX
 ht.client = http.Client{}
 ht.status = typex.SOURCE_UP
 glogger.GLogger.Info("HTTP Target started")
 return nil
}
 
func (ht *HTTPTarget) Status() typex.SourceState {
 if err := ht.prob(); err != nil {
  glogger.GLogger.Error(err)
  return typex.SOURCE_DOWN
 }
 return ht.status
 
}
func (ht *HTTPTarget) To(data interface{}) (interface{}, error) {
 r, err := utils.Post(ht.client, data, ht.mainConfig.Url, ht.mainConfig.Headers)
 return r, err
}
 
func (ht *HTTPTarget) Stop() {
 ht.status = typex.SOURCE_DOWN
 if ht.CancelCTX != nil {
  ht.CancelCTX()
 }
}
func (ht *HTTPTarget) Details() *typex.OutEnd {
 return ht.RuleEngine.GetOutEnd(ht.PointId)
}
func (ht *HTTPTarget) prob() error {
 d := net.Dialer{
  Timeout: 3 * time.Second,
 }
 Url, err := url.Parse(ht.mainConfig.Url)
 if err != nil {
  return err
 }
 conn, err := d.Dial("tcp", Url.Host)
 if err != nil {
  return err
 }
 conn.Close()
 return nil
}
 

插件:Demo 插件

Go
package demo_plugin
 
import (
 "github.com/hootrhino/rhilex/typex"
 "gopkg.in/ini.v1"
)
 
type DemoPlugin struct {
}
 
func NewDemoPlugin() *DemoPlugin {
 return &DemoPlugin{}
}
 
func (dm *DemoPlugin) Init(config *ini.Section) error {
 return nil
}
 
func (dm *DemoPlugin) Start(typex.Rhilex) error {
 return nil
}
func (dm *DemoPlugin) Stop() error {
 return nil
}
 
func (dm *DemoPlugin) PluginMetaInfo() typex.XPluginMetaInfo {
 return typex.XPluginMetaInfo{
  UUID:        "DEMO01",
  Name:        "DemoPlugin",
  Version:     "v0.0.1",
  Description: "DEMO01",
 }
}
 
/*
*
* 服务调用接口
*
 */
func (dm *DemoPlugin) Service(arg typex.ServiceArg) typex.ServiceResult {
 return typex.ServiceResult{}
}
 

综合案例

Modbus 温湿度传感器网关

这是一个 Modbus 温湿度传感器网关示例。

Go
package test
 
import (
 "context"
 "os"
 "os/signal"
 "syscall"
 "testing"
 "time"
 
 httpserver "github.com/hootrhino/rhilex/component/apiserver"
 "github.com/hootrhino/rhilex/component/rhilexrpc"
 "github.com/hootrhino/rhilex/config"
 "github.com/hootrhino/rhilex/engine"
 "github.com/hootrhino/rhilex/glogger"
 "github.com/hootrhino/rhilex/plugin/demo_plugin"
 "github.com/hootrhino/rhilex/typex"
 
 "google.golang.org/grpc"
 "google.golang.org/grpc/credentials/insecure"
)
 
func Test_Modbus_LUA_Parse(t *testing.T) {
 c := make(chan os.Signal, 1)
 signal.Notify(c, syscall.SIGINT, syscall.SIGABRT)
 engine := engine.InitRuleEngine(core.InitGlobalConfig("config/rhilex.ini"))
 engine.Start()
 hh := httpserver.NewHttpApiServer(engine)
 // HttpApiServer loaded default
 if err := engine.LoadPlugin("plugin.http_server", hh); err != nil {
  glogger.GLogger.Fatal("Rule load failed:", err)
 }
 // Load a demo plugin
 if err := engine.LoadPlugin("plugin.demo", demo_plugin.NewDemoPlugin()); err != nil {
  glogger.GLogger.Error("Rule load failed:", err)
 }
 // Grpc Inend
 grpcInend := typex.NewInEnd("GRPC", "rhilex Grpc InEnd", "rhilex Grpc InEnd", map[string]interface{}{
  "port": 2581,
 })
 ctx, cancelF := typex.NewCCTX() // ,ctx, cancelF
 if err := engine.LoadInEndWithCtx(grpcInend, ctx, cancelF); err != nil {
  glogger.GLogger.Error("Rule load failed:", err)
 }
 
 rule := typex.NewRule(engine,
  "uuid",
  "Just a test",
  "Just a test",
  []string{grpcInend.UUID}[0],
  "",
  `function Success() print("[LUA Success Callback]=> OK") end`,
  `
  Actions = {
   function(args)
       Debug(args)
    return true, args
   end
  }`,
  `function Failed(error) print("[LUA Failed Callback]", error) end`)
 if err := engine.LoadRule(rule); err != nil {
  glogger.GLogger.Error(err)
 }
 conn, err := grpc.Dial("127.0.0.1:2581", grpc.WithTransportCredentials(insecure.NewCredentials()))
 if err != nil {
  glogger.GLogger.Error(err)
 }
 defer conn.Close()
 client := rhilexrpc.NewRhilexRpcClient(conn)
 
 resp, err := client.Work(context.Background(), &rhilexrpc.Data{
  // lua 输出 {"a":"0000000000000001","b":"00000000","c":"00000001"}
  Value: string([]byte{0, 1, 0, 1}),
 })
 if err != nil {
  glogger.GLogger.Error(err)
 }
 glogger.GLogger.Infof("rhilex Rpc Call Result ====>>: %v", resp.GetMessage())
 time.Sleep(1 * time.Second)
 engine.Stop()
}
 

总结

RULEX 框架整体来说设计上很简单,只需阅读本指南后分析代码结构即可很快上手开发,因此本文档不赘述一些基础细节。希望用户能基于 RULEX 开发出各种强大的应用。同时希望开发者们能攻坚 RULEX 社区。

© 2023-2025 RHILEX Technologies Inc. All rights reserved.