Java - Flink 在 kafka 接收器上发送空对象
Java - Flink sending empty object on kafka sink
在我的 flink 脚本上,我有一个从一个 kafka 主题获取的流,对其进行操作并使用接收器将其发送回 kafka。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties p = new Properties();
p.setProperty("bootstrap.servers", servers_ip_list);
p.setProperty("gropu.id", "Flink");
FlinkKafkaConsumer<Event_N> kafkaData_N =
new FlinkKafkaConsumer("CorID_0", new Ev_Des_Sch_N(), p);
WatermarkStrategy<Event_N> wmStrategy =
WatermarkStrategy
.<Event_N>forMonotonousTimestamps()
.withIdleness(Duration.ofMinutes(1))
.withTimestampAssigner((Event, timestamp) -> {
return Event.get_Time();
});
DataStream<Event_N> stream_N = env.addSource(
kafkaData_N.assignTimestampsAndWatermarks(wmStrategy));
上面的部分工作正常没有任何问题,下面的部分是我遇到问题的地方。
String ProducerTopic = "CorID_0_f1";
DataStream<Stream_Blocker_Pojo.block> box_stream_p= stream_N
.keyBy((Event_N CorrID) -> CorrID.get_CorrID())
.map(new Stream_Blocker_Pojo());
FlinkKafkaProducer<Stream_Blocker_Pojo.block> myProducer = new FlinkKafkaProducer<>(
ProducerTopic,
new ObjSerializationSchema(ProducerTopic),
p,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
box_stream_p.addSink(myProducer);
没有错误,一切正常,这是 Stream_Blocker_Pojo,我在其中映射一个流,对其进行操作并发送一个新流。(我简化了我的代码,只保留 4 个变量并删除所有数学和数据处理)。
public class Stream_Blocker_Pojo extends RichMapFunction<Event_N, Stream_Blocker_Pojo.block>
{
public class block {
public Double block_id;
public Double block_var2 ;
public Double block_var3;
public Double block_var4;}
private transient ValueState<block> state_a;
@Override
public void open(Configuration parameters) throws Exception {
state_a = getRuntimeContext().getState(new ValueStateDescriptor<>("BoxState_a", block.class));
}
public block map(Event_N input) throws Exception {
p1.Stream_Blocker_Pojo.block current_a = state_a.value();
if (current_a == null) {
current_a = new p1.Stream_Blocker_Pojo.block();
current_a.block_id = 0.0;
current_a.block_var2 = 0.0;
current_a.block_var3 = 0.0;
current_a.block_var4 = 0.0;}
current_a.block_id = input.f_num_id;
current_a.block_var2 = input.f_num_2;
current_a.block_var3 = input.f_num_3;
current_a.tblock_var4 = input.f_num_4;
state_a.update(current_a);
return new block();
};
}
这是 Kafka 序列化模式的实现。
public class ObjSerializationSchema implements KafkaSerializationSchema<Stream_Blocker_Pojo.block>{
private String topic;
private ObjectMapper mapper;
public ObjSerializationSchema(String topic) {
super();
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(Stream_Blocker_Pojo.block obj, Long timestamp) {
byte[] b = null;
if (mapper == null) {
mapper = new ObjectMapper();
}
try {
b= mapper.writeValueAsBytes(obj);
} catch (JsonProcessingException e) {
}
return new ProducerRecord<byte[], byte[]>(topic, b);
}
}
当我使用 kafka 打开我从 Flink 脚本发送的消息时,我发现所有变量都是“null”
CorrID b'{"block_id":null,"block_var1":null,"block_var2":null,"block_var3":null," block_var4":空}
我好像发送了一个没有任何值的空对象。但我很难理解我做错了什么。我认为问题可能出在我对 Stream_Blocker_Pojo 的实现中,或者可能出在 ObjSerializationSchema 中,我们将不胜感激。谢谢
这里有两个可能的问题:
- 您确定您传递的
block
类型的变量没有空字段吗?您可能需要调试该部分以确保。
- 原因也可能在
ObjectMapper
,您应该为您的 block
提供可用的 getter 和 setter,否则 Jackson 可能无法访问它们。
在我的 flink 脚本上,我有一个从一个 kafka 主题获取的流,对其进行操作并使用接收器将其发送回 kafka。
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties p = new Properties();
p.setProperty("bootstrap.servers", servers_ip_list);
p.setProperty("gropu.id", "Flink");
FlinkKafkaConsumer<Event_N> kafkaData_N =
new FlinkKafkaConsumer("CorID_0", new Ev_Des_Sch_N(), p);
WatermarkStrategy<Event_N> wmStrategy =
WatermarkStrategy
.<Event_N>forMonotonousTimestamps()
.withIdleness(Duration.ofMinutes(1))
.withTimestampAssigner((Event, timestamp) -> {
return Event.get_Time();
});
DataStream<Event_N> stream_N = env.addSource(
kafkaData_N.assignTimestampsAndWatermarks(wmStrategy));
上面的部分工作正常没有任何问题,下面的部分是我遇到问题的地方。
String ProducerTopic = "CorID_0_f1";
DataStream<Stream_Blocker_Pojo.block> box_stream_p= stream_N
.keyBy((Event_N CorrID) -> CorrID.get_CorrID())
.map(new Stream_Blocker_Pojo());
FlinkKafkaProducer<Stream_Blocker_Pojo.block> myProducer = new FlinkKafkaProducer<>(
ProducerTopic,
new ObjSerializationSchema(ProducerTopic),
p,
FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance
box_stream_p.addSink(myProducer);
没有错误,一切正常,这是 Stream_Blocker_Pojo,我在其中映射一个流,对其进行操作并发送一个新流。(我简化了我的代码,只保留 4 个变量并删除所有数学和数据处理)。
public class Stream_Blocker_Pojo extends RichMapFunction<Event_N, Stream_Blocker_Pojo.block>
{
public class block {
public Double block_id;
public Double block_var2 ;
public Double block_var3;
public Double block_var4;}
private transient ValueState<block> state_a;
@Override
public void open(Configuration parameters) throws Exception {
state_a = getRuntimeContext().getState(new ValueStateDescriptor<>("BoxState_a", block.class));
}
public block map(Event_N input) throws Exception {
p1.Stream_Blocker_Pojo.block current_a = state_a.value();
if (current_a == null) {
current_a = new p1.Stream_Blocker_Pojo.block();
current_a.block_id = 0.0;
current_a.block_var2 = 0.0;
current_a.block_var3 = 0.0;
current_a.block_var4 = 0.0;}
current_a.block_id = input.f_num_id;
current_a.block_var2 = input.f_num_2;
current_a.block_var3 = input.f_num_3;
current_a.tblock_var4 = input.f_num_4;
state_a.update(current_a);
return new block();
};
}
这是 Kafka 序列化模式的实现。
public class ObjSerializationSchema implements KafkaSerializationSchema<Stream_Blocker_Pojo.block>{
private String topic;
private ObjectMapper mapper;
public ObjSerializationSchema(String topic) {
super();
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(Stream_Blocker_Pojo.block obj, Long timestamp) {
byte[] b = null;
if (mapper == null) {
mapper = new ObjectMapper();
}
try {
b= mapper.writeValueAsBytes(obj);
} catch (JsonProcessingException e) {
}
return new ProducerRecord<byte[], byte[]>(topic, b);
}
}
当我使用 kafka 打开我从 Flink 脚本发送的消息时,我发现所有变量都是“null”
CorrID b'{"block_id":null,"block_var1":null,"block_var2":null,"block_var3":null," block_var4":空}
我好像发送了一个没有任何值的空对象。但我很难理解我做错了什么。我认为问题可能出在我对 Stream_Blocker_Pojo 的实现中,或者可能出在 ObjSerializationSchema 中,我们将不胜感激。谢谢
这里有两个可能的问题:
- 您确定您传递的
block
类型的变量没有空字段吗?您可能需要调试该部分以确保。 - 原因也可能在
ObjectMapper
,您应该为您的block
提供可用的 getter 和 setter,否则 Jackson 可能无法访问它们。