Camel Apache 和 Activemq 重新传送到原始队列

Camel Apache & Activemq redeliver to original queue

我有这样的路线:

from("timer://foo?fixedRate=true&period="+TimeUnit.MINUTES.toMillis(1))
            .process(exchange -> {

                System.out.println("Inserting: "+ new Date());

                exchange.getIn().setHeader("CamelHttpMethod", "GET");
                exchange.getIn().setHeader("Content-Type", "application/json");
                exchange.getIn().setHeader("accept", "application/json");

            })
            .to("https://jsonplaceholder.typicode.com/posts")
            .process(exchange -> { new TestProcess();})
            .to("activemq:jasonplaceholder");
from("activemq:jasonplaceholder")
            .transacted()
            .unmarshal(new ListJacksonDataFormat(Post.class))
            .process(exchange -> {
                @SuppressWarnings("unchecked")
                List<Post> posts = exchange.getIn().getBody(List.class);
                System.out.println(posts);

                posts.forEach(post -> {
                    iDaoPost.insertPost(post);
                });


            });

如果发生错误,我想将我的 JSON 消息回滚到原始队列。 目前,我的消息被传递到activemq的DLQ,但我想在出现错误时将其重新传递到源队列。 谢谢

好吧,理论上你可以用 ExceptionClause which is part of Camel's error handling

onException(ExceptionTypeYouWantToCatch.class)
    .to("activemq:jasonplaceholder");

但是,我不知道它是否可以在事务性消费者中工作以生产到您从消费的相同队列中。只是因为我从来没有这样做过。

DLQ(死信队列)和 DLT(死信主题)已经到位,因为如果您不将错误消息移开,它们将阻止您的整个处理过程。

想象一下当消息无法解组(或有任何其他持续性错误)时会发生什么:

  • 发生错误
  • 消息被移回源队列
  • 消息再次被消费(如果没有其他消息,立即)
  • 发生错误
  • 消息被移回源队列
  • 等等

我猜您想实施某种重试来处理临时错误。

我对这种情况所做的是

  • 将错误消息移动到错误队列(不是 DLQ)
  • 添加另一个消费错误队列的消费者
  • 当错误消息被消费时,带有重试计数的消息头会递增
  • 然后将消息延迟(例如1h)移回源队列
  • 如果同一消息从错误队列中被多次使用(例如 5 次),它会被移到 DLQ
  • DLQ 中的消息必须由人工分析