针对 WSO2 消息代理的 MQTT Oauth2 身份验证

MQTT Oauth2 authentication against WSO2 Message broker

我想让 MQTT 代理通过 OAUTH2 服务器进行身份验证。

我正在使用 Paho 在 Python 3 中对 MQTT 客户端进行编码。

我已成功连接到 IS 以获取访问令牌。

我已经通过 MQTT 消息代理成功发送了 json 条消息。

我已经按照官方说明配置了 WSO2 Message Broker 和 Identity Server 一起工作。

当我尝试连接身份验证信息时,我无法正确发送要处理的访问令牌。

client = mqtt.Client("oauthmqttcli") 
client.connect(broker_address, port=1883) #connect to broker
client.username_pw_set(access_token)
client.publish("test","{'a':'b'}")

在日志中我看到了这个:

TID: [] [] [2018-09-21 10:56:33,019] ERROR {org.dna.mqtt.wso2.MqttLogExceptionHandler} -  ValueEvent exception occurred on disruptor. {org.dna.mqtt.wso2.MqttLogExceptionHandler}
java.lang.NullPointerException
        at org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor.processPublish(ProtocolProcessor.java:390)
        at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:171)
        at org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging.onEvent(SimpleMessaging.java:47)
        at com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
TID: [] [] [2018-09-21 10:56:33,026] ERROR {org.dna.mqtt.moquette.server.netty.metrics.MessageMetricsHandler} -  Connection reset by peer {org.dna.mqtt.moquette.server.netty.metrics.MessageMetricsHandler}
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:446)
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:116)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:748)
TID: [] [] [2018-09-21 10:56:33,027]  INFO {org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor} -  Lost connection with client oauthmqttcli {org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor}
TID: [] [] [2018-09-21 10:56:33,027]  WARN {org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor} -  MQTTAuthorizationSubject for client ID oauthmqttcli is not removed since the entry does not exist {org.dna.mqtt.moquette.messaging.spi.impl.ProtocolProcessor}

如何将访问令牌发送到要访问的服务器? Message Broker 在哪里搜索访问令牌?

终于找到了。访问令牌必须作为用户名和空白密码发送。

client = mqtt.Client("dev7") 
client.username_pw_set(username=access_token,password="")
client.connect(broker_address, port=broker_port) #connect to broker
print("Connected")
client.publish("test","{'c':'Payload!'}")
print("Sending message")