使用流分析的 Azure IoT 中心云到设备消息
Azure IoT hub cloud to device message using Stream Analytics
我正在通过如下路径发送数据。
Android -> IoT Hub -> Stream Analytics -> SQL
我在流分析的查询中调用机器学习函数。现在,我想 return 机器学习的结果到 Android 设备。为了在 Android 设备上接收云到设备的消息,我已经使用 Device Explorer 完成并进行了测试。我在看官方教程,1, 2 but still no clue on how to send cloud to device message using the Stream Analytics. He说使用service bus和function app但是没说详细。我是 Azure 的新手。希望有人能给我一些指导或任何 link 以便我可以更多地了解如何实施它。提前致谢。
您可以使用 Azure 函数(预览版) 输出 ASA 作业以通过Azure IoT 中心面向服务的端点。
下面是这个函数的一个例子。
run.csx:
#r "Newtonsoft.Json"
using System.Configuration;
using System.Text;
using System.Net;
using Microsoft.Azure.Devices;
using Newtonsoft.Json;
// create proxy
static Microsoft.Azure.Devices.ServiceClient client = ServiceClient.CreateFromConnectionString(ConfigurationManager.AppSettings["myIoTHub"]);
public static async Task<HttpResponseMessage> Run(string input, HttpRequestMessage req, TraceWriter log)
{
log.Info($"ASA Job: {input}");
var data = JsonConvert.DeserializeAnonymousType(input, new[] { new { xyz = "", IoTHub = new { ConnectionDeviceId = ""}}});
if(!string.IsNullOrEmpty(data[0]?.IoTHub?.ConnectionDeviceId))
{
string deviceId = data[0].IoTHub.ConnectionDeviceId;
log.Info($"Device: {deviceId}");
// cloud-to-device message
var msg = JsonConvert.SerializeObject(new { temp = 20.5 });
var c2dmsg = new Microsoft.Azure.Devices.Message(Encoding.ASCII.GetBytes(msg));
// send AMQP message
await client.SendAsync(deviceId, c2dmsg);
}
return req.CreateResponse(HttpStatusCode.NoContent);
}
function.json:
{
"bindings": [
{
"authLevel": "function",
"name": "input",
"type": "httpTrigger",
"direction": "in"
},
{
"name": "$return",
"type": "http",
"direction": "out"
}
],
"disabled": false
}
project.json:
{
"frameworks": {
"net46":{
"dependencies": {
"Microsoft.Azure.Devices": "1.3.2"
}
}
}
}
附录 A
为了测试目的,进行以下步骤:
- 在您的 Azure Function App 中添加设置 myIoTHub
Azure IoT 中心的连接字符串。
- 创建 HttpTrigger 函数,我们将其命名为 HttpASA_1
- 使用以上内容更新 run.csx、function.json 和 project.json 文件
- 将以下示例用于 测试 请求正文:
[
{
"time": "2017-11-26T12:52:23.4292501Z",
"counter": 57,
"windSpeed": 8.7358,
"temperature": 16.63,
"humidity": 79.42,
"EventProcessedUtcTime": "2017-11-26T12:52:21.3568252Z",
"PartitionId": 2,
"EventEnqueuedUtcTime": "2017-11-26T12:52:22.435Z",
"IoTHub":{
"MessageId":空,
"CorrelationId":空,
"ConnectionDeviceId": "设备 1",
"ConnectionDeviceGenerationId": "636189812948967054",
"EnqueuedTime": "2017-11-26T12:52:21.562Z",
"StreamId": 空
}
}
]
为您实际的设备 ID 更改值 Device1。
- 现在,AF已经可以测试了。按按钮 运行 到 运行 示例并查看日志进度。
- 您应该会在您的设备上看到 AF 发送的 C2D 消息,或者您可以下载一个小型测试器 Azure IoT Hub Tester 来模拟您的 MQTT 设备。以下屏幕片段显示了该测试仪:
- 现在,在这一步中,我们可以转到 ASA 作业来调用此 HttpASA_1 函数。请注意,ASA 作业仅调用 HttpTrigger 函数。以下屏幕片段显示了为我们的 Azure 函数添加输出:
当您在 导入选项 组合框中选择订阅时,您应该会在组合框中看到此功能。完成后,按 保存 按钮并查看屏幕上的通知消息。 ASA 将向您的 AF 发送验证消息,其响应状态应为 20x 代码。
- 最后,您可以转到 查询 为您的 AF 生成输出。以下屏幕显示了所有遥测数据到 AF 的简单输出:
注意,inpsim 是我的 iothub 输入。
ASA 以下列格式输出 HttpTrigger 函数的负载,请参阅我的示例:
[
{
"time": "2017-11-26T12:52:23.4292501Z",
"counter": 57,
"windSpeed": 8.7358,
"temperature": 16.63,
"humidity": 79.42,
"EventProcessedUtcTime": "2017-11-26T12:52:21.3568252Z",
"PartitionId": 2,
"EventEnqueuedUtcTime": "2017-11-26T12:52:22.435Z",
"IoTHub": {
"MessageId": null,
"CorrelationId": null,
"ConnectionDeviceId": "Device1",
"ConnectionDeviceGenerationId": "636189812948967054",
"EnqueuedTime": "2017-11-26T12:52:21.562Z",
"StreamId": null
}
}
]
请注意,我的遥测数据 (*) 是 counter、temperature、humidity和时间戳 time,因此其他属性由 ASA 作业隐式创建。根据查询,您可以为 C2D 消息创建任何业务属性。
我正在通过如下路径发送数据。
Android -> IoT Hub -> Stream Analytics -> SQL
我在流分析的查询中调用机器学习函数。现在,我想 return 机器学习的结果到 Android 设备。为了在 Android 设备上接收云到设备的消息,我已经使用 Device Explorer 完成并进行了测试。我在看官方教程,1, 2 but still no clue on how to send cloud to device message using the Stream Analytics. He说使用service bus和function app但是没说详细。我是 Azure 的新手。希望有人能给我一些指导或任何 link 以便我可以更多地了解如何实施它。提前致谢。
您可以使用 Azure 函数(预览版) 输出 ASA 作业以通过Azure IoT 中心面向服务的端点。
下面是这个函数的一个例子。
run.csx:
#r "Newtonsoft.Json"
using System.Configuration;
using System.Text;
using System.Net;
using Microsoft.Azure.Devices;
using Newtonsoft.Json;
// create proxy
static Microsoft.Azure.Devices.ServiceClient client = ServiceClient.CreateFromConnectionString(ConfigurationManager.AppSettings["myIoTHub"]);
public static async Task<HttpResponseMessage> Run(string input, HttpRequestMessage req, TraceWriter log)
{
log.Info($"ASA Job: {input}");
var data = JsonConvert.DeserializeAnonymousType(input, new[] { new { xyz = "", IoTHub = new { ConnectionDeviceId = ""}}});
if(!string.IsNullOrEmpty(data[0]?.IoTHub?.ConnectionDeviceId))
{
string deviceId = data[0].IoTHub.ConnectionDeviceId;
log.Info($"Device: {deviceId}");
// cloud-to-device message
var msg = JsonConvert.SerializeObject(new { temp = 20.5 });
var c2dmsg = new Microsoft.Azure.Devices.Message(Encoding.ASCII.GetBytes(msg));
// send AMQP message
await client.SendAsync(deviceId, c2dmsg);
}
return req.CreateResponse(HttpStatusCode.NoContent);
}
function.json:
{
"bindings": [
{
"authLevel": "function",
"name": "input",
"type": "httpTrigger",
"direction": "in"
},
{
"name": "$return",
"type": "http",
"direction": "out"
}
],
"disabled": false
}
project.json:
{
"frameworks": {
"net46":{
"dependencies": {
"Microsoft.Azure.Devices": "1.3.2"
}
}
}
}
附录 A
为了测试目的,进行以下步骤:
- 在您的 Azure Function App 中添加设置 myIoTHub Azure IoT 中心的连接字符串。
- 创建 HttpTrigger 函数,我们将其命名为 HttpASA_1
- 使用以上内容更新 run.csx、function.json 和 project.json 文件
- 将以下示例用于 测试 请求正文:
[ { "time": "2017-11-26T12:52:23.4292501Z", "counter": 57, "windSpeed": 8.7358, "temperature": 16.63, "humidity": 79.42, "EventProcessedUtcTime": "2017-11-26T12:52:21.3568252Z", "PartitionId": 2, "EventEnqueuedUtcTime": "2017-11-26T12:52:22.435Z", "IoTHub":{ "MessageId":空, "CorrelationId":空, "ConnectionDeviceId": "设备 1", "ConnectionDeviceGenerationId": "636189812948967054", "EnqueuedTime": "2017-11-26T12:52:21.562Z", "StreamId": 空 } } ]
为您实际的设备 ID 更改值 Device1。
- 现在,AF已经可以测试了。按按钮 运行 到 运行 示例并查看日志进度。
- 您应该会在您的设备上看到 AF 发送的 C2D 消息,或者您可以下载一个小型测试器 Azure IoT Hub Tester 来模拟您的 MQTT 设备。以下屏幕片段显示了该测试仪:
- 现在,在这一步中,我们可以转到 ASA 作业来调用此 HttpASA_1 函数。请注意,ASA 作业仅调用 HttpTrigger 函数。以下屏幕片段显示了为我们的 Azure 函数添加输出:
当您在 导入选项 组合框中选择订阅时,您应该会在组合框中看到此功能。完成后,按 保存 按钮并查看屏幕上的通知消息。 ASA 将向您的 AF 发送验证消息,其响应状态应为 20x 代码。
- 最后,您可以转到 查询 为您的 AF 生成输出。以下屏幕显示了所有遥测数据到 AF 的简单输出:
注意,inpsim 是我的 iothub 输入。
ASA 以下列格式输出 HttpTrigger 函数的负载,请参阅我的示例:
[
{
"time": "2017-11-26T12:52:23.4292501Z",
"counter": 57,
"windSpeed": 8.7358,
"temperature": 16.63,
"humidity": 79.42,
"EventProcessedUtcTime": "2017-11-26T12:52:21.3568252Z",
"PartitionId": 2,
"EventEnqueuedUtcTime": "2017-11-26T12:52:22.435Z",
"IoTHub": {
"MessageId": null,
"CorrelationId": null,
"ConnectionDeviceId": "Device1",
"ConnectionDeviceGenerationId": "636189812948967054",
"EnqueuedTime": "2017-11-26T12:52:21.562Z",
"StreamId": null
}
}
]
请注意,我的遥测数据 (*) 是 counter、temperature、humidity和时间戳 time,因此其他属性由 ASA 作业隐式创建。根据查询,您可以为 C2D 消息创建任何业务属性。