Pulsar 消息监听器的多线程
Multithreading with pulsar message listeners
我是 java message listeners
和 apache pulsar
的新手。
假设我已经像这样维护了一个监听器,
private MessageListener<byte[]> generateListener() {
MessageListener<byte[]> listener = (consumer, respMsg) -> {
String respM = new String(respMsg.getValue(), StandardCharsets.UTF_8);
logger.info(respM);
consumer.acknowledgeAsync(respMsg);
};
return listener;
}
像这样的 Consumer 实例,
Consumer<byte[]> c = consumerBuilder.messageListener(generateListener()).topic(topicName).subscriptionName("Consumer-" + i).subscribeAsync().get();
我想知道这个侦听器将如何处理多个传入消息?是否会像 JMS 侦听器那样在单独的线程中处理每条消息?如果是这样,那么我如何配置要使用的线程数 - 是使用 ClientBuilder.listenerThreads()
属性?
在维护多个消费者时,是否需要为每个消费者维护多个侦听器对象,即类似这样的东西 -
consumerBuilder.clone().messageListener(generateListener()).topic(topicName).subscriptionName("Consumer-" + i).subscribeAsync()
?
ClientBuilder#listenerThreads
方法允许配置内部 thread-pool 的大小,它将在将从该客户端创建的所有 Consumers
或 Readers
中共享和使用.
Pulsar Client 将保证单个消费者的 MessageListener
将始终由同一线程调用,即提供的 MessageListener
不需要是 thread-safe.所以,是的,最好为每个 Consumer
.
使用专用的 MessageListener
对象
请注意,这也确保了排序。
所以基本上,如果您只使用一个 Consumer
,那么您可以将 listenerThreads 保持为 1
(这是默认值)。
这是一个完整的例子,可以用来观察行为:
public class PulsarConsumerListenerExample {
public static void main(String[] args) throws PulsarClientException {
int numListenerThread = 2;
PulsarClient client = PulsarClient
.builder()
.serviceUrl("pulsar://localhost:6650")
.listenerThreads(numListenerThread)
.build();
final List<Consumer<?>> consumers = new ArrayList<>();
for (int i = 0; i < numListenerThread; i++) {
consumers.add(createConsumerWithLister(client, "my-topic", "my-subscription", "C" + i));
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
for (Consumer<?> consumer : consumers) {
try {
consumer.close();
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
}));
}
private static Consumer<String> createConsumerWithLister(final PulsarClient client,
final String topic,
final String subscription,
final String consumerName) throws PulsarClientException {
return client.newConsumer(Schema.STRING)
.topic(topic)
.consumerName(consumerName)
.subscriptionName(subscription)
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionType(SubscriptionType.Failover)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.messageListener((MessageListener<String>) (consumer, msg) -> {
System.out.printf(
"[%s/%s]Message received: key=%s, value=%s, topic=%s, id=%s%n",
consumerName,
Thread.currentThread().getName(),
msg.getKey(),
msg.getValue(),
msg.getTopicName(),
msg.getMessageId().toString());
consumer.acknowledgeAsync(msg);
})
.subscribe();
}
}
我是 java message listeners
和 apache pulsar
的新手。
假设我已经像这样维护了一个监听器,
private MessageListener<byte[]> generateListener() {
MessageListener<byte[]> listener = (consumer, respMsg) -> {
String respM = new String(respMsg.getValue(), StandardCharsets.UTF_8);
logger.info(respM);
consumer.acknowledgeAsync(respMsg);
};
return listener;
}
像这样的 Consumer 实例,
Consumer<byte[]> c = consumerBuilder.messageListener(generateListener()).topic(topicName).subscriptionName("Consumer-" + i).subscribeAsync().get();
我想知道这个侦听器将如何处理多个传入消息?是否会像 JMS 侦听器那样在单独的线程中处理每条消息?如果是这样,那么我如何配置要使用的线程数 - 是使用 ClientBuilder.listenerThreads()
属性?
在维护多个消费者时,是否需要为每个消费者维护多个侦听器对象,即类似这样的东西 -
consumerBuilder.clone().messageListener(generateListener()).topic(topicName).subscriptionName("Consumer-" + i).subscribeAsync()
?
ClientBuilder#listenerThreads
方法允许配置内部 thread-pool 的大小,它将在将从该客户端创建的所有 Consumers
或 Readers
中共享和使用.
Pulsar Client 将保证单个消费者的 MessageListener
将始终由同一线程调用,即提供的 MessageListener
不需要是 thread-safe.所以,是的,最好为每个 Consumer
.
MessageListener
对象
请注意,这也确保了排序。
所以基本上,如果您只使用一个 Consumer
,那么您可以将 listenerThreads 保持为 1
(这是默认值)。
这是一个完整的例子,可以用来观察行为:
public class PulsarConsumerListenerExample {
public static void main(String[] args) throws PulsarClientException {
int numListenerThread = 2;
PulsarClient client = PulsarClient
.builder()
.serviceUrl("pulsar://localhost:6650")
.listenerThreads(numListenerThread)
.build();
final List<Consumer<?>> consumers = new ArrayList<>();
for (int i = 0; i < numListenerThread; i++) {
consumers.add(createConsumerWithLister(client, "my-topic", "my-subscription", "C" + i));
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
for (Consumer<?> consumer : consumers) {
try {
consumer.close();
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
}));
}
private static Consumer<String> createConsumerWithLister(final PulsarClient client,
final String topic,
final String subscription,
final String consumerName) throws PulsarClientException {
return client.newConsumer(Schema.STRING)
.topic(topic)
.consumerName(consumerName)
.subscriptionName(subscription)
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionType(SubscriptionType.Failover)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.messageListener((MessageListener<String>) (consumer, msg) -> {
System.out.printf(
"[%s/%s]Message received: key=%s, value=%s, topic=%s, id=%s%n",
consumerName,
Thread.currentThread().getName(),
msg.getKey(),
msg.getValue(),
msg.getTopicName(),
msg.getMessageId().toString());
consumer.acknowledgeAsync(msg);
})
.subscribe();
}
}