Apache Pulsar 异步消费者设置(可完成的未来)
Apache Pulsar Async Consumer Setup (Completable Future)
我正在尝试为 Apache Pulsar 设置一个异步使用者,但我的问题是只收到 1 条消息并且没有其他消息通过,除非我重新启动我的 Spring Boot 应用程序。不幸的是,关于将 CompletableFuture 与 Pulsar 结合使用的文档不是很好,我是新手。
我的代码如下:
@EventListener(ApplicationReadyEvent.class)
public void subscribe() throws PulsarClientException {
Consumer consumer = this.pulsarClient.newConsumer(JSONSchema.of(TenantMicroserviceProvisioningRequestSchema.class))
.subscriptionName(this.pulsarSubscriptionName)
.topic(this.pulsarTopicName)
.ackTimeout(240, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
while(true) {
CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
asyncMessage.thenAcceptAsync(incomingMessage -> {
TenantMicroserviceProvisioningRequestSchema schema = (TenantMicroserviceProvisioningRequestSchema) incomingMessage.getValue();
LOGGER.info(String.format("***New provisioning request recieved for tenant database [%s] with instance code [%s]", schema.getDatabaseName(), schema.getInstanceCode()));
consumer.acknowledgeAsync(incomingMessage.getMessageId());
});
}
}
在 Java 文档中它确实说“一旦返回的 CompletableFuture 完成接收到的消息,应该随后调用 receiveAsync()。否则它会在应用程序中创建接收请求的积压。”,但我是不确定如何做到这一点。我认为这是导致问题的原因。
文档:
脉冲星文档:https://pulsar.apache.org/docs/en/client-libraries-java/#async-receive
Pulsar Java receieveAsync() 文档:https://pulsar.apache.org/api/client/org/apache/pulsar/client/api/Consumer.html
*** 更新 *** 我已经添加了 while 循环,但是当我这样做时,我的 Spring 引导内存消耗浮动高达 10 GB。不确定,但想知道未来是否是这样设置的。
为了防止这种过多的内存消耗,您需要向 class 添加一个中间数据结构,以限制未完成的 CompletableFutures
的数量,例如 LinkedBlockingQueue
如下所示
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
public class AsyncConsumerDemo {
private static final LinkedBlockingQueue<CompletableFuture<Message<String>>> outstandingMessages =
new LinkedBlockingQueue<CompletableFuture<Message<String>>>(1000);
private static final ExecutorService executor = Executors.newCachedThreadPool();
public static void main(String[] args) throws PulsarClientException, InterruptedException, ExecutionException {
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://broker:6650")
.build();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.subscriptionName("pulsarSubscriptionName")
.topic("pulsarTopicName")
.ackTimeout(240, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
new Thread(() -> { // Message Retrieval Thread
CompletableFuture<Message<String>> future;
while ( (future = consumer.receiveAsync()) != null) {
outstandingMessages.put(future);
}
}).start();
for (int numConsumers = 0; numConsumers < 10; numConsumers++) {
executor.submit(() -> { // Message Consumer Thread
while(true) {
try {
CompletableFuture<Message<String>> future = outstandingMessages.take();
Message<String> msg = future.get();
// Process the message
consumer.acknowledgeAsync(msg.getMessageId());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
});
}
}
}
我正在尝试为 Apache Pulsar 设置一个异步使用者,但我的问题是只收到 1 条消息并且没有其他消息通过,除非我重新启动我的 Spring Boot 应用程序。不幸的是,关于将 CompletableFuture 与 Pulsar 结合使用的文档不是很好,我是新手。
我的代码如下:
@EventListener(ApplicationReadyEvent.class)
public void subscribe() throws PulsarClientException {
Consumer consumer = this.pulsarClient.newConsumer(JSONSchema.of(TenantMicroserviceProvisioningRequestSchema.class))
.subscriptionName(this.pulsarSubscriptionName)
.topic(this.pulsarTopicName)
.ackTimeout(240, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
while(true) {
CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
asyncMessage.thenAcceptAsync(incomingMessage -> {
TenantMicroserviceProvisioningRequestSchema schema = (TenantMicroserviceProvisioningRequestSchema) incomingMessage.getValue();
LOGGER.info(String.format("***New provisioning request recieved for tenant database [%s] with instance code [%s]", schema.getDatabaseName(), schema.getInstanceCode()));
consumer.acknowledgeAsync(incomingMessage.getMessageId());
});
}
}
在 Java 文档中它确实说“一旦返回的 CompletableFuture 完成接收到的消息,应该随后调用 receiveAsync()。否则它会在应用程序中创建接收请求的积压。”,但我是不确定如何做到这一点。我认为这是导致问题的原因。
文档: 脉冲星文档:https://pulsar.apache.org/docs/en/client-libraries-java/#async-receive Pulsar Java receieveAsync() 文档:https://pulsar.apache.org/api/client/org/apache/pulsar/client/api/Consumer.html
*** 更新 *** 我已经添加了 while 循环,但是当我这样做时,我的 Spring 引导内存消耗浮动高达 10 GB。不确定,但想知道未来是否是这样设置的。
为了防止这种过多的内存消耗,您需要向 class 添加一个中间数据结构,以限制未完成的 CompletableFutures
的数量,例如 LinkedBlockingQueue
如下所示
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
public class AsyncConsumerDemo {
private static final LinkedBlockingQueue<CompletableFuture<Message<String>>> outstandingMessages =
new LinkedBlockingQueue<CompletableFuture<Message<String>>>(1000);
private static final ExecutorService executor = Executors.newCachedThreadPool();
public static void main(String[] args) throws PulsarClientException, InterruptedException, ExecutionException {
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl("pulsar://broker:6650")
.build();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.subscriptionName("pulsarSubscriptionName")
.topic("pulsarTopicName")
.ackTimeout(240, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.subscribe();
new Thread(() -> { // Message Retrieval Thread
CompletableFuture<Message<String>> future;
while ( (future = consumer.receiveAsync()) != null) {
outstandingMessages.put(future);
}
}).start();
for (int numConsumers = 0; numConsumers < 10; numConsumers++) {
executor.submit(() -> { // Message Consumer Thread
while(true) {
try {
CompletableFuture<Message<String>> future = outstandingMessages.take();
Message<String> msg = future.get();
// Process the message
consumer.acknowledgeAsync(msg.getMessageId());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
});
}
}
}