Pulsar 客户端线程平衡
Pulsar client thread balance
我正在尝试实现一个具有多个生产者的 Pulsar 客户端,该客户端在线程之间分配负载,但无论传递给 ioThreads() 和 listenerThreads() 的值如何,它总是使第一个线程过载(> 65 % cpu 而其他线程完全空闲)
我尝试了一些方法,包括每小时“动态重新平衡”(最后一种方法),但在过程中关闭它肯定不是最好的方法
这是相关代码
...
// pulsar client
pulsarClient = PulsarClient.builder() //
.operationTimeout(config.getAppPulsarTimeout(), TimeUnit.SECONDS) //
.ioThreads(config.getAppPulsarClientThreads()) //
.listenerThreads(config.getAppPulsarClientThreads()) //
.serviceUrl(config.getPulsarServiceUrl()).build();
...
private createProducers() {
String strConsumerTopic = this.config.getPulsarTopicInput();
List<Integer> protCasesList = this.config.getEventProtoCaseList();
for (Integer e : protCasesList) {
String topicName = config.getPulsarTopicOutput().concat(String.valueOf(e));
LOG.info("Creating producer for topic: {}", topicName);
Producer<byte[]> protobufProducer = pulsarClient.newProducer().topic(topicName).enableBatching(false)
.blockIfQueueFull(true).compressionType(CompressionType.NONE)
.sendTimeout(config.getPulsarSendTimeout(), TimeUnit.SECONDS)
.maxPendingMessages(config.getPulsarMaxPendingMessages()).create();
this.mapLink.put(strConsumerTopic.concat(String.valueOf(e)), protobufProducer);
}
}
public void closeProducers() {
String strConsumerTopic = this.config.getPulsarTopicInput();
List<Integer> protCasesList = this.config.getEventProtoCaseList();
for (Integer e : protCasesList) {
try {
this.mapLink.get(strConsumerTopic.concat(String.valueOf(e))).close();
LOG.info("{} producer correctly closed...",
this.mapLink.get(strConsumerTopic.concat(String.valueOf(e))).getProducerName());
} catch (PulsarClientException e1) {
LOG.error("Producer: {} not closed cause: {}",
this.mapLink.get(strConsumerTopic.concat(String.valueOf(e))).getProducerName(),
e1.getMessage());
}
}
}
public void rebalancePulsarThreads(boolean firstRun) {
ThreadMXBean threadHandler = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadsInfo = threadHandler.getThreadInfo(threadHandler.getAllThreadIds());
for (ThreadInfo threadInfo : threadsInfo) {
if (threadInfo.getThreadName().contains("pulsar-client-io")) {
// enable cpu time for all threads
threadHandler.setThreadCpuTimeEnabled(true);
// get cpu time for this specific thread
long threadCPUTime = threadHandler.getThreadCpuTime(threadInfo.getThreadId());
int thresholdCPUTime = 65;
if (threadCPUTime > thresholdCPUTime) {
LOG.warn("Pulsar client thread with CPU time greater than {}% - REBALANCING now", thresholdCPUTime);
try {
closeProducers();
} catch (Exception e) {
if (!firstRun) {
// producers will not be available in the first run
// therefore, the logging only happens when it is not the first run
LOG.warn("Unable to close Pulsar client threads on rebalancing: {}", e.getMessage());
}
}
try {
createPulsarProducers();
} catch (Exception e) {
LOG.warn("Unable to create Pulsar client threads on rebalancing: {}", e.getMessage());
}
}
}
}
}
根据您的描述,最有可能的情况是您使用的所有主题都由一个代理提供服务。
如果情况确实如此,并且避免代理之间的主题负载平衡,则使用单个线程是正常的,因为所有这些生产者将共享一个池化的 TCP 连接,并且每个连接都分配给 1 个 IO 线程(侦听器线程用于消费者侦听器)。
如果您想强制使用更多线程,可以增加“每个代理的最大 TCP 连接数”设置,以便使用所有已配置的 IO 线程。
例如:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.ioThreads(16)
.connectionsPerBroker(16)
.create();
我正在尝试实现一个具有多个生产者的 Pulsar 客户端,该客户端在线程之间分配负载,但无论传递给 ioThreads() 和 listenerThreads() 的值如何,它总是使第一个线程过载(> 65 % cpu 而其他线程完全空闲)
我尝试了一些方法,包括每小时“动态重新平衡”(最后一种方法),但在过程中关闭它肯定不是最好的方法
这是相关代码
...
// pulsar client
pulsarClient = PulsarClient.builder() //
.operationTimeout(config.getAppPulsarTimeout(), TimeUnit.SECONDS) //
.ioThreads(config.getAppPulsarClientThreads()) //
.listenerThreads(config.getAppPulsarClientThreads()) //
.serviceUrl(config.getPulsarServiceUrl()).build();
...
private createProducers() {
String strConsumerTopic = this.config.getPulsarTopicInput();
List<Integer> protCasesList = this.config.getEventProtoCaseList();
for (Integer e : protCasesList) {
String topicName = config.getPulsarTopicOutput().concat(String.valueOf(e));
LOG.info("Creating producer for topic: {}", topicName);
Producer<byte[]> protobufProducer = pulsarClient.newProducer().topic(topicName).enableBatching(false)
.blockIfQueueFull(true).compressionType(CompressionType.NONE)
.sendTimeout(config.getPulsarSendTimeout(), TimeUnit.SECONDS)
.maxPendingMessages(config.getPulsarMaxPendingMessages()).create();
this.mapLink.put(strConsumerTopic.concat(String.valueOf(e)), protobufProducer);
}
}
public void closeProducers() {
String strConsumerTopic = this.config.getPulsarTopicInput();
List<Integer> protCasesList = this.config.getEventProtoCaseList();
for (Integer e : protCasesList) {
try {
this.mapLink.get(strConsumerTopic.concat(String.valueOf(e))).close();
LOG.info("{} producer correctly closed...",
this.mapLink.get(strConsumerTopic.concat(String.valueOf(e))).getProducerName());
} catch (PulsarClientException e1) {
LOG.error("Producer: {} not closed cause: {}",
this.mapLink.get(strConsumerTopic.concat(String.valueOf(e))).getProducerName(),
e1.getMessage());
}
}
}
public void rebalancePulsarThreads(boolean firstRun) {
ThreadMXBean threadHandler = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadsInfo = threadHandler.getThreadInfo(threadHandler.getAllThreadIds());
for (ThreadInfo threadInfo : threadsInfo) {
if (threadInfo.getThreadName().contains("pulsar-client-io")) {
// enable cpu time for all threads
threadHandler.setThreadCpuTimeEnabled(true);
// get cpu time for this specific thread
long threadCPUTime = threadHandler.getThreadCpuTime(threadInfo.getThreadId());
int thresholdCPUTime = 65;
if (threadCPUTime > thresholdCPUTime) {
LOG.warn("Pulsar client thread with CPU time greater than {}% - REBALANCING now", thresholdCPUTime);
try {
closeProducers();
} catch (Exception e) {
if (!firstRun) {
// producers will not be available in the first run
// therefore, the logging only happens when it is not the first run
LOG.warn("Unable to close Pulsar client threads on rebalancing: {}", e.getMessage());
}
}
try {
createPulsarProducers();
} catch (Exception e) {
LOG.warn("Unable to create Pulsar client threads on rebalancing: {}", e.getMessage());
}
}
}
}
}
根据您的描述,最有可能的情况是您使用的所有主题都由一个代理提供服务。
如果情况确实如此,并且避免代理之间的主题负载平衡,则使用单个线程是正常的,因为所有这些生产者将共享一个池化的 TCP 连接,并且每个连接都分配给 1 个 IO 线程(侦听器线程用于消费者侦听器)。
如果您想强制使用更多线程,可以增加“每个代理的最大 TCP 连接数”设置,以便使用所有已配置的 IO 线程。
例如:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.ioThreads(16)
.connectionsPerBroker(16)
.create();