当背压开始时,在 rabbitmq 中获取 Producer 的失败回调

Getting failure callback for Producer in rabbitmq when back pressure kicks in

我想使用一些回调找出我的 rabbitmq 生产者的失败消息 api.I 已经使用 [{rabbit, [{vm_memory_high_watermark, 0.001}]}] 配置了 rabbitmq。并尝试推送大量消息,但所有消息都被接受,稍后出现 TimeoutException,消息未发送到队列enter code here,请告诉我如何捕获它。

发送消息代码:

// #create-sink - producer
        final Sink<ByteString, CompletionStage<Done>> amqpSink =
            AmqpSink.createSimple(
                AmqpSinkSettings.create(connectionProvider)
                    .withRoutingKey(AkkaConstants.queueName)
                    .withDeclaration(queueDeclaration));


        // #run-sink
        //final List<String> input = Arrays.asList("one", "two", "three", "four", "five");
        //Source.from(input).map(ByteString::fromString).runWith(amqpSink, materializer);

        String filePath = "D:\subrata\code\akkaAmqpTest-master\akkaAmqpTest-master\logs2\dummy.txt";
        Path path = Paths.get(filePath);

        // List containing 78198 individual message
        List<String> contents = Files.readAllLines(path);
        System.out.println("********** file reading done ....");
        int times = 5;

        // Send 78198*times message to Queue [From console i can see 400000 number of messages being sent]
        for(int i=0;i<times;i++) {
            Source.from(contents).map(ByteString::fromString).runWith(amqpSink, materializer);
        }
        System.out.println("************* sending to queue is done");

不幸的是,目前不支持开箱即用。理想情况下,生产者将被建模为 Flow,它将所有传入消息发送到 AMQP 代理,并发出相同的消息,结果无论是否已成功发送到代理。 Alpakka 问题跟踪器上有 a ticket to track this possible improvement