pubSub Source:两次收到相同的消息
pubSubSource: Receving the same message twice
描述
- 我在 Kafka 连接分布式模式下有一个 pubSubSource 连接器,它只是从 PubSub 订阅中读取并写入 Kafka 主题。问题是,即使我向 GCP PubSub 发布一条消息,我也会在我的 Kafka 主题中收到此消息两次。
如何重现
部署Kafka和Kafka连接
创建具有以下 pubSubSource
配置的连接器:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "pubSubSource",
"config": {
"connector.class":"com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"tasks.max":"1",
"cps.subscription":"pubsub-test-sub",
"kafka.topic":"kafka-sub-topic",
"cps.project":"test-project123",
"gcp.credentials.file.path":"/tmp/gcp-creds/account-key.json"
}
}'
以下是 Kafka 连接配置:
"plugin.path": "/usr/share/java,/usr/share/confluent-hub-components"
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
"value.converter": "org.apache.kafka.connect.json.JsonConverter"
"key.converter.schemas.enable": "false"
"value.converter.schemas.enable": "false"
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter"
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter"
"config.storage.replication.factor": "1"
"offset.storage.replication.factor": "1"
"status.storage.replication.factor": "1"
使用以下命令向 PubSub 主题发布消息:
gcloud pubsub topics publish test-topic --message='{"someKey":"someValue"}'
从目标 Kafka 主题读取消息:
/usr/bin/kafka-console-consumer --bootstrap-server xx.xxx.xxx.xx:9092 --topic kafka-topic --from-beginning
# Output
{"someKey":"someValue"}
{"someKey":"someValue"}
为什么会这样,是不是我做错了什么?
我在 https://cloud.google.com/pubsub/docs/faq 找到了以下信息,看来您也遇到了同样的问题。您可以尝试生成大消息并查看结果是否相同吗?
详情来自link:
为什么重复邮件太多?
Pub/Sub 保证至少传递一次消息,这意味着偶尔会出现重复。但是,高重复率可能表明客户端未在配置的 ack_deadline_seconds 内确认消息,并且 Pub/Sub 正在重试消息传递。这可以在请求订阅的监控指标 pubsub.googleapis.com/subscription/pull_ack_message_operation_count 和推送订阅的 pubsub.googleapis.com/subscription/push_request_count 中观察到。在 /response_code 中查找升高的过期值或 webhook_timeout 值。如果有很多小消息,这种情况尤其可能发生,因为 Pub/Sub 可能会在内部对消息进行批处理,部分确认的批处理将被完全重新传送。
另一种可能性是订户未确认某些消息,因为处理这些特定消息的代码路径失败,并且从未进行确认调用;或者推送端点从不响应或响应错误。
如何检测重复邮件?
Pub/Sub 为每条消息分配一个唯一的 message_id ,可用于检测订阅者收到的重复消息。但是,这不会让您检测到对同一数据的多个发布请求所导致的重复。检测这些将需要发布者提供唯一的消息标识符。有关进一步讨论,请参阅 Pub/Sub I/O。
描述
- 我在 Kafka 连接分布式模式下有一个 pubSubSource 连接器,它只是从 PubSub 订阅中读取并写入 Kafka 主题。问题是,即使我向 GCP PubSub 发布一条消息,我也会在我的 Kafka 主题中收到此消息两次。
如何重现
部署Kafka和Kafka连接
创建具有以下
pubSubSource
配置的连接器:curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "pubSubSource", "config": { "connector.class":"com.google.pubsub.kafka.source.CloudPubSubSourceConnector", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "tasks.max":"1", "cps.subscription":"pubsub-test-sub", "kafka.topic":"kafka-sub-topic", "cps.project":"test-project123", "gcp.credentials.file.path":"/tmp/gcp-creds/account-key.json" } }'
以下是 Kafka 连接配置:
"plugin.path": "/usr/share/java,/usr/share/confluent-hub-components" "key.converter": "org.apache.kafka.connect.json.JsonConverter" "value.converter": "org.apache.kafka.connect.json.JsonConverter" "key.converter.schemas.enable": "false" "value.converter.schemas.enable": "false" "internal.key.converter": "org.apache.kafka.connect.json.JsonConverter" "internal.value.converter": "org.apache.kafka.connect.json.JsonConverter" "config.storage.replication.factor": "1" "offset.storage.replication.factor": "1" "status.storage.replication.factor": "1"
使用以下命令向 PubSub 主题发布消息:
gcloud pubsub topics publish test-topic --message='{"someKey":"someValue"}'
从目标 Kafka 主题读取消息:
/usr/bin/kafka-console-consumer --bootstrap-server xx.xxx.xxx.xx:9092 --topic kafka-topic --from-beginning # Output {"someKey":"someValue"} {"someKey":"someValue"}
为什么会这样,是不是我做错了什么?
我在 https://cloud.google.com/pubsub/docs/faq 找到了以下信息,看来您也遇到了同样的问题。您可以尝试生成大消息并查看结果是否相同吗?
详情来自link:
为什么重复邮件太多? Pub/Sub 保证至少传递一次消息,这意味着偶尔会出现重复。但是,高重复率可能表明客户端未在配置的 ack_deadline_seconds 内确认消息,并且 Pub/Sub 正在重试消息传递。这可以在请求订阅的监控指标 pubsub.googleapis.com/subscription/pull_ack_message_operation_count 和推送订阅的 pubsub.googleapis.com/subscription/push_request_count 中观察到。在 /response_code 中查找升高的过期值或 webhook_timeout 值。如果有很多小消息,这种情况尤其可能发生,因为 Pub/Sub 可能会在内部对消息进行批处理,部分确认的批处理将被完全重新传送。
另一种可能性是订户未确认某些消息,因为处理这些特定消息的代码路径失败,并且从未进行确认调用;或者推送端点从不响应或响应错误。
如何检测重复邮件? Pub/Sub 为每条消息分配一个唯一的 message_id ,可用于检测订阅者收到的重复消息。但是,这不会让您检测到对同一数据的多个发布请求所导致的重复。检测这些将需要发布者提供唯一的消息标识符。有关进一步讨论,请参阅 Pub/Sub I/O。