如何使 Serdes 与多步 kafka 流一起工作
How to make Serdes work with multi-step kafka streams
我是 Kafka 的新手,我正在使用 Twitter API 作为数据源构建一个入门项目。我已经创建了一个生产者,它可以查询 Twitter API 并将数据发送到我的 kafka 主题,其中包含用于键和值的字符串序列化程序。我的 Kafka Stream 应用程序读取此数据并进行字数统计,但也按推文的日期进行分组。这部分是通过一个名为 wordCounts 的 KTable 来完成的,以利用它的更新插入功能。这个KTable的结构是:
键:{单词:exampleWord,日期:exampleDate},值:numberOfOccurences
然后我尝试将 KTable 流中的数据重组为平面结构,以便稍后将其发送到数据库。您可以在 wordCountsStructured KStream 对象中看到这一点。这会将数据重组为如下所示的结构。该值最初是 JsonObject 但我将其转换为字符串以匹配我设置的 Serdes。
Key: null, Value: {word: exampleWord, date: exampleDate, Counts: numberOfOccurences}
但是,当我尝试将其发送到我的第二个 kafka 主题时,出现以下错误。
A serializer (key:
org.apache.kafka.common.serialization.StringSerializer / value:
org.apache.kafka.common.serialization.StringSerializer) is not
compatible to the actual key or value type (key type:
com.google.gson.JsonObject / value type: com.google.gson.JsonObject).
Change the default Serdes in StreamConfig or provide correct Serdes
via method parameters.
我对此感到困惑,因为我发送到主题的 KStream 是 <String, String>
类型。有谁知道我该如何解决这个问题?
public class TwitterWordCounter {
private final JsonParser jsonParser = new JsonParser();
public Topology createTopology(){
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("test-topic2");
KTable<JsonObject, Long> wordCounts = textLines
//parse each tweet as a tweet object
.mapValues(tweetString -> new Gson().fromJson(jsonParser.parse(tweetString).getAsJsonObject(), Tweet.class))
//map each tweet object to a list of json objects, each of which containing a word from the tweet and the date of the tweet
.flatMapValues(TwitterWordCounter::tweetWordDateMapper)
//update the key so it matches the word-date combination so we can do a groupBy and count instances
.selectKey((key, wordDate) -> wordDate)
.groupByKey()
.count(Materialized.as("Counts"));
/*
In order to structure the data so that it can be ingested into SQL, the value of each item in the stream must be straightforward: property, value
so we have to:
1. take the columns which include the dimensional data and put this into the value of the stream.
2. lable the count with 'count' as the column name
*/
KStream<String, String> wordCountsStructured = wordCounts.toStream()
.map((key, value) -> new KeyValue<>(null, MapValuesToIncludeColumnData(key, value).toString()));
KStream<String, String> wordCountsPeek = wordCountsStructured.peek(
(key, value) -> System.out.println("key: " + key + "value:" + value)
);
wordCountsStructured.to("test-output2", Produced.with(Serdes.String(), Serdes.String()));
return builder.build();
}
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application1111");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "myIPAddress");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
TwitterWordCounter wordCountApp = new TwitterWordCounter();
KafkaStreams streams = new KafkaStreams(wordCountApp.createTopology(), config);
streams.start();
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
//this method is used for taking a tweet and transforming it to a representation of the words in it plus the date
public static List<JsonObject> tweetWordDateMapper(Tweet tweet) {
try{
List<String> words = Arrays.asList(tweet.tweetText.split("\W+"));
List<JsonObject> tweetsJson = new ArrayList<JsonObject>();
for(String word: words) {
JsonObject tweetJson = new JsonObject();
tweetJson.add("date", new JsonPrimitive(tweet.formattedDate().toString()));
tweetJson.add("word", new JsonPrimitive(word));
tweetsJson.add(tweetJson);
}
return tweetsJson;
}
catch (Exception e) {
System.out.println(e);
System.out.println(tweet.serialize().toString());
return new ArrayList<JsonObject>();
}
}
public JsonObject MapValuesToIncludeColumnData(JsonObject key, Long countOfWord) {
key.addProperty("count", countOfWord); //new JsonPrimitive(count));
return key;
}
因为您在 groupBy() 之前执行键更改操作,它将创建一个重新分区主题,并且对于该主题,它将依赖于您已设置为 String Serde 的默认键值 serdes。
您可以将 groupBy()
调用修改为 groupBy(Grouped.with(StringSerde,JsonSerde)
,这应该会有所帮助。
我是 Kafka 的新手,我正在使用 Twitter API 作为数据源构建一个入门项目。我已经创建了一个生产者,它可以查询 Twitter API 并将数据发送到我的 kafka 主题,其中包含用于键和值的字符串序列化程序。我的 Kafka Stream 应用程序读取此数据并进行字数统计,但也按推文的日期进行分组。这部分是通过一个名为 wordCounts 的 KTable 来完成的,以利用它的更新插入功能。这个KTable的结构是:
键:{单词:exampleWord,日期:exampleDate},值:numberOfOccurences
然后我尝试将 KTable 流中的数据重组为平面结构,以便稍后将其发送到数据库。您可以在 wordCountsStructured KStream 对象中看到这一点。这会将数据重组为如下所示的结构。该值最初是 JsonObject 但我将其转换为字符串以匹配我设置的 Serdes。
Key: null, Value: {word: exampleWord, date: exampleDate, Counts: numberOfOccurences}
但是,当我尝试将其发送到我的第二个 kafka 主题时,出现以下错误。
A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: com.google.gson.JsonObject / value type: com.google.gson.JsonObject). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
我对此感到困惑,因为我发送到主题的 KStream 是 <String, String>
类型。有谁知道我该如何解决这个问题?
public class TwitterWordCounter {
private final JsonParser jsonParser = new JsonParser();
public Topology createTopology(){
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("test-topic2");
KTable<JsonObject, Long> wordCounts = textLines
//parse each tweet as a tweet object
.mapValues(tweetString -> new Gson().fromJson(jsonParser.parse(tweetString).getAsJsonObject(), Tweet.class))
//map each tweet object to a list of json objects, each of which containing a word from the tweet and the date of the tweet
.flatMapValues(TwitterWordCounter::tweetWordDateMapper)
//update the key so it matches the word-date combination so we can do a groupBy and count instances
.selectKey((key, wordDate) -> wordDate)
.groupByKey()
.count(Materialized.as("Counts"));
/*
In order to structure the data so that it can be ingested into SQL, the value of each item in the stream must be straightforward: property, value
so we have to:
1. take the columns which include the dimensional data and put this into the value of the stream.
2. lable the count with 'count' as the column name
*/
KStream<String, String> wordCountsStructured = wordCounts.toStream()
.map((key, value) -> new KeyValue<>(null, MapValuesToIncludeColumnData(key, value).toString()));
KStream<String, String> wordCountsPeek = wordCountsStructured.peek(
(key, value) -> System.out.println("key: " + key + "value:" + value)
);
wordCountsStructured.to("test-output2", Produced.with(Serdes.String(), Serdes.String()));
return builder.build();
}
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application1111");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "myIPAddress");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
TwitterWordCounter wordCountApp = new TwitterWordCounter();
KafkaStreams streams = new KafkaStreams(wordCountApp.createTopology(), config);
streams.start();
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
//this method is used for taking a tweet and transforming it to a representation of the words in it plus the date
public static List<JsonObject> tweetWordDateMapper(Tweet tweet) {
try{
List<String> words = Arrays.asList(tweet.tweetText.split("\W+"));
List<JsonObject> tweetsJson = new ArrayList<JsonObject>();
for(String word: words) {
JsonObject tweetJson = new JsonObject();
tweetJson.add("date", new JsonPrimitive(tweet.formattedDate().toString()));
tweetJson.add("word", new JsonPrimitive(word));
tweetsJson.add(tweetJson);
}
return tweetsJson;
}
catch (Exception e) {
System.out.println(e);
System.out.println(tweet.serialize().toString());
return new ArrayList<JsonObject>();
}
}
public JsonObject MapValuesToIncludeColumnData(JsonObject key, Long countOfWord) {
key.addProperty("count", countOfWord); //new JsonPrimitive(count));
return key;
}
因为您在 groupBy() 之前执行键更改操作,它将创建一个重新分区主题,并且对于该主题,它将依赖于您已设置为 String Serde 的默认键值 serdes。
您可以将 groupBy()
调用修改为 groupBy(Grouped.with(StringSerde,JsonSerde)
,这应该会有所帮助。