Azure IoT 中心设备消息,消息正文上的路由过滤器不起作用
Azure IoTHub DeviceMessage, route filter on message body not working
我正在按照 https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-monitoring-notifications-with-azure-logic-apps 中所述的说明使用 Azure 逻辑应用程序创建 IoT 远程监控和通知。
遥测模拟器(Java 使用 com.microsoft.azure.sdk.iot -> iot-device-client -> 1.14.0 版本)
public class SimulatedDevice {
// The device connection string to authenticate the device with your IoT hub.
// Using the Azure CLI:
// az iot hub device-identity show-connection-string --hub-name {YourIoTHubName}
// --device-id MyJavaDevice --output table
private static String connString = "#ConnectionString";
private static IotHubClientProtocol protocol = IotHubClientProtocol.AMQPS;
private static DeviceClient client;
// Specify the telemetry to send to your IoT hub.
private static class TelemetryDataPoint {
public double temperature;
public double humidity;
public String isTrue = "true";
// Serialize object to JSON format.
public String serialize() {
Gson gson = new Gson();
return gson.toJson(this);
}
}
// Print the acknowledgement received from IoT Hub for the telemetry message
// sent.
private static class EventCallback implements IotHubEventCallback {
public void execute(IotHubStatusCode status, Object context) {
System.out.println("IoT Hub responded to message with status: " + status.name());
if (context != null) {
synchronized (context) {
context.notify();
}
}
}
}
private static class MessageSender implements Runnable {
public void run() {
try {
// Initialize the simulated telemetry.
double minTemperature = 20;
double minHumidity = 60;
Random rand = new Random();
int i = 0;
while (i < 100000) {
// Simulate telemetry.
double currentTemperature = minTemperature + rand.nextDouble() * 15;
double currentHumidity = minHumidity + rand.nextDouble() * 20;
TelemetryDataPoint telemetryDataPoint = new TelemetryDataPoint();
telemetryDataPoint.temperature = currentTemperature;
telemetryDataPoint.humidity = currentHumidity;
// Add the telemetry to the message body as JSON.
String msgStr = telemetryDataPoint.serialize();
byte[] bodyClone = msgStr.getBytes(StandardCharsets.UTF_8);
Message msg = new Message(bodyClone);
// Add a custom application property to the message.
// An IoT hub can filter on these properties without access to the message body.
msg.setProperty("temperatureAlert", (currentTemperature > 30) ? "true" : "false");
msg.setMessageType(MessageType.DEVICE_TELEMETRY);
System.out.println("Sending message string: " + msgStr);
System.out.println("Sending message: " + msg);
Object lockobj = new Object();
// Send the message.
EventCallback callback = new EventCallback();
client.sendEventAsync(msg, callback, lockobj);
synchronized (lockobj) {
lockobj.wait();
}
Thread.sleep(1000);
}
} catch (InterruptedException e) {
System.out.println("Finished.");
}
}
}
public static void main(String[] args) throws IOException, URISyntaxException {
// Connect to the IoT hub.
client = new DeviceClient(connString, protocol);
client.open();
// Create new thread and start sending messages
MessageSender sender = new MessageSender();
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.execute(sender);
// Stop the application.
System.out.println("Press ENTER to exit.");
System.in.read();
executor.shutdownNow();
client.closeNow();
}
}
对于 QueryString - temperatureAlert = "true" - 一切正常。但是对于查询字符串 - $body.temperature > 30 - 然后我没有收到任何消息。
为了让 IoT Hub 知道消息是否可以根据其 body 内容进行路由,消息必须包含特定的 headers 来描述其 [=33= 的内容和编码].特别是,消息必须同时具有这两个 headers 才能在消息 body 上进行路由:
- "application/json"
的内容类型
- 内容编码必须匹配以下之一:
- "utf-8"
- "utf-16"
- "utf-32"
此处在语法下方添加两行以创建消息 object:
msg.setContentEncoding("utf-8");
msg.setContentType("application/json");
我正在按照 https://docs.microsoft.com/en-us/azure/iot-hub/iot-hub-monitoring-notifications-with-azure-logic-apps 中所述的说明使用 Azure 逻辑应用程序创建 IoT 远程监控和通知。
遥测模拟器(Java 使用 com.microsoft.azure.sdk.iot -> iot-device-client -> 1.14.0 版本)
public class SimulatedDevice {
// The device connection string to authenticate the device with your IoT hub.
// Using the Azure CLI:
// az iot hub device-identity show-connection-string --hub-name {YourIoTHubName}
// --device-id MyJavaDevice --output table
private static String connString = "#ConnectionString";
private static IotHubClientProtocol protocol = IotHubClientProtocol.AMQPS;
private static DeviceClient client;
// Specify the telemetry to send to your IoT hub.
private static class TelemetryDataPoint {
public double temperature;
public double humidity;
public String isTrue = "true";
// Serialize object to JSON format.
public String serialize() {
Gson gson = new Gson();
return gson.toJson(this);
}
}
// Print the acknowledgement received from IoT Hub for the telemetry message
// sent.
private static class EventCallback implements IotHubEventCallback {
public void execute(IotHubStatusCode status, Object context) {
System.out.println("IoT Hub responded to message with status: " + status.name());
if (context != null) {
synchronized (context) {
context.notify();
}
}
}
}
private static class MessageSender implements Runnable {
public void run() {
try {
// Initialize the simulated telemetry.
double minTemperature = 20;
double minHumidity = 60;
Random rand = new Random();
int i = 0;
while (i < 100000) {
// Simulate telemetry.
double currentTemperature = minTemperature + rand.nextDouble() * 15;
double currentHumidity = minHumidity + rand.nextDouble() * 20;
TelemetryDataPoint telemetryDataPoint = new TelemetryDataPoint();
telemetryDataPoint.temperature = currentTemperature;
telemetryDataPoint.humidity = currentHumidity;
// Add the telemetry to the message body as JSON.
String msgStr = telemetryDataPoint.serialize();
byte[] bodyClone = msgStr.getBytes(StandardCharsets.UTF_8);
Message msg = new Message(bodyClone);
// Add a custom application property to the message.
// An IoT hub can filter on these properties without access to the message body.
msg.setProperty("temperatureAlert", (currentTemperature > 30) ? "true" : "false");
msg.setMessageType(MessageType.DEVICE_TELEMETRY);
System.out.println("Sending message string: " + msgStr);
System.out.println("Sending message: " + msg);
Object lockobj = new Object();
// Send the message.
EventCallback callback = new EventCallback();
client.sendEventAsync(msg, callback, lockobj);
synchronized (lockobj) {
lockobj.wait();
}
Thread.sleep(1000);
}
} catch (InterruptedException e) {
System.out.println("Finished.");
}
}
}
public static void main(String[] args) throws IOException, URISyntaxException {
// Connect to the IoT hub.
client = new DeviceClient(connString, protocol);
client.open();
// Create new thread and start sending messages
MessageSender sender = new MessageSender();
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.execute(sender);
// Stop the application.
System.out.println("Press ENTER to exit.");
System.in.read();
executor.shutdownNow();
client.closeNow();
}
}
对于 QueryString - temperatureAlert = "true" - 一切正常。但是对于查询字符串 - $body.temperature > 30 - 然后我没有收到任何消息。
为了让 IoT Hub 知道消息是否可以根据其 body 内容进行路由,消息必须包含特定的 headers 来描述其 [=33= 的内容和编码].特别是,消息必须同时具有这两个 headers 才能在消息 body 上进行路由:
- "application/json" 的内容类型
- 内容编码必须匹配以下之一:
- "utf-8"
- "utf-16"
- "utf-32"
此处在语法下方添加两行以创建消息 object:
msg.setContentEncoding("utf-8");
msg.setContentType("application/json");