当消息映射到另一个流时,流中消息的时间戳会发生什么变化?
What happens to the timestamp of a message in a stream when it's mapped into another stream?
我有一个应用程序,我在其中处理流并将其转换为另一个流。这是一个示例:
public void run(final String... args) {
final Serde<Event> eventSerde = new EventSerde();
final Properties props = streamingConfig.getProperties(
applicationName,
concurrency,
Serdes.String(),
eventSerde
);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, EventTimestampExtractor.class);
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Event> eventStream = builder.stream(inputStream);
final Serde<Device> deviceSerde = new DeviceSerde();
eventStream
.map((key, event) -> {
final Device device = modelMapper.map(event, Device.class);
return new KeyValue<>(key, device);
})
.to("device_topic", Produced.with(Serdes.String(), deviceSerde));
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
}
以下是有关该应用的一些详细信息:
Spring Boot 1.5.17
Kafka 2.1.0
Kafka Streams 2.1.0
Spring Kafka 1.3.6
虽然在输入流内的消息中设置了时间戳,但我还放置了 TimestampExtractor 的实现以确保将正确的时间戳附加到所有消息中(因为其他生产者可能将消息发送到同一主题)。
在代码中,我收到了事件流,基本上将它们转换为不同的对象,并最终将这些对象路由到不同的流中。
我试图了解在这种特殊情况下,我设置的初始时间戳是否仍附加到发布到 device_topic 的消息中。
接收端(设备流)是这样的:
@KafkaListener(topics = "device_topic")
public void onDeviceReceive(final Device device, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) final long timestamp) {
log.trace("[{}] Received device: {}", timestamp, device);
}
不幸的是,打印的时间戳似乎是挂钟时间。这是预期的行为还是我遗漏了什么?
Spring Kafka 1.3.x 使用非常老的 0.11 客户端;也许它不会传播时间戳。我刚刚使用 Boot 2.1.3 和 Spring Kafka 2.2.4 进行了测试,时间戳传播正常...
@SpringBootApplication
@EnableKafkaStreams
public class So54771130Application {
public static void main(String[] args) {
SpringApplication.run(So54771130Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so54771130", 0, 42L, null, "baz");
};
}
@Bean
public KStream<String, String> stream(StreamsBuilder builder) {
KStream<String, String> stream = builder.stream("so54771130");
stream
.map((k, v) -> {
System.out.println("Mapping:" + v);
return new KeyValue<>(null, "bar");
})
.to("so54771130-1");
return stream;
}
@Bean
public NewTopic topic1() {
return new NewTopic("so54771130", 1, (short) 1);
}
@Bean
public NewTopic topic2() {
return new NewTopic("so54771130-1", 1, (short) 1);
}
@KafkaListener(id = "so54771130", topics = "so54771130-1")
public void listen(String in, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
System.out.println(in + "@" + ts);
}
}
和
Mapping:baz
bar@42
我有一个应用程序,我在其中处理流并将其转换为另一个流。这是一个示例:
public void run(final String... args) {
final Serde<Event> eventSerde = new EventSerde();
final Properties props = streamingConfig.getProperties(
applicationName,
concurrency,
Serdes.String(),
eventSerde
);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, EventTimestampExtractor.class);
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, Event> eventStream = builder.stream(inputStream);
final Serde<Device> deviceSerde = new DeviceSerde();
eventStream
.map((key, event) -> {
final Device device = modelMapper.map(event, Device.class);
return new KeyValue<>(key, device);
})
.to("device_topic", Produced.with(Serdes.String(), deviceSerde));
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
}
以下是有关该应用的一些详细信息:
Spring Boot 1.5.17
Kafka 2.1.0
Kafka Streams 2.1.0
Spring Kafka 1.3.6
虽然在输入流内的消息中设置了时间戳,但我还放置了 TimestampExtractor 的实现以确保将正确的时间戳附加到所有消息中(因为其他生产者可能将消息发送到同一主题)。
在代码中,我收到了事件流,基本上将它们转换为不同的对象,并最终将这些对象路由到不同的流中。
我试图了解在这种特殊情况下,我设置的初始时间戳是否仍附加到发布到 device_topic 的消息中。
接收端(设备流)是这样的:
@KafkaListener(topics = "device_topic")
public void onDeviceReceive(final Device device, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) final long timestamp) {
log.trace("[{}] Received device: {}", timestamp, device);
}
不幸的是,打印的时间戳似乎是挂钟时间。这是预期的行为还是我遗漏了什么?
Spring Kafka 1.3.x 使用非常老的 0.11 客户端;也许它不会传播时间戳。我刚刚使用 Boot 2.1.3 和 Spring Kafka 2.2.4 进行了测试,时间戳传播正常...
@SpringBootApplication
@EnableKafkaStreams
public class So54771130Application {
public static void main(String[] args) {
SpringApplication.run(So54771130Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so54771130", 0, 42L, null, "baz");
};
}
@Bean
public KStream<String, String> stream(StreamsBuilder builder) {
KStream<String, String> stream = builder.stream("so54771130");
stream
.map((k, v) -> {
System.out.println("Mapping:" + v);
return new KeyValue<>(null, "bar");
})
.to("so54771130-1");
return stream;
}
@Bean
public NewTopic topic1() {
return new NewTopic("so54771130", 1, (short) 1);
}
@Bean
public NewTopic topic2() {
return new NewTopic("so54771130-1", 1, (short) 1);
}
@KafkaListener(id = "so54771130", topics = "so54771130-1")
public void listen(String in, @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
System.out.println(in + "@" + ts);
}
}
和
Mapping:baz
bar@42