Storm SQS 消息未被确认

Storm SQS messages not getting acked

我有一个拓扑,其中 1 个 spout 从 2 个 SQS 队列和 5 个螺栓读取数据。处理后,当我尝试从第二个螺栓确认时,它没有被确认。

我 运行 它处于可靠模式并尝试确认最后一个螺栓。我收到这条消息,就好像消息被确认了一样。但它并没有从队列中删除,也没有调用覆盖的 ack() 方法。看起来它调用了 backtype.storm.task.OutputCollector 中的默认 ack 方法,而不是我的 spout 中的重写方法。

8240 [Thread-24-conversionBolt] INFO  backtype.storm.daemon.task - Emitting: conversionBolt__ack_ack [-7578372739434961741 -8189877254603774958]

我已将消息 ID 锚定到我的 SQS 队列喷口中的元组并发送到第一个 bolt。

collector.emit(getStreamId(message), new Values(jsonObj.toString()), message.getReceiptHandle());

我的队列中覆盖了 ack() 和 fail() 方法 spout.Default Visibility Timeout 已设置为 30 秒

我的拓扑中的代码片段:

TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("firstQueueSpout",
                new SqsQueueSpout(StormConfigurations.getQueueURL()
                        + StormConfigurations.getFirstQueueName(), true),
                StormConfigurations.getAwsQueueSpoutThreads());

        builder.setSpout("secondQueueSpout",
                new SqsQueueSpout(StormConfigurations.getQueueURL()
                        + StormConfigurations.getSecondQueueName(),
                        true), StormConfigurations.getAwsQueueSpoutThreads());

        builder.setBolt("transformerBolt", new TransformerBolt(),
                StormConfigurations.getTranformerBoltThreads())
                .shuffleGrouping("firstQueueSpout")
                .shuffleGrouping("secondQueueSpout");

        builder.setBolt("conversionBolt", new ConversionBolt(),
                StormConfigurations.getTranformerBoltThreads())
                .shuffleGrouping("transformerBolt");

        // To dispatch it to the corresponding bolts based on packet type
        builder.setBolt("dispatchBolt", new DispatcherBolt(),
                StormConfigurations.getDispatcherBoltThreads())
                .shuffleGrouping("conversionBolt");

来自 SQSQueueSpout 的代码片段(扩展 BaseRichSpout):

@Override
public void nextTuple() 
{
        if (queue.isEmpty()) {
            ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
                    new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10));
            queue.addAll(receiveMessageResult.getMessages());
        }       
        Message message = queue.poll();
        if (message != null) 
        {
            try
            {
                JSONParser parser = new JSONParser();           
                JSONObject jsonObj = (JSONObject) parser.parse(message.getBody());
                //      ack(message.getReceiptHandle());
                if (reliable) {
                    collector.emit(getStreamId(message), new Values(jsonObj.toString()), message.getReceiptHandle());
                } else {
                    // Delete it right away
                    sqs.deleteMessageAsync(new DeleteMessageRequest(queueUrl, message.getReceiptHandle()));             
                    collector.emit(getStreamId(message), new Values(jsonObj.toString()));
                }
            }
            catch (ParseException e) 
            {
                LOG.error("SqsQueueSpout SQLException in SqsQueueSpout.nextTuple(): ", e);
            }
        } else {
            // Still empty, go to sleep.
            Utils.sleep(sleepTime);
        }
    }

    public String getStreamId(Message message) {
        return Utils.DEFAULT_STREAM_ID;
    }

    public int getSleepTime() {
        return sleepTime;
    }

    public void setSleepTime(int sleepTime) 
    {
        this.sleepTime = sleepTime;
    }

    @Override
    public void ack(Object msgId) {
        System.out.println("......Inside ack in sqsQueueSpout..............."+msgId);
        // Only called in reliable mode.
        try {
            sqs.deleteMessageAsync(new DeleteMessageRequest(queueUrl, (String) msgId));
        } catch (AmazonClientException ace) { }
    }

    @Override
    public void fail(Object msgId) {
        // Only called in reliable mode.
        try {
            sqs.changeMessageVisibilityAsync(
                    new ChangeMessageVisibilityRequest(queueUrl, (String) msgId, 0));
        } catch (AmazonClientException ace) { }
    }

    @Override
    public void close() {
        sqs.shutdown();
        ((AmazonSQSAsyncClient) sqs).getExecutorService().shutdownNow();
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("message"));
    }

从我的第一个 Bolt 中截取的代码(扩展 BaseRichBolt):

public class TransformerBolt extends BaseRichBolt 
{
    private static final long serialVersionUID = 1L;
    public static final Logger LOG = LoggerFactory.getLogger(TransformerBolt.class);
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String eventStr = input.getString(0);
//some code here to convert the json string to map
//Map datamap, long packetId being sent to next bolt
    this.collector.emit(input, new Values(dataMap,packetId));       
        } 
        catch (Exception e) {
            LOG.warn("Exception while converting AWS SQS to HashMap :{}", e);
        }    
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("dataMap", "packetId"));
    }
}

来自第二个 Bolt 的代码片段:

public class ConversionBolt extends BaseRichBolt 
{
    private static final long serialVersionUID = 1L;
    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) 
    {
        try{
            Map dataMap = (Map)input.getValue(0);
            Long packetId = (Long)input.getValue(1);

                //this ack is not working
                this.collector.ack(input);
        }catch(Exception e){
            this.collector.fail(input);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

如果您需要更多信息,请告诉我。有人阐明了为什么我的 spout 中被覆盖的 ack 没有被调用(从我的第二个 bolt)...

您必须ack 所有 螺栓中的所有传入元组,即,将collector.ack(input) 添加到TransformerBolt.execute(Tuple input)

您看到的日志消息是正确的:您的代码调用 collector.ack(...) 并且此调用已被记录。在你的拓扑结构中对 ack 的调用是 而不是 Spout.ack(...) 的调用:每次 Spout 发出一个带有消息 ID 的元组时,这个 ID 都会被注册运行 你的拓扑结构的攻击者。这些 ackers 将在 Bolt 的每个 ack 上收到一条消息,收集这些消息并在收到元组的所有 ack 时通知 Spout。如果 Spout 从 acker 收到此消息,它会调用自己的 ack(Object messageID) 方法。

详情请看这里:https://storm.apache.org/documentation/Guaranteeing-message-processing.html