如何在 java 中使用 mqtt 从云 (iothub) 检索数据
How to retrieve data from cloud (iothub)using mqtt in java
我是 IoTHub 的新手。我已使用 python.The 协议成功将消息发送到物联网集线器 (D2C) 我们使用的是 mqtt.We 正在尝试使用 java 从云(物联网集线器)检索数据,但无法找到找到一种从云端接收消息的正确方法。我怀疑我们是否可以直接从物联网中心读取消息,或者我们需要将传入消息重定向到事件中心以检索消息。
我也尝试在 java 中同时从 iothub 读取消息,同时将数据发送到云端,但我收到如下错误..(与服务器失去连接。重新连接 0 次。)
我用这段代码从iothub读取数据,
import com.microsoft.azure.sdk.iot.device.DeviceClient;
import com.microsoft.azure.sdk.iot.device.IotHubMessageResult;
import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.MessageCallback;
import com.microsoft.azure.sdk.iot.device.IotHubClientProtocol;
import com.microsoft.azure.sdk.iot.service.sdk.IotHubServiceClientProtocol;
import java.io.IOException;
import java.net.URISyntaxException;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Kafkareception {
public static void main(String[] args) throws IOException {
try {
String connString = "HostName=";
IotHubClientProtocol protocol = IotHubClientProtocol.MQTT;
DeviceClient client = new DeviceClient(connString, protocol);
MessageCallback callback = new AppMessageCallback();
client.setMessageCallback(callback, null);
client.open();
} catch (URISyntaxException ex) {
Logger.getLogger(Kafkareception.class.getName()).log(Level.SEVERE, null, ex);
}
}
private static class AppMessageCallback implements MessageCallback {
public IotHubMessageResult execute(Message msg, Object context) {
System.out.println(new String(msg.getBytes(), Message.DEFAULT_IOTHUB_MESSAGE_CHARSET) + "Received message from hub: ");
return IotHubMessageResult.COMPLETE;
}
}
}
根据您提供的信息,您可能尝试使用 DeviceClient
设置一台设备到 Azure IoT 中心的两个活动连接:一个正在发送 D2C 消息,一个是“从 iothub 读取数据”。你得到这个错误可能是因为:
IoT Hub only supports one active MQTT connection per device. Any new
MQTT connection on behalf of the same device ID causes IoT Hub to drop
the existing connection.
参考:Communicate with your IoT hub using the MQTT protocol.
如果你想接收发送到 Azure IoT 中心的 D2C 消息,你可以 use Event Hub-compatible endpoint(Java)。无需自行将传入消息重定向到事件中心。
IoT Hub exposes the messages/events built-in endpoint for your
back-end services to read the device-to-cloud messages received by
your hub. This endpoint is Event Hub-compatible, which enables you to
use any of the mechanisms the Event Hubs service supports for reading
messages.
参考:Understand Azure IoT Hub messaging and IoT Hub endpoints.
我从iothub.We读取了数据可以使用代码
import java.io.IOException;
import com.microsoft.azure.eventhubs.*;
import com.microsoft.azure.servicebus.*;
import java.nio.charset.Charset;
import java.time.*;
import java.util.function.*;
public class Datafetch {
public static void main(String[] args) throws IOException {
EventHubClient client0 = receiveMessages("0");
EventHubClient client1 = receiveMessages("1");
System.out.println("Press ENTER to exit.");
System.in.read();
try {
client0.closeSync();
client1.closeSync();
System.exit(0);
} catch (ServiceBusException sbe) {
System.exit(1);
}
}
private static EventHubClient receiveMessages(final String partitionId) {
String connStr = "Endpoint={youreventhubcompatibleendpoint};EntityPath={youreventhubcompatiblename};SharedAccessKeyName=iothubowner;SharedAccessKey={youriothubkey}";
EventHubClient client = null;
try {
client = EventHubClient.createFromConnectionStringSync(connStr);
} catch (Exception e) {
System.out.println("Failed to create client: " + e.getMessage());
System.exit(1);
}
try {
client.createReceiver(
EventHubClient.DEFAULT_CONSUMER_GROUP_NAME,
partitionId,
Instant.now()).thenAccept(new Consumer<PartitionReceiver>() {
public void accept(PartitionReceiver receiver) {
System.out.println("** Created receiver on partition " + partitionId);
try {
while (true) {
Iterable<EventData> receivedEvents = receiver.receive(100).get();
System.out.println(receivedEvents);
int batchSize = 0;
if (receivedEvents != null) {
for (EventData receivedEvent : receivedEvents) {
System.out.println(String.format("Offset: %s, SeqNo: %s, EnqueueTime: %s",
receivedEvent.getSystemProperties().getOffset(),
receivedEvent.getSystemProperties().getSequenceNumber(),
receivedEvent.getSystemProperties().getEnqueuedTime()));
System.out.println(String.format("| Device ID: %s", receivedEvent.getSystemProperties().getClass()));
System.out.println(String.format("| Message Payload: %s", new String(receivedEvent.getBody(),
Charset.defaultCharset())));
batchSize++;
}
}
System.out.println(String.format("Partition: %s, ReceivedBatch Size: %s", partitionId, batchSize));
}
} catch (Exception e) {
System.out.println("Failed to receive messages: " + e.getMessage());
}
}
});
} catch (Exception e) {
System.out.println("Failed to create receiver: " + e.getMessage());
}
return client;
}
}
我是 IoTHub 的新手。我已使用 python.The 协议成功将消息发送到物联网集线器 (D2C) 我们使用的是 mqtt.We 正在尝试使用 java 从云(物联网集线器)检索数据,但无法找到找到一种从云端接收消息的正确方法。我怀疑我们是否可以直接从物联网中心读取消息,或者我们需要将传入消息重定向到事件中心以检索消息。
我也尝试在 java 中同时从 iothub 读取消息,同时将数据发送到云端,但我收到如下错误..(与服务器失去连接。重新连接 0 次。)
我用这段代码从iothub读取数据,
import com.microsoft.azure.sdk.iot.device.DeviceClient;
import com.microsoft.azure.sdk.iot.device.IotHubMessageResult;
import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.MessageCallback;
import com.microsoft.azure.sdk.iot.device.IotHubClientProtocol;
import com.microsoft.azure.sdk.iot.service.sdk.IotHubServiceClientProtocol;
import java.io.IOException;
import java.net.URISyntaxException;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Kafkareception {
public static void main(String[] args) throws IOException {
try {
String connString = "HostName=";
IotHubClientProtocol protocol = IotHubClientProtocol.MQTT;
DeviceClient client = new DeviceClient(connString, protocol);
MessageCallback callback = new AppMessageCallback();
client.setMessageCallback(callback, null);
client.open();
} catch (URISyntaxException ex) {
Logger.getLogger(Kafkareception.class.getName()).log(Level.SEVERE, null, ex);
}
}
private static class AppMessageCallback implements MessageCallback {
public IotHubMessageResult execute(Message msg, Object context) {
System.out.println(new String(msg.getBytes(), Message.DEFAULT_IOTHUB_MESSAGE_CHARSET) + "Received message from hub: ");
return IotHubMessageResult.COMPLETE;
}
}
}
根据您提供的信息,您可能尝试使用 DeviceClient
设置一台设备到 Azure IoT 中心的两个活动连接:一个正在发送 D2C 消息,一个是“从 iothub 读取数据”。你得到这个错误可能是因为:
IoT Hub only supports one active MQTT connection per device. Any new MQTT connection on behalf of the same device ID causes IoT Hub to drop the existing connection.
参考:Communicate with your IoT hub using the MQTT protocol.
如果你想接收发送到 Azure IoT 中心的 D2C 消息,你可以 use Event Hub-compatible endpoint(Java)。无需自行将传入消息重定向到事件中心。
IoT Hub exposes the messages/events built-in endpoint for your back-end services to read the device-to-cloud messages received by your hub. This endpoint is Event Hub-compatible, which enables you to use any of the mechanisms the Event Hubs service supports for reading messages.
参考:Understand Azure IoT Hub messaging and IoT Hub endpoints.
我从iothub.We读取了数据可以使用代码
import java.io.IOException;
import com.microsoft.azure.eventhubs.*;
import com.microsoft.azure.servicebus.*;
import java.nio.charset.Charset;
import java.time.*;
import java.util.function.*;
public class Datafetch {
public static void main(String[] args) throws IOException {
EventHubClient client0 = receiveMessages("0");
EventHubClient client1 = receiveMessages("1");
System.out.println("Press ENTER to exit.");
System.in.read();
try {
client0.closeSync();
client1.closeSync();
System.exit(0);
} catch (ServiceBusException sbe) {
System.exit(1);
}
}
private static EventHubClient receiveMessages(final String partitionId) {
String connStr = "Endpoint={youreventhubcompatibleendpoint};EntityPath={youreventhubcompatiblename};SharedAccessKeyName=iothubowner;SharedAccessKey={youriothubkey}";
EventHubClient client = null;
try {
client = EventHubClient.createFromConnectionStringSync(connStr);
} catch (Exception e) {
System.out.println("Failed to create client: " + e.getMessage());
System.exit(1);
}
try {
client.createReceiver(
EventHubClient.DEFAULT_CONSUMER_GROUP_NAME,
partitionId,
Instant.now()).thenAccept(new Consumer<PartitionReceiver>() {
public void accept(PartitionReceiver receiver) {
System.out.println("** Created receiver on partition " + partitionId);
try {
while (true) {
Iterable<EventData> receivedEvents = receiver.receive(100).get();
System.out.println(receivedEvents);
int batchSize = 0;
if (receivedEvents != null) {
for (EventData receivedEvent : receivedEvents) {
System.out.println(String.format("Offset: %s, SeqNo: %s, EnqueueTime: %s",
receivedEvent.getSystemProperties().getOffset(),
receivedEvent.getSystemProperties().getSequenceNumber(),
receivedEvent.getSystemProperties().getEnqueuedTime()));
System.out.println(String.format("| Device ID: %s", receivedEvent.getSystemProperties().getClass()));
System.out.println(String.format("| Message Payload: %s", new String(receivedEvent.getBody(),
Charset.defaultCharset())));
batchSize++;
}
}
System.out.println(String.format("Partition: %s, ReceivedBatch Size: %s", partitionId, batchSize));
}
} catch (Exception e) {
System.out.println("Failed to receive messages: " + e.getMessage());
}
}
});
} catch (Exception e) {
System.out.println("Failed to create receiver: " + e.getMessage());
}
return client;
}
}