具有多个租户的条件过滤逻辑的 spark 作业
spark jobs with conditional filtering logic for multiple tenants
我有一个table喜欢
tenant_id,start_date,end_date,use_fancy_transformation
1,20180101,20180201,true
2,20180103,20180115,false
和像
这样的数据框
tenant_id,thing,date,value
1,1,20180105,1
2,2,20180107,2
1,2,20180120,2
现在有一些业务逻辑应该(有条件地)执行不同的事情。最简单的情况是从第一个 table 导出指定日期范围内的数据。显然,每个租户的日期范围都不同。
所以当执行(伪代码)时df.filter(date between(tenant2_start, tenant2_end).write.partitionBy("tenant_id", "current_date").parquet
过滤器被应用并且tenant1的内容丢失。
我想到的可能的解决方案(none真的很满足我,都显得笨拙):
- 不并行化配置 table 然后迭代每个条目。然而,这可能需要一些相当昂贵的转换来重新运行(最好只计算一次)。
- 仅在代价高昂的转换之后对配置对象进行迭代可能是一种可能性 - 但我认为这不是一个好的解决方案,因为当第一个租户的导出工作正常但第二个租户的导出失败时,那么spark/ yarn 将从头开始并重新计算两者。
- 另一种可能的解决方案是通过工作流 运行ner(如 Oozie / Airflow)从外部对配置进行参数化,但是,这会触发大量工作(以防存在大量租户) .记住代价高昂的转型。
你看到优雅的出路了吗?
非等值连接可以很好地解决问题
我有一个table喜欢
tenant_id,start_date,end_date,use_fancy_transformation
1,20180101,20180201,true
2,20180103,20180115,false
和像
这样的数据框tenant_id,thing,date,value
1,1,20180105,1
2,2,20180107,2
1,2,20180120,2
现在有一些业务逻辑应该(有条件地)执行不同的事情。最简单的情况是从第一个 table 导出指定日期范围内的数据。显然,每个租户的日期范围都不同。
所以当执行(伪代码)时df.filter(date between(tenant2_start, tenant2_end).write.partitionBy("tenant_id", "current_date").parquet
过滤器被应用并且tenant1的内容丢失。
我想到的可能的解决方案(none真的很满足我,都显得笨拙):
- 不并行化配置 table 然后迭代每个条目。然而,这可能需要一些相当昂贵的转换来重新运行(最好只计算一次)。
- 仅在代价高昂的转换之后对配置对象进行迭代可能是一种可能性 - 但我认为这不是一个好的解决方案,因为当第一个租户的导出工作正常但第二个租户的导出失败时,那么spark/ yarn 将从头开始并重新计算两者。
- 另一种可能的解决方案是通过工作流 运行ner(如 Oozie / Airflow)从外部对配置进行参数化,但是,这会触发大量工作(以防存在大量租户) .记住代价高昂的转型。
你看到优雅的出路了吗?
非等值连接可以很好地解决问题