为什么报告接收器连接器的 SourceConnectorConfig?
Why is SourceConnectorConfig reported for sink connector?
我正在尝试使用 spredfast s3 connector 创建 Kafka 接收器连接器。但是,出于某种原因,日志输出报告了 SourceConnectorConfig:
INFO ConnectorConfig values:
connector.class = com.spredfast.kafka.connect.s3.sink.S3SinkConnector
key.converter = null
name = transactions-s3-sink
tasks.max = 1
transforms = null
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig:180)
INFO Creating connector transactions-s3-sink of type com.spredfast.kafka.connect.s3.sink.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:178)
INFO Instantiated connector transactions-s3-sink with version 0.0.1 of type class com.spredfast.kafka.connect.s3.sink.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:181)
INFO Finished creating connector transactions-s3-sink (org.apache.kafka.connect.runtime.Worker:194)
INFO SourceConnectorConfig values:
connector.class = com.spredfast.kafka.connect.s3.sink.S3SinkConnector
key.converter = null
name = transactions-s3-sink
tasks.max = 1
transforms = null
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.SourceConnectorConfig:180)
INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:824)
...
INFO Sink task WorkerSinkTask{id=transactions-s3-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:232)
为什么在日志输出中进一步报告了 SinkConnectorConfig 我可以看到创建了 WorkerSinkTask?
原因是此连接器扩展了 Connector
抽象 class 而不是来自 Connect 的 API 的 SinkConnector
抽象 class(参见源代码 here).
因此,Connect framework无法判断这个connector是source还是sink,目前代码中的逻辑是,如果不是sink,就认为是source。这就是您遇到这种不一致的原因。
解决方案是连接器扩展适当的抽象class(此处org.apache.kafka.connect.sink.SinkConnector
)
我正在尝试使用 spredfast s3 connector 创建 Kafka 接收器连接器。但是,出于某种原因,日志输出报告了 SourceConnectorConfig:
INFO ConnectorConfig values:
connector.class = com.spredfast.kafka.connect.s3.sink.S3SinkConnector
key.converter = null
name = transactions-s3-sink
tasks.max = 1
transforms = null
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.ConnectorConfig:180)
INFO Creating connector transactions-s3-sink of type com.spredfast.kafka.connect.s3.sink.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:178)
INFO Instantiated connector transactions-s3-sink with version 0.0.1 of type class com.spredfast.kafka.connect.s3.sink.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:181)
INFO Finished creating connector transactions-s3-sink (org.apache.kafka.connect.runtime.Worker:194)
INFO SourceConnectorConfig values:
connector.class = com.spredfast.kafka.connect.s3.sink.S3SinkConnector
key.converter = null
name = transactions-s3-sink
tasks.max = 1
transforms = null
value.converter = class org.apache.kafka.connect.storage.StringConverter
(org.apache.kafka.connect.runtime.SourceConnectorConfig:180)
INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:824)
...
INFO Sink task WorkerSinkTask{id=transactions-s3-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:232)
为什么在日志输出中进一步报告了 SinkConnectorConfig 我可以看到创建了 WorkerSinkTask?
原因是此连接器扩展了 Connector
抽象 class 而不是来自 Connect 的 API 的 SinkConnector
抽象 class(参见源代码 here).
因此,Connect framework无法判断这个connector是source还是sink,目前代码中的逻辑是,如果不是sink,就认为是source。这就是您遇到这种不一致的原因。
解决方案是连接器扩展适当的抽象class(此处org.apache.kafka.connect.sink.SinkConnector
)