Solace 源连接器 "tasks.max" 如何连接到队列?
How does a Solace source connector "tasks.max" connect to a queue?
我们正在实施一个 Kafka 主题,它是一个 3 机集群。我们将退出 Solace 队列,并想知道 tasks.max
参数的作用以及它如何划分消息。我们无权访问安慰队列,只能调用并查看我们是否正在从缓冲区中删除消息。任何见解都会很棒! hornetQ 中的当前系统是 运行 5 个线程。
环境:
- 分布式模式
- 1 个主题
- 3 个副本
- 每台机器 5 个分区
Solace-kafka-connector-source-master
我们已经在一个任务中独立尝试过它并且它有效。由于我们无法控制安慰队列,因此我们可以查看是否正在丢弃消息。
{
"name": "solaceSourceConnector",
"config": {
"connector.class":
"com.solace.source.connector.SolaceSourceConnector",
"tasks.max": "1",
"kafka.topic": "solacetest",
"sol.host": "HOSTNAME",
"sol.username": "USERNAME",
"sol.password": "PASSWORD",
"sol.vpn_name": "VPN IP",
"sol.topics": "soltest, soltest1,solacetest2",
"sol.queue": "testQ",
"sol.message_processor_class": "com.solace.source.connector.msgProcessors.SolaceSampleKeyedMessageProcessor",
"sol.generate_send_timestamps": "true",
"sol.generate_rcv_timestamps": "true",
"sol.sub_ack_window_size": "255",
"sol.generate_sequence_numbers": "true",
"sol.calculate_message_expiration": "true",
"sol.subscriber_dto_override": "true",
"sol.channel_properties.connect_retries": "-1",
"sol.channel_properties.reconnect_retries": "-1",
"sol.kafka_message_key": "DESTINATION",
"sol.ssl_trust_store": "/opt/PKI/skeltonCA/heinz1.ts",
"sol.ssl_trust_store_pasword": "sasquatch",
"sol.ssl_trust_store_format": "JKS",
"sol.ssl_key_store": "/opt/PKI/skeltonCA/heinz1.ks",
"sol.ssl_key_store_password": "sasquatch",
"sol.ssl_key_store_format": "JKS",
"sol.ssl_key_store_normalized_format": "JKS",
"sol.ssl_private_key_alias": "heinz1",
"sol.ssl_private_key_password": "sasquatch"
}
}
我正在努力做到这一点,这样我们就不会丢失缓冲区中的消息。
max.tasks 1 将与独占队列一起使用,因为这种 Solace 队列只能有一个活动订阅者。它还将保证从 Solace 到 Kafka 的消息顺序得以保留。
如果 Solace 队列是非独占(即共享)队列,则增加 max.tasks 将定义从队列中消耗的并发 Solace 客户端数量的上限。这允许使用队列中的多个消费者进行水平扩展以获得更好的吞吐量。但是,Solace 不保证非独占队列的多个消费者之间的消息顺序,因此可能无法保留消息顺序。
在任何一种情况下,消息都不会丢失,因为在成功确认已写入 Kafka 之前,它们不会被确认回 Solace 并被删除。
我们正在实施一个 Kafka 主题,它是一个 3 机集群。我们将退出 Solace 队列,并想知道 tasks.max
参数的作用以及它如何划分消息。我们无权访问安慰队列,只能调用并查看我们是否正在从缓冲区中删除消息。任何见解都会很棒! hornetQ 中的当前系统是 运行 5 个线程。
环境:
- 分布式模式
- 1 个主题
- 3 个副本
- 每台机器 5 个分区
Solace-kafka-connector-source-master
我们已经在一个任务中独立尝试过它并且它有效。由于我们无法控制安慰队列,因此我们可以查看是否正在丢弃消息。
{
"name": "solaceSourceConnector",
"config": {
"connector.class":
"com.solace.source.connector.SolaceSourceConnector",
"tasks.max": "1",
"kafka.topic": "solacetest",
"sol.host": "HOSTNAME",
"sol.username": "USERNAME",
"sol.password": "PASSWORD",
"sol.vpn_name": "VPN IP",
"sol.topics": "soltest, soltest1,solacetest2",
"sol.queue": "testQ",
"sol.message_processor_class": "com.solace.source.connector.msgProcessors.SolaceSampleKeyedMessageProcessor",
"sol.generate_send_timestamps": "true",
"sol.generate_rcv_timestamps": "true",
"sol.sub_ack_window_size": "255",
"sol.generate_sequence_numbers": "true",
"sol.calculate_message_expiration": "true",
"sol.subscriber_dto_override": "true",
"sol.channel_properties.connect_retries": "-1",
"sol.channel_properties.reconnect_retries": "-1",
"sol.kafka_message_key": "DESTINATION",
"sol.ssl_trust_store": "/opt/PKI/skeltonCA/heinz1.ts",
"sol.ssl_trust_store_pasword": "sasquatch",
"sol.ssl_trust_store_format": "JKS",
"sol.ssl_key_store": "/opt/PKI/skeltonCA/heinz1.ks",
"sol.ssl_key_store_password": "sasquatch",
"sol.ssl_key_store_format": "JKS",
"sol.ssl_key_store_normalized_format": "JKS",
"sol.ssl_private_key_alias": "heinz1",
"sol.ssl_private_key_password": "sasquatch"
}
}
我正在努力做到这一点,这样我们就不会丢失缓冲区中的消息。
max.tasks 1 将与独占队列一起使用,因为这种 Solace 队列只能有一个活动订阅者。它还将保证从 Solace 到 Kafka 的消息顺序得以保留。
如果 Solace 队列是非独占(即共享)队列,则增加 max.tasks 将定义从队列中消耗的并发 Solace 客户端数量的上限。这允许使用队列中的多个消费者进行水平扩展以获得更好的吞吐量。但是,Solace 不保证非独占队列的多个消费者之间的消息顺序,因此可能无法保留消息顺序。
在任何一种情况下,消息都不会丢失,因为在成功确认已写入 Kafka 之前,它们不会被确认回 Solace 并被删除。