Snowflake Kafka 连接器配置问题
Snowflake Kafka connector config issue
我正在按照本指南中的步骤进行操作 Snowflake Connector for Kafka
我收到的错误消息是
BadRequestException:连接器配置 {.....} 不包含连接器类型
我运行命令为
sh kafka_2.12-2.3.0/bin/connect-standalone.sh connect-standalone.properties snowflake_kafka_config.json
我的配置文件是
连接-standalone.properties
bootstrap.servers=localhost:9092
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/Users/kafka_test/kafka
jar 文件 snowflake-kafka-connector-0.5.1.jar
在 plugin.path
snowflake_kafka_config.json
{
"name":"Kafka_Test",
"Config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"8",
"topics":"test",
"snowflake.topic2table.map": "",
"buffer.count.records":"1",
"buffer.flush.time":"60",
"buffer.size.bytes":"65536",
"snowflake.url.name":"<url>",
"snowflake.user.name":"<user_name>",
"snowflake.private.key":"<private_key>",
"snowflake.private.key.passphrase":"<pass_phrase>",
"snowflake.database.name":"<db>",
"snowflake.schema.name":"<schema>",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
"value.converter.schema.registry.url":"",
"value.converter.basic.auth.credentials.source":"",
"value.converter.basic.auth.user.info":""
}
}
Kafka是运行在本地,我有一个生产者和消费者,可以看到数据流动。
这与我在 Confluent community Slack 上回答过的问题相同,但我也会 post 将其放在这里以供参考:-)
connect worker 日志显示正在加载连接器 JAR 本身,因此“不包含连接器类型”是因为您的配置格式是 fubar。
您 运行 处于独立模式,但传递的 JSON 文件不会。我个人的意见是始终使用分布式,即使只是它的单个节点。如果您需要回顾一下独立与分布式,请查看此内容:http://rmoff.dev/ksldn19-kafka-connect
如果您必须独立使用,那么您的连接器配置 (snowflake_kafka_config.json
) 需要像这样的属性文件:
param1=argument1
param2=argument2
您可以在此处查看有效的 JSON 个示例(如果您使用分布式模式):https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-zero-to-hero/demo_zero-to-hero-with-kafka-connect.adoc#stream-data-from-kafka-to-elasticsearch
我正在按照本指南中的步骤进行操作 Snowflake Connector for Kafka
我收到的错误消息是
BadRequestException:连接器配置 {.....} 不包含连接器类型
我运行命令为
sh kafka_2.12-2.3.0/bin/connect-standalone.sh connect-standalone.properties snowflake_kafka_config.json
我的配置文件是
连接-standalone.properties
bootstrap.servers=localhost:9092
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/Users/kafka_test/kafka
jar 文件 snowflake-kafka-connector-0.5.1.jar
在 plugin.path
snowflake_kafka_config.json
{
"name":"Kafka_Test",
"Config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"8",
"topics":"test",
"snowflake.topic2table.map": "",
"buffer.count.records":"1",
"buffer.flush.time":"60",
"buffer.size.bytes":"65536",
"snowflake.url.name":"<url>",
"snowflake.user.name":"<user_name>",
"snowflake.private.key":"<private_key>",
"snowflake.private.key.passphrase":"<pass_phrase>",
"snowflake.database.name":"<db>",
"snowflake.schema.name":"<schema>",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter",
"value.converter.schema.registry.url":"",
"value.converter.basic.auth.credentials.source":"",
"value.converter.basic.auth.user.info":""
}
}
Kafka是运行在本地,我有一个生产者和消费者,可以看到数据流动。
这与我在 Confluent community Slack 上回答过的问题相同,但我也会 post 将其放在这里以供参考:-)
connect worker 日志显示正在加载连接器 JAR 本身,因此“不包含连接器类型”是因为您的配置格式是 fubar。
您 运行 处于独立模式,但传递的 JSON 文件不会。我个人的意见是始终使用分布式,即使只是它的单个节点。如果您需要回顾一下独立与分布式,请查看此内容:http://rmoff.dev/ksldn19-kafka-connect
如果您必须独立使用,那么您的连接器配置 (snowflake_kafka_config.json
) 需要像这样的属性文件:
param1=argument1
param2=argument2
您可以在此处查看有效的 JSON 个示例(如果您使用分布式模式):https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-zero-to-hero/demo_zero-to-hero-with-kafka-connect.adoc#stream-data-from-kafka-to-elasticsearch