线程 "StreamThread-1" java.lang.IllegalArgumentException 中的 Kafka-Stream 异常:无效的时间戳 -1

Kafka-Stream Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1

我正在构建一个简单的 k-stream 应用程序,它将数据从现有主题流式传输到另一个主题。

我的制作人 streaming meetup's open data 正在处理名为 test 的主题,我想将其处理为 testout 主题

代码如下:

package com.mycompany.app;

import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Properties;


public class App {

    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-first-streams-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 10);

        KStreamBuilder builder = new KStreamBuilder();

        builder.stream("test").to("testout");;

        KafkaStreams streams = new KafkaStreams(builder, props);
        streams.start();

        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            @Override
            public void run() {
                streams.close();
            }
        }));
    }
}

但是,当我 运行 应用程序时:

java -cp target/my-app-1.0-SNAPSHOT.jar com.mycompany.app.App

我遇到了这个异常:

Exception in thread "StreamThread-1" java.lang.IllegalArgumentException: Invalid timestamp -1
    at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:60)
    at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:72)
    at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)

我找了这么久,还是没找到为什么会出现这个错误。

有什么想法吗?

发件人:http://docs.confluent.io/3.1.0/streams/faq.html#invalid-timestamp-exception

This error means that the timestamp extractor of your Kafka Streams application failed to extract a valid timestamp from a record. Typically, this points to a problem with the record (e.g., the record does not contain a timestamp at all), but it could also indicate a problem or bug in the timestamp extractor used by the application.

When does a record not contain a valid timestamp:

  • If you are using the default ConsumerRecordTimestampExtractor, it is most likely that your records do not carry an embedded timestamp (embedded record timestamps got introduced in Kafka’s message format in Kafka 0.10). This might happen, if for example, you consume a topic that is written by old Kafka producer clients (i.e., version 0.9 or earlier) or by third-party producer clients. Another situation where this may happen is after upgrading your Kafka cluster from 0.9 to 0.10, where all the data that was generated with 0.9 does not include the 0.10 message timestamps.
  • If you are using a custom timestamp extractor, make sure that your extractor is properly handling invalid (negative) timestamps, where “properly” depends on the semantics of your application. For example, you can return a default or an estimated timestamp if you cannot extract a valid timestamp (maybe the timestamp field in your data is just missing).
  • You can also switch to processing-time semantics via WallclockTimestampExtractor; whether such a fallback is an appropriate response to this situation depends on your use case. However, as a first step you should identify and fix the root cause for why such problematic records were written to Kafka in the first place. In a second step you may consider applying workarounds (as described above) when dealing with such records (for example, if you need to process those records after all). Another option is to regenerate the records with correct timestamps and write them to a new Kafka topic.