Storm 中的数据并行性
Data parallelism in Storm
我已经阅读了有关 Apache 风暴的信息并做了一些基础教程。我有以下拓扑结构,我想用风暴来实现,但不确定如何处理数据分布。
业务要求是
实时评估客户的投资组合。
简化形式包括:
1) 接受实时市场价格(货币、商品等...)
2) 对于每个价格变动计算每个头寸的当前利润并将其转换为客户账户货币
3) 分析每个客户所有头寸的总数 p/l 和交易量,并在需要时生成信号
4) 在客户层面计算必须是顺序的并且atomic/serialized。
IE。所有头寸都必须根据其进入系统的订单中的每个价格变动进行评估,并且即使客户有 100 个头寸,也必须根据相同的价格计算总计。
5) 分析由 symbol/customer type/country /etc...汇总的系统中所有头寸的交易量/趋势,并使它们在某种仪表板中可用。
所有订单都执行并存储在rdbms中。
我的主要问题是如何在每个节点处理它自己的部分的不同节点上跨 Storm 螺栓分配成千上万个位置。使用 Modulo 足以对客户进行分区,但是我如何为每个螺栓实例提供 id,以便每个实例只处理自己的等量客户? Storm 中有开箱即用的功能吗?
另一个问题是如何有效地进行上述聚合?
你可以使用 fieldsGrouping
。你可以声明一个元组分组的字段(在你的例子中,id
)。
我假设您的输入流是 JSON 具有 id 和正文字段的对象
{"id":"1234","body":"some body"}
还假设您的拓扑有一个 spout,两个螺栓,即 BoltA 和 BoltB。
在 BoltB 中,覆盖 declareOutputFields 方法并填写详细信息。
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id","log"));
}
你可以像下面这样声明拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout, 1);
builder.setBolt("boltA", new BoltA(), 1)
.shuffleGrouping("spout");
builder.setBolt("counterBolt", new BoltB(), 1).fieldsGrouping("boltB", new Fields("id"));
在这种情况下,来自 boltA
的具有相同 ID 的元组将被传送到 boltB
的相同实例
我已经阅读了有关 Apache 风暴的信息并做了一些基础教程。我有以下拓扑结构,我想用风暴来实现,但不确定如何处理数据分布。 业务要求是 实时评估客户的投资组合。 简化形式包括: 1) 接受实时市场价格(货币、商品等...) 2) 对于每个价格变动计算每个头寸的当前利润并将其转换为客户账户货币 3) 分析每个客户所有头寸的总数 p/l 和交易量,并在需要时生成信号 4) 在客户层面计算必须是顺序的并且atomic/serialized。 IE。所有头寸都必须根据其进入系统的订单中的每个价格变动进行评估,并且即使客户有 100 个头寸,也必须根据相同的价格计算总计。 5) 分析由 symbol/customer type/country /etc...汇总的系统中所有头寸的交易量/趋势,并使它们在某种仪表板中可用。
所有订单都执行并存储在rdbms中。 我的主要问题是如何在每个节点处理它自己的部分的不同节点上跨 Storm 螺栓分配成千上万个位置。使用 Modulo 足以对客户进行分区,但是我如何为每个螺栓实例提供 id,以便每个实例只处理自己的等量客户? Storm 中有开箱即用的功能吗? 另一个问题是如何有效地进行上述聚合?
你可以使用 fieldsGrouping
。你可以声明一个元组分组的字段(在你的例子中,id
)。
我假设您的输入流是 JSON 具有 id 和正文字段的对象
{"id":"1234","body":"some body"}
还假设您的拓扑有一个 spout,两个螺栓,即 BoltA 和 BoltB。
在 BoltB 中,覆盖 declareOutputFields 方法并填写详细信息。
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id","log"));
}
你可以像下面这样声明拓扑
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout, 1);
builder.setBolt("boltA", new BoltA(), 1)
.shuffleGrouping("spout");
builder.setBolt("counterBolt", new BoltB(), 1).fieldsGrouping("boltB", new Fields("id"));
在这种情况下,来自 boltA
的具有相同 ID 的元组将被传送到 boltB