Pyspark rdd 转置
Pyspark rdd Transpose
我在配置单元测试数据库中有以下 emp table
1 ram 2000.0 101 market
2 shyam 3000.0 102 IT
3 sam 4000.0 103 finance
4 remo 1000.0 103 finance
我想在 pyspark 中转置此 table,前两列相同,最后 3 列堆叠。
我在 pyspark 中完成了以下操作 shell
test = sqlContext.sql("select * from testing.emp")
data = test.flatMap (lambda row: [Row (id=row ['id'],name=row['name'],column_name=col,column_val=row [col]) for col in ('sal','dno','dname')])
emp = sqlContext.createDataFrame(data)
emp.registerTempTable('mytempTable')
sqlContext.sql('create table testing.test(id int,name string,column_name string,column_val int) row format delimited fields terminated by ","')
sqlContext.sql('INSERT INTO TABlE testing.test select * from mytempTable')
预期输出是
1 ram sal 2000
1 ram dno 101
1 ram dname market
2 shyam sal 3000
2 shyam dno 102
2 shyam dname IT
3 sam sal 4000
3 sam dno 103
3 sam dname finance
4 remo sal 1000
4 remo dno 103
4 remo dname finance
但是我得到的输出是
NULL 2000.0 1 NULL
NULL NULL 1 NULL
NULL NULL 1 NULL
NULL 3000.0 2 NULL
NULL NULL 2 NULL
NULL NULL 2 NULL
NULL 4000.0 3 NULL
NULL NULL 3 NULL
NULL NULL 3 NULL
NULL 1000.0 4 NULL
NULL NULL 4 NULL
NULL NULL 4 NULL
如果 table
中有很多列,请告诉我如何循环列
抱歉我才注意到"hive table "
cfg = SparkConf().setAppName('MyApp')
spark = SparkSession.builder.config(conf=cfg).enableHiveSupport().getOrCreate()
df = spark.table("default.test").cache()
cols = df.columns[2:5]
df = df.rdd.map(lambda x: Row(id=x[0], name=x[1], val=dict(zip(cols, x[2:5]))))
df = spark.createDataFrame(df)
df.createOrReplaceTempView('mytempTable')
sql = """
select
id,
name,
explode(val) AS (k,v)
from mytempTable
"""
df = spark.sql(sql)
df.show()
在 HIVE 中:
> desc test;
OK
id string
somebody string
sal string
dno string
dname string
dt string
# Partition Information
# col_name data_type comment
dt string
P.S。
您只能在没有 Spark 的情况下使用 sql 作为 :
select
a.id,
a.somebody,
b.k,
b.v
from (
select
id,
somebody,
map('sal',sal,
'dno',dno,
'dname',dname) as val
from default.test
) a
lateral VIEW explode(val) b as k,v
关于您关于小镶木地板文件的问题:
cfg = SparkConf().setAppName('MyApp')
spark = SparkSession.builder.enableHiveSupport().config(conf=cfg).getOrCreate()
df = spark.sparkContext.parallelize(range(26))
df = df.map(lambda x: (x, chr(x + 97), '2017-01-12'))
df = spark.createDataFrame(df, schema=['idx', 'val', 'dt']).coalesce(1)
df.write.saveAsTable('default.testing', mode='overwrite', partitionBy='dt', format='parquet')
small parquet files amount = DataFrame partitions amount
您可以使用df.coalesce或df.repartition来减少DataFrame分区数量
但我不确定是否存在将DataFrame分区减少到只有一个的隐患(例如:OOM?)
还有另一种合并小文件的方法,只需要使用 HIVE sql:
set mapred.reduce.tasks=1;
insert overwrite table default.yourtable partition (dt='2017-01-13')
select col from tmp.yourtable where dt='2017-01-13';
我在配置单元测试数据库中有以下 emp table
1 ram 2000.0 101 market
2 shyam 3000.0 102 IT
3 sam 4000.0 103 finance
4 remo 1000.0 103 finance
我想在 pyspark 中转置此 table,前两列相同,最后 3 列堆叠。
我在 pyspark 中完成了以下操作 shell
test = sqlContext.sql("select * from testing.emp")
data = test.flatMap (lambda row: [Row (id=row ['id'],name=row['name'],column_name=col,column_val=row [col]) for col in ('sal','dno','dname')])
emp = sqlContext.createDataFrame(data)
emp.registerTempTable('mytempTable')
sqlContext.sql('create table testing.test(id int,name string,column_name string,column_val int) row format delimited fields terminated by ","')
sqlContext.sql('INSERT INTO TABlE testing.test select * from mytempTable')
预期输出是
1 ram sal 2000
1 ram dno 101
1 ram dname market
2 shyam sal 3000
2 shyam dno 102
2 shyam dname IT
3 sam sal 4000
3 sam dno 103
3 sam dname finance
4 remo sal 1000
4 remo dno 103
4 remo dname finance
但是我得到的输出是
NULL 2000.0 1 NULL
NULL NULL 1 NULL
NULL NULL 1 NULL
NULL 3000.0 2 NULL
NULL NULL 2 NULL
NULL NULL 2 NULL
NULL 4000.0 3 NULL
NULL NULL 3 NULL
NULL NULL 3 NULL
NULL 1000.0 4 NULL
NULL NULL 4 NULL
NULL NULL 4 NULL
如果 table
中有很多列,请告诉我如何循环列抱歉我才注意到"hive table "
cfg = SparkConf().setAppName('MyApp')
spark = SparkSession.builder.config(conf=cfg).enableHiveSupport().getOrCreate()
df = spark.table("default.test").cache()
cols = df.columns[2:5]
df = df.rdd.map(lambda x: Row(id=x[0], name=x[1], val=dict(zip(cols, x[2:5]))))
df = spark.createDataFrame(df)
df.createOrReplaceTempView('mytempTable')
sql = """
select
id,
name,
explode(val) AS (k,v)
from mytempTable
"""
df = spark.sql(sql)
df.show()
在 HIVE 中:
> desc test;
OK
id string
somebody string
sal string
dno string
dname string
dt string
# Partition Information
# col_name data_type comment
dt string
P.S。 您只能在没有 Spark 的情况下使用 sql 作为 :
select
a.id,
a.somebody,
b.k,
b.v
from (
select
id,
somebody,
map('sal',sal,
'dno',dno,
'dname',dname) as val
from default.test
) a
lateral VIEW explode(val) b as k,v
关于您关于小镶木地板文件的问题:
cfg = SparkConf().setAppName('MyApp')
spark = SparkSession.builder.enableHiveSupport().config(conf=cfg).getOrCreate()
df = spark.sparkContext.parallelize(range(26))
df = df.map(lambda x: (x, chr(x + 97), '2017-01-12'))
df = spark.createDataFrame(df, schema=['idx', 'val', 'dt']).coalesce(1)
df.write.saveAsTable('default.testing', mode='overwrite', partitionBy='dt', format='parquet')
small parquet files amount = DataFrame partitions amount
您可以使用df.coalesce或df.repartition来减少DataFrame分区数量
但我不确定是否存在将DataFrame分区减少到只有一个的隐患(例如:OOM?)
还有另一种合并小文件的方法,只需要使用 HIVE sql:
set mapred.reduce.tasks=1;
insert overwrite table default.yourtable partition (dt='2017-01-13')
select col from tmp.yourtable where dt='2017-01-13';