
simulate命令,配合模拟脚本生成多个太阳能和风力发电站 MQTT 状态数据采集上报与客户端(虚拟电站)。
虚拟电站将连接到 EMQX Platform 上,周期性生成模拟数据,并向指定 MQTT 主题发布自身状态数据; EMQX Platform 在接收到消息后,使用内置的规则引擎和数据集成功能,将其存储到 Snowflake 中; Snowflake 保存数据后,在其平台上进行数据分析。
典型的数据格式如下:

{"id": "6b50f69c-9c9b-48e7-ae9d-849e6e5e5dd5","city": "San Francisco","model": "Solar-Model-A1","regionID": "01","type": "Solar","ratedPower": 15.5,"timestamp": "2024-07-10T12:34:56Z","powerOutput": 12.3,"windSpeed": null,"solarRadiation": 720,"rotationSpeed": null}
准备 MQTTX 模拟数据
simulate命令,可以使用 Node.js 编写模拟脚本,实现预期的模拟消息生成与发布。
solar-wind-power-plant.js文件,将本章节提供的模拟脚本粘贴进去。您也可以参照此处对脚本内容进行修改:https://mqttx.app/zh/docs/cli/get-started#%E6%A8%A1%E6%8B%9F%E5%99%A8
simulate运行脚本,指定脚本路径和模拟的客户端数量:
mqttx simulate --file ./solar-wind-power-plant.js -c 10
--file选项指定运行
./solar-wind-power-plant.js脚本文件
-c选项指定模拟客户端数量为 10 个
mqttx/simulate/Solar-Wind-Power-Plant/{clientid}主题发布一条消息。
sub命令订阅主题,验证消息是否正常发布:
mqttx sub -t mqttx/simulate/Solar-Wind-Power-Plant/+ -v
const store = {index: 0};function transformToFloat(val) {if (typeof val !== 'number') {val = Number(val);}const _val = val.toFixed(2);if (_val.endsWith('.00')) {return parseFloat(_val) + 0.01;}return parseFloat(_val);}function getWindPower(hour, faker) {if (hour >= 8 && hour < 18) {return faker.datatype.float({ min: 900, max: 1100 });} else {return faker.datatype.float({ min: 600, max: 900 });}}function calculateWindSpeed(rotationSpeed) {// 假设转速和风速之间的线性关系return rotationSpeed 60; // 简单的线性关系}function getSolarPower(hour, isCloudy, faker) {if (hour >= 6 && hour < 18) {let power = faker.datatype.float({ min: 5, max: 20 });if (isCloudy) {power *= 0.8;}return power;} else {return faker.datatype.float({ min: 0, max: 1 });}}function calculateSolarRadiation(powerOutput) {// 假设功率和光照强度之间的线性关系return powerOutput * 50; // 简单的线性关系}function generator(faker, options) {const clientid = options.clientid;const currentTimestamp = Date.now(); // 使用当前时间const currentDate = new Date(currentTimestamp).toISOString().split('T')[0];if (!store[clientid]) {const deviceType = faker.helpers.arrayElement(['Wind', 'Solar']);const ratedPower = deviceType === 'Wind' ? 1500 : faker.datatype.float({ min: 5, max: 20 });store[clientid] = {id: faker.datatype.uuid(),city: faker.address.city(),model: faker.helpers.arrayElement(['Model_A', 'Model_B', 'Model_C']),regionID: faker.helpers.arrayElement(['01', '02', '03', '04']),type: deviceType,ratedPower,currentDate,isCloudy: faker.datatype.boolean(0.3), // 30% 概率是阴天powerOutput: 0,windSpeed: deviceType === 'Wind' ? null : 0,solarRadiation: deviceType === 'Solar' ? null : 0,rotationSpeed: deviceType === 'Wind' ? faker.datatype.float({ min: 0, max: 1500 }) : null};}const data = store[clientid];const hour = new Date(currentTimestamp).getHours();// 新的一天时,重新确定是否是阴天if (data.currentDate !== currentDate) {data.currentDate = currentDate;data.isCloudy = faker.datatype.boolean(0.3); // 30% 概率是阴天}if (data.type === 'Wind') {data.rotationSpeed = faker.datatype.float({ min: 0, max: 1500 });data.powerOutput = getWindPower(hour, faker);data.windSpeed = calculateWindSpeed(data.rotationSpeed);} else if (data.type === 'Solar') {data.powerOutput = getSolarPower(hour, data.isCloudy, faker);data.solarRadiation = calculateSolarRadiation(data.powerOutput);}return {message: JSON.stringify({id: data.id,city: data.city,model: data.model,regionID: data.regionID,type: data.type,ratedPower: transformToFloat(data.ratedPower),timestamp: currentTimestamp,powerOutput: transformToFloat(data.powerOutput),windSpeed: data.windSpeed ? transformToFloat(data.windSpeed) : 0,solarRadiation: data.solarRadiation ? transformToFloat(data.solarRadiation) : 0,rotationSpeed: data.rotationSpeed ? transformToFloat(data.rotationSpeed) : 0})};}const name = 'Solar-Wind-Power-Plant';const author = 'EMQX Team';const dataFormat = 'JSON';const version = '0.0.1';const description = `Solar and wind power plant simulator, mock data generated with current timestamp.Cloudiness is determined at the start of each day.`;module.exports = {generator,name,author,dataFormat,version,description,};
准备 Snowflake 环境
IOT_DATA的数据库;

IOT_DATA数据库下的
PUBLICSchema

Standard,参考场景描述,对应的 Snowflake 建表语句如下:
CREATE TABLE RenewableEnergyData (id STRING,city STRING,model STRING,regionID STRING,type STRING,ratedPower FLOAT,timestamp TIMESTAMP,powerOutput FLOAT,windSpeed FLOAT,solarRadiation FLOAT,rotationSpeed FLOAT);
准备连接所需信息

// sql-api-generate-jwt.js.const crypto = require('crypto')const fs = require('fs');var jwt = require('jsonwebtoken');// 根据实际情况修改以下值// 证书私钥文件路径var privateKeyFile = fs.readFileSync('./rsa_key.p8');// 证书密码(如果有)var mypassphrase = '';// 账户 ID,英文字符需要大写var accountID = "OXTPEXE-LCF92X4";// 注册用户名,英文字符需要大写var username = 'XXXXXX'privateKeyObject = crypto.createPrivateKey({ key: privateKeyFile, format: 'pem', passphrase: mypassphrase });var privateKey = privateKeyObject.export({ format: 'pem', type: 'pkcs8' });publicKeyObject = crypto.createPublicKey({ key: privateKey, format: 'pem' });var publicKey = publicKeyObject.export({ format: 'der', type: 'spki' });const FP = crypto.createHash('sha256').update(publicKey, 'utf8').digest('base64')var publicKeyFingerprint = 'SHA256:' + FP;var signOptions = {iat: Date.now(),iss: `${accountID}.${username}.${publicKeyFingerprint}`,sub: `${accountID}.${username}`,exp: Date.now() + 1000 * 60 * 60};var token = jwt.sign(signOptions, privateKey, { algorithm: 'RS256' });console.log("\nToken: \n\n" + token);

至此,我们已经完成了所有准备工作。接下来,我们需要在 EMQX Platform 配置规则引擎与数据集成来实现。
在 EMQX Platform 上配置数据集成
截止 EMQX Platform 企业版 v5.7.1 版本,原生的 Snowflake 数据集成还在开发中,您需要通过 EMQX Platform 的 HTTP 动作 + Snowflake REST API 进行数据写入。

admin,
public;
SELECTpayloadFROM"mqttx/simulate/Solar-Wind-Power-Plant/+"
名称:填入任意名称; 连接器:点击右侧 + 按钮,填入生成 REST API 请求参数章节中的 URL 和请求头,并完成创建; 请求体:此处应该为 JSON 格式,指定数据库、插入 Snowflake SQL 语句以及绑定参数。 bindings
字段中,可以使用${filed}
语法来提取规则 SQL 的处理结果实现数据的插入。
{"statement": "INSERT INTO IOT_DATA.PUBLIC.RenewableEnergyData (id, city, model, regionID, type, ratedPower, timestamp, powerOutput, windSpeed, solarRadiation, rotationSpeed)\n VALUES (:1, :2, :3, :4, :5, :6, :7, :8, :9, :10, :11);","timeout": 60,"database": "IOT_DATA","bindings": {"1": { "type": "TEXT", "value": "${payload.id}" },"2": { "type": "TEXT", "value": "${payload.city}" },"3": { "type": "TEXT", "value": "${payload.model}" },"4": { "type": "TEXT", "value": "${payload.regionID}" },"5": { "type": "TEXT", "value": "${payload.type}" },"6": { "type": "FIXED", "value": "${payload.ratedPower}" },"7": { "type": "TEXT", "value": "${payload.timestamp}" },"8": { "type": "FIXED", "value": "${payload.powerOutput}" },"9": { "type": "FIXED", "value": "${payload.windSpeed}" },"10": { "type": "FIXED", "value": "${payload.solarRadiation}" },"11": { "type": "FIXED", "value": "${payload.rotationSpeed}" }}}
5. 其他参数留空,创建动作并保存规则。
接下来,我们在 Snowflake 中进行配置,实现数据的分析和可视化展示。
Snowflake 数据分析与可视化
首先,我们检查数据是否成功写入到 Snowflake 中。
1. 登录 Snowflake 控制台,打开 Projects → Worksheets 页面,新建一个 SQL Worksheets;
2. 选中 IOT_DATA 数据库,输入以下 SQL 并执行
可以看到 RenewableenErgydata
表中数据条目数量不为 0。
select count(*) from iot_data.public.renewableenergydata

获取瞬时发电量:可以实时了解当前的发电情况,通过查询最后一次上报的数据来实现。例如,使用 SQL 查询获取最新的风力发电和太阳能发电数据,并将结果展示在图表中。这可以帮助您迅速掌握当前的发电状态,及时发现并处理异常情况。 获取历史发电量:可以分析过去一段时间的发电情况,通过查询并汇总历史数据来实现。例如,使用 SQL 查询过去一天、一周或一个月的发电数据,并生成相应的图表。这可以帮助您了解发电趋势,评估设备性能,并制定优化策略。

您还可以通过其他方式,例如 Snowflake AI/ML Studio,实现异常检测和数据分类,还可以对历史数据进行训练,自动处理发电区域、季节性数据,实现未来发电量趋势的预测。
结语


文章转载自EMQX,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




