融合的 kafka 到 s3 连接失败,线程 [KafkaBasedLog 工作线程中出现错误意外异常 -
confluent kafka to s3 connection failed with ERROR Unexpected exception in Thread[KafkaBasedLog Work Thread -
我在 EC2 上设置了从 kafka 读取并写入 S3 的融合 (4.0) 连接器。
单机测试顺利:
bin/connect-standalone etc/standalone/example-connect-worker.properties etc/standalone/example-connect-s3-sink.properties
但是分布式版本一直失败
[2018-01-30 21:26:05,860] ERROR Unexpected exception in Thread[KafkaBasedLog Work Thread - connect-configs,5,main] (org.apache.kafka.connect.util.KafkaBasedLog:334)
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1097)
at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:256)
at org.apache.kafka.connect.util.KafkaBasedLog.access0(KafkaBasedLog.java:69)
at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:327)
我只想首先使用连接器 class 等于 FileStreamSinkConnector
接收器配置文件如下:
name=local-file-sink
#connector.class=FileStreamSink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=test.sink.txt
topics=tests3
s3.bucket=tests3
s3.prefix=tests3
s3.endpoint=http://localhost:9090
s3.path_style=true
local.buffer.dir=/tmp/connect-system-test
非常感谢!
当您使用 ./bin/connect-distributed
启动分布式 Connect worker 时,您只能通过命令行提供 worker 的属性。
要通过将其配置发布到工作人员的 REST 端点来加载连接器,您可以使用 curl
或等效命令。
例如:
curl -X POST -H "Content-Type: application/json" --data @config.json http://localhost:8083/connectors
其中 config.json
是一个包含连接器属性的文件。
更多信息在这里:
https://docs.confluent.io/current/connect/managing.html#distributed-example
我在 EC2 上设置了从 kafka 读取并写入 S3 的融合 (4.0) 连接器。
单机测试顺利:
bin/connect-standalone etc/standalone/example-connect-worker.properties etc/standalone/example-connect-s3-sink.properties
但是分布式版本一直失败
[2018-01-30 21:26:05,860] ERROR Unexpected exception in Thread[KafkaBasedLog Work Thread - connect-configs,5,main] (org.apache.kafka.connect.util.KafkaBasedLog:334)
java.lang.IllegalStateException: Consumer is not subscribed to any topics or assigned any partitions
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1097)
at org.apache.kafka.connect.util.KafkaBasedLog.poll(KafkaBasedLog.java:256)
at org.apache.kafka.connect.util.KafkaBasedLog.access0(KafkaBasedLog.java:69)
at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:327)
我只想首先使用连接器 class 等于 FileStreamSinkConnector
接收器配置文件如下:
name=local-file-sink
#connector.class=FileStreamSink
connector.class=org.apache.kafka.connect.file.FileStreamSinkConnector
tasks.max=1
file=test.sink.txt
topics=tests3
s3.bucket=tests3
s3.prefix=tests3
s3.endpoint=http://localhost:9090
s3.path_style=true
local.buffer.dir=/tmp/connect-system-test
非常感谢!
当您使用 ./bin/connect-distributed
启动分布式 Connect worker 时,您只能通过命令行提供 worker 的属性。
要通过将其配置发布到工作人员的 REST 端点来加载连接器,您可以使用 curl
或等效命令。
例如:
curl -X POST -H "Content-Type: application/json" --data @config.json http://localhost:8083/connectors
其中 config.json
是一个包含连接器属性的文件。
更多信息在这里: https://docs.confluent.io/current/connect/managing.html#distributed-example