Kafka Consumer 清空记录
Kafka Consumer empty records
关于这个主题有很多问题,但是,这不是一个重复的问题!
我面临的问题是我试图用 Java 14 和 Kafka 2.5.0 和我的 Consumer returns 建立一个 SpringBoot 项目的空列表记录。
这里的大多数答案都指出了一些被遗忘的属性, or to set the .
在下面的代码段中,我看不出与 docs.confluent.io, eventhough my config-settings seem unconventional (see my setting of the 有任何 逻辑上的 区别。
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public KafkaConsumer<Long, MyClass> consumerConfigs() {
Properties config = new Properties();
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);
System.setProperty("java.security.auth.login.config", ".\src\main\resources\jaas.conf");
return new KafkaConsumer<>(config);
}
}
然而这有效。我没有收到任何异常(Kafka 或其他),并且已建立连接。
// jaas.conf-file
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="serviceName"
username="username"
password="password";
};
这是我实际投票的地方:
try {
KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
consumer.subscribe(Collections.singletonList(inputTopic));
int count = 0;
Long start = System.currentTimeMillis();
Long end = System.currentTimeMillis();
while (end - start < 900_000) {
// boolean would be set to true in production
ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
System.out.println(result);
});
consumer.commitSync();
System.out.println("visualize number of loops made: " + ++count);
end = System.currentTimeMillis();
}
} catch (KafkaException e) {
e.printStackTrace();
} catch (Exception e) {
System.out.println(e.getMessage());
}
我添加了指纹和其他杂乱 以试图找出问题所在。我 运行 我的程序处于调试模式,并将断点放在这一行:
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
因此,正如人们所期望的那样,我看到每秒打印一行计数。但是由于我的Consumerreturns没有记录,它永远不会进入forEach
,因此永远不会触发我的断点。
我的题目绝对可以在云端看到,有两个分区。消息源源不断,所以我知道我应该可以捡到一些东西。
我知道连接到集群需要一些时间,但当前时间设置为一刻钟,我至少应该收到一些东西,对吧?作为替代方案,我尝试将 consumer.subscribe()
切换到 consumer.assign()
方法,我指定了我的 TopicPartition,将消费者设置为 consumer.seekToBeginning()
。它 运行 很好,但也没有返回任何内容。
在最常见的示例中找不到的另一件事是我使用自己的 类。因此,我根据 this tutorial.
实现了自定义(反)序列化程序,而不是 KafkaConsumer<String, String>
会不会是我的配置设置问题?轮询超时有问题吗? (反)序列化,还是完全不同的东西?我真的无法查明为什么我得到零记录的任何原因。任何反馈将不胜感激!
问题已解决。这不是您可以从我发布的问题中确定的任何内容,不过,我想澄清一些事情,以防其他人发现自己陷入类似配置的困境。
- 验证收到的密码确实是正确的。 捂脸
是这样的,我以为他正在连接到集群,但我的循环却一直在打印计数,因为执行了 .poll(Duration.ofMillis(1000))
方法 -> 检查他是否可以在给定的超时时间内连接-> 如果连接失败,返回零记录。不会抛出任何错误。正常情况下,2秒左右,应该就可以建立连接了。
- 检查您与数据库的连接。
您永远不希望应用程序停止,这就是为什么我设计了 myOwnKafkaService.getSomethingFromRecord(record.key(), record.value())
方法来记录所有错误,但捕获所有异常。直到我检查了日志,我才意识到我访问远程数据库的权限不正确。
- 所谓的时间戳,应该反序列化为 java.util.Date
错误解析它抛出异常,但我的方法返回 null
。正如此答案中的所有评论一样,这也归结为对此类设置缺乏经验。您会在下面找到经过纠正的 类 作为工作示例(但完全不是最佳实践)。
KafkaConfig:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public KafkaConsumer<Long, MyClass> consumerConfigs() {
Properties config = new Properties();
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100_000);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300_000);
config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);
System.setProperty("java.security.auth.login.config", ".\src\main\resources\jaas.conf");
return new KafkaConsumer<>(config);
}
}
轮询方法正文:
KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
consumer.subscribe(Collections.singletonList(inputTopic));
while (true) {
ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
System.out.println(result);
});
consumer.commitSync();
}
带有反序列化器的 MyClass 小例子:
@Data
@Slf4J
public class MyClass implements Deserializer<MyClass> {
@JsonProperty("UNIQUE_KEY")
private Long uniqueKey;
@JsonProperty("EVENT_TIMESTAMP")
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS")
private Date eventTimestamp;
@JsonProperty("SOME_OTHER_FIELD")
private String someOtherField;
@Override
public MyClass deserialize(String s, byte[] bytes) {
ObjectMapper mapper = new ObjectMapper();
MyClass event = null;
try {
event = mapper
.registerModule(new JavaTimeModule())
.readValue(bytes, MyClass.class);
} catch (Exception e) {
log.error("Something went wrong during the deserialization of the MyClass: {}", e.getMessage());
}
return event;
}
}
我希望这对将来的其他人有用。我从挫折和错误中学到了很多。
关于这个主题有很多问题,但是,这不是一个重复的问题!
我面临的问题是我试图用 Java 14 和 Kafka 2.5.0 和我的 Consumer returns 建立一个 SpringBoot 项目的空列表记录。
这里的大多数答案都指出了一些被遗忘的属性,
在下面的代码段中,我看不出与 docs.confluent.io, eventhough my config-settings seem unconventional (see my setting of the
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public KafkaConsumer<Long, MyClass> consumerConfigs() {
Properties config = new Properties();
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);
System.setProperty("java.security.auth.login.config", ".\src\main\resources\jaas.conf");
return new KafkaConsumer<>(config);
}
}
然而这有效。我没有收到任何异常(Kafka 或其他),并且已建立连接。
// jaas.conf-file
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="serviceName"
username="username"
password="password";
};
这是我实际投票的地方:
try {
KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
consumer.subscribe(Collections.singletonList(inputTopic));
int count = 0;
Long start = System.currentTimeMillis();
Long end = System.currentTimeMillis();
while (end - start < 900_000) {
// boolean would be set to true in production
ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
System.out.println(result);
});
consumer.commitSync();
System.out.println("visualize number of loops made: " + ++count);
end = System.currentTimeMillis();
}
} catch (KafkaException e) {
e.printStackTrace();
} catch (Exception e) {
System.out.println(e.getMessage());
}
我添加了指纹和其他杂乱 以试图找出问题所在。我 运行 我的程序处于调试模式,并将断点放在这一行:
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
因此,正如人们所期望的那样,我看到每秒打印一行计数。但是由于我的Consumerreturns没有记录,它永远不会进入forEach
,因此永远不会触发我的断点。
我的题目绝对可以在云端看到,有两个分区。消息源源不断,所以我知道我应该可以捡到一些东西。
我知道连接到集群需要一些时间,但当前时间设置为一刻钟,我至少应该收到一些东西,对吧?作为替代方案,我尝试将 consumer.subscribe()
切换到 consumer.assign()
方法,我指定了我的 TopicPartition,将消费者设置为 consumer.seekToBeginning()
。它 运行 很好,但也没有返回任何内容。
在最常见的示例中找不到的另一件事是我使用自己的 类。因此,我根据 this tutorial.
实现了自定义(反)序列化程序,而不是KafkaConsumer<String, String>
会不会是我的配置设置问题?轮询超时有问题吗? (反)序列化,还是完全不同的东西?我真的无法查明为什么我得到零记录的任何原因。任何反馈将不胜感激!
问题已解决。这不是您可以从我发布的问题中确定的任何内容,不过,我想澄清一些事情,以防其他人发现自己陷入类似配置的困境。
- 验证收到的密码确实是正确的。 捂脸
是这样的,我以为他正在连接到集群,但我的循环却一直在打印计数,因为执行了 .poll(Duration.ofMillis(1000))
方法 -> 检查他是否可以在给定的超时时间内连接-> 如果连接失败,返回零记录。不会抛出任何错误。正常情况下,2秒左右,应该就可以建立连接了。
- 检查您与数据库的连接。
您永远不希望应用程序停止,这就是为什么我设计了 myOwnKafkaService.getSomethingFromRecord(record.key(), record.value())
方法来记录所有错误,但捕获所有异常。直到我检查了日志,我才意识到我访问远程数据库的权限不正确。
- 所谓的时间戳,应该反序列化为 java.util.Date
错误解析它抛出异常,但我的方法返回 null
。正如此答案中的所有评论一样,这也归结为对此类设置缺乏经验。您会在下面找到经过纠正的 类 作为工作示例(但完全不是最佳实践)。
KafkaConfig:
@EnableKafka
@Configuration
public class KafkaConfig {
@Bean
public KafkaConsumer<Long, MyClass> consumerConfigs() {
Properties config = new Properties();
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100_000);
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300_000);
config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);
System.setProperty("java.security.auth.login.config", ".\src\main\resources\jaas.conf");
return new KafkaConsumer<>(config);
}
}
轮询方法正文:
KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
consumer.subscribe(Collections.singletonList(inputTopic));
while (true) {
ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
records.forEach(record -> {
MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
System.out.println(result);
});
consumer.commitSync();
}
带有反序列化器的 MyClass 小例子:
@Data
@Slf4J
public class MyClass implements Deserializer<MyClass> {
@JsonProperty("UNIQUE_KEY")
private Long uniqueKey;
@JsonProperty("EVENT_TIMESTAMP")
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS")
private Date eventTimestamp;
@JsonProperty("SOME_OTHER_FIELD")
private String someOtherField;
@Override
public MyClass deserialize(String s, byte[] bytes) {
ObjectMapper mapper = new ObjectMapper();
MyClass event = null;
try {
event = mapper
.registerModule(new JavaTimeModule())
.readValue(bytes, MyClass.class);
} catch (Exception e) {
log.error("Something went wrong during the deserialization of the MyClass: {}", e.getMessage());
}
return event;
}
}
我希望这对将来的其他人有用。我从挫折和错误中学到了很多。