Flink RabbitMQ 关联 ID
Flink RabbitMQ Correlation ID
我正在尝试了解如何使用 RabbiMQ 在 Flink 中使用高可用性,使用关联 ID 并启用检查点,但它不起作用。我的生产者代码:
connection = factory.newConnection();
Channel channel = connection.createChannel();
String corrId = java.util.UUID.randomUUID().toString();
BasicProperties props = new AMQP.BasicProperties().builder().correlationId(corrId).build();
channel.queueDeclare("flink-poc", true, false, false, null);
MessageQueue queue = new MessageQueue(500); //Queue of messages to be sent to rabbitmq
Message msg = null;
while ((msg = queue.takeMessage()) != null)
channel.basicPublish("", "flink-poc", props, mapper.writeValueAsBytes(msg));
channel.close();
connection.close();
消费者代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
env.setBufferTimeout(100);
env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE); // start a checkpoint every 1000 ms
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
RabbitSource<Message> rabbitSource = new RabbitSource<Message>(Host, 5672, username, pass, "flink-poc", VirtualHost, true, schema);
messages = env.addSource(rabbitSource, TypeInformation.of(Message.class)
使用此代码,不会向 RabbitMQ 返回 ACK。感谢任何帮助。
更新: 通过预取 (channel.basicQos(15);
),它开始工作,但速度非常低。有什么办法可以改善吗?如果停用setStreamTimeCharacteristic
,结果是有序的,提高了10倍的速度,但仍然很低,这怎么可能?
您对所有消息使用相同的 correlationId,必须为每条消息使用一个新的 correlationId。来自 https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.html 'usesCorrelationId - Whether the messages received are supplied with a unique id to deduplicate messages (in case of failed acknowledgments). Only used when checkpointing is enabled.'
我正在尝试了解如何使用 RabbiMQ 在 Flink 中使用高可用性,使用关联 ID 并启用检查点,但它不起作用。我的生产者代码:
connection = factory.newConnection();
Channel channel = connection.createChannel();
String corrId = java.util.UUID.randomUUID().toString();
BasicProperties props = new AMQP.BasicProperties().builder().correlationId(corrId).build();
channel.queueDeclare("flink-poc", true, false, false, null);
MessageQueue queue = new MessageQueue(500); //Queue of messages to be sent to rabbitmq
Message msg = null;
while ((msg = queue.takeMessage()) != null)
channel.basicPublish("", "flink-poc", props, mapper.writeValueAsBytes(msg));
channel.close();
connection.close();
消费者代码:
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
env.setBufferTimeout(100);
env.enableCheckpointing(1000,CheckpointingMode.EXACTLY_ONCE); // start a checkpoint every 1000 ms
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
RabbitSource<Message> rabbitSource = new RabbitSource<Message>(Host, 5672, username, pass, "flink-poc", VirtualHost, true, schema);
messages = env.addSource(rabbitSource, TypeInformation.of(Message.class)
使用此代码,不会向 RabbitMQ 返回 ACK。感谢任何帮助。
更新: 通过预取 (channel.basicQos(15);
),它开始工作,但速度非常低。有什么办法可以改善吗?如果停用setStreamTimeCharacteristic
,结果是有序的,提高了10倍的速度,但仍然很低,这怎么可能?
您对所有消息使用相同的 correlationId,必须为每条消息使用一个新的 correlationId。来自 https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.html 'usesCorrelationId - Whether the messages received are supplied with a unique id to deduplicate messages (in case of failed acknowledgments). Only used when checkpointing is enabled.'