SparkSQL PostgresQL 数据框分区
SparkSQL PostgresQL Dataframe partitions
我有一个连接到 Postgres 数据库的 SparkSQL 的非常简单的设置,我正在尝试从 table 获取一个 DataFrame,该 DataFrame 具有 X 个分区(假设为 2)。代码如下:
Map<String, String> options = new HashMap<String, String>();
options.put("url", DB_URL);
options.put("driver", POSTGRES_DRIVER);
options.put("dbtable", "select ID, OTHER from TABLE limit 1000");
options.put("partitionColumn", "ID");
options.put("lowerBound", "100");
options.put("upperBound", "500");
options.put("numPartitions","2");
DataFrame housingDataFrame = sqlContext.read().format("jdbc").options(options).load();
由于某种原因,DataFrame 的一个分区几乎包含了所有行。
据我所知,lowerBound/upperBound
是用于微调的参数。在 SparkSQL 的文档 (Spark 1.4.0 - spark-sql_2.11) 中,它说它们用于定义步幅,而不是 filter/range 分区列。但这提出了几个问题:
- 步幅是 Spark 为每个执行器(分区)查询数据库的频率(每次查询返回的元素数)?
- 如果不是,这个参数的目的是什么,它们取决于什么,我如何以 stable 的方式平衡我的 DataFrame 分区(不要求所有分区包含相同数量的元素,只是存在平衡 - 例如 2 个分区 100 个元素 55/45 、 60/40 甚至 65/35 都可以)
似乎找不到这些问题的明确答案,我想知道你们中的一些人是否可以为我解决这个问题,因为现在在处理 X 百万行和所有繁重的数据时影响我的集群性能提升交给一个执行者。
干杯,感谢您的宝贵时间。
确实对分区列使用了下限;请参考此代码(撰写本文时的当前版本):
函数columnPartition
包含分区逻辑的代码和下限/上限的使用。
本质上,下限和上限以及分区数用于计算每个并行任务的增量或拆分。
假设 table 有分区列 "year",并且有 2006 年到 2016 年的数据。
如果您将分区数定义为 10,下限为 2006 年,上限为 2016 年,您将让每个任务获取自己年份的数据 - 理想情况。
即使您错误地指定了下限和/或上限,例如设置 lower = 0 和 upper = 2016,数据传输会有偏差,但是,您不会 "lose" 或无法检索任何数据,因为:
第一个任务将获取 < 0 年的数据。
第二个任务将获取 0 到 2016/10 之间年份的数据。
第三个任务将获取 2016/10 和 2*2016/10 之间的年份数据。
...
最后一个任务的 where 条件为 year->2016。
T.
下界和上界目前已被确定做他们在之前的答案中做的事情。后续将是如何在不查看最小最大值或数据是否严重倾斜的情况下跨分区平衡数据。
如果您的数据库支持 "hash" 函数,它就可以解决问题。
分区列 = "hash(column_name)%num_partitions"
numPartitions = 10 // 随心所欲
下限 = 0
upperBound = numPartitions
只要模数运算 returns 在 [0,numPartitions)
上均匀分布,这就会起作用
我有一个连接到 Postgres 数据库的 SparkSQL 的非常简单的设置,我正在尝试从 table 获取一个 DataFrame,该 DataFrame 具有 X 个分区(假设为 2)。代码如下:
Map<String, String> options = new HashMap<String, String>();
options.put("url", DB_URL);
options.put("driver", POSTGRES_DRIVER);
options.put("dbtable", "select ID, OTHER from TABLE limit 1000");
options.put("partitionColumn", "ID");
options.put("lowerBound", "100");
options.put("upperBound", "500");
options.put("numPartitions","2");
DataFrame housingDataFrame = sqlContext.read().format("jdbc").options(options).load();
由于某种原因,DataFrame 的一个分区几乎包含了所有行。
据我所知,lowerBound/upperBound
是用于微调的参数。在 SparkSQL 的文档 (Spark 1.4.0 - spark-sql_2.11) 中,它说它们用于定义步幅,而不是 filter/range 分区列。但这提出了几个问题:
- 步幅是 Spark 为每个执行器(分区)查询数据库的频率(每次查询返回的元素数)?
- 如果不是,这个参数的目的是什么,它们取决于什么,我如何以 stable 的方式平衡我的 DataFrame 分区(不要求所有分区包含相同数量的元素,只是存在平衡 - 例如 2 个分区 100 个元素 55/45 、 60/40 甚至 65/35 都可以)
似乎找不到这些问题的明确答案,我想知道你们中的一些人是否可以为我解决这个问题,因为现在在处理 X 百万行和所有繁重的数据时影响我的集群性能提升交给一个执行者。
干杯,感谢您的宝贵时间。
确实对分区列使用了下限;请参考此代码(撰写本文时的当前版本):
函数columnPartition
包含分区逻辑的代码和下限/上限的使用。
本质上,下限和上限以及分区数用于计算每个并行任务的增量或拆分。
假设 table 有分区列 "year",并且有 2006 年到 2016 年的数据。
如果您将分区数定义为 10,下限为 2006 年,上限为 2016 年,您将让每个任务获取自己年份的数据 - 理想情况。
即使您错误地指定了下限和/或上限,例如设置 lower = 0 和 upper = 2016,数据传输会有偏差,但是,您不会 "lose" 或无法检索任何数据,因为:
第一个任务将获取 < 0 年的数据。
第二个任务将获取 0 到 2016/10 之间年份的数据。
第三个任务将获取 2016/10 和 2*2016/10 之间的年份数据。
...
最后一个任务的 where 条件为 year->2016。
T.
下界和上界目前已被确定做他们在之前的答案中做的事情。后续将是如何在不查看最小最大值或数据是否严重倾斜的情况下跨分区平衡数据。
如果您的数据库支持 "hash" 函数,它就可以解决问题。
分区列 = "hash(column_name)%num_partitions"
numPartitions = 10 // 随心所欲
下限 = 0
upperBound = numPartitions
只要模数运算 returns 在 [0,numPartitions)
上均匀分布,这就会起作用