Alpakka JMS 交易
Alpakka JMS Transaction
我正在使用 Alpakka
及其 JMS
连接器从 Oracle AQ
中取出数据。按照 this 指南,我可以想出下面非常基本的实现。
我的问题是如何让它成为事务性的,这样我就可以保证在抛出异常时我的消息不会丢失。
object ConsumerApp extends App {
implicit val system: ActorSystem = ActorSystem("actor-system")
implicit val materializer: ActorMaterializer = ActorMaterializer()
val connectionFactory = AQjmsFactory.getConnectionFactory(getOracleDataSource())
val out = JmsSource.textSource(
JmsSourceSettings(connectionFactory).withQueue("My_Queue")
)
val sink = Sink.foreach { message: String =>
println("in sink: " + message)
throw new Exception("") // !!! MESSAGE IS LOST !!!
}
out.runWith(sink, materializer)
}
如果是PL/SQL
,解法是这样的:
DECLARE
dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW (44);
msg SYS.AQ$_JMS_TEXT_MESSAGE;
BEGIN
DBMS_AQ.dequeue (
queue_name => 'My_Queue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => msg,
msgid => message_handle
);
-- do something with the message
COMMIT;
END;
流阶段失败时的默认行为是关闭整个流。您必须决定如何使用退避策略 handle errors in the stream. One approach, for example, is to restart 流。
此外,由于您使用的是 Alpakka JMS 连接器,请将 acknowledgement mode 设置为 ClientAcknowledge
(自 Alpakka 0.15 起可用)。使用此配置,未确认的消息可以通过 JMS 源再次传递。例如:
val jmsSource: Source[Message, NotUsed] = JmsSource(
JmsSourceSettings(connectionFactory)
.withQueue("My_Queue")
.withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge)
)
val result = jmsSource
.map {
case textMessage: TextMessage =>
val text = textMessage.getText
textMessage.acknowledge()
text
}
.runForeach(println)
我正在使用 Alpakka
及其 JMS
连接器从 Oracle AQ
中取出数据。按照 this 指南,我可以想出下面非常基本的实现。
我的问题是如何让它成为事务性的,这样我就可以保证在抛出异常时我的消息不会丢失。
object ConsumerApp extends App {
implicit val system: ActorSystem = ActorSystem("actor-system")
implicit val materializer: ActorMaterializer = ActorMaterializer()
val connectionFactory = AQjmsFactory.getConnectionFactory(getOracleDataSource())
val out = JmsSource.textSource(
JmsSourceSettings(connectionFactory).withQueue("My_Queue")
)
val sink = Sink.foreach { message: String =>
println("in sink: " + message)
throw new Exception("") // !!! MESSAGE IS LOST !!!
}
out.runWith(sink, materializer)
}
如果是PL/SQL
,解法是这样的:
DECLARE
dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW (44);
msg SYS.AQ$_JMS_TEXT_MESSAGE;
BEGIN
DBMS_AQ.dequeue (
queue_name => 'My_Queue',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => msg,
msgid => message_handle
);
-- do something with the message
COMMIT;
END;
流阶段失败时的默认行为是关闭整个流。您必须决定如何使用退避策略 handle errors in the stream. One approach, for example, is to restart 流。
此外,由于您使用的是 Alpakka JMS 连接器,请将 acknowledgement mode 设置为 ClientAcknowledge
(自 Alpakka 0.15 起可用)。使用此配置,未确认的消息可以通过 JMS 源再次传递。例如:
val jmsSource: Source[Message, NotUsed] = JmsSource(
JmsSourceSettings(connectionFactory)
.withQueue("My_Queue")
.withAcknowledgeMode(AcknowledgeMode.ClientAcknowledge)
)
val result = jmsSource
.map {
case textMessage: TextMessage =>
val text = textMessage.getText
textMessage.acknowledge()
text
}
.runForeach(println)