如何在 Apache Flink 中使用 ActiveMQ?

How do I use ActiveMQ in Apache Flink?

我正在通过 ActiveMQ 获取我的数据,我想使用 Apache Flink DataStreams 实时处理这些数据。支持许多消息服务,如 RabbitMQ 和 Kafka,但我看不到对 ActiveMQ 的任何支持。我该如何使用它?

由于不支持 ActiveMQ,我建议实施自定义源。

您基本上必须实现 SourceFunction 接口。 如果你想拥有恰好一次的语义,你可以将你的实现基于 MultipleIdsMessageAcknowledgingSourceBase class.

我建议您开始实施 SourceFunction

找到 Flink 的 JMS 连接器:

https://github.com/jkirsch/senser/blob/master/src/main/java/edu/tuberlin/senser/images/flink/io/FlinkJMSStreamSource.java