我的 Kafka 流应用程序刚刚退出,代码 0 什么都不做
My Kafka streaming application just exit with code 0 doing nothing
为了尝试 Kafka 流,我这样做了:
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "generic-avro-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.BOOTSTRAP_SERVER);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringDeserializer.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaAvroDeserializer.class);
streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, Utils.SCHEMA_REGISTRY_URL);
builder.stream(Utils.ALL_FX_EVENTS_TOPIC).foreach((key, value) -> System.out.println(key));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
}
但是当我在本地 运行 时,我只得到这个 :
基本上我 运行 它来自我的 IDE,1 秒后它就停止了,而它应该等待主题中推送的新事件。
没看懂
kafka 主题在另一台机器上,但我也编写了一个非常简单的消费者代码,我能够从这个远程主题读取消息。
出于某种原因,这个非常简单的 kafka 流应用程序退出时代码为 0。
我无能为力,知道吗?
由于问题似乎与此处的 slf4j 依赖项有关,因此 pom :
4.0.0
罐
<name>Ingestor :: Bigdata :: Ingestor</name>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<parent>
<groupId>com.parent-pom</groupId>
<artifactId>parent-pom</artifactId>
<version>4.0.2</version>
</parent>
<groupId>com</groupId>
<artifactId>ingestor</artifactId>
<version>1.0.1-SNAPSHOT</version>
<properties>
<sq.artifact.type>internal</sq.artifact.type>
<maven-compiler-plugin.version>3.1</maven-compiler-plugin.version>
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
<maven.compiler.source>7</maven.compiler.source>
<maven.compiler.target>7</maven.compiler.target>
<revision>1.0.0-SNAPSHOT</revision>
<sq.scs>fx-dan</sq.scs>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>com</groupId>
<artifactId>libs-schemas</artifactId>
<version>1.0.6-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>common-config</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>common-utils</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-serializer</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
更新:
除了缺少 log4j 属性文件之外,错误是由于 Serdes 的配置错误造成的。
更新后的代码如下所示:
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "generic-avro-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.BOOTSTRAP_SERVER);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, Utils.SCHEMA_REGISTRY_URL);
final Serde<String> stringSerde = Serdes.String();
final Serde<AllTypesFxEvents> specificAvroSerde = new SpecificAvroSerde<>();
final boolean isKeySerde = false;
specificAvroSerde.configure(Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, Utils.SCHEMA_REGISTRY_URL),
isKeySerde);
builder.stream(Utils.ALL_FX_EVENTS_TOPIC).foreach((key, value) -> System.out.println(key));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.cleanUp();
kafkaStreams.start();
我没有异常堆栈了,但是 link 帮助我修复了它。
proper guide for java kafka stream with avro schema registry
你的代码对我有用(即使有错误 values-at 至少不会终止)。请在您的代码中使用 logback 并将记录器级别保持为 DEBUG。这样你就可以仔细观察你的 kafka 流启动时发生了什么。
可能 kafka 线程由于某些我们不能这样猜测的原因而终止。
PS: 对不起,我没有添加评论的声誉。
我解决了我的问题,这里是摘要。
第一个问题是我没有正确设置slf4j。 Kafka 期望有 slf4j 可用,因为这是它用来打印它遇到的错误的东西。没有它会默默地让我的程序失败。
要解决此问题,您需要在您的资源文件夹(在您的 src 文件夹中)中添加一个 log4j.properties 文件。
我的看起来像这样(它不打印 DEBUG 日志):
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
获得日志后,我得到了关于序列化错误的堆栈跟踪。我没有日志了,但另一个让我修复它的 SO post 是:
Use Kafka Streams with Avro Schema Registry
基本上我在 Serde 配置中使用了错误的对象。
更新后的代码如下所示:
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "generic-avro-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.BOOTSTRAP_SERVER);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, Utils.SCHEMA_REGISTRY_URL);
final Serde<String> stringSerde = Serdes.String();
final Serde<AllTypesFxEvents> specificAvroSerde = new SpecificAvroSerde<>();
final boolean isKeySerde = false;
specificAvroSerde.configure(Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, Utils.SCHEMA_REGISTRY_URL),
isKeySerde);
builder.stream(Utils.ALL_FX_EVENTS_TOPIC).foreach((key, value) -> System.out.println(key));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.cleanUp();
kafkaStreams.start();
为了尝试 Kafka 流,我这样做了:
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "generic-avro-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.BOOTSTRAP_SERVER);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, StringDeserializer.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaAvroDeserializer.class);
streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, Utils.SCHEMA_REGISTRY_URL);
builder.stream(Utils.ALL_FX_EVENTS_TOPIC).foreach((key, value) -> System.out.println(key));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.start();
}
但是当我在本地 运行 时,我只得到这个 :
基本上我 运行 它来自我的 IDE,1 秒后它就停止了,而它应该等待主题中推送的新事件。
没看懂
kafka 主题在另一台机器上,但我也编写了一个非常简单的消费者代码,我能够从这个远程主题读取消息。
出于某种原因,这个非常简单的 kafka 流应用程序退出时代码为 0。 我无能为力,知道吗?
由于问题似乎与此处的 slf4j 依赖项有关,因此 pom :
4.0.0 罐<name>Ingestor :: Bigdata :: Ingestor</name>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<parent>
<groupId>com.parent-pom</groupId>
<artifactId>parent-pom</artifactId>
<version>4.0.2</version>
</parent>
<groupId>com</groupId>
<artifactId>ingestor</artifactId>
<version>1.0.1-SNAPSHOT</version>
<properties>
<sq.artifact.type>internal</sq.artifact.type>
<maven-compiler-plugin.version>3.1</maven-compiler-plugin.version>
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
<maven.compiler.source>7</maven.compiler.source>
<maven.compiler.target>7</maven.compiler.target>
<revision>1.0.0-SNAPSHOT</revision>
<sq.scs>fx-dan</sq.scs>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>com</groupId>
<artifactId>libs-schemas</artifactId>
<version>1.0.6-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>common-config</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>common-utils</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-serializer</artifactId>
<version>7.0.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.0.1-jre</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
更新:
除了缺少 log4j 属性文件之外,错误是由于 Serdes 的配置错误造成的。
更新后的代码如下所示:
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "generic-avro-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.BOOTSTRAP_SERVER);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, Utils.SCHEMA_REGISTRY_URL);
final Serde<String> stringSerde = Serdes.String();
final Serde<AllTypesFxEvents> specificAvroSerde = new SpecificAvroSerde<>();
final boolean isKeySerde = false;
specificAvroSerde.configure(Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, Utils.SCHEMA_REGISTRY_URL),
isKeySerde);
builder.stream(Utils.ALL_FX_EVENTS_TOPIC).foreach((key, value) -> System.out.println(key));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.cleanUp();
kafkaStreams.start();
我没有异常堆栈了,但是 link 帮助我修复了它。
proper guide for java kafka stream with avro schema registry
你的代码对我有用(即使有错误 values-at 至少不会终止)。请在您的代码中使用 logback 并将记录器级别保持为 DEBUG。这样你就可以仔细观察你的 kafka 流启动时发生了什么。 可能 kafka 线程由于某些我们不能这样猜测的原因而终止。
PS: 对不起,我没有添加评论的声誉。
我解决了我的问题,这里是摘要。
第一个问题是我没有正确设置slf4j。 Kafka 期望有 slf4j 可用,因为这是它用来打印它遇到的错误的东西。没有它会默默地让我的程序失败。
要解决此问题,您需要在您的资源文件夹(在您的 src 文件夹中)中添加一个 log4j.properties 文件。
我的看起来像这样(它不打印 DEBUG 日志):
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
获得日志后,我得到了关于序列化错误的堆栈跟踪。我没有日志了,但另一个让我修复它的 SO post 是:
Use Kafka Streams with Avro Schema Registry
基本上我在 Serde 配置中使用了错误的对象。
更新后的代码如下所示:
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "generic-avro-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.BOOTSTRAP_SERVER);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
streamsConfiguration.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, Utils.SCHEMA_REGISTRY_URL);
final Serde<String> stringSerde = Serdes.String();
final Serde<AllTypesFxEvents> specificAvroSerde = new SpecificAvroSerde<>();
final boolean isKeySerde = false;
specificAvroSerde.configure(Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, Utils.SCHEMA_REGISTRY_URL),
isKeySerde);
builder.stream(Utils.ALL_FX_EVENTS_TOPIC).foreach((key, value) -> System.out.println(key));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
kafkaStreams.cleanUp();
kafkaStreams.start();