运行 pubsub kafka 连接器独立模式问题

Running pubsub kafka connector standalone mode issues

所以,我一直在尝试获得 PubSub Kafka 连接器 运行ning 大约一个月,但遇到了各种问题。我在这里回顾了很多关于 Kafka Connect 和 Pubsub 连接器的问题,这些问题帮助我走了很远,但我又被卡住了。当我 运行 这个命令时:

.\bin\windows\connect-standalone.bat 
.\etc\kafka\WorkerConfig.properties .\etc\kafka\configSink.properties .\etc\kafka\configSource.properties

我得到一长串链接的错误 here:

在它尝试启动其余服务器之后,错误 "could not scan file [file name]..." 就开始了。我不确定是否需要设置 rest.host.name 和 rest.port,因为目前,对于 standaloneConfig 值,它显示为

rest.host.name = null

编辑:查看日志文件一段时间后,我发现了以下消息:

Kafka consumer created
Created connector CPSConnector
Initializing task CPSConnector-0 with config {connector.class=com.google.pubsub.kafka.sink.CloudPubSubSinkConnector, task.class=com.google.pubsub.kafka.sink.CloudPubSubSinkTask, tasks.max=1, topics=, cps.project=kohls-sis-sandbox, name=CPSConnector, cps.topic=test-pubsub}
Task CPSConnector-0 threw an uncaught and unrecoverable exception
org.apache.kafka.connect.errors.ConnectException: Sink tasks require a list of topics.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

编辑:因此,我通过在我的 configSink 中添加 topics=test 解决了上述问题。当前的错误消息如下。这是否表示您只能 运行 接收器连接器或源连接器?

Failed to create job for .\etc\kafka\configSource.properties
Stopping after connector error
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.AlreadyExistsException: Connector CPSConnector already exists
    at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:80)
    at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:67)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:97)
Caused by: org.apache.kafka.connect.errors.AlreadyExistsException: Connector CPSConnector already exists
    at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:145)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:94)

在我的 WorkerConfig.properites 中,我有 bootstrap.servers=localhost:2181。我的 属性 个文件是 here.

我不确定如何修复,因为我设置了我的属性文件,确保 cps-kakfa-connector.jar 在 class 路径中。我也设置了plugin.path=\share\java\kafka\kafka-connect-pubsub

如果有人能指出正确的方向来解决这个问题,那就太好了。我按照这里的说明操作:https://github.com/GoogleCloudPlatform/pubsub/tree/master/kafka-connector

当您将其配置属性提交给 Kafka Connect 集群或独立工作程序时,每个连接器实例,无论是源还是接收器,都需要有一个唯一的名称。

在上面的示例中,只需将 Source 命名为与 Sink 不同的名称即可。 例如:

$ head -n 1 configSource.properties
name=CPSSourceConnector
$ head -n 1 configSink.properties
name=CPSSinkConnector

或者,也可以:

$ head -n 1 configSource.properties
name=Tom
$ head -n 1 configSink.properties
name=Jerry