Java - Flink 在 kafka 接收器上发送空对象

Java - Flink sending empty object on kafka sink

在我的 flink 脚本上,我有一个从一个 kafka 主题获取的流,对其进行操作并使用接收器将其发送回 kafka。

    public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    Properties p = new Properties();
    p.setProperty("bootstrap.servers", servers_ip_list);
    p.setProperty("gropu.id", "Flink");

   
    FlinkKafkaConsumer<Event_N> kafkaData_N =
            new FlinkKafkaConsumer("CorID_0", new Ev_Des_Sch_N(), p);
    WatermarkStrategy<Event_N> wmStrategy =
            WatermarkStrategy
                    .<Event_N>forMonotonousTimestamps()
                    .withIdleness(Duration.ofMinutes(1))
                    .withTimestampAssigner((Event, timestamp) -> {
                        return Event.get_Time();
                    });
    DataStream<Event_N> stream_N = env.addSource(
            kafkaData_N.assignTimestampsAndWatermarks(wmStrategy));

上面的部分工作正常没有任何问题,下面的部分是我遇到问题的地方。

    String ProducerTopic = "CorID_0_f1";

    DataStream<Stream_Blocker_Pojo.block> box_stream_p= stream_N
                .keyBy((Event_N CorrID) -> CorrID.get_CorrID())
                .map(new Stream_Blocker_Pojo());

    FlinkKafkaProducer<Stream_Blocker_Pojo.block> myProducer = new FlinkKafkaProducer<>(
                ProducerTopic,
                new ObjSerializationSchema(ProducerTopic),
                p,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE); // fault-tolerance

     box_stream_p.addSink(myProducer);

没有错误,一切正常,这是 Stream_Blocker_Pojo,我在其中映射一个流,对其进行操作并发送一个新流。(我简化了我的代码,只保留 4 个变量并删除所有数学和数据处理)。

public class Stream_Blocker_Pojo extends RichMapFunction<Event_N, Stream_Blocker_Pojo.block>
{

        public class block {
        public Double block_id;
        public Double block_var2 ;
        public Double block_var3;
        public Double block_var4;}
        
        private transient ValueState<block> state_a;
        
        @Override
        public void open(Configuration parameters) throws Exception {
            state_a = getRuntimeContext().getState(new ValueStateDescriptor<>("BoxState_a", block.class));
        }

        public block map(Event_N input) throws Exception {

        p1.Stream_Blocker_Pojo.block current_a = state_a.value();

            if (current_a == null) {
                current_a = new p1.Stream_Blocker_Pojo.block();
                current_a.block_id = 0.0;
                current_a.block_var2 = 0.0;
                current_a.block_var3 = 0.0;
                current_a.block_var4 = 0.0;}

        
            current_a.block_id = input.f_num_id;
            current_a.block_var2 = input.f_num_2;
            current_a.block_var3 = input.f_num_3;
            current_a.tblock_var4 = input.f_num_4;
          
            state_a.update(current_a);
            return new block();
        };   
    }

这是 Kafka 序列化模式的实现。

public class ObjSerializationSchema implements KafkaSerializationSchema<Stream_Blocker_Pojo.block>{

    private String topic;
    private ObjectMapper mapper;

    public ObjSerializationSchema(String topic) {
        super();
        this.topic = topic;
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(Stream_Blocker_Pojo.block obj, Long timestamp) {
        byte[] b = null;
        if (mapper == null) {
            mapper = new ObjectMapper();
        }
        try {
            b= mapper.writeValueAsBytes(obj);
        } catch (JsonProcessingException e) {

        }
        return new ProducerRecord<byte[], byte[]>(topic, b);
    }

}

当我使用 kafka 打开我从 Flink 脚本发送的消息时,我发现所有变量都是“null”

CorrID b'{"block_id":null,"block_var1":null,"block_var2":null,"block_var3":null," block_var4":空}

我好像发送了一个没有任何值的空对象。但我很难理解我做错了什么。我认为问题可能出在我对 Stream_Blocker_Pojo 的实现中,或者可能出在 ObjSerializationSchema 中,我们将不胜感激。谢谢

这里有两个可能的问题:

  1. 您确定您传递的 block 类型的变量没有空字段吗?您可能需要调试该部分以确保。
  2. 原因也可能在 ObjectMapper,您应该为您的 block 提供可用的 getter 和 setter,否则 Jackson 可能无法访问它们。