运行 pubsub kafka 连接器独立模式问题
Running pubsub kafka connector standalone mode issues
所以,我一直在尝试获得 PubSub Kafka 连接器 运行ning 大约一个月,但遇到了各种问题。我在这里回顾了很多关于 Kafka Connect 和 Pubsub 连接器的问题,这些问题帮助我走了很远,但我又被卡住了。当我 运行 这个命令时:
.\bin\windows\connect-standalone.bat
.\etc\kafka\WorkerConfig.properties .\etc\kafka\configSink.properties .\etc\kafka\configSource.properties
我得到一长串链接的错误 here:
在它尝试启动其余服务器之后,错误 "could not scan file [file name]..." 就开始了。我不确定是否需要设置 rest.host.name 和 rest.port,因为目前,对于 standaloneConfig 值,它显示为
rest.host.name = null
编辑:查看日志文件一段时间后,我发现了以下消息:
Kafka consumer created
Created connector CPSConnector
Initializing task CPSConnector-0 with config {connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector, task.class=com.google.pubsub.kafka.sink.CloudPubSubSinkTask, tasks.max=1, topics=, cps.project=kohls-sis-sandbox, name=CPSConnector, cps.topic=test-pubsub}
Task CPSConnector-0 threw an uncaught and unrecoverable exception
org.apache.kafka.connect.errors.ConnectException: Sink tasks require a list of topics.
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:202)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
编辑:因此,我通过在我的 configSink 中添加 topics=test
解决了上述问题。当前的错误消息如下。这是否表示您只能 运行 接收器连接器或源连接器?
Failed to create job for .\etc\kafka\configSource.properties
Stopping after connector error
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.AlreadyExistsException: Connector CPSConnector already exists
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:80)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:67)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:97)
Caused by: org.apache.kafka.connect.errors.AlreadyExistsException: Connector CPSConnector already exists
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:145)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:94)
在我的 WorkerConfig.properites 中,我有 bootstrap.servers=localhost:2181
。我的 属性 个文件是 here.
我不确定如何修复,因为我设置了我的属性文件,确保 cps-kakfa-connector.jar 在 class 路径中。我也设置了plugin.path=\share\java\kafka\kafka-connect-pubsub
。
如果有人能指出正确的方向来解决这个问题,那就太好了。我按照这里的说明操作:https://github.com/GoogleCloudPlatform/pubsub/tree/master/kafka-connector
当您将其配置属性提交给 Kafka Connect 集群或独立工作程序时,每个连接器实例,无论是源还是接收器,都需要有一个唯一的名称。
在上面的示例中,只需将 Source 命名为与 Sink 不同的名称即可。
例如:
$ head -n 1 configSource.properties
name=CPSSourceConnector
$ head -n 1 configSink.properties
name=CPSSinkConnector
或者,也可以:
$ head -n 1 configSource.properties
name=Tom
$ head -n 1 configSink.properties
name=Jerry
所以,我一直在尝试获得 PubSub Kafka 连接器 运行ning 大约一个月,但遇到了各种问题。我在这里回顾了很多关于 Kafka Connect 和 Pubsub 连接器的问题,这些问题帮助我走了很远,但我又被卡住了。当我 运行 这个命令时:
.\bin\windows\connect-standalone.bat
.\etc\kafka\WorkerConfig.properties .\etc\kafka\configSink.properties .\etc\kafka\configSource.properties
我得到一长串链接的错误 here:
在它尝试启动其余服务器之后,错误 "could not scan file [file name]..." 就开始了。我不确定是否需要设置 rest.host.name 和 rest.port,因为目前,对于 standaloneConfig 值,它显示为
rest.host.name = null
编辑:查看日志文件一段时间后,我发现了以下消息:
Kafka consumer created
Created connector CPSConnector
Initializing task CPSConnector-0 with config {connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector, task.class=com.google.pubsub.kafka.sink.CloudPubSubSinkTask, tasks.max=1, topics=, cps.project=kohls-sis-sandbox, name=CPSConnector, cps.topic=test-pubsub}
Task CPSConnector-0 threw an uncaught and unrecoverable exception
org.apache.kafka.connect.errors.ConnectException: Sink tasks require a list of topics.
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:202)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
编辑:因此,我通过在我的 configSink 中添加 topics=test
解决了上述问题。当前的错误消息如下。这是否表示您只能 运行 接收器连接器或源连接器?
Failed to create job for .\etc\kafka\configSource.properties
Stopping after connector error
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.AlreadyExistsException: Connector CPSConnector already exists
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:80)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:67)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:97)
Caused by: org.apache.kafka.connect.errors.AlreadyExistsException: Connector CPSConnector already exists
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:145)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:94)
在我的 WorkerConfig.properites 中,我有 bootstrap.servers=localhost:2181
。我的 属性 个文件是 here.
我不确定如何修复,因为我设置了我的属性文件,确保 cps-kakfa-connector.jar 在 class 路径中。我也设置了plugin.path=\share\java\kafka\kafka-connect-pubsub
。
如果有人能指出正确的方向来解决这个问题,那就太好了。我按照这里的说明操作:https://github.com/GoogleCloudPlatform/pubsub/tree/master/kafka-connector
当您将其配置属性提交给 Kafka Connect 集群或独立工作程序时,每个连接器实例,无论是源还是接收器,都需要有一个唯一的名称。
在上面的示例中,只需将 Source 命名为与 Sink 不同的名称即可。 例如:
$ head -n 1 configSource.properties
name=CPSSourceConnector
$ head -n 1 configSink.properties
name=CPSSinkConnector
或者,也可以:
$ head -n 1 configSource.properties
name=Tom
$ head -n 1 configSink.properties
name=Jerry