Spark JDBC 过滤边界外的记录
Spark JDBC filter records outside of boundaries
我正在尝试优化一项日常工作,该工作将三个月的数据从 MySQL table 提取到 HDFS 上的镶木地板中。他们目前正在以一种非常有创意的方式使用 mysqldump
,但有一个 spark/hdfs 生态系统,所以我想我应该改用它。
背景
我这样定义了如何读取数据库:
# time_col column is epoch as an integer
# start_time is beginning of three month period
# end_time is end of three month period
df = session.read \
.format("jdbc") \
.option("url", url) \
.option("driver", driver) \
.option("dbtable", "table1") \
.option("user", username) \
.option("password", password) \
.option("partitionColumn","time_col") \
.option("upperBound", end_time) \
.option("lowerBound", start_time) \
.option("numPartitions", partitions) \
.load()
这真的很好用,除了第一个和最后一个分区有 10 亿条我什至不想要的记录;
为了过滤掉绝大多数table,我更新了dtable
这样
.option("dtable", "(select * from table1 WHERE time_col >= {} and time_col < {}) as table2".format(start_time, end_time))
这种方法奏效了。当 end_time
- start_time
较小时,作业运行良好,但无法扩展到 3 个月。
这是因为每个分区的查询现在包括派生的 table
EXPLAIN SELECT * FROM (SELECT * From table1 WHERE time_col >=1585780000 AND time_col < 1585866400 ) as table2 WHERE `time_col` >= 1585808800 AND `time_col` < 1585812400;
+----+-------------+------------+-------+---------------+----------+---------+------+----------+-------------+
| id | select_type | table | type | possible_keys | key | key_len | ref | rows | Extra |
+----+-------------+------------+-------+---------------+----------+---------+------+----------+-------------+
| 1 | PRIMARY | <derived2> | ALL | NULL | NULL | NULL | NULL | 25048354 | Using where |
| 2 | DERIVED | table1 | range | time_col | time_col | 4 | NULL | 25048354 | Using where |
+----+-------------+------------+-------+---------------+----------+---------+------+----------+-------------+
相比之下,这是我刚刚使用 dtable = "table1"
时生成的查询的样子;更简单更快
explain SELECT * From table1 WHERE `time_col` >= 1585808800 AND `time_col` < 1585812400;
+----+-------------+--------+-------+---------------+----------+---------+------+---------+-------------+
| id | select_type | table | type | possible_keys | key | key_len | ref | rows | Extra |
+----+-------------+--------+-------+---------------+----------+---------+------+---------+-------------+
| 1 | SIMPLE | table1 | range | time_col | time_col | 4 | NULL | 1097631 | Using where |
+----+-------------+--------+-------+---------------+----------+---------+------+---------+-------------+
问题
有没有什么方法可以过滤掉 upperBound
和 lowerBound
之外的数据,同时保持更简单的查询?就像防止第一个和最后一个分区成为 运行 或通过在运行时覆盖 dtable
这样它只用 table1
?
替换子查询
参数
我只有对 MySQL 5.7 上的 table 的读取权限,无法创建视图或索引
我正在 Spark 3.1 上开发,但我相信生产是在 Spark 2 上进行的
是的,我考虑过 Spark Structured Streaming 和其他流媒体选项,但这不是我们目前的方向。
我发现如果添加 where() 方法就可以避免子查询。示例:
# time_col column is epoch as an integer
# start_time is beginning of three month period
# end_time is end of three month period
df = session.read \
.format("jdbc") \
.option("url", url) \
.option("driver", driver) \
.option("dbtable", "table1") \
.option("user", username) \
.option("password", password) \
.option("partitionColumn","time_col") \
.option("upperBound", end_time) \
.option("lowerBound", start_time) \
.option("numPartitions", partitions) \
.load()
# This filters out everything outside of boundaries
# without creating a subquery
df.where('time_col >= {} AND time_col < {}'.format(start_time,end_time))
Spark 能够将子句与分区逻辑创建的子句一起添加。因此,没有子查询和更好的性能。
我正在尝试优化一项日常工作,该工作将三个月的数据从 MySQL table 提取到 HDFS 上的镶木地板中。他们目前正在以一种非常有创意的方式使用 mysqldump
,但有一个 spark/hdfs 生态系统,所以我想我应该改用它。
背景
我这样定义了如何读取数据库:
# time_col column is epoch as an integer
# start_time is beginning of three month period
# end_time is end of three month period
df = session.read \
.format("jdbc") \
.option("url", url) \
.option("driver", driver) \
.option("dbtable", "table1") \
.option("user", username) \
.option("password", password) \
.option("partitionColumn","time_col") \
.option("upperBound", end_time) \
.option("lowerBound", start_time) \
.option("numPartitions", partitions) \
.load()
这真的很好用,除了第一个和最后一个分区有 10 亿条我什至不想要的记录;
为了过滤掉绝大多数table,我更新了dtable
这样
.option("dtable", "(select * from table1 WHERE time_col >= {} and time_col < {}) as table2".format(start_time, end_time))
这种方法奏效了。当 end_time
- start_time
较小时,作业运行良好,但无法扩展到 3 个月。
这是因为每个分区的查询现在包括派生的 table
EXPLAIN SELECT * FROM (SELECT * From table1 WHERE time_col >=1585780000 AND time_col < 1585866400 ) as table2 WHERE `time_col` >= 1585808800 AND `time_col` < 1585812400;
+----+-------------+------------+-------+---------------+----------+---------+------+----------+-------------+
| id | select_type | table | type | possible_keys | key | key_len | ref | rows | Extra |
+----+-------------+------------+-------+---------------+----------+---------+------+----------+-------------+
| 1 | PRIMARY | <derived2> | ALL | NULL | NULL | NULL | NULL | 25048354 | Using where |
| 2 | DERIVED | table1 | range | time_col | time_col | 4 | NULL | 25048354 | Using where |
+----+-------------+------------+-------+---------------+----------+---------+------+----------+-------------+
相比之下,这是我刚刚使用 dtable = "table1"
时生成的查询的样子;更简单更快
explain SELECT * From table1 WHERE `time_col` >= 1585808800 AND `time_col` < 1585812400;
+----+-------------+--------+-------+---------------+----------+---------+------+---------+-------------+
| id | select_type | table | type | possible_keys | key | key_len | ref | rows | Extra |
+----+-------------+--------+-------+---------------+----------+---------+------+---------+-------------+
| 1 | SIMPLE | table1 | range | time_col | time_col | 4 | NULL | 1097631 | Using where |
+----+-------------+--------+-------+---------------+----------+---------+------+---------+-------------+
问题
有没有什么方法可以过滤掉 upperBound
和 lowerBound
之外的数据,同时保持更简单的查询?就像防止第一个和最后一个分区成为 运行 或通过在运行时覆盖 dtable
这样它只用 table1
?
参数
我只有对 MySQL 5.7 上的 table 的读取权限,无法创建视图或索引
我正在 Spark 3.1 上开发,但我相信生产是在 Spark 2 上进行的
是的,我考虑过 Spark Structured Streaming 和其他流媒体选项,但这不是我们目前的方向。
我发现如果添加 where() 方法就可以避免子查询。示例:
# time_col column is epoch as an integer
# start_time is beginning of three month period
# end_time is end of three month period
df = session.read \
.format("jdbc") \
.option("url", url) \
.option("driver", driver) \
.option("dbtable", "table1") \
.option("user", username) \
.option("password", password) \
.option("partitionColumn","time_col") \
.option("upperBound", end_time) \
.option("lowerBound", start_time) \
.option("numPartitions", partitions) \
.load()
# This filters out everything outside of boundaries
# without creating a subquery
df.where('time_col >= {} AND time_col < {}'.format(start_time,end_time))
Spark 能够将子句与分区逻辑创建的子句一起添加。因此,没有子查询和更好的性能。