Apache Storm 2.1.0 本地 DRPC 没有 return 任何响应,尽管最后一个螺栓很好地向收集器发送了一个元组
Apache Storm 2.1.0 local DRPC does not return any response although a tuple is well emitted to the collector by the last bolt
我在尝试 运行 包含一个螺栓的 DRPC 拓扑并通过本地集群查询它时遇到问题。使用 IntelliJ 调试后,bolt 确实执行了,但是 JCQueue 在 bolt 执行后陷入无限循环,直到向服务器发送超时。
这是用于构建拓扑构建器的代码:
public static LinearDRPCTopologyBuilder createBuilder()
{
var bolt = new MRedisLookupBolt(createRedisConfiguration(), new RedisTurnoverMapper());
var builder = new LinearDRPCTopologyBuilder("sales");
builder.addBolt(bolt, 1).localOrShuffleGrouping();
return builder;
}
MRedisLookupBolt 只是 IBasicBolt 对 Jedis 执行 hget 命令的一个非常简单的实现。 MRedisLookupBolt 的 execute
方法只是发出 Values
的实例,其中包含两个字段的值,这些字段声明如下:
declarer.declare(new Fields("id", "Value"));
在这样的单元测试中构建和查询拓扑:
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);
try(LocalDRPC drpc = new LocalDRPC())
{
LocalCluster cluster = new LocalCluster();
var builder = BasicRedisRPCTopology.createBuilder();
LocalCluster.LocalTopology topo = cluster.submitTopology(
"Sales-fetch", conf, builder.createLocalTopology(drpc));
var result = drpc.execute("sales", "XXXXX");
System.out.println("################ Result: " + result);
}
catch (Exception e)
{
e.printStackTrace();
}
阅读日志时,我确定数据是红色的,并且所有内容都已发出
但最后,我用我的测试方法轻轻地打印出了这个堆栈跟踪。当然,没有值分配给结果变量,过程永远不会到达最后的打印指令:
我在这里缺少一些东西。我的理解是:BoltExecutor 用来检索执行哪个螺栓的 ID 的 JCQueue 永远不会结束,尽管只有一个参数发送到本地 DRPC 并且只有一个螺栓声明到拓扑中。我已经尝试向拓扑添加更多螺栓或更改用于创建它的构建器实现,但没有成功。
我使用 Apache Storm 2.1.0 找到了适合我的用例的解决方案。
似乎按照文档的建议调用本地集群的submitTopology
方法并没有使用LinearDRPCTopologyBuilder
构建拓扑正确结束执行器版本2.1.0。
通过仔细查看源代码,可以了解如何将 LinearDRPCTopologyBuilder
逻辑直接应用于 TopologyBuilder
。
这是应用于 createBuilder
方法的更改:
public static TopologyBuilder createBuilder(ILocalDRPC localDRPC)
{
var spout = Optional.ofNullable(localDRPC)
.map(drpc -> new DRPCSpout("sales", drpc))
.orElse(new DRPCSpout("sales"));
var bolt = new MRedisLookupBolt(createRedisConfiguration(), new RedisTurnoverMapper());
var builder = new TopologyBuilder();
builder.setSpout("drpc", spout);
builder.setBolt("redisLookup", bolt, 1)
.shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults())
.shuffleGrouping("redisLookup");
return builder;
}
这里是一个执行的例子:
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);
try(LocalDRPC drpc = new LocalDRPC())
{
LocalCluster cluster = new LocalCluster();
var builder = BasicRedisRPCTopology.createBuilder(drpc);
cluster.submitTopology("Sales-fetch", conf, builder.createTopology());
var result = drpc.execute("sales", "XXXXX");
System.out.println("################ Result: " + result);
}
catch (Exception e)
{
e.printStackTrace();
}
不幸的是,此解决方案不允许使用 LinearDRPCTopologyBuilder
的所有嵌入式工具,并且暗示要构建所有拓扑流 'by hand'。有必要将映射器行为更改为,因为字段的显示顺序与以前不同。
我在尝试 运行 包含一个螺栓的 DRPC 拓扑并通过本地集群查询它时遇到问题。使用 IntelliJ 调试后,bolt 确实执行了,但是 JCQueue 在 bolt 执行后陷入无限循环,直到向服务器发送超时。
这是用于构建拓扑构建器的代码:
public static LinearDRPCTopologyBuilder createBuilder()
{
var bolt = new MRedisLookupBolt(createRedisConfiguration(), new RedisTurnoverMapper());
var builder = new LinearDRPCTopologyBuilder("sales");
builder.addBolt(bolt, 1).localOrShuffleGrouping();
return builder;
}
MRedisLookupBolt 只是 IBasicBolt 对 Jedis 执行 hget 命令的一个非常简单的实现。 MRedisLookupBolt 的 execute
方法只是发出 Values
的实例,其中包含两个字段的值,这些字段声明如下:
declarer.declare(new Fields("id", "Value"));
在这样的单元测试中构建和查询拓扑:
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);
try(LocalDRPC drpc = new LocalDRPC())
{
LocalCluster cluster = new LocalCluster();
var builder = BasicRedisRPCTopology.createBuilder();
LocalCluster.LocalTopology topo = cluster.submitTopology(
"Sales-fetch", conf, builder.createLocalTopology(drpc));
var result = drpc.execute("sales", "XXXXX");
System.out.println("################ Result: " + result);
}
catch (Exception e)
{
e.printStackTrace();
}
阅读日志时,我确定数据是红色的,并且所有内容都已发出
但最后,我用我的测试方法轻轻地打印出了这个堆栈跟踪。当然,没有值分配给结果变量,过程永远不会到达最后的打印指令:
我在这里缺少一些东西。我的理解是:BoltExecutor 用来检索执行哪个螺栓的 ID 的 JCQueue 永远不会结束,尽管只有一个参数发送到本地 DRPC 并且只有一个螺栓声明到拓扑中。我已经尝试向拓扑添加更多螺栓或更改用于创建它的构建器实现,但没有成功。
我使用 Apache Storm 2.1.0 找到了适合我的用例的解决方案。
似乎按照文档的建议调用本地集群的submitTopology
方法并没有使用LinearDRPCTopologyBuilder
构建拓扑正确结束执行器版本2.1.0。
通过仔细查看源代码,可以了解如何将 LinearDRPCTopologyBuilder
逻辑直接应用于 TopologyBuilder
。
这是应用于 createBuilder
方法的更改:
public static TopologyBuilder createBuilder(ILocalDRPC localDRPC)
{
var spout = Optional.ofNullable(localDRPC)
.map(drpc -> new DRPCSpout("sales", drpc))
.orElse(new DRPCSpout("sales"));
var bolt = new MRedisLookupBolt(createRedisConfiguration(), new RedisTurnoverMapper());
var builder = new TopologyBuilder();
builder.setSpout("drpc", spout);
builder.setBolt("redisLookup", bolt, 1)
.shuffleGrouping("drpc");
builder.setBolt("return", new ReturnResults())
.shuffleGrouping("redisLookup");
return builder;
}
这里是一个执行的例子:
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);
try(LocalDRPC drpc = new LocalDRPC())
{
LocalCluster cluster = new LocalCluster();
var builder = BasicRedisRPCTopology.createBuilder(drpc);
cluster.submitTopology("Sales-fetch", conf, builder.createTopology());
var result = drpc.execute("sales", "XXXXX");
System.out.println("################ Result: " + result);
}
catch (Exception e)
{
e.printStackTrace();
}
不幸的是,此解决方案不允许使用 LinearDRPCTopologyBuilder
的所有嵌入式工具,并且暗示要构建所有拓扑流 'by hand'。有必要将映射器行为更改为,因为字段的显示顺序与以前不同。