将 Eclipse Ditto 连接到 Apache Kafka
Connecting Eclipse Ditto to Apache Kafka
我按照本文档中解释的说明将 Apache Kafka 连接到 Eclipse Ditto。
https://www.eclipse.org/ditto/connectivity-protocol-bindings-kafka2.html
我不确定以下内容。
1) ["ditto:outbound-auth-subject", "..."] 在授权上下文中。
2) "address": "topic/key"
请让我知道他们!提前谢谢你!
编辑:
请找出我用来连接Ditto和Kafka的命令
curl -X POST -i -u devops:foobar -H 'Content-Type: application/json' -d '{
"targetActorSelection": "/system/sharding/connection",
"headers": {
"aggregate": false
},
"piggybackCommand": {
"type": "connectivity.commands:createConnection",
"connection": {
"id": "MyKafkaConnection1",
"connectionType": "kafka",
"connectionStatus": "open",
"uri": "tcp://radsah:password@localhost:9092",
"specificConfig": {
"bootstrapServers": "10.196.2.218:9092",
"saslMechanism": "plain"
},
"failoverEnabled": true,
"targets": [
{
"address": "digital-twins",
"topics": [
"_/_/things/twin/events",
"_/_/things/live/messages"
],
"authorizationContext": ["ditto:outbound-auth-subject"]
}],
"mappingContext": {
"mappingEngine": "JavaScript",
"options": {
"incomingScript": "function mapToDittoProtocolMsg(\n headers,\n textPayload,\n bytePayload,\n contentType\n) {\n\n if (contentType !== \"application/json\") {\n return null;\n }\n\n var jsonData = JSON.parse(textPayload);\n var temperature = jsonData.temp;\n var humidity = jsonData.hum;\n \n var path;\n var value;\n if (temperature != null && humidity != null) {\n path = \"/features\";\n value = {\n temperature: {\n properties: {\n value: temperature\n }\n },\n humidity: {\n properties: {\n value: humidity\n }\n }\n };\n } else if (temperature != null) {\n path = \"/features/temperature/properties/value\";\n value = temperature;\n } else if (humidity != null) {\n path = \"/features/humidity/properties/value\";\n value = humidity;\n }\n \n if (!path || !value) {\n return null;\n }\n\n return Ditto.buildDittoProtocolMsg(\n \"org.eclipse.ditto\",\n headers[\"device_id\"],\n \"things\",\n \"twin\",\n \"commands\",\n \"modify\",\n path,\n headers,\n value\n );\n}"
}
}
}
}
}' http://localhost:8080/devops/piggyback/connectivity?timeout=8000
我已经使用 Hono 注册了一个设备,我正在将数据发送到 Ditto。同上成功接收数据。但是我想把这个接收到的数据发送给Kafka。
Kafka 和Ditto 成功建立连接。但是我没有收到 kafka-consumer "digital-twins"。我错过了什么吗?
使用策略命令编辑:
curl -X PUT 'http://localhost:8080/api/2/policies/org.eclipse.ditto:5100' -u 'ditto:ditto' -H 'Content-Type: application/json' -d '{
"entries": {
"owner": {
"subjects": {
"nginx:ditto": {
"type": "nginx basic auth user"
}
},
"resources": {
"thing:/": {
"grant": [
"READ","WRITE"
],
"revoke": []
},
"policy:/": {
"grant": [
"READ","WRITE"
],
"revoke": []
},
"message:/": {
"grant": [
"READ","WRITE"
],
"revoke": []
}
}
}
}
}
关于授权上下文你可以看看the authorization section in our connections documentation。它必须包含在您的事物的策略或 ACL 中定义的主题。
例如:
事物 "foo:bar" 的策略已定义主题 "somePrefix:someValue" 的整个事物的读取权限。
{
"policyId": "foo:bar",
"entries": {
... //Maybe more entries
"MyKafkaConnection": {
"subjects": {
"somePrefix:someValue": {
"type": "my description for this subject"
}
},
"resources": {
"thing:/": {
"grant": [
"READ"
],
"revoke": []
},
"message:/": {
"grant": [
"READ"
],
"revoke": []
}
}
}
}
}
在您提到的示例中,与 "foo:bar" 相关的事件将通过 kafka 连接发布在您在地址字段中指定的主题上。
我按照本文档中解释的说明将 Apache Kafka 连接到 Eclipse Ditto。
https://www.eclipse.org/ditto/connectivity-protocol-bindings-kafka2.html
我不确定以下内容。
1) ["ditto:outbound-auth-subject", "..."] 在授权上下文中。
2) "address": "topic/key"
请让我知道他们!提前谢谢你!
编辑:
请找出我用来连接Ditto和Kafka的命令
curl -X POST -i -u devops:foobar -H 'Content-Type: application/json' -d '{
"targetActorSelection": "/system/sharding/connection",
"headers": {
"aggregate": false
},
"piggybackCommand": {
"type": "connectivity.commands:createConnection",
"connection": {
"id": "MyKafkaConnection1",
"connectionType": "kafka",
"connectionStatus": "open",
"uri": "tcp://radsah:password@localhost:9092",
"specificConfig": {
"bootstrapServers": "10.196.2.218:9092",
"saslMechanism": "plain"
},
"failoverEnabled": true,
"targets": [
{
"address": "digital-twins",
"topics": [
"_/_/things/twin/events",
"_/_/things/live/messages"
],
"authorizationContext": ["ditto:outbound-auth-subject"]
}],
"mappingContext": {
"mappingEngine": "JavaScript",
"options": {
"incomingScript": "function mapToDittoProtocolMsg(\n headers,\n textPayload,\n bytePayload,\n contentType\n) {\n\n if (contentType !== \"application/json\") {\n return null;\n }\n\n var jsonData = JSON.parse(textPayload);\n var temperature = jsonData.temp;\n var humidity = jsonData.hum;\n \n var path;\n var value;\n if (temperature != null && humidity != null) {\n path = \"/features\";\n value = {\n temperature: {\n properties: {\n value: temperature\n }\n },\n humidity: {\n properties: {\n value: humidity\n }\n }\n };\n } else if (temperature != null) {\n path = \"/features/temperature/properties/value\";\n value = temperature;\n } else if (humidity != null) {\n path = \"/features/humidity/properties/value\";\n value = humidity;\n }\n \n if (!path || !value) {\n return null;\n }\n\n return Ditto.buildDittoProtocolMsg(\n \"org.eclipse.ditto\",\n headers[\"device_id\"],\n \"things\",\n \"twin\",\n \"commands\",\n \"modify\",\n path,\n headers,\n value\n );\n}"
}
}
}
}
}' http://localhost:8080/devops/piggyback/connectivity?timeout=8000
我已经使用 Hono 注册了一个设备,我正在将数据发送到 Ditto。同上成功接收数据。但是我想把这个接收到的数据发送给Kafka。
Kafka 和Ditto 成功建立连接。但是我没有收到 kafka-consumer "digital-twins"。我错过了什么吗?
使用策略命令编辑:
curl -X PUT 'http://localhost:8080/api/2/policies/org.eclipse.ditto:5100' -u 'ditto:ditto' -H 'Content-Type: application/json' -d '{
"entries": {
"owner": {
"subjects": {
"nginx:ditto": {
"type": "nginx basic auth user"
}
},
"resources": {
"thing:/": {
"grant": [
"READ","WRITE"
],
"revoke": []
},
"policy:/": {
"grant": [
"READ","WRITE"
],
"revoke": []
},
"message:/": {
"grant": [
"READ","WRITE"
],
"revoke": []
}
}
}
}
}
关于授权上下文你可以看看the authorization section in our connections documentation。它必须包含在您的事物的策略或 ACL 中定义的主题。
例如:
事物 "foo:bar" 的策略已定义主题 "somePrefix:someValue" 的整个事物的读取权限。
{
"policyId": "foo:bar",
"entries": {
... //Maybe more entries
"MyKafkaConnection": {
"subjects": {
"somePrefix:someValue": {
"type": "my description for this subject"
}
},
"resources": {
"thing:/": {
"grant": [
"READ"
],
"revoke": []
},
"message:/": {
"grant": [
"READ"
],
"revoke": []
}
}
}
}
}
在您提到的示例中,与 "foo:bar" 相关的事件将通过 kafka 连接发布在您在地址字段中指定的主题上。