Flink - KafkaSink 没有将数据写入 kafka 主题

Flink - KafkaSink not writing data to kafka topic

我正在尝试从 Kafka 读取 JSON 事件,将其聚合到 eventId 及其类别上,并通过 flink 将它们写入不同的 kafka 主题。该程序能够从 kafka 读取消息,但 KafkaSink 没有将数据写回另一个 kafka 主题。我不确定我正在犯的错误。有人可以检查并让我知道我哪里错了。这是我正在使用的代码。

KafkaSource<EventMessage> source = KafkaSource.<EventMessage>builder()
        .setBootstrapServers(LOCAL_KAFKA_BROKER)
        .setTopics(INPUT_KAFKA_TOPIC)
        .setGroupId(LOCAL_GROUP)
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new InputDeserializationSchema())
        .build();

WindowAssigner<Object, TimeWindow> windowAssigner = TumblingEventTimeWindows.of(WINDOW_SIZE);

DataStream<EventMessage> eventStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Event Source");

DataStream<EventSummary> events =
eventStream
        .keyBy(eventMessage -> eventMessage.getCategory() + eventMessage.getEventId())
        .window(windowAssigner)
        .aggregate(new EventAggregator())
        .name("EventAggregator test >> ");

KafkaSink<EventSummary> sink = KafkaSink.<EventSummary>builder()
        .setBootstrapServers(LOCAL_KAFKA_BROKER)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                .setTopic(OUTPUT_KAFKA_TOPIC)
                .setValueSerializationSchema(new OutputSummarySerializationSchema())
                .build())
        .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();

events.sinkTo(sink);

这些是我为输入消息和输出创建的 POJO。

# EventMessage POJO
public class EventMessage implements Serializable {
    private Long timestamp;
    private int eventValue;
    private String eventId;
    private String category;

    public EventMessage() { }

    public EventMessage(Long timestamp, int eventValue, String eventId, String category) {
        this.timestamp = timestamp;
        this.eventValue = eventValue;
        this.eventId = eventId;
        this.category = category;
    }
    .....
}

# EventSummary POJO
public class EventSummary {

    public EventMessage eventMessage;
    public int sum;
    public int count;

    public EventSummary() { }
    ....
}

这些是我正在使用的反序列化和序列化模式。

public class InputDeserializationSchema implements DeserializationSchema<EventMessage> {

    static ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public EventMessage deserialize(byte[] bytes) throws IOException {

        return objectMapper.readValue(bytes, EventMessage.class);
    }

    @Override
    public boolean isEndOfStream(EventMessage inputMessage) {
        return false;
    }

    @Override
    public TypeInformation<EventMessage> getProducedType() {
        return TypeInformation.of(EventMessage.class);
    }
}

public class OutputSummarySerializationSchema implements SerializationSchema<EventSummary> {

    static ObjectMapper objectMapper = new ObjectMapper();

    Logger logger = LoggerFactory.getLogger(OutputSummarySerializationSchema.class);

    @Override
    public byte[] serialize(EventSummary eventSummary) {
        if (objectMapper == null) {
            objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
            objectMapper = new ObjectMapper();
        }
        try {
            String json = objectMapper.writeValueAsString(eventSummary);
            return json.getBytes();
        } catch (com.fasterxml.jackson.core.JsonProcessingException e) {
            logger.error("Failed to parse JSON", e);
        }
        return new byte[0];
    }
}

我正在使用此聚合器聚合 JSON 条消息。

public class EventAggregator implements AggregateFunction<EventMessage, EventSummary, EventSummary> {

    private static final Logger log = LoggerFactory.getLogger(EventAggregator.class);
    @Override
    public EventSummary createAccumulator() {
        return new EventSummary();
    }

    @Override
    public EventSummary add(EventMessage eventMessage, EventSummary eventSummary) {
        eventSummary.eventMessage = eventMessage;
        eventSummary.count += 1;
        eventSummary.sum += eventMessage.getEventValue();

        return eventSummary;
    }

    @Override
    public EventSummary getResult(EventSummary eventSummary) {
        return eventSummary;
    }

    @Override
    public EventSummary merge(EventSummary summary1, EventSummary summary2) {
        return new EventSummary(null,
                summary1.sum + summary2.sum,
                summary1.count + summary2.count);
    }
}

有人可以帮我解决这个问题吗?

提前致谢。

为了使事件时间窗口工作,您必须指定一个正确的 WatermarkStrategy。否则,windows 永远不会关闭,也不会产生任何结果。

水印的作用是标记流中的一个位置,并指示流在该点通过某个特定时间戳完成。在收到流完整性指示器之前,windows 继续等待分配给它们的更多事件。

为了简单地调试水印,您可以切换到 PrintSink,直到水印正常工作。或者为了简化 KafkaSink 的调试,您可以切换到使用处理时间 windows 直到接收器工作。