消息中心和 Confluent Kafka Connect S3
Message Hub & Confluent Kafka Connect S3
我需要将来自 IBM MHub 主题的消息消费到 IBM 对象存储中。
我将它与带有 Confluent Kafka Connect S3 插件的本地 Kafka 服务器一起工作,作为接收器 Amazon S3 存储桶和文件的独立工作器。两者都成功了。
如果我将 Confluent Kafka Connect S3 配置为 IBM MHub 集群的分布式工作者,我没有收到任何错误,但仍然没有消息最终到达 Amazon S3 存储桶。我也试过文件接收器,也没有运气。
有可能吗?
您可以尝试使用消息中心(现在称为事件流)云对象存储桥:https://console.bluemix.net/docs/services/MessageHub/messagehub115.html#cloud_object_storage_bridge
似乎符合您的要求?
发件人:https://kafka.apache.org/documentation/#connect_running
此处配置的参数供 Kafka Connect 使用的生产者和消费者访问配置、偏移和状态主题。 Kafka source和Kafka sink任务的配置,可以使用相同的参数,但需要加上consumer前缀。和制作人。分别。从 worker 配置继承的唯一参数是 bootstrap.servers,在大多数情况下这就足够了,因为同一个集群通常用于所有目的。一个值得注意的例外是安全集群,它需要额外的参数来允许连接。这些参数需要在工作配置中设置三次,一次用于管理访问,一次用于 Kafka 接收器,一次用于 Kafka 源。
所以解决方案是添加与消费者重复的配置。在 worker 配置中添加前缀,以便所需的 sasl_ssl 设置发生,而不是接收器消费者的默认设置。
IBM 云对象存储也可以。需要凭据,例如。环境变量:AWS_ACCESS_KEY_ID="see cos credentials" & AWS_SECRET_ACCESS_KEY="see cos credentials"
连接器配置:
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "5",
"topics": "your-topic",
"s3.region": "eu-central-1",
"store.url": "https://s3.eu-geo.objectstorage.softlayer.net",
"s3.bucket.name": "your-bucket",
"s3.part.size": "5242880",
"flush.size": "1",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE",
"name": "s3-sink"
}
}
我需要将来自 IBM MHub 主题的消息消费到 IBM 对象存储中。
我将它与带有 Confluent Kafka Connect S3 插件的本地 Kafka 服务器一起工作,作为接收器 Amazon S3 存储桶和文件的独立工作器。两者都成功了。
如果我将 Confluent Kafka Connect S3 配置为 IBM MHub 集群的分布式工作者,我没有收到任何错误,但仍然没有消息最终到达 Amazon S3 存储桶。我也试过文件接收器,也没有运气。
有可能吗?
您可以尝试使用消息中心(现在称为事件流)云对象存储桥:https://console.bluemix.net/docs/services/MessageHub/messagehub115.html#cloud_object_storage_bridge
似乎符合您的要求?
发件人:https://kafka.apache.org/documentation/#connect_running
此处配置的参数供 Kafka Connect 使用的生产者和消费者访问配置、偏移和状态主题。 Kafka source和Kafka sink任务的配置,可以使用相同的参数,但需要加上consumer前缀。和制作人。分别。从 worker 配置继承的唯一参数是 bootstrap.servers,在大多数情况下这就足够了,因为同一个集群通常用于所有目的。一个值得注意的例外是安全集群,它需要额外的参数来允许连接。这些参数需要在工作配置中设置三次,一次用于管理访问,一次用于 Kafka 接收器,一次用于 Kafka 源。
所以解决方案是添加与消费者重复的配置。在 worker 配置中添加前缀,以便所需的 sasl_ssl 设置发生,而不是接收器消费者的默认设置。
IBM 云对象存储也可以。需要凭据,例如。环境变量:AWS_ACCESS_KEY_ID="see cos credentials" & AWS_SECRET_ACCESS_KEY="see cos credentials"
连接器配置:
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "5",
"topics": "your-topic",
"s3.region": "eu-central-1",
"store.url": "https://s3.eu-geo.objectstorage.softlayer.net",
"s3.bucket.name": "your-bucket",
"s3.part.size": "5242880",
"flush.size": "1",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE",
"name": "s3-sink"
}
}