将 Spark SQL 转换为 Dataframe API
Convert Spark SQL to Dataframe API
我是 Pyspark.I 的新手,我希望将下面的 spark SQL 转换为数据帧 API
sql("SELECT
t.transaction_category_id,
sum(t.transaction_amount) AS sum_amount,
count(DISTINCT t.user_id) AS num_users
FROM transactions t
JOIN users u USING (user_id)
WHERE t.is_blocked = False
AND u.is_active = 1
GROUP BY t.transaction_category_id
ORDER BY sum_amount DESC").show()
交易表很大的地方表不均匀table.I正在寻找是否可以应用广播join/salting?
查询的连接部分如下所示:
import pyspark.sql.functions as f
output_df = (
transactions.alias('t')
.join(users.alias('u').hint('broadcast'), ['user_id'], 'inner')
.where((f.col('t.is_blocked') == False) & (f.col('u.is_active') == 1))
.groupBy(f.col('t.transaction_category_id'))
.agg(
f.sum(f.col('t.transaction_amount')).alias('sum_amount'),
f.count_distinct(f.col('t.user_id')).alias('num_users')
)
.orderBy(f.col('sum_amount'))
)
您还可以使用下面的
import pyspark.sql.functions as func
output_df = transactions.join(broadcast(users), users.user_id
== transactions.user_id).where((transactions.is_blocked
== False) & (users.is_active
== 1)).groupBy(transactions.transaction_category_id).agg(func.countDistinct(users.user_id).alias('num_users'
), func.sum(transactions.transaction_amount).alias('sum_amount'
)).select(transactions.transaction_category_id, 'num_users',
'sum_amount')
我是 Pyspark.I 的新手,我希望将下面的 spark SQL 转换为数据帧 API
sql("SELECT
t.transaction_category_id,
sum(t.transaction_amount) AS sum_amount,
count(DISTINCT t.user_id) AS num_users
FROM transactions t
JOIN users u USING (user_id)
WHERE t.is_blocked = False
AND u.is_active = 1
GROUP BY t.transaction_category_id
ORDER BY sum_amount DESC").show()
交易表很大的地方表不均匀table.I正在寻找是否可以应用广播join/salting?
查询的连接部分如下所示:
import pyspark.sql.functions as f
output_df = (
transactions.alias('t')
.join(users.alias('u').hint('broadcast'), ['user_id'], 'inner')
.where((f.col('t.is_blocked') == False) & (f.col('u.is_active') == 1))
.groupBy(f.col('t.transaction_category_id'))
.agg(
f.sum(f.col('t.transaction_amount')).alias('sum_amount'),
f.count_distinct(f.col('t.user_id')).alias('num_users')
)
.orderBy(f.col('sum_amount'))
)
您还可以使用下面的
import pyspark.sql.functions as func
output_df = transactions.join(broadcast(users), users.user_id
== transactions.user_id).where((transactions.is_blocked
== False) & (users.is_active
== 1)).groupBy(transactions.transaction_category_id).agg(func.countDistinct(users.user_id).alias('num_users'
), func.sum(transactions.transaction_amount).alias('sum_amount'
)).select(transactions.transaction_category_id, 'num_users',
'sum_amount')