是否可以使用文件源作为输入和 jdbc-sink 作为 kafka 的输出?
Is it possible to use a file-source as input AND jdbc-sink as output with kafka?
我目前正在处理 Kafka 项目,我的问题是我能够使用文件源连接器读取文件并将数据存储到主题中。
我的配置:
connector.class=FileStreamSource
tasks.max=1
file=/vagrant/fake_sensor.dat
topic=sensor
然后我努力使用 Jdbc-sink 连接器将数据发送到我的 Postgres 数据库。
我的配置:
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=sensor
connection.url=jdbc:postgresql://localhost:5432/pg_data_eng
connection.user=vagrant
connection.password=vagrant
auto.create=true
key.converter=org.apache.kafka.connect.json.JsonConverter
schemas.enable=false
请注意,我尝试了很多不同的配置,但没有任何效果。
我可以使用 REST API 看到我的错误:
http://localhost:18083/connectors/jdbc-sink/tasks/0/status
我明白了:
{"id":0,"state":"FAILED","worker_id":"127.0.1.1:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)\
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:82)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:63)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:78)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
... 10 more\n"}
我看到 Value schema must be of type Struct
是主要问题,可能与模式注册表有关。
我也尝试过添加 value.converter.schema.registry.url=http://localhost:8081
但是还是不行。
我确实在互联网上研究了一些教程,但其中 none 是关于文件源和 jdbc-sink 的,所以我的问题是:是否有可能这样做吗?
问题是 FileSourceConnect
returns 一个字符串模式连接记录,而不是一个结构(这是 JDBC 接收器和其他连接通常期望的)。
您必须使用转换将值包装到结构中。
connector.class=FileStreamSource
tasks.max=1
file=/vagrant/fake_sensor.dat
topic=sensor
# Add this
transforms=HoistField
transforms.HoistField.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.HoistField.field=line
然后输入文件
Foo
Bar
转换后的消息会变成这样(请手动消费主题确认)
{"line":"Foo"}
{"line":"Bar"}
因此,您的数据库需要一个 line
文本列。
I also tried that by adding value.converter.schema.registry.url=http://localhost:8081 But still not working.
你需要使用 Avro 才能工作,而不是 JSONConverter
我目前正在处理 Kafka 项目,我的问题是我能够使用文件源连接器读取文件并将数据存储到主题中。
我的配置:
connector.class=FileStreamSource
tasks.max=1
file=/vagrant/fake_sensor.dat
topic=sensor
然后我努力使用 Jdbc-sink 连接器将数据发送到我的 Postgres 数据库。
我的配置:
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=sensor
connection.url=jdbc:postgresql://localhost:5432/pg_data_eng
connection.user=vagrant
connection.password=vagrant
auto.create=true
key.converter=org.apache.kafka.connect.json.JsonConverter
schemas.enable=false
请注意,我尝试了很多不同的配置,但没有任何效果。 我可以使用 REST API 看到我的错误:
http://localhost:18083/connectors/jdbc-sink/tasks/0/status
我明白了:
{"id":0,"state":"FAILED","worker_id":"127.0.1.1:8083","trace":"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)\
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.kafka.connect.errors.ConnectException: Value schema must be of type Struct
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:82)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:63)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:78)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:74)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
... 10 more\n"}
我看到 Value schema must be of type Struct
是主要问题,可能与模式注册表有关。
我也尝试过添加 value.converter.schema.registry.url=http://localhost:8081
但是还是不行。
我确实在互联网上研究了一些教程,但其中 none 是关于文件源和 jdbc-sink 的,所以我的问题是:是否有可能这样做吗?
问题是 FileSourceConnect
returns 一个字符串模式连接记录,而不是一个结构(这是 JDBC 接收器和其他连接通常期望的)。
您必须使用转换将值包装到结构中。
connector.class=FileStreamSource
tasks.max=1
file=/vagrant/fake_sensor.dat
topic=sensor
# Add this
transforms=HoistField
transforms.HoistField.type=org.apache.kafka.connect.transforms.HoistField$Value
transforms.HoistField.field=line
然后输入文件
Foo
Bar
转换后的消息会变成这样(请手动消费主题确认)
{"line":"Foo"}
{"line":"Bar"}
因此,您的数据库需要一个 line
文本列。
I also tried that by adding value.converter.schema.registry.url=http://localhost:8081 But still not working.
你需要使用 Avro 才能工作,而不是 JSONConverter