如何 transform/fork Kafka 流并将其发送到特定主题?
How do I transform/fork a Kafka stream and send it over to a specific topic?
我正在尝试使用函数 "mapValues" 将原始流 "textlines" 中获得的字符串值转换为 JSONObject 消息到 newStream。然后将我在 newStream 中得到的任何内容流式传输到名为 "testoutput" 的主题上。但是每次消息实际上通过转换块时,我都会得到一个 NullPointerException,其中的错误仅指向 kafka 流库。不知道发生了什么:((
P.S。当我 fork/create 从原始流中创建一个新的kafka流时,新流是否属于原始构建器?因为我需要构建器来创建 KafkaStreams 对象并开始流式传输,所以我不确定是否需要对新流做其他事情,而不仅仅是指定它的去向 .to("topic")
//Testing a Kafka Stream Application
public class testStream {
public static void main(String[] args) throws Exception {
//Configurations
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-teststream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxxxxxxxxxxx:xxxx");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
//Building Stream
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textlines = builder.stream("mytest2");
//Printout The Inputs just for testing purposes
textlines.foreach(new ForeachAction<String, String>(){
public void apply(String key, String value){
for(int y=0; y<value.length(); y++){
System.out.print(value.charAt(y));
}
System.out.print("\n");
}
});
//Transform String Records to JSON Objects
KStream<String, JSONObject> newStream = textlines.mapValues(new ValueMapper<String,JSONObject>(){
@Override
public JSONObject apply(String value) {
JSONObject jsnobj = new JSONObject();
//If the first 4 letters of the message is "xxxx" then parse it to a
//JSON Object, otherwise create a dummy
if(value.substring(0, 4).equals("xxxx")){
jsnobj.put("Header_Title", value.substring(0, 4));
jsnobj.put("Data_Part", value.substring(4));
}else{
jsnobj.put("Header_Title", "Not xxxx");
jsnobj.put("Data_Part", "None");
}
return jsnobj;
}
});
//Specify target
newStream.to("testoutput");
//Off you go
KafkaStreams streams=new KafkaStreams(builder, props);
streams.start();
}
}
据我所知,你的问题是这一行:
newStream.to("testoutput");
newStream
具有类型 KStream<String, JSONObject>
.
但是,您的应用程序配置为默认使用 String
serde 到 serialize/deserialize 记录键和记录值:
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
这意味着,当您未在 to()
调用中提供显式 serdes 时,Kafka Streams 将尝试将您的 newStream
编写为 KStream<String, String>
(而不是 KStream<String, JSONObject>
) 回到卡夫卡。
您需要做的是在 to()
调用中提供显式 serdes:
// Sth like this
newStream.to(Serdes.String(), myJsonSerde, "testoutput");
不幸的是,Kafka 还没有包含开箱即用的 JSON serde(已计划)。幸运的是,您可以查看(并复制)Kafka 自己的 Kafka Streams 演示应用程序中包含的示例 JSON serde API:https://github.com/apache/kafka/tree/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview
@Michael:我根据您的建议修改了我的代码。多谢。我的 objective 是将读取的字符串解析为 json 值。
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream("input-topic-name");
// do stuff
Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
Serializer<JsonNode> jsonSerializer = new JsonSerializer();
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStream<String, JsonNode> newStream = textLines.mapValues(new ValueMapper<String,JsonNode>(){
@Override
public JsonNode apply(String value) {
JSONObject jsnObj = new JSONObject();
//If the first 4 letters of the message is "xxxx" then parse it to a
//JSON Object, otherwise create a dummy
jsnObj.put("Header_Title", value.toString());
ObjectMapper objectMapper = new ObjectMapper();
JsonNode json_value = null;
try {
json_value = objectMapper.readTree(jsnObj.toString());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return json_value;
}
});
newStream.to(Serdes.String(), jsonSerde, "json-output");
我正在尝试使用函数 "mapValues" 将原始流 "textlines" 中获得的字符串值转换为 JSONObject 消息到 newStream。然后将我在 newStream 中得到的任何内容流式传输到名为 "testoutput" 的主题上。但是每次消息实际上通过转换块时,我都会得到一个 NullPointerException,其中的错误仅指向 kafka 流库。不知道发生了什么:((
P.S。当我 fork/create 从原始流中创建一个新的kafka流时,新流是否属于原始构建器?因为我需要构建器来创建 KafkaStreams 对象并开始流式传输,所以我不确定是否需要对新流做其他事情,而不仅仅是指定它的去向 .to("topic")
//Testing a Kafka Stream Application
public class testStream {
public static void main(String[] args) throws Exception {
//Configurations
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-teststream");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "xxxxxxxxxxxx:xxxx");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
//Building Stream
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textlines = builder.stream("mytest2");
//Printout The Inputs just for testing purposes
textlines.foreach(new ForeachAction<String, String>(){
public void apply(String key, String value){
for(int y=0; y<value.length(); y++){
System.out.print(value.charAt(y));
}
System.out.print("\n");
}
});
//Transform String Records to JSON Objects
KStream<String, JSONObject> newStream = textlines.mapValues(new ValueMapper<String,JSONObject>(){
@Override
public JSONObject apply(String value) {
JSONObject jsnobj = new JSONObject();
//If the first 4 letters of the message is "xxxx" then parse it to a
//JSON Object, otherwise create a dummy
if(value.substring(0, 4).equals("xxxx")){
jsnobj.put("Header_Title", value.substring(0, 4));
jsnobj.put("Data_Part", value.substring(4));
}else{
jsnobj.put("Header_Title", "Not xxxx");
jsnobj.put("Data_Part", "None");
}
return jsnobj;
}
});
//Specify target
newStream.to("testoutput");
//Off you go
KafkaStreams streams=new KafkaStreams(builder, props);
streams.start();
}
}
据我所知,你的问题是这一行:
newStream.to("testoutput");
newStream
具有类型 KStream<String, JSONObject>
.
但是,您的应用程序配置为默认使用 String
serde 到 serialize/deserialize 记录键和记录值:
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
这意味着,当您未在 to()
调用中提供显式 serdes 时,Kafka Streams 将尝试将您的 newStream
编写为 KStream<String, String>
(而不是 KStream<String, JSONObject>
) 回到卡夫卡。
您需要做的是在 to()
调用中提供显式 serdes:
// Sth like this
newStream.to(Serdes.String(), myJsonSerde, "testoutput");
不幸的是,Kafka 还没有包含开箱即用的 JSON serde(已计划)。幸运的是,您可以查看(并复制)Kafka 自己的 Kafka Streams 演示应用程序中包含的示例 JSON serde API:https://github.com/apache/kafka/tree/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview
@Michael:我根据您的建议修改了我的代码。多谢。我的 objective 是将读取的字符串解析为 json 值。
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream("input-topic-name");
// do stuff
Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
Serializer<JsonNode> jsonSerializer = new JsonSerializer();
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStream<String, JsonNode> newStream = textLines.mapValues(new ValueMapper<String,JsonNode>(){
@Override
public JsonNode apply(String value) {
JSONObject jsnObj = new JSONObject();
//If the first 4 letters of the message is "xxxx" then parse it to a
//JSON Object, otherwise create a dummy
jsnObj.put("Header_Title", value.toString());
ObjectMapper objectMapper = new ObjectMapper();
JsonNode json_value = null;
try {
json_value = objectMapper.readTree(jsnObj.toString());
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return json_value;
}
});
newStream.to(Serdes.String(), jsonSerde, "json-output");