如何检查 gcp pubsub empty/inactive 订阅
How to check gcp pubsub empty/inactive subscription
我有一个订阅 GCP 主题的应用程序,当那里有一些消息时,它会下载它们并将它们发送到 ActiveMQ 上的队列。
为了加快这个过程,我正在使用 executorService 并启动多个线程以将消息发送到 activeMQ。由于这个订阅应该是一项持续的任务,我将代码放在 while(true) 循环中,因此我无法以正常方式关闭 executorService,因为我将创建和关闭执行程序服务每个循环。
我正在寻找一种优雅的方法来在订阅为空(主题中没有数据)2 或 3 分钟或一些不活动时关闭 executorService window。当然,当有一些新数据时,它会再次开始。
以下是我不喜欢的想法,它只是我在订阅检索不到数据时递增的计数器。
我正在寻找一种更优雅的方法。
@Service
@Slf4j
public class PubSubSubscriberService {
private static final int EMPTY_SUBSCRIPTION_COUNTER = 4;
private static final Logger businessLogger = LoggerFactory.getLogger("BusinessLogger");
private Queue<PubsubMessage> messages = new ConcurrentLinkedQueue<>();
public void pullMessagesAndSendToBroker(CompositeConfigurationElement cce) {
var patchSize = cce.getSubscriber().getPatchSize();
var nThreads = cce.getSubscriber().getSendingParallelThreads();
var scheduledTasks = 0;
var subscribeCounter = 0;
ThreadPoolExecutor threadPoolExecutor = null;
while (true) {
try {
if (subscribeCounter < EMPTY_SUBSCRIPTION_COUNTER) {
log.info("Creating Executor Service for uploading to broker with a thread pool of Size: " + nThreads);
threadPoolExecutor = getThreadPoolExecutor(nThreads);
}
var subscriber = this.getSubscriber(cce);
this.startSubscriber(subscriber, cce);
this.checkActivity(threadPoolExecutor, subscribeCounter++);
// send patches of {{ messagesPerIteration }}
while (this.messages.size() > patchSize) {
if (poolIsReady(threadPoolExecutor, nThreads)) {
UploadTask task = new UploadTask(this.messages, cce, cf, patchSize);
threadPoolExecutor.submit(task);
scheduledTasks ++;
}
subscribeCounter = 0;
}
// send the rest
if (this.messages.size() > 0) {
UploadTask task = new UploadTask(this.messages, cce, cf, patchSize);
threadPoolExecutor.submit(task);
scheduledTasks ++;
subscribeCounter = 0;
}
if (scheduledTasks > 0) {
businessLogger.info("Scheduled " + scheduledTasks + " upload tasks of size upto: " + patchSize + ", preparing to start subscribing for 30 more sec") ;
scheduledTasks = 0;
}
} catch ( Exception e) {
e.printStackTrace();
businessLogger.error(e.getMessage());
}
}
您的池占用很少 space 和内存,并且在不使用时几乎不消耗 CPU。为您的池容量设置一个最大限制,并在尝试缩小它时使用它。如果要处理的消息太多,任务将排队等待空闲的执行程序池来完成任务。
如果您担心上下可扩展性,您的设计可能会被审查。您可以在集群中触发事件并在其他 pods 上并行处理它们,而不是 pod 内部的 executorPool。这些 pods 将能够根据流量向上和向下扩展(查看 Knative)
我有一个订阅 GCP 主题的应用程序,当那里有一些消息时,它会下载它们并将它们发送到 ActiveMQ 上的队列。
为了加快这个过程,我正在使用 executorService 并启动多个线程以将消息发送到 activeMQ。由于这个订阅应该是一项持续的任务,我将代码放在 while(true) 循环中,因此我无法以正常方式关闭 executorService,因为我将创建和关闭执行程序服务每个循环。
我正在寻找一种优雅的方法来在订阅为空(主题中没有数据)2 或 3 分钟或一些不活动时关闭 executorService window。当然,当有一些新数据时,它会再次开始。
以下是我不喜欢的想法,它只是我在订阅检索不到数据时递增的计数器。
我正在寻找一种更优雅的方法。
@Service
@Slf4j
public class PubSubSubscriberService {
private static final int EMPTY_SUBSCRIPTION_COUNTER = 4;
private static final Logger businessLogger = LoggerFactory.getLogger("BusinessLogger");
private Queue<PubsubMessage> messages = new ConcurrentLinkedQueue<>();
public void pullMessagesAndSendToBroker(CompositeConfigurationElement cce) {
var patchSize = cce.getSubscriber().getPatchSize();
var nThreads = cce.getSubscriber().getSendingParallelThreads();
var scheduledTasks = 0;
var subscribeCounter = 0;
ThreadPoolExecutor threadPoolExecutor = null;
while (true) {
try {
if (subscribeCounter < EMPTY_SUBSCRIPTION_COUNTER) {
log.info("Creating Executor Service for uploading to broker with a thread pool of Size: " + nThreads);
threadPoolExecutor = getThreadPoolExecutor(nThreads);
}
var subscriber = this.getSubscriber(cce);
this.startSubscriber(subscriber, cce);
this.checkActivity(threadPoolExecutor, subscribeCounter++);
// send patches of {{ messagesPerIteration }}
while (this.messages.size() > patchSize) {
if (poolIsReady(threadPoolExecutor, nThreads)) {
UploadTask task = new UploadTask(this.messages, cce, cf, patchSize);
threadPoolExecutor.submit(task);
scheduledTasks ++;
}
subscribeCounter = 0;
}
// send the rest
if (this.messages.size() > 0) {
UploadTask task = new UploadTask(this.messages, cce, cf, patchSize);
threadPoolExecutor.submit(task);
scheduledTasks ++;
subscribeCounter = 0;
}
if (scheduledTasks > 0) {
businessLogger.info("Scheduled " + scheduledTasks + " upload tasks of size upto: " + patchSize + ", preparing to start subscribing for 30 more sec") ;
scheduledTasks = 0;
}
} catch ( Exception e) {
e.printStackTrace();
businessLogger.error(e.getMessage());
}
}
您的池占用很少 space 和内存,并且在不使用时几乎不消耗 CPU。为您的池容量设置一个最大限制,并在尝试缩小它时使用它。如果要处理的消息太多,任务将排队等待空闲的执行程序池来完成任务。
如果您担心上下可扩展性,您的设计可能会被审查。您可以在集群中触发事件并在其他 pods 上并行处理它们,而不是 pod 内部的 executorPool。这些 pods 将能够根据流量向上和向下扩展(查看 Knative)