Kafka 在第一秒内生成消息很慢
Kafka is slow to produce messages in first seconds
我正在使用 kafka,我做了一个这样的制作人:
synchronized (obj) {
while (true){
long start = Instant.now().toEpochMilli();
for (int i=0; i< NUM_MSG_SEC ; i++)
{
PriceStreamingData data = PriceStreamingData.newBuilder()
.setUser(getRequest().getUser())
.setSecurity(getRequest().getSecurity())
.setTimestamp(Instant.now().toEpochMilli())
.setPrice(new Random().nextDouble()*200)
.build();
record = new ProducerRecord<>(topic, keyBuilder.build(data),
data);
producer.send(record,new Callback(){
@Override
public void onCompletion(RecordMetadata arg0, Exception arg1) {
counter.incrementAndGet();
if(arg1 != null){
arg1.printStackTrace();
}
}
});
}
long diffCiclo = Instant.now().toEpochMilli() - start;
long diff = Instant.now().toEpochMilli() - startTime;
System.out.println("Number of sent: " + counter.get() +
" Millisecond:" + (diff) + " - NumberOfSent/Diff(K): " + counter.get()/diff );
try {
if(diffCiclo >= 1000){
System.out.println("over 1 second: " + diffCiclo);
}
else {
obj.wait( 1000 - diffCiclo );
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
如您所见,它非常简单,只需创建一条新消息并发送即可。
如果我看到日志:
NumberOfSent/Diff(K)
前 10 秒表现非常糟糕
30k per second
60 秒后我有
180k per second
为什么?我怎样才能开始已经达到 180k 的进程?
我的kafka生产者配置是Follwing
Async producer ( but also with sync producer the situation dose not change)
ACKS_CONFIG = 0
BATCH_SIZE_CONFIG = 20000
COMPRESSION_TYPE_CONFIG = none
LINGER_MS_CONFIG = 0
最后一个细节:
NUM_MSG_SEC is set to 200000 or bigger number
我自己找到了解决方案,希望这个 post 对其他人也有用。
问题站在
ProducerConfig.BATCH_SIZE_CONFIG
和
ProducerConfig.LINGER_MS_CONFIG
我的参数是 20000 和 0,为了解决这个问题,我将它们设置为更高的值 200000 和 1000。最后我用参数启动了 JVM:
-XX:MinMetaspaceFreeRatio=100
-XX:MaxMetaspaceFreeRatio=100
因为我看到将元空间设置为合适的值需要更长的时间。
现在制作人直接从140k开始,1秒就180k了。
我正在使用 kafka,我做了一个这样的制作人:
synchronized (obj) {
while (true){
long start = Instant.now().toEpochMilli();
for (int i=0; i< NUM_MSG_SEC ; i++)
{
PriceStreamingData data = PriceStreamingData.newBuilder()
.setUser(getRequest().getUser())
.setSecurity(getRequest().getSecurity())
.setTimestamp(Instant.now().toEpochMilli())
.setPrice(new Random().nextDouble()*200)
.build();
record = new ProducerRecord<>(topic, keyBuilder.build(data),
data);
producer.send(record,new Callback(){
@Override
public void onCompletion(RecordMetadata arg0, Exception arg1) {
counter.incrementAndGet();
if(arg1 != null){
arg1.printStackTrace();
}
}
});
}
long diffCiclo = Instant.now().toEpochMilli() - start;
long diff = Instant.now().toEpochMilli() - startTime;
System.out.println("Number of sent: " + counter.get() +
" Millisecond:" + (diff) + " - NumberOfSent/Diff(K): " + counter.get()/diff );
try {
if(diffCiclo >= 1000){
System.out.println("over 1 second: " + diffCiclo);
}
else {
obj.wait( 1000 - diffCiclo );
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
如您所见,它非常简单,只需创建一条新消息并发送即可。 如果我看到日志:
NumberOfSent/Diff(K)
前 10 秒表现非常糟糕
30k per second
60 秒后我有
180k per second
为什么?我怎样才能开始已经达到 180k 的进程?
我的kafka生产者配置是Follwing
Async producer ( but also with sync producer the situation dose not change)
ACKS_CONFIG = 0
BATCH_SIZE_CONFIG = 20000
COMPRESSION_TYPE_CONFIG = none
LINGER_MS_CONFIG = 0
最后一个细节:
NUM_MSG_SEC is set to 200000 or bigger number
我自己找到了解决方案,希望这个 post 对其他人也有用。
问题站在
ProducerConfig.BATCH_SIZE_CONFIG
和
ProducerConfig.LINGER_MS_CONFIG
我的参数是 20000 和 0,为了解决这个问题,我将它们设置为更高的值 200000 和 1000。最后我用参数启动了 JVM:
-XX:MinMetaspaceFreeRatio=100
-XX:MaxMetaspaceFreeRatio=100
因为我看到将元空间设置为合适的值需要更长的时间。
现在制作人直接从140k开始,1秒就180k了。