Flink - KafkaSink 没有将数据写入 kafka 主题
Flink - KafkaSink not writing data to kafka topic
我正在尝试从 Kafka 读取 JSON 事件,将其聚合到 eventId 及其类别上,并通过 flink 将它们写入不同的 kafka 主题。该程序能够从 kafka 读取消息,但 KafkaSink 没有将数据写回另一个 kafka 主题。我不确定我正在犯的错误。有人可以检查并让我知道我哪里错了。这是我正在使用的代码。
KafkaSource<EventMessage> source = KafkaSource.<EventMessage>builder()
.setBootstrapServers(LOCAL_KAFKA_BROKER)
.setTopics(INPUT_KAFKA_TOPIC)
.setGroupId(LOCAL_GROUP)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new InputDeserializationSchema())
.build();
WindowAssigner<Object, TimeWindow> windowAssigner = TumblingEventTimeWindows.of(WINDOW_SIZE);
DataStream<EventMessage> eventStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Event Source");
DataStream<EventSummary> events =
eventStream
.keyBy(eventMessage -> eventMessage.getCategory() + eventMessage.getEventId())
.window(windowAssigner)
.aggregate(new EventAggregator())
.name("EventAggregator test >> ");
KafkaSink<EventSummary> sink = KafkaSink.<EventSummary>builder()
.setBootstrapServers(LOCAL_KAFKA_BROKER)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(OUTPUT_KAFKA_TOPIC)
.setValueSerializationSchema(new OutputSummarySerializationSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
events.sinkTo(sink);
这些是我为输入消息和输出创建的 POJO。
# EventMessage POJO
public class EventMessage implements Serializable {
private Long timestamp;
private int eventValue;
private String eventId;
private String category;
public EventMessage() { }
public EventMessage(Long timestamp, int eventValue, String eventId, String category) {
this.timestamp = timestamp;
this.eventValue = eventValue;
this.eventId = eventId;
this.category = category;
}
.....
}
# EventSummary POJO
public class EventSummary {
public EventMessage eventMessage;
public int sum;
public int count;
public EventSummary() { }
....
}
这些是我正在使用的反序列化和序列化模式。
public class InputDeserializationSchema implements DeserializationSchema<EventMessage> {
static ObjectMapper objectMapper = new ObjectMapper();
@Override
public EventMessage deserialize(byte[] bytes) throws IOException {
return objectMapper.readValue(bytes, EventMessage.class);
}
@Override
public boolean isEndOfStream(EventMessage inputMessage) {
return false;
}
@Override
public TypeInformation<EventMessage> getProducedType() {
return TypeInformation.of(EventMessage.class);
}
}
public class OutputSummarySerializationSchema implements SerializationSchema<EventSummary> {
static ObjectMapper objectMapper = new ObjectMapper();
Logger logger = LoggerFactory.getLogger(OutputSummarySerializationSchema.class);
@Override
public byte[] serialize(EventSummary eventSummary) {
if (objectMapper == null) {
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
objectMapper = new ObjectMapper();
}
try {
String json = objectMapper.writeValueAsString(eventSummary);
return json.getBytes();
} catch (com.fasterxml.jackson.core.JsonProcessingException e) {
logger.error("Failed to parse JSON", e);
}
return new byte[0];
}
}
我正在使用此聚合器聚合 JSON 条消息。
public class EventAggregator implements AggregateFunction<EventMessage, EventSummary, EventSummary> {
private static final Logger log = LoggerFactory.getLogger(EventAggregator.class);
@Override
public EventSummary createAccumulator() {
return new EventSummary();
}
@Override
public EventSummary add(EventMessage eventMessage, EventSummary eventSummary) {
eventSummary.eventMessage = eventMessage;
eventSummary.count += 1;
eventSummary.sum += eventMessage.getEventValue();
return eventSummary;
}
@Override
public EventSummary getResult(EventSummary eventSummary) {
return eventSummary;
}
@Override
public EventSummary merge(EventSummary summary1, EventSummary summary2) {
return new EventSummary(null,
summary1.sum + summary2.sum,
summary1.count + summary2.count);
}
}
有人可以帮我解决这个问题吗?
提前致谢。
为了使事件时间窗口工作,您必须指定一个正确的 WatermarkStrategy
。否则,windows 永远不会关闭,也不会产生任何结果。
水印的作用是标记流中的一个位置,并指示流在该点通过某个特定时间戳完成。在收到流完整性指示器之前,windows 继续等待分配给它们的更多事件。
为了简单地调试水印,您可以切换到 PrintSink
,直到水印正常工作。或者为了简化 KafkaSink
的调试,您可以切换到使用处理时间 windows 直到接收器工作。
我正在尝试从 Kafka 读取 JSON 事件,将其聚合到 eventId 及其类别上,并通过 flink 将它们写入不同的 kafka 主题。该程序能够从 kafka 读取消息,但 KafkaSink 没有将数据写回另一个 kafka 主题。我不确定我正在犯的错误。有人可以检查并让我知道我哪里错了。这是我正在使用的代码。
KafkaSource<EventMessage> source = KafkaSource.<EventMessage>builder()
.setBootstrapServers(LOCAL_KAFKA_BROKER)
.setTopics(INPUT_KAFKA_TOPIC)
.setGroupId(LOCAL_GROUP)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new InputDeserializationSchema())
.build();
WindowAssigner<Object, TimeWindow> windowAssigner = TumblingEventTimeWindows.of(WINDOW_SIZE);
DataStream<EventMessage> eventStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Event Source");
DataStream<EventSummary> events =
eventStream
.keyBy(eventMessage -> eventMessage.getCategory() + eventMessage.getEventId())
.window(windowAssigner)
.aggregate(new EventAggregator())
.name("EventAggregator test >> ");
KafkaSink<EventSummary> sink = KafkaSink.<EventSummary>builder()
.setBootstrapServers(LOCAL_KAFKA_BROKER)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(OUTPUT_KAFKA_TOPIC)
.setValueSerializationSchema(new OutputSummarySerializationSchema())
.build())
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
events.sinkTo(sink);
这些是我为输入消息和输出创建的 POJO。
# EventMessage POJO
public class EventMessage implements Serializable {
private Long timestamp;
private int eventValue;
private String eventId;
private String category;
public EventMessage() { }
public EventMessage(Long timestamp, int eventValue, String eventId, String category) {
this.timestamp = timestamp;
this.eventValue = eventValue;
this.eventId = eventId;
this.category = category;
}
.....
}
# EventSummary POJO
public class EventSummary {
public EventMessage eventMessage;
public int sum;
public int count;
public EventSummary() { }
....
}
这些是我正在使用的反序列化和序列化模式。
public class InputDeserializationSchema implements DeserializationSchema<EventMessage> {
static ObjectMapper objectMapper = new ObjectMapper();
@Override
public EventMessage deserialize(byte[] bytes) throws IOException {
return objectMapper.readValue(bytes, EventMessage.class);
}
@Override
public boolean isEndOfStream(EventMessage inputMessage) {
return false;
}
@Override
public TypeInformation<EventMessage> getProducedType() {
return TypeInformation.of(EventMessage.class);
}
}
public class OutputSummarySerializationSchema implements SerializationSchema<EventSummary> {
static ObjectMapper objectMapper = new ObjectMapper();
Logger logger = LoggerFactory.getLogger(OutputSummarySerializationSchema.class);
@Override
public byte[] serialize(EventSummary eventSummary) {
if (objectMapper == null) {
objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
objectMapper = new ObjectMapper();
}
try {
String json = objectMapper.writeValueAsString(eventSummary);
return json.getBytes();
} catch (com.fasterxml.jackson.core.JsonProcessingException e) {
logger.error("Failed to parse JSON", e);
}
return new byte[0];
}
}
我正在使用此聚合器聚合 JSON 条消息。
public class EventAggregator implements AggregateFunction<EventMessage, EventSummary, EventSummary> {
private static final Logger log = LoggerFactory.getLogger(EventAggregator.class);
@Override
public EventSummary createAccumulator() {
return new EventSummary();
}
@Override
public EventSummary add(EventMessage eventMessage, EventSummary eventSummary) {
eventSummary.eventMessage = eventMessage;
eventSummary.count += 1;
eventSummary.sum += eventMessage.getEventValue();
return eventSummary;
}
@Override
public EventSummary getResult(EventSummary eventSummary) {
return eventSummary;
}
@Override
public EventSummary merge(EventSummary summary1, EventSummary summary2) {
return new EventSummary(null,
summary1.sum + summary2.sum,
summary1.count + summary2.count);
}
}
有人可以帮我解决这个问题吗?
提前致谢。
为了使事件时间窗口工作,您必须指定一个正确的 WatermarkStrategy
。否则,windows 永远不会关闭,也不会产生任何结果。
水印的作用是标记流中的一个位置,并指示流在该点通过某个特定时间戳完成。在收到流完整性指示器之前,windows 继续等待分配给它们的更多事件。
为了简单地调试水印,您可以切换到 PrintSink
,直到水印正常工作。或者为了简化 KafkaSink
的调试,您可以切换到使用处理时间 windows 直到接收器工作。