Alpakka AMQP:如何检测声明异常?
Alpakka AMQP : How to detect declaration exception?
我有一个带有声明的 AMQP 源和 AMQP 接收器:
List<Declaration> declarations = new ArrayList<Declaration>() {{
add(QueueDeclaration.create(sourceExchangeName));
add(BindingDeclaration.create(sourceExchangeName, sourceExchangeName).withRoutingKey(sourceRoutingKey));
}};
amqpSource = AmqpSource
.committableSource(
NamedQueueSourceSettings.create(connectionProvider, sourceExchangeName)
.withDeclarations(declarations),
bufferSize);
AmqpWriteSettings amqpWriteSettings = AmqpWriteSettings.create(connectionProvider)
.withExchange("DEST_XCHANGE")
.withRoutingKey("ROUTE123")
.withDeclaration(ExchangeDeclaration.create(destinationExchangeName,
BuiltinExchangeType.DIRECT.getType()));
amqpSink = AmqpSink.create(amqpWriteSettings);
然后我就有了流量..
amqpSource.map(doSomething).async().map(doSomethingElse).async().to(amqpSink)
现在,在我启动应用程序后,发送到源队列的消息没有被使用。后来我发现这是由于声明过程中发生的错误。 (即,当我在 Source 和 Sink 设置中删除 .withDeclarations(..) 时它工作正常。
所以我的问题:
- 如何检测 AMQP 源和接收器是否启动并且 运行 正常?
- 如何忽略声明异常?
- 如果发生任何异常,我如何知道并使系统发生故障?
为了回答 1 和 3,AmqpSink
实现了一个 CompletionStage<Done>
,您必须保留它并处理(注册一些回调函数)以观察流的失败和完成。在文档示例中,我们阻止了生产代码中不好的完成阶段 (https://doc.akka.io/docs/alpakka/current/amqp.html#with-sink), that's probably because the sample is included in one of the Alpakka tests. Prefer the usual CompletionStage
callback/transformation methods instead (see for example this introduction)。
当发生错误时,CompletionStage
将失败,当流 materialized/starting 启动时或在处理元素期间,或者在源到达末尾并且每个元素都经过您的处理后完成流入水槽。这意味着对于启动流,如果它没有很快失败,它是 运行.
对于问题 2 不确定是否可以忽略声明异常,可能是那些总是连接失败。
我有一个带有声明的 AMQP 源和 AMQP 接收器:
List<Declaration> declarations = new ArrayList<Declaration>() {{
add(QueueDeclaration.create(sourceExchangeName));
add(BindingDeclaration.create(sourceExchangeName, sourceExchangeName).withRoutingKey(sourceRoutingKey));
}};
amqpSource = AmqpSource
.committableSource(
NamedQueueSourceSettings.create(connectionProvider, sourceExchangeName)
.withDeclarations(declarations),
bufferSize);
AmqpWriteSettings amqpWriteSettings = AmqpWriteSettings.create(connectionProvider)
.withExchange("DEST_XCHANGE")
.withRoutingKey("ROUTE123")
.withDeclaration(ExchangeDeclaration.create(destinationExchangeName,
BuiltinExchangeType.DIRECT.getType()));
amqpSink = AmqpSink.create(amqpWriteSettings);
然后我就有了流量..
amqpSource.map(doSomething).async().map(doSomethingElse).async().to(amqpSink)
现在,在我启动应用程序后,发送到源队列的消息没有被使用。后来我发现这是由于声明过程中发生的错误。 (即,当我在 Source 和 Sink 设置中删除 .withDeclarations(..) 时它工作正常。
所以我的问题:
- 如何检测 AMQP 源和接收器是否启动并且 运行 正常?
- 如何忽略声明异常?
- 如果发生任何异常,我如何知道并使系统发生故障?
为了回答 1 和 3,AmqpSink
实现了一个 CompletionStage<Done>
,您必须保留它并处理(注册一些回调函数)以观察流的失败和完成。在文档示例中,我们阻止了生产代码中不好的完成阶段 (https://doc.akka.io/docs/alpakka/current/amqp.html#with-sink), that's probably because the sample is included in one of the Alpakka tests. Prefer the usual CompletionStage
callback/transformation methods instead (see for example this introduction)。
当发生错误时,CompletionStage
将失败,当流 materialized/starting 启动时或在处理元素期间,或者在源到达末尾并且每个元素都经过您的处理后完成流入水槽。这意味着对于启动流,如果它没有很快失败,它是 运行.
对于问题 2 不确定是否可以忽略声明异常,可能是那些总是连接失败。