如何在 java 中使用 mqtt 从云 (iothub) 检索数据

How to retrieve data from cloud (iothub)using mqtt in java

我是 IoTHub 的新手。我已使用 python.The 协议成功将消息发送到物联网集线器 (D2C) 我们使用的是 mqtt.We 正在尝试使用 java 从云(物联网集线器)检索数据,但无法找到找到一种从云端接收消息的正确方法。我怀疑我们是否可以直接从物联网中心读取消息,或者我们需要将传入消息重定向到事件中心以检索消息。

我也尝试在 java 中同时从 iothub 读取消息,同时将数据发送到云端,但我收到如下错误..(与服务器失去连接。重新连接 0 次。)

我用这段代码从iothub读取数据,

import com.microsoft.azure.sdk.iot.device.DeviceClient;
import com.microsoft.azure.sdk.iot.device.IotHubMessageResult;
import com.microsoft.azure.sdk.iot.device.Message;
import com.microsoft.azure.sdk.iot.device.MessageCallback;
import com.microsoft.azure.sdk.iot.device.IotHubClientProtocol;
import com.microsoft.azure.sdk.iot.service.sdk.IotHubServiceClientProtocol;
import java.io.IOException;
import java.net.URISyntaxException;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Kafkareception {

    public static void main(String[] args) throws IOException {
        try {
            String connString = "HostName=";
            IotHubClientProtocol protocol = IotHubClientProtocol.MQTT;
            DeviceClient client = new DeviceClient(connString, protocol);

            MessageCallback callback = new AppMessageCallback();
            client.setMessageCallback(callback, null);
            client.open();
        } catch (URISyntaxException ex) {
            Logger.getLogger(Kafkareception.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    private static class AppMessageCallback implements MessageCallback {

        public IotHubMessageResult execute(Message msg, Object context) {
            System.out.println(new String(msg.getBytes(), Message.DEFAULT_IOTHUB_MESSAGE_CHARSET) + "Received message from hub: ");

            return IotHubMessageResult.COMPLETE;
        }
    }
}

根据您提供的信息,您可能尝试使用 DeviceClient 设置一台设备到 Azure IoT 中心的两个活动连接:一个正在发送 D2C 消息,一个是“从 iothub 读取数据”。你得到这个错误可能是因为:

IoT Hub only supports one active MQTT connection per device. Any new MQTT connection on behalf of the same device ID causes IoT Hub to drop the existing connection.

参考:Communicate with your IoT hub using the MQTT protocol.

如果你想接收发送到 Azure IoT 中心的 D2C 消息,你可以 use Event Hub-compatible endpoint(Java)。无需自行将传入消息重定向到事件中心。

IoT Hub exposes the messages/events built-in endpoint for your back-end services to read the device-to-cloud messages received by your hub. This endpoint is Event Hub-compatible, which enables you to use any of the mechanisms the Event Hubs service supports for reading messages.

参考:Understand Azure IoT Hub messaging and IoT Hub endpoints.

我从iothub.We读取了数据可以使用代码

import java.io.IOException;
import com.microsoft.azure.eventhubs.*;
import com.microsoft.azure.servicebus.*;

import java.nio.charset.Charset;
import java.time.*;
import java.util.function.*;

public class Datafetch {

    public static void main(String[] args) throws IOException {
        EventHubClient client0 = receiveMessages("0");
        EventHubClient client1 = receiveMessages("1");
        System.out.println("Press ENTER to exit.");
        System.in.read();
        try {
            client0.closeSync();
            client1.closeSync();
            System.exit(0);
        } catch (ServiceBusException sbe) {
            System.exit(1);
        }
    }

private static EventHubClient receiveMessages(final String partitionId) {

        String connStr = "Endpoint={youreventhubcompatibleendpoint};EntityPath={youreventhubcompatiblename};SharedAccessKeyName=iothubowner;SharedAccessKey={youriothubkey}";
        EventHubClient client = null;
        try {
            client = EventHubClient.createFromConnectionStringSync(connStr);
        } catch (Exception e) {
            System.out.println("Failed to create client: " + e.getMessage());
            System.exit(1);
        }
        try {
            client.createReceiver(
                    EventHubClient.DEFAULT_CONSUMER_GROUP_NAME,
                    partitionId,
                    Instant.now()).thenAccept(new Consumer<PartitionReceiver>() {
                        public void accept(PartitionReceiver receiver) {
                            System.out.println("** Created receiver on partition " + partitionId);
                            try {
                                while (true) {
                                    Iterable<EventData> receivedEvents = receiver.receive(100).get();
                                    System.out.println(receivedEvents);
                                    int batchSize = 0;
                                    if (receivedEvents != null) {
                                        for (EventData receivedEvent : receivedEvents) {
                                            System.out.println(String.format("Offset: %s, SeqNo: %s, EnqueueTime: %s",
                                                    receivedEvent.getSystemProperties().getOffset(),
                                                    receivedEvent.getSystemProperties().getSequenceNumber(),
                                                    receivedEvent.getSystemProperties().getEnqueuedTime()));
                                            System.out.println(String.format("| Device ID: %s", receivedEvent.getSystemProperties().getClass()));
                                            System.out.println(String.format("| Message Payload: %s", new String(receivedEvent.getBody(),
                                                    Charset.defaultCharset())));
                                            batchSize++;
                                        }
                                    }
                                    System.out.println(String.format("Partition: %s, ReceivedBatch Size: %s", partitionId, batchSize));
                                }
                            } catch (Exception e) {
                                System.out.println("Failed to receive messages: " + e.getMessage());
                            }
                        }
                    });
        } catch (Exception e) {
            System.out.println("Failed to create receiver: " + e.getMessage());
        }
        return client;
    }
}