Mule FailedToQueueEventException 石英连接

Mule FailedToQueueEventException Quartz connection

我有下面的 quartz 入站组件来触发 kafka 事件。但它似乎在抛出 SEDA 队列异常。

<quartz:connector name="myQuartzConnector" validateConnections="true">
    <receiver-threading-profile maxThreadsActive="1"/>
</quartz:connector>

<flow name="quartz-scheduler-kafka-consumer-trigger-flow">
    <quartz:inbound-endpoint jobName="Trigger-Kafka-Consumer-Quartz-Job" repeatInterval="1" responseTimeout="10000" connector-ref="myQuartzConnector" doc:name="Quartz">
        <quartz:event-generator-job/>
    </quartz:inbound-endpoint>
    <component class="org.my.myKafkaCOnsumer" doc:name="Java KafkaConsumer"/>
</flow>

Quartz用于触发Kafka消费流。在 Kafka 消费者连接在 java 组件中结束之前,控件不会 return 返回调度程序。 Kafka 消费者连接永远不会结束,因为它处于递归 while(true) 循环中。如果卡夫卡连接结束,石英调度程序应该重新触发重新打开卡夫卡连接的 java 组件。

Message               : The queue for 'SEDA Stage quartz-scheduler-kafka-consumer-trigger-flow.stage1' did not accept new event within 30000 MILLISECONDS.
Payload               : {NullPayload}
Payload Type          : org.mule.transport.NullPayload
Element               : null @ message-gateway-profile-update-api:null:null
--------------------------------------------------------------------------------
Root Exception stack trace:
org.mule.api.service.FailedToQueueEventException: The queue for 'SEDA Stage quartz-scheduler-kafka-consumer-trigger-flow.stage1' did not accept new event within 30000 MILLISECONDS.
    at org.mule.processor.SedaStageInterceptingMessageProcessor.enqueue(SedaStageInterceptingMessageProcessor.java:139)
    at org.mule.processor.SedaStageInterceptingMessageProcessor.processNextAsync(SedaStageInterceptingMessageProcessor.java:102)
    at org.mule.processor.AsyncInterceptingMessageProcessor.process(AsyncInterceptingMessageProcessor.java:103)
    at org.mule.execution.ExceptionToMessagingExceptionExecutionInterceptor.execute(ExceptionToMessagingExceptionExecutionInterceptor.java:27)

Flow snapshot

The control does not return back to the schedule

这是不正确的说法。调度程序不知道之前的流程是如何结束的。事实上,旧流程继续执行,新流程的事件开始。 如果将记录器作为调度程序之后的第一个组件放在流程的开头,您可以看到它。

实际上,由于您正在侦听消息,所以它应该不是调度程序。监听器应该是流的来源。它应该是这样的:

<flow name="quartz-scheduler-kafka-consumer-trigger-flow">
    <component class="org.my.myKafkaCOnsumer" doc:name="Java KafkaConsumer"/>
</flow>

这里有更多关于多个时间表的信息https://simpleflatservice.com/mule4/Multipleschedules.html

错误是由于 Quartz 端点设置为触发速度快于流处理消息的速度。这个流程有一个默认的处理策略queued-asynchronous, which means that the events triggered by the Quartz endpoint are sent to a SEDA queue, then processed by flow threads as available. The Quartz endpoint is set to repeat every 1 ms, which is very low. There is very little chance of the component to process in that time. When the thread pool used by the flow is exhausted, then the SEDA queue starts to get filled. When the elements in the queue exceed the default timeout to get a thread to get execute, you receive the error. This issue is described in the KB https://help.mulesoft.com/s/article/Error-The-queue-for-SEDA-queue-name-did-not-accept-new-event-within-30000-MILLISECONDS

您可以将流处理策略更改为同步以重用来自连接器的队列来执行并避免排队,但 repeatInterval 似乎小得不切实际。

附带说明,Quartz connector has been deprecated since a long time ago. It has been replaced by the Poll scope