使用 MQTT X 验证 Kuiper 流处理功能

本篇文章将使用 MQTT X 的脚本及定时功能模拟温湿度数据上报,EMQ X Edge 作为消息中间件进行消息转发,EMQ X Kuiper 进行消息接收并进行规则处理,最终将处理过的数据通过 EMQ X Edge 下发到 MQTT X。

mqttxedgekuiper.png

介绍及安装

本文中所演示的所有运行环境都将通过 Docker 搭建,如有其它安装需求,也可参考下文中提供的下载链接和安装文档进行构建。

EMQ X Kuiper

EMQ X Kuiper 是 Golang 实现的轻量级物联网边缘分析、流式处理开源软件,可以运行在各类资源受限的边缘设备上。Kuiper 设计的一个主要目标就是将在云端运行的实时流式计算框架(比如 Apache SparkApache StormApache Flink 等)迁移到边缘端。Kuiper 参考了上述云端流式处理项目的架构与实现,结合边缘流式数据处理的特点,采用了编写基于源 (Source)SQL (业务逻辑处理), 目标 (Sink) 的规则引擎来实现边缘端的流式数据处理。项目地址:https://github.com/emqx/kuiper

版本:v1.0.2

下载链接安装文档

# 获取 Docker 镜像
$ docker pull emqx/kuiper:1.0.2

# 启动 Docker 容器
$ docker run -p 9081:9081 -d --name kuiper emqx/kuiper:1.0.2

Kuiper-manager

本文将使用 Kuiper-manager 对 EMQ X Kuiper 进行可视化管理和使用,Kuiper-manager 是一款可用于管理 Kuiper 节点,流,规则和插件等的 Web 管理控制台。

版本:v1.0.2

目前仅支持使用 Docker 镜像

# 获取 Docker 镜像
$ docker pull emqx/kuiper-manager:1.0.2

# 启动 Docker 容器
$ docker run -p 9082:9082 -d emqx/kuiper-manager:1.0.2

EMQ X Edge

EMQ X Edge 是轻量级多协议物联网边缘消息中间件,支持部署在资源受限的物联网边缘硬件。项目地址:https://github.com/emqx/emqx

版本:v4.2.4

下载链接 | 安装文档

# 获取 Docker 镜像
$ docker pull emqx/emqx-edge:4.2.4

# 启动 Docker 容器
$ docker run -d --name emqx -p 1883:1883 emqx/emqx-edge:4.2.4

MQTT X

MQTT X 是由一款跨平台 MQTT 5.0 桌面测试客户端,它支持 macOS,Linux,Windows。用户可以快速创建多个同时在线的 MQTT 客户端,方便测试 MQTT/TCP、MQTT/TLS、MQTT/WebSocket 的连接/发布/订阅功能及其他 MQTT 协议 特性。项目地址:https://github.com/emqx/MQTTX

版本:v1.4.2

下载链接 | GitHub

用户可到 MQTT X 官网或 GitHub 下载页下载所对应操作系统的安装包进行安装使用。

Mac 用户可在 App Store 中进行下载:https://apps.apple.com/cn/app/mqttx/id1514074565?mt=12

Linux 用户可在 Snapcraft 中进行下载:https://snapcraft.io/mqttx

使用教程

在搭建完环境后,我们就可以搭配各模块间的功能来使用,进行功能测试和验证。

Kuiper-manager 使用

首先我们对 Kuiper 进行流和规则的创建和配置。在安装完并运行 Kuiper-manager 成功后,我们打开浏览器,然后输入 http://localhost:9082。 如果您从其他计算机访问 kuiper-manager,请将 localhost 更改为运行 kuiper-manager 的 IP 地址。首次打开后需要输入的用户名密码为:admin / public,建议首次登录后,进行密码修改。

kuipermanagerlogin.png

节点

登录成功之后会进入到一个节点管理界面,点击 添加节点 按钮,会有一个弹出框,需要添加一个 Kuiper 的实例节点。这里我们使用的是普通节点,就选择第一项 直连节点。除直连节点外,目前还支持添加华为 IEF 节点,本文将不做阐述。然后输入需要操控的 Kuiper 实例的 端点地址 ,并输入 节点名称 用来标识节点。

注意:如果使用 Docker 启动的话,端点地址需要输入 Docker 容器内的 IP 地址

kuiperaddnodes.png

添加成功后,我们在节点列表中,点击节点名称后可进入到该节点实例中。进入后,接下来我们将配置创建该 Kuiper 实例下的流和规则。

进入到 Kuiper 实例页面后,会进入到流的 Tab 页面下,我们点击右边 创建流 按钮,进入到创建流的页面,此时可按以下步骤来进行操作:

  1. 输入一个 流名称 用来标识,这里我们输入流名称为 demo

  2. 输入结构定义,比如该条流需要接收到的流数据中有哪些字段类型,我们可以提前定义。添加时仅需输入字段名称,和选择类型添加即可,类型包含了 bigint, float, string, boolean, array, struct 等。结构定义为可选,可以勾选结构列表上方的 是否为带结构的流 来取消或开启结构定义,当取消结构定义时,将接收任何结构类型的数据。本文中我们已经规定好了需要处理的数据结构,所以我们分别添加两个字段:temperaturehumidity,类型都为 bigint

  3. 输入 数据源,文中我们将使用 MQTT 作为消息源,因此该配置可输入用于接收消息的 Topic, 这里我们输入 /kuiper/stream

  4. 选择 流类型,这里将选择为 MQTT;

  5. 选择 配置组,配置组即为流类型下定义的配置信息,比如 MQTT 默认配置组下 servers 信息为 ['tcp://127.0.0.1:1883']。用户可自定义该配置信息,点击上方的 源配置 按钮,进入到页面中配置,也可到 etc 目录下修改配置文件。这里我们选择重新配置过的 demo_conf 配置组;

    注意:如果使用的 MQTT Broker 为 Docker 启动的 EMQ X Edge 话,Servers 地址需要修改为 Docker 容器内的 IP 地址

  6. 选择 流格式,最后我们选择流数据格式为 json

kuipercreatestream.png

除以上可视化创建方式外,我们还可以点击页面中最右上角的切换按钮,切换到文本模式。可直接输入创建流的 SQL 语句进行创建,SQL 示例:

CREATE STREAM demo (
  temperature bigint,
  humidity bigint,
) WITH (DATASOURCE="/kuiper/stream", FORMAT="json", CONF_KEY="demo_conf", TYPE="mqtt");

点击 提交 按钮后,我们就成功创建了一条流。接下来将为创建好的流设置规则。

规则

点击规则的 Tab 项,进入到规则列表页面,我们点击右边的 创建规则 按钮,进入到创建规则的页面,此时可按以下步骤来进行操作:

  1. 输入 规则 ID 用来标识该规则,这里我们输入 demoRule

  2. 输入 SQL 语句,为规则运行的 SQL 查询。这里将定义一条查询数据流中的温湿度数据,并设置过滤条件为温度大于 30 时的 SQL 语句。SQL 示例:

    SELECT temperature, humidity FROM demo WHERE temperature > 30
    
  3. 选择添加规则的 动作,即为 Sink 动作组,数据可多选,Sink 为当规则执行后输出的目标。这里我们依然使用 MQTT,通过 MQTT 转发规则执行后的数据。选择完成后,可输入 MQTT Sink 的配置信息,本文就只配置 MQTT Broker 的地址和 Topic 信息,Topic 即为接收消息的主题。

    注意:如果使用的 MQTT Broker 为 Docker 启动的 EMQ X Edge 话,Broker 地址需要填写为 Docker 容器内的 IP 地址

  4. 设置 选项,选项部分为可选,均有默认值,如需修改可参考 Kuiper 文档 进行设置。

kuipercreaterule.png

除以上可视化创建方式外,我们还可以点击页面中最右上角的切换按钮,切换到文本模式。可直接输入创建规则的 JSON 数据进行创建,JSON 示例:

{
  "id": "demoRule",
  "sql": "SELECT temperature, humidity FROM demo WHERE temperature > 30",
  "actions": [
    {
      "mqtt": {
        "server": "tcp://172.17.0.2:1883",
        "topic": "/kuiper/rule"
      }
    }
  ]
}

点击 提交 按钮后,我们就成功创建了一条规则。至此,我们就已经完成了 Kuiper 数据流和规则配置,接下来我们将使用 MQTT X 来测试和验证 Kuiper 的流处理功能。

MQTT X 使用

下载安装完成后,打开 MQTT X,我们新建一个名为 edge1 的连接,连接到和 Kuiper Source 配置相同的 EMQ X Edge 上。测试连接成功后,我们进入到 脚本 页面,使用以下提供的示例脚本,来生成模拟数据。

/**
 * Simulated temperature and humidity reporting
 * @return Return a simulated temperature and humidity JSON data - { "temperature": 23, "humidity": 40 }
 * @param value, MQTT Payload - {}
 */

function random(min, max) {
  return Math.round(Math.random() * (max - min)) + min
}

function handlePayload(value) {
  let _value = value
  if (typeof value === 'string') {
    _value = JSON.parse(value)
  }
  _value.temperature = random(10, 40)
  _value.humidity = random(20, 40)
  return JSON.stringify(_value, null, 2)
}

execute(handlePayload)

mqttxscript.png

测试发现模拟数据成功,我们到连接页面中,打开脚本使用功能(使用脚本功能本文不做详细描述,可参考 MQTT X 文档),输入发送的 Payload 数据模版为 {} ,输入 Topic 为流定义中的 Data Source,这里就填写 /kuiper/stream,然后设置定时消息,设置发送频率为 1 秒,然后点击发送一条消息成功后,MQTT X 将每秒自动发送一条模拟测试数据。

mqttxtimed.png

此时再新建一个名为 edge2 的连接,连接到和 Kuiper Sink 配置相同的 EMQ X Edge 上,然后订阅 MQTT Sink 中配置的 Topic,这里就订阅 /kuiper/rule 主题,用来接收 Kuiper 处理的过的数据。

mqttxrule.png

验证结果

当我们发送了模拟数据后,可以通过在规则列表中点击 状态 按钮查看是否有消息流入流出。我们通过以下截图可以看到,Kuiper 总共收到了 40 条消息,过滤流出了 14 条消息。

kuiperrulestatus.png

然后继续查看 MQTT X 内的信息,edge1 一共定时发送了 40 条模拟消息,切换到 edge2 可以查看到一共收到 14 条消息。发送和接收数据和 Kuiper 内统计流入流出数据一致,且查看接收到的消息中的 temperature 都完全大于 30,满足了我们在 Kuiper 中设置的过滤条件。说明我们的 Kuiper 流处理功能已经成功完成了我们所设置的数据处理需求,测试和验证成功。

mqttxsend.png

mqttxres.png

除通过状态按钮查看 Kuiper 规则处理的数据信息外,还可点击 拓扑 按钮,进入到规则的拓扑图页面,通过规则拓扑图完整的将数据流向与规则状态展示出来,并且可以查看到具体处理数据模块的实时动态信息。

kuiperruletopo.png

总结

至此,本文就完成了一个使用 MQTT X 客户端验证 Kuiper 流处理的功能的简易教程。Kuiper 可以运行在各类物联网的边缘使用场景中,通过 Kuiper 在边缘端的处理,可以提升系统响应速度,节省网络带宽费用和存储成本,以及提高系统安全性等。

除文章中所示例的 MQTT Source 和 MQTT Sink 外,Kuiper 还内置了许多多样化的 Source 和 Sink 配置,并且包含了与 EdgeX Foundry、KubeEdge、EMQ X Edge 等的集成能力。规则 SQL 内还支持 60+ 常见的函数,提供扩展点可以扩展自定义函数。提供了强大的插件系统,高度可扩展。

本篇文章中所使用三个项目都完全开源,您可以到 GitHub(EMQ X KuiperEMQ X EdgeMQTTX)中来提交使用过程中遇到的问题,或是 Fork 我们的项目向我们提交修改后的 PR,我们将会及时查阅和处理。也特此感谢社区中所有用户的贡献和反馈。

Shifan Yu

Shifan Yu

EMQ X 研发工程师,负责 EMQ X 的前端开发工作,也是 MQTT X 的开发者和维护者。

为流式数据存储和实时处理而生的流数据库

全托管的 MQTT 云服务,开始 180 天免费试用

推荐阅读

Kuiper 1.1.2 正式发布 - 超轻量 IoT 边缘流处理

Kuiper 1.1.2 对函数插件进行了增强,增加了更多内置函数及规则属性,并添加了与机器学习框架 TensorFlow Lite 集成的示例。

MQTT X 脚本功能使用教程

MQTT X 在 v1.4.2 版本后加入了脚本功能,用户可使用脚本对 Payload 进行自定义转换,本文将通过两个简单的实例来介绍脚本功能的使用。

在树莓派上搭建智能家居网关

本文将使用 树莓派 + EMQ X Edge + EMQ X Kuiper 搭建智能家居网关,实现智能家居设备数据的边缘计算处理,减少家庭私密数据外流。