Flink - 如何将 POJO 序列化到 Kafka Sink
Flink - How to serialize a POJO to Kafka Sink
我正在尝试使用 KafkaSink 我的数据流,为此我使用了以下代码:
KafkaSink<Events> sink = KafkaSink.<Events>builder()
.setBootstrapServers(LOCAL_KAFKA_BROKER)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(OUTPUT_KAFKA_TOPIC)
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
在这里,SimpleStringSchema() 不适合,因为我要返回事件的 POJO。这是我一直在使用的 POJO。
public Events(Date windowStart, Date windowEnd,
String metric, String eventId,
long count) {
this.windowStart = windowStart;
this.windowEnd = windowEnd;
this.metric = metric;
this.eventId = eventId;
this.count = count;
}
public Date getWindowStart() {
return windowStart;
}
public void setWindowStart(Date windowStart) {
this.windowStart = windowStart;
}
public Date getWindowEnd() {
return windowEnd;
}
public void setWindowEnd(Date windowEnd) {
this.windowEnd = windowEnd;
}
public String getMetric() {
return metric;
}
public void setMetric(String metric) {
this.metric = metric;
}
public String getEventId() {
return eventId;
}
public void setEventId(String eventId) {
this.eventId = eventId;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
public String toString() {
StringBuilder sb = new StringBuilder("Events{");
sb.append("windowStart=").append(windowStart);
sb.append(", windowEnd=").append(windowEnd);
sb.append(", metric=").append(metric);
sb.append(", eventId=").append(eventId);
sb.append(", count=").append(count);
sb.append("}");
return sb.toString();
}
对于 POJO,我想不出可以在这里使用的 SerializationSchema。我尝试了以下方法:
public class EventsSerializationSchema implements DeserializationSchema<Events>, SerializationSchema<Events> {
private static final ObjectMapper objectMapper = new ObjectMapper();
private String topic;
public EventsSerializationSchema(){
}
public EventsSerializationSchema(String topic) {
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(
final Events events, @Nullable final Long timestamp) {
try {
//if topic is null, default topic will be used
return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(events));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + events, e);
}
}
}
但是,这不起作用,因为我不确定如何序列化它。有人可以帮忙吗?
P.S: 由于我使用的是 Flink 1.14,FlinkKafkaPublisher 在此版本中已弃用。
提前致谢
EventsSerializationSchema
实现了错误的接口。您想要实施 SerializationSchema
或 KafkaSerializationSchema
,具体取决于您是否愿意实施
byte[] serialize(T element)
或
ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp)
.
有关示例,请参阅 KafkaProducerJob.java and UsageRecordSerializationSchema.java。
我正在尝试使用 KafkaSink 我的数据流,为此我使用了以下代码:
KafkaSink<Events> sink = KafkaSink.<Events>builder()
.setBootstrapServers(LOCAL_KAFKA_BROKER)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(OUTPUT_KAFKA_TOPIC)
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
在这里,SimpleStringSchema() 不适合,因为我要返回事件的 POJO。这是我一直在使用的 POJO。
public Events(Date windowStart, Date windowEnd,
String metric, String eventId,
long count) {
this.windowStart = windowStart;
this.windowEnd = windowEnd;
this.metric = metric;
this.eventId = eventId;
this.count = count;
}
public Date getWindowStart() {
return windowStart;
}
public void setWindowStart(Date windowStart) {
this.windowStart = windowStart;
}
public Date getWindowEnd() {
return windowEnd;
}
public void setWindowEnd(Date windowEnd) {
this.windowEnd = windowEnd;
}
public String getMetric() {
return metric;
}
public void setMetric(String metric) {
this.metric = metric;
}
public String getEventId() {
return eventId;
}
public void setEventId(String eventId) {
this.eventId = eventId;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
public String toString() {
StringBuilder sb = new StringBuilder("Events{");
sb.append("windowStart=").append(windowStart);
sb.append(", windowEnd=").append(windowEnd);
sb.append(", metric=").append(metric);
sb.append(", eventId=").append(eventId);
sb.append(", count=").append(count);
sb.append("}");
return sb.toString();
}
对于 POJO,我想不出可以在这里使用的 SerializationSchema。我尝试了以下方法:
public class EventsSerializationSchema implements DeserializationSchema<Events>, SerializationSchema<Events> {
private static final ObjectMapper objectMapper = new ObjectMapper();
private String topic;
public EventsSerializationSchema(){
}
public EventsSerializationSchema(String topic) {
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(
final Events events, @Nullable final Long timestamp) {
try {
//if topic is null, default topic will be used
return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(events));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + events, e);
}
}
}
但是,这不起作用,因为我不确定如何序列化它。有人可以帮忙吗? P.S: 由于我使用的是 Flink 1.14,FlinkKafkaPublisher 在此版本中已弃用。
提前致谢
EventsSerializationSchema
实现了错误的接口。您想要实施 SerializationSchema
或 KafkaSerializationSchema
,具体取决于您是否愿意实施
byte[] serialize(T element)
或
ProducerRecord<byte[], byte[]> serialize(T element, @Nullable Long timestamp)
.
有关示例,请参阅 KafkaProducerJob.java and UsageRecordSerializationSchema.java。