如何在 Storm 上同步 KafkaSpout
How to make a synchronous KafkaSpout on Storm
我正在尝试制作一个同步使用来自 Kafka 的消息的 Kafka 消费者。
我遇到的实际问题是消息队列存储在 Storm Spout 中。
我想做的是让 Storm 等待 Kafka ACK 回复,然后才让 Storm 消费下一条消息。
我正在使用 Storm KafkaSpout:
/**
* Creates a configured kafka spout.
* @param topicName Topic where the kafka spout subscribes
* @return An instance of configured KafkaSpout
*/
public KafkaSpout getkafkaSpout(String topicName){
return new KafkaSpout(this.getSpoutConfig(topicName));
}
/**
* Create the necessary configuration to create a new kafka spout.
* @param topicName Topic where the kafka spout subscribes
* @return Spout configuration
*/
public SpoutConfig getSpoutConfig(String topicName) {
SpoutConfig spoutConfig=new SpoutConfig(this.getZkHosts(),topicName, "", String.join("-",topicName,RandomStringUtils.randomAlphanumeric(20)));
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.startOffsetTime=kafka.api.OffsetRequest.LatestTime();
return spoutConfig;
}
builder.setSpout("kafkaPackedData", stormConfig.getkafkaSpout("topic_kafka"), 2);
我已经更新到 Storm 2.0.0,我使用 storm-kafka-client。但是如果我配置
Storm queue to 50: setMaxSpoutPending(50);
当我向 Kafka 发送大量数据时,Storm 停止使用它。
我已经使用下一个配置配置了 Kafka 消费者:
KafkaSpoutConfig spoutConf = KafkaSpoutConfig.builder("stream1:9092", "kafkaToStormAlarms")
.setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000") //Set session timeout
.setProp(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000") //Set request timeout
.setOffsetCommitPeriodMs(10000) //Set automatic confirmation time (in ms)
.setFirstPollOffsetStrategy(LATEST) //Set to pull the latest messages
.setRetry(kafkaSpoutRetryService)
.build();
当 Storm 消耗了 50 条与 MaxSpoutPending 配置相同的消息时,它将停止消耗更多消息。也许下一个螺栓没有正确发送 ACK?我在 KafkaConsumerSpout 之后使用下一个螺栓:
public class testBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("MQTTmessage"));
}
@Override
public void execute(Tuple tuple, BasicOutputCollector boc) {
System.out.println("\n\n\n\nLLEGA BIENN AL SPLIT TEXT BOLT\n\n");
System.out.println("TUPLE "+tuple);
String text = tuple.getString(4);
List<String> lines = Arrays.asList(text.split("\r?\n"));
lines.forEach(line -> {
boc.emit(new Values(line));
});
}
}
关于节流 spout:是的,您可以通过将拓扑配置中的 topology.max.spout.pending
选项设置为 1
来执行此操作。如果您想获得良好的吞吐量,我不会真的推荐它,但我假设您已经仔细考虑了为什么需要拓扑以这种方式运行。
关于新的 spout:stream1:9092
服务器 Kafka 运行 是否开启,kafkaToStormAlarms
是您要发送到的主题吗?如果没有,那可能是你的问题。否则,请检查 storm/logs/workers-artifacts
中的 worker 日志,它可能会告诉您为什么 spout 没有发出任何东西。
最后,是的,你绝对应该使用 storm-kafka-client
而不是 storm-kafka
,否则你将无法升级到 Storm 2.0.0,或与此相关的最新 Kafka 版本。
我正在尝试制作一个同步使用来自 Kafka 的消息的 Kafka 消费者。
我遇到的实际问题是消息队列存储在 Storm Spout 中。
我想做的是让 Storm 等待 Kafka ACK 回复,然后才让 Storm 消费下一条消息。
我正在使用 Storm KafkaSpout:
/**
* Creates a configured kafka spout.
* @param topicName Topic where the kafka spout subscribes
* @return An instance of configured KafkaSpout
*/
public KafkaSpout getkafkaSpout(String topicName){
return new KafkaSpout(this.getSpoutConfig(topicName));
}
/**
* Create the necessary configuration to create a new kafka spout.
* @param topicName Topic where the kafka spout subscribes
* @return Spout configuration
*/
public SpoutConfig getSpoutConfig(String topicName) {
SpoutConfig spoutConfig=new SpoutConfig(this.getZkHosts(),topicName, "", String.join("-",topicName,RandomStringUtils.randomAlphanumeric(20)));
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.startOffsetTime=kafka.api.OffsetRequest.LatestTime();
return spoutConfig;
}
builder.setSpout("kafkaPackedData", stormConfig.getkafkaSpout("topic_kafka"), 2);
我已经更新到 Storm 2.0.0,我使用 storm-kafka-client。但是如果我配置
Storm queue to 50: setMaxSpoutPending(50);
当我向 Kafka 发送大量数据时,Storm 停止使用它。
我已经使用下一个配置配置了 Kafka 消费者:
KafkaSpoutConfig spoutConf = KafkaSpoutConfig.builder("stream1:9092", "kafkaToStormAlarms")
.setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000") //Set session timeout
.setProp(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000") //Set request timeout
.setOffsetCommitPeriodMs(10000) //Set automatic confirmation time (in ms)
.setFirstPollOffsetStrategy(LATEST) //Set to pull the latest messages
.setRetry(kafkaSpoutRetryService)
.build();
当 Storm 消耗了 50 条与 MaxSpoutPending 配置相同的消息时,它将停止消耗更多消息。也许下一个螺栓没有正确发送 ACK?我在 KafkaConsumerSpout 之后使用下一个螺栓:
public class testBolt extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("MQTTmessage"));
}
@Override
public void execute(Tuple tuple, BasicOutputCollector boc) {
System.out.println("\n\n\n\nLLEGA BIENN AL SPLIT TEXT BOLT\n\n");
System.out.println("TUPLE "+tuple);
String text = tuple.getString(4);
List<String> lines = Arrays.asList(text.split("\r?\n"));
lines.forEach(line -> {
boc.emit(new Values(line));
});
}
}
关于节流 spout:是的,您可以通过将拓扑配置中的 topology.max.spout.pending
选项设置为 1
来执行此操作。如果您想获得良好的吞吐量,我不会真的推荐它,但我假设您已经仔细考虑了为什么需要拓扑以这种方式运行。
关于新的 spout:stream1:9092
服务器 Kafka 运行 是否开启,kafkaToStormAlarms
是您要发送到的主题吗?如果没有,那可能是你的问题。否则,请检查 storm/logs/workers-artifacts
中的 worker 日志,它可能会告诉您为什么 spout 没有发出任何东西。
最后,是的,你绝对应该使用 storm-kafka-client
而不是 storm-kafka
,否则你将无法升级到 Storm 2.0.0,或与此相关的最新 Kafka 版本。