Apache Storm Trident 和 Kafka Spout 集成
Apache Storm Trident and Kafka Spout Integration
我找不到将 Kafka 与 Apache Storm Trident 正确集成的好文档。我试图在这里查看之前发布的相关问题,但没有足够的信息。
我想将 Trident 与 Kafka 连接为 OpaqueTridentKafkaSpout。这是当前可用的示例代码
GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(properties.getProperty("topic", "mytopic"));
Broker brokerForPartition0 = new Broker("IP1",9092);
Broker brokerForPartition1 = new Broker("IP2", 9092);
Broker brokerForPartition2 = new Broker("IP3:9092");
globalPartitionInformation.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
globalPartitionInformation.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
globalPartitionInformation.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
StaticHosts staticHosts = new StaticHosts(globalPartitionInformation);
TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts,properties.getProperty("topic", "mytopic"));
tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig);
有了这个,我可以为我的拓扑生成流,如下面的代码所示
TridentTopology topology = new TridentTopology();
Stream analyticsStream = topology.newStream("spout", kafkaSpout).parallelismHint(Integer.valueOf(properties.getProperty("spout","6")))
虽然我提供了并行性和我的分区,但只有 1 个 Kafka Spout 执行器是 运行,因此我无法很好地扩展它。
谁能指导我将 Apache Storm Trident (2.0.0) 与 Apache Kafka (1.0) 集成的更好方法,每个都有 3 个节点集群?
此外,一旦它完成从 Kafka 的读取,我就会不断收到这些日志
2018-04-09 14:17:34.119 o.a.s.k.KafkaUtils Thread-15-spout-spout-executor[79 79] [INFO] Metrics Tick: Not enough data to calculate spout lag. 2018-04-09 14:17:34.129 o.a.s.k.KafkaUtils Thread-21-spout-spout-executor[88 88] [INFO] Metrics Tick: Not enough data to calculate spout lag.
并且在 Storm UI 中,我可以看到上面消息的确认。有没有关于忽略指标 Ticks 的建议?
无论如何,如果您使用的是 Storm 2.0.0,我认为您应该切换到 storm-kafka-client Trident spout。 storm-kafka 模块仅用于支持较旧的 Kafka 版本,因为底层 Kafka API (SimpleConsumer) 已被删除。新模块从 0.10.0.0 开始支持 Kafka。
您可以在此处找到新 spout 的 Trident 拓扑示例 https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java。
我找不到将 Kafka 与 Apache Storm Trident 正确集成的好文档。我试图在这里查看之前发布的相关问题,但没有足够的信息。
我想将 Trident 与 Kafka 连接为 OpaqueTridentKafkaSpout。这是当前可用的示例代码
GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(properties.getProperty("topic", "mytopic"));
Broker brokerForPartition0 = new Broker("IP1",9092);
Broker brokerForPartition1 = new Broker("IP2", 9092);
Broker brokerForPartition2 = new Broker("IP3:9092");
globalPartitionInformation.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
globalPartitionInformation.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
globalPartitionInformation.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
StaticHosts staticHosts = new StaticHosts(globalPartitionInformation);
TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts,properties.getProperty("topic", "mytopic"));
tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig);
有了这个,我可以为我的拓扑生成流,如下面的代码所示
TridentTopology topology = new TridentTopology();
Stream analyticsStream = topology.newStream("spout", kafkaSpout).parallelismHint(Integer.valueOf(properties.getProperty("spout","6")))
虽然我提供了并行性和我的分区,但只有 1 个 Kafka Spout 执行器是 运行,因此我无法很好地扩展它。
谁能指导我将 Apache Storm Trident (2.0.0) 与 Apache Kafka (1.0) 集成的更好方法,每个都有 3 个节点集群?
此外,一旦它完成从 Kafka 的读取,我就会不断收到这些日志
2018-04-09 14:17:34.119 o.a.s.k.KafkaUtils Thread-15-spout-spout-executor[79 79] [INFO] Metrics Tick: Not enough data to calculate spout lag. 2018-04-09 14:17:34.129 o.a.s.k.KafkaUtils Thread-21-spout-spout-executor[88 88] [INFO] Metrics Tick: Not enough data to calculate spout lag.
并且在 Storm UI 中,我可以看到上面消息的确认。有没有关于忽略指标 Ticks 的建议?
无论如何,如果您使用的是 Storm 2.0.0,我认为您应该切换到 storm-kafka-client Trident spout。 storm-kafka 模块仅用于支持较旧的 Kafka 版本,因为底层 Kafka API (SimpleConsumer) 已被删除。新模块从 0.10.0.0 开始支持 Kafka。
您可以在此处找到新 spout 的 Trident 拓扑示例 https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java。