Flink - 如何将 POJO 序列化到 Kafka Sink

Flink - How to serialize a POJO to Kafka Sink

我正在尝试使用 KafkaSink 我的数据流,为此我使用了以下代码:

     KafkaSink<Events> sink = KafkaSink.<Events>builder()
        .setBootstrapServers(LOCAL_KAFKA_BROKER)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                .setTopic(OUTPUT_KAFKA_TOPIC)
                .setValueSerializationSchema(new SimpleStringSchema())
                .build())
        .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();

在这里,SimpleStringSchema() 不适合,因为我要返回事件的 POJO。这是我一直在使用的 POJO。

    public Events(Date windowStart, Date windowEnd,
                    String metric, String eventId,
                    long count) {
        this.windowStart = windowStart;
        this.windowEnd = windowEnd;
        this.metric = metric;
        this.eventId = eventId;
        this.count = count;
    }

    public Date getWindowStart() {
        return windowStart;
    }

    public void setWindowStart(Date windowStart) {
        this.windowStart = windowStart;
    }

    public Date getWindowEnd() {
        return windowEnd;
    }

    public void setWindowEnd(Date windowEnd) {
        this.windowEnd = windowEnd;
    }

    public String getMetric() {
        return metric;
    }

    public void setMetric(String metric) {
        this.metric = metric;
    }

    public String getEventId() {
        return eventId;
    }

    public void setEventId(String eventId) {
        this.eventId = eventId;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("Events{");
        sb.append("windowStart=").append(windowStart);
        sb.append(", windowEnd=").append(windowEnd);
        sb.append(", metric=").append(metric);
        sb.append(", eventId=").append(eventId);
        sb.append(", count=").append(count);
        sb.append("}");

        return sb.toString();
    }

对于 POJO,我想不出可以在这里使用的 SerializationSchema。我尝试了以下方法:

    public class EventsSerializationSchema implements DeserializationSchema<Events>, SerializationSchema<Events> {

        private static final ObjectMapper objectMapper = new ObjectMapper();
        private String topic;

        public EventsSerializationSchema(){
        }

        public EventsSerializationSchema(String topic) {
            this.topic = topic;
        }

        @Override
        public ProducerRecord<byte[], byte[]> serialize(
                final Events events, @Nullable final Long timestamp) {
            try {
                //if topic is null, default topic will be used
                return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(events));
            } catch (JsonProcessingException e) {
                throw new IllegalArgumentException("Could not serialize record: " + events, e);
            }
        }
    }

但是,这不起作用,因为我不确定如何序列化它。有人可以帮忙吗? P.S: 由于我使用的是 Flink 1.14,FlinkKafkaPublisher 在此版本中已弃用。

提前致谢

EventsSerializationSchema 实现了错误的接口。您想要实施 SerializationSchemaKafkaSerializationSchema,具体取决于您是否愿意实施

byte[] serialize(T element)

ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp).

有关示例,请参阅 KafkaProducerJob.java and UsageRecordSerializationSchema.java