如何提取 Kafka Streams 消息中嵌入的时间戳
How to extract timestamp embedded in messages in Kafka Streams
我想提取每条消息中嵌入的时间戳并将它们作为 json 有效负载发送到我的数据库中。
我想获取以下三个时间戳。
事件时间: The point in time when an event or data record occurred, i.e. was originally created “by the source”.
处理时间: The point in time when the event or data record happens to be processed by the stream processing application, i.e. when the record is being consumed.
摄取时间: The point in time when an event or data record is stored in a topic partition by a Kafka broker.
这是我的流应用程序代码:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_URL + ":9092"); // pass from env localhost:9092 ,BROKER_URL + ":9092"
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");
source_o365_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
System.out.println("========> o365_user_activity_by_date Log: " + value);
ArrayList<String> keywords = new ArrayList<String>();
try {
JSONObject send = new JSONObject();
JSONObject received = new JSONObject(value);
send.put("current_date", getCurrentDate().toString()); // UTC TIME
send.put("activity_time", received.get("CreationTime")); // CONSTANTS FINAL STATIC(Topic Names, Cassandra keys)
send.put("user_id", received.get("UserId"));
send.put("operation", received.get("Operation"));
send.put("workload", received.get("Workload"));
keywords.add(send.toString());
} catch (Exception e) {
// TODO: handle exception
System.err.println("Unable to convert to json");
e.printStackTrace();
}
return keywords;
}
}).to("o365_user_activity_by_date");
在代码中,我只是获取每条记录,对其进行一些流处理并将其发送到不同的主题。
现在,我要发送的每条记录 Event-time
、Processing-time
和 Ingestion-time
都嵌入到负载中。
我看过 FailOnInvalidTimestamp
和 WallclockTimestampExtractor
但我对如何使用它们感到困惑。
请指导我如何实现这一目标。
Timestamp
提取器只能给你一个时间戳,这个时间戳用于 time-based 操作,如 windowed-aggregations 或连接。看来你没有做任何time-based计算思想,因此,从计算的角度来看,这无关紧要。
请注意,一条记录只有一个元数据时间戳字段。此时间戳字段可用于存储可由生产者设置的 event-timestamp。作为替代方案,您可以让代理使用代理摄取时间覆盖生产者提供的时间戳。这是主题配置。
要访问记录元数据时间戳(独立于 event-time 或 ingestion-time),默认时间戳提取器会为您提供此时间戳。如果你想在你的应用程序中访问它,你需要使用 Processor API,即,在你的情况下是 .transform()
而不是 .flatMap
运算符。您的 Transformer
将使用 context
对象进行初始化,该对象允许您访问提取的时间戳。
因为一条记录只能存储一个元数据时间戳,并且因为您想将其用于代理摄取时间,所以上游生产者必须将 event-timestamp 直接放入有效负载中。
对于 processing-time,只需按照您的代码片段中的指示进行系统调用。
我想提取每条消息中嵌入的时间戳并将它们作为 json 有效负载发送到我的数据库中。
我想获取以下三个时间戳。
事件时间: The point in time when an event or data record occurred, i.e. was originally created “by the source”.
处理时间: The point in time when the event or data record happens to be processed by the stream processing application, i.e. when the record is being consumed.
摄取时间: The point in time when an event or data record is stored in a topic partition by a Kafka broker.
这是我的流应用程序代码:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BROKER_URL + ":9092"); // pass from env localhost:9092 ,BROKER_URL + ":9092"
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_o365_user_activity = builder.stream("o365_user_activity");
source_o365_user_activity.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
System.out.println("========> o365_user_activity_by_date Log: " + value);
ArrayList<String> keywords = new ArrayList<String>();
try {
JSONObject send = new JSONObject();
JSONObject received = new JSONObject(value);
send.put("current_date", getCurrentDate().toString()); // UTC TIME
send.put("activity_time", received.get("CreationTime")); // CONSTANTS FINAL STATIC(Topic Names, Cassandra keys)
send.put("user_id", received.get("UserId"));
send.put("operation", received.get("Operation"));
send.put("workload", received.get("Workload"));
keywords.add(send.toString());
} catch (Exception e) {
// TODO: handle exception
System.err.println("Unable to convert to json");
e.printStackTrace();
}
return keywords;
}
}).to("o365_user_activity_by_date");
在代码中,我只是获取每条记录,对其进行一些流处理并将其发送到不同的主题。
现在,我要发送的每条记录 Event-time
、Processing-time
和 Ingestion-time
都嵌入到负载中。
我看过 FailOnInvalidTimestamp
和 WallclockTimestampExtractor
但我对如何使用它们感到困惑。
请指导我如何实现这一目标。
Timestamp
提取器只能给你一个时间戳,这个时间戳用于 time-based 操作,如 windowed-aggregations 或连接。看来你没有做任何time-based计算思想,因此,从计算的角度来看,这无关紧要。
请注意,一条记录只有一个元数据时间戳字段。此时间戳字段可用于存储可由生产者设置的 event-timestamp。作为替代方案,您可以让代理使用代理摄取时间覆盖生产者提供的时间戳。这是主题配置。
要访问记录元数据时间戳(独立于 event-time 或 ingestion-time),默认时间戳提取器会为您提供此时间戳。如果你想在你的应用程序中访问它,你需要使用 Processor API,即,在你的情况下是 .transform()
而不是 .flatMap
运算符。您的 Transformer
将使用 context
对象进行初始化,该对象允许您访问提取的时间戳。
因为一条记录只能存储一个元数据时间戳,并且因为您想将其用于代理摄取时间,所以上游生产者必须将 event-timestamp 直接放入有效负载中。
对于 processing-time,只需按照您的代码片段中的指示进行系统调用。