Flink 批量连接性能
Flink batch join performance
我一直在以批处理模式测试与 TableApi 和 DataStream api 的简单连接。但是我的结果很糟糕,所以一定是我做错了什么。用于连接的数据集约为 900gb 和 3gb。用于测试的环境是具有 10 * m5.xlarge 个工作节点的 EMR。
TableApi 使用的方法是在数据 s3 路径上创建一个 tables,并在目标 s3 路径上对创建的 table 执行插入语句。通过调整任务管理器内存、numberOfTaskSlots、并行度,但无法使其在某种程度上执行 table 时间(至少 1.5 小时)。
在批处理模式下使用 DataStreamApi 时,我总是会遇到一个问题,即 yarn 会终止任务,因为它使用了超过 90% 的磁盘 space。所以我很困惑,如果这是由于代码,或者只是 flink 需要比 spark 更多的磁盘 space。
读取数据流:
val sourceStream: DataStream[SourceObj] = env.fromSource(source, WatermarkStrategy.noWatermarks(), "someSourceName")
.map(x => SourceObj.convertFromString(x))
加入:
val joinedStream = sourceStream.join(sourceStream2)
.where(col1 => sourceStream.col1)
.equalTo(col2 => sourceStream2.col2)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(1))
.apply{
(s, c) => JoinedObj(c.col1, s.col2, s.col3)
}
我是不是遗漏了什么或者我只需要扩展集群?
一般来说,你最好使用 Flink 的 Table/SQL API 来实现关系型工作负载,这样它的优化器就有机会帮忙了。
但是如果我没看错的话,
这个特定的连接执行起来会非常昂贵,因为状态中没有任何东西会过期。两个表都将在 Flink 中完全具体化,因为对于此查询,每一行输入都保持相关并可能影响结果。
如果您可以将其转换为某种具有时间约束的联接,优化器可以使用该联接来释放不再有用的行,那么它的行为会好得多。
当您在批处理模式下使用 DataStream API 时,extensively using managed memory in all shuflle/join/reduce operations. Also, as stated in the last paragraph here Flink 会在连接期间将无法容纳在内存中的所有数据溢出到磁盘。
因此,我认为这可能是磁盘 space 短缺问题的原因。我在工作中遇到了同样的问题。
我一直在以批处理模式测试与 TableApi 和 DataStream api 的简单连接。但是我的结果很糟糕,所以一定是我做错了什么。用于连接的数据集约为 900gb 和 3gb。用于测试的环境是具有 10 * m5.xlarge 个工作节点的 EMR。
TableApi 使用的方法是在数据 s3 路径上创建一个 tables,并在目标 s3 路径上对创建的 table 执行插入语句。通过调整任务管理器内存、numberOfTaskSlots、并行度,但无法使其在某种程度上执行 table 时间(至少 1.5 小时)。
在批处理模式下使用 DataStreamApi 时,我总是会遇到一个问题,即 yarn 会终止任务,因为它使用了超过 90% 的磁盘 space。所以我很困惑,如果这是由于代码,或者只是 flink 需要比 spark 更多的磁盘 space。 读取数据流:
val sourceStream: DataStream[SourceObj] = env.fromSource(source, WatermarkStrategy.noWatermarks(), "someSourceName")
.map(x => SourceObj.convertFromString(x))
加入:
val joinedStream = sourceStream.join(sourceStream2)
.where(col1 => sourceStream.col1)
.equalTo(col2 => sourceStream2.col2)
.window(GlobalWindows.create())
.trigger(CountTrigger.of(1))
.apply{
(s, c) => JoinedObj(c.col1, s.col2, s.col3)
}
我是不是遗漏了什么或者我只需要扩展集群?
一般来说,你最好使用 Flink 的 Table/SQL API 来实现关系型工作负载,这样它的优化器就有机会帮忙了。
但是如果我没看错的话, 这个特定的连接执行起来会非常昂贵,因为状态中没有任何东西会过期。两个表都将在 Flink 中完全具体化,因为对于此查询,每一行输入都保持相关并可能影响结果。
如果您可以将其转换为某种具有时间约束的联接,优化器可以使用该联接来释放不再有用的行,那么它的行为会好得多。
当您在批处理模式下使用 DataStream API 时,extensively using managed memory in all shuflle/join/reduce operations. Also, as stated in the last paragraph here Flink 会在连接期间将无法容纳在内存中的所有数据溢出到磁盘。
因此,我认为这可能是磁盘 space 短缺问题的原因。我在工作中遇到了同样的问题。