Storm SQS 消息未被确认
Storm SQS messages not getting acked
我有一个拓扑,其中 1 个 spout 从 2 个 SQS 队列和 5 个螺栓读取数据。处理后,当我尝试从第二个螺栓确认时,它没有被确认。
我 运行 它处于可靠模式并尝试确认最后一个螺栓。我收到这条消息,就好像消息被确认了一样。但它并没有从队列中删除,也没有调用覆盖的 ack()
方法。看起来它调用了 backtype.storm.task.OutputCollector
中的默认 ack 方法,而不是我的 spout 中的重写方法。
8240 [Thread-24-conversionBolt] INFO backtype.storm.daemon.task - Emitting: conversionBolt__ack_ack [-7578372739434961741 -8189877254603774958]
我已将消息 ID 锚定到我的 SQS 队列喷口中的元组并发送到第一个 bolt。
collector.emit(getStreamId(message), new Values(jsonObj.toString()), message.getReceiptHandle());
我的队列中覆盖了 ack() 和 fail() 方法 spout.Default Visibility Timeout
已设置为 30 秒
我的拓扑中的代码片段:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("firstQueueSpout",
new SqsQueueSpout(StormConfigurations.getQueueURL()
+ StormConfigurations.getFirstQueueName(), true),
StormConfigurations.getAwsQueueSpoutThreads());
builder.setSpout("secondQueueSpout",
new SqsQueueSpout(StormConfigurations.getQueueURL()
+ StormConfigurations.getSecondQueueName(),
true), StormConfigurations.getAwsQueueSpoutThreads());
builder.setBolt("transformerBolt", new TransformerBolt(),
StormConfigurations.getTranformerBoltThreads())
.shuffleGrouping("firstQueueSpout")
.shuffleGrouping("secondQueueSpout");
builder.setBolt("conversionBolt", new ConversionBolt(),
StormConfigurations.getTranformerBoltThreads())
.shuffleGrouping("transformerBolt");
// To dispatch it to the corresponding bolts based on packet type
builder.setBolt("dispatchBolt", new DispatcherBolt(),
StormConfigurations.getDispatcherBoltThreads())
.shuffleGrouping("conversionBolt");
来自 SQSQueueSpout 的代码片段(扩展 BaseRichSpout):
@Override
public void nextTuple()
{
if (queue.isEmpty()) {
ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10));
queue.addAll(receiveMessageResult.getMessages());
}
Message message = queue.poll();
if (message != null)
{
try
{
JSONParser parser = new JSONParser();
JSONObject jsonObj = (JSONObject) parser.parse(message.getBody());
// ack(message.getReceiptHandle());
if (reliable) {
collector.emit(getStreamId(message), new Values(jsonObj.toString()), message.getReceiptHandle());
} else {
// Delete it right away
sqs.deleteMessageAsync(new DeleteMessageRequest(queueUrl, message.getReceiptHandle()));
collector.emit(getStreamId(message), new Values(jsonObj.toString()));
}
}
catch (ParseException e)
{
LOG.error("SqsQueueSpout SQLException in SqsQueueSpout.nextTuple(): ", e);
}
} else {
// Still empty, go to sleep.
Utils.sleep(sleepTime);
}
}
public String getStreamId(Message message) {
return Utils.DEFAULT_STREAM_ID;
}
public int getSleepTime() {
return sleepTime;
}
public void setSleepTime(int sleepTime)
{
this.sleepTime = sleepTime;
}
@Override
public void ack(Object msgId) {
System.out.println("......Inside ack in sqsQueueSpout..............."+msgId);
// Only called in reliable mode.
try {
sqs.deleteMessageAsync(new DeleteMessageRequest(queueUrl, (String) msgId));
} catch (AmazonClientException ace) { }
}
@Override
public void fail(Object msgId) {
// Only called in reliable mode.
try {
sqs.changeMessageVisibilityAsync(
new ChangeMessageVisibilityRequest(queueUrl, (String) msgId, 0));
} catch (AmazonClientException ace) { }
}
@Override
public void close() {
sqs.shutdown();
((AmazonSQSAsyncClient) sqs).getExecutorService().shutdownNow();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
从我的第一个 Bolt 中截取的代码(扩展 BaseRichBolt):
public class TransformerBolt extends BaseRichBolt
{
private static final long serialVersionUID = 1L;
public static final Logger LOG = LoggerFactory.getLogger(TransformerBolt.class);
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String eventStr = input.getString(0);
//some code here to convert the json string to map
//Map datamap, long packetId being sent to next bolt
this.collector.emit(input, new Values(dataMap,packetId));
}
catch (Exception e) {
LOG.warn("Exception while converting AWS SQS to HashMap :{}", e);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("dataMap", "packetId"));
}
}
来自第二个 Bolt 的代码片段:
public class ConversionBolt extends BaseRichBolt
{
private static final long serialVersionUID = 1L;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input)
{
try{
Map dataMap = (Map)input.getValue(0);
Long packetId = (Long)input.getValue(1);
//this ack is not working
this.collector.ack(input);
}catch(Exception e){
this.collector.fail(input);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
如果您需要更多信息,请告诉我。有人阐明了为什么我的 spout 中被覆盖的 ack 没有被调用(从我的第二个 bolt)...
您必须ack
所有 螺栓中的所有传入元组,即,将collector.ack(input)
添加到TransformerBolt.execute(Tuple input)
。
您看到的日志消息是正确的:您的代码调用 collector.ack(...)
并且此调用已被记录。在你的拓扑结构中对 ack
的调用是 而不是 对 Spout.ack(...)
的调用:每次 Spout 发出一个带有消息 ID 的元组时,这个 ID 都会被注册运行 你的拓扑结构的攻击者。这些 ackers 将在 Bolt 的每个 ack 上收到一条消息,收集这些消息并在收到元组的所有 ack 时通知 Spout。如果 Spout 从 acker 收到此消息,它会调用自己的 ack(Object messageID)
方法。
详情请看这里:https://storm.apache.org/documentation/Guaranteeing-message-processing.html
我有一个拓扑,其中 1 个 spout 从 2 个 SQS 队列和 5 个螺栓读取数据。处理后,当我尝试从第二个螺栓确认时,它没有被确认。
我 运行 它处于可靠模式并尝试确认最后一个螺栓。我收到这条消息,就好像消息被确认了一样。但它并没有从队列中删除,也没有调用覆盖的 ack()
方法。看起来它调用了 backtype.storm.task.OutputCollector
中的默认 ack 方法,而不是我的 spout 中的重写方法。
8240 [Thread-24-conversionBolt] INFO backtype.storm.daemon.task - Emitting: conversionBolt__ack_ack [-7578372739434961741 -8189877254603774958]
我已将消息 ID 锚定到我的 SQS 队列喷口中的元组并发送到第一个 bolt。
collector.emit(getStreamId(message), new Values(jsonObj.toString()), message.getReceiptHandle());
我的队列中覆盖了 ack() 和 fail() 方法 spout.Default Visibility Timeout
已设置为 30 秒
我的拓扑中的代码片段:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("firstQueueSpout",
new SqsQueueSpout(StormConfigurations.getQueueURL()
+ StormConfigurations.getFirstQueueName(), true),
StormConfigurations.getAwsQueueSpoutThreads());
builder.setSpout("secondQueueSpout",
new SqsQueueSpout(StormConfigurations.getQueueURL()
+ StormConfigurations.getSecondQueueName(),
true), StormConfigurations.getAwsQueueSpoutThreads());
builder.setBolt("transformerBolt", new TransformerBolt(),
StormConfigurations.getTranformerBoltThreads())
.shuffleGrouping("firstQueueSpout")
.shuffleGrouping("secondQueueSpout");
builder.setBolt("conversionBolt", new ConversionBolt(),
StormConfigurations.getTranformerBoltThreads())
.shuffleGrouping("transformerBolt");
// To dispatch it to the corresponding bolts based on packet type
builder.setBolt("dispatchBolt", new DispatcherBolt(),
StormConfigurations.getDispatcherBoltThreads())
.shuffleGrouping("conversionBolt");
来自 SQSQueueSpout 的代码片段(扩展 BaseRichSpout):
@Override
public void nextTuple()
{
if (queue.isEmpty()) {
ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(
new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10));
queue.addAll(receiveMessageResult.getMessages());
}
Message message = queue.poll();
if (message != null)
{
try
{
JSONParser parser = new JSONParser();
JSONObject jsonObj = (JSONObject) parser.parse(message.getBody());
// ack(message.getReceiptHandle());
if (reliable) {
collector.emit(getStreamId(message), new Values(jsonObj.toString()), message.getReceiptHandle());
} else {
// Delete it right away
sqs.deleteMessageAsync(new DeleteMessageRequest(queueUrl, message.getReceiptHandle()));
collector.emit(getStreamId(message), new Values(jsonObj.toString()));
}
}
catch (ParseException e)
{
LOG.error("SqsQueueSpout SQLException in SqsQueueSpout.nextTuple(): ", e);
}
} else {
// Still empty, go to sleep.
Utils.sleep(sleepTime);
}
}
public String getStreamId(Message message) {
return Utils.DEFAULT_STREAM_ID;
}
public int getSleepTime() {
return sleepTime;
}
public void setSleepTime(int sleepTime)
{
this.sleepTime = sleepTime;
}
@Override
public void ack(Object msgId) {
System.out.println("......Inside ack in sqsQueueSpout..............."+msgId);
// Only called in reliable mode.
try {
sqs.deleteMessageAsync(new DeleteMessageRequest(queueUrl, (String) msgId));
} catch (AmazonClientException ace) { }
}
@Override
public void fail(Object msgId) {
// Only called in reliable mode.
try {
sqs.changeMessageVisibilityAsync(
new ChangeMessageVisibilityRequest(queueUrl, (String) msgId, 0));
} catch (AmazonClientException ace) { }
}
@Override
public void close() {
sqs.shutdown();
((AmazonSQSAsyncClient) sqs).getExecutorService().shutdownNow();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("message"));
}
从我的第一个 Bolt 中截取的代码(扩展 BaseRichBolt):
public class TransformerBolt extends BaseRichBolt
{
private static final long serialVersionUID = 1L;
public static final Logger LOG = LoggerFactory.getLogger(TransformerBolt.class);
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String eventStr = input.getString(0);
//some code here to convert the json string to map
//Map datamap, long packetId being sent to next bolt
this.collector.emit(input, new Values(dataMap,packetId));
}
catch (Exception e) {
LOG.warn("Exception while converting AWS SQS to HashMap :{}", e);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("dataMap", "packetId"));
}
}
来自第二个 Bolt 的代码片段:
public class ConversionBolt extends BaseRichBolt
{
private static final long serialVersionUID = 1L;
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input)
{
try{
Map dataMap = (Map)input.getValue(0);
Long packetId = (Long)input.getValue(1);
//this ack is not working
this.collector.ack(input);
}catch(Exception e){
this.collector.fail(input);
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
如果您需要更多信息,请告诉我。有人阐明了为什么我的 spout 中被覆盖的 ack 没有被调用(从我的第二个 bolt)...
您必须ack
所有 螺栓中的所有传入元组,即,将collector.ack(input)
添加到TransformerBolt.execute(Tuple input)
。
您看到的日志消息是正确的:您的代码调用 collector.ack(...)
并且此调用已被记录。在你的拓扑结构中对 ack
的调用是 而不是 对 Spout.ack(...)
的调用:每次 Spout 发出一个带有消息 ID 的元组时,这个 ID 都会被注册运行 你的拓扑结构的攻击者。这些 ackers 将在 Bolt 的每个 ack 上收到一条消息,收集这些消息并在收到元组的所有 ack 时通知 Spout。如果 Spout 从 acker 收到此消息,它会调用自己的 ack(Object messageID)
方法。
详情请看这里:https://storm.apache.org/documentation/Guaranteeing-message-processing.html