如何在pyspark中使用链接?
How to use chaining in pyspark?
我有一个名为 Incito
的数据框,该数据框的 Supplier Inv No
列由逗号分隔值组成。我需要通过使用 pyspark.I 适当地重复这些逗号分隔值来重新创建数据框我正在使用以下 python 代码 that.Can 我将其转换为 pyspark?是否可以通过 pyspark?
from itertools import chain
def chainer(s):
return list(chain.from_iterable(s.str.split(',')))
incito['Supplier Inv No'] = incito['Supplier Inv No'].astype(str)
# calculate lengths of splits
lens = incito['Supplier Inv No'].str.split(',').map(len)
# create new dataframe, repeating or chaining as appropriate
dfnew = pd.DataFrame({'Supplier Inv No': chainer(incito['Supplier Inv No']),
'Forwarder': np.repeat(incito['Forwarder'], lens),
'Mode': np.repeat(incito['Mode'], lens),
'File No': np.repeat(incito['File No'], lens),
'ETD': np.repeat(incito['ETD'], lens),
'Flight No': np.repeat(incito['Flight No'], lens),
'Shipped Country': np.repeat(incito['Shipped Country'], lens),
'Port': np.repeat(incito['Port'], lens),
'Delivered_Country': np.repeat(incito['Delivered_Country'], lens),
'AirWeight': np.repeat(incito['AirWeight'], lens),
'FREIGHT CHARGE': np.repeat(incito['FREIGHT CHARGE'], lens)})
这是我在 pyspark.But 中尝试的方法,我没有得到预期的结果。
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql import functions as F
import pandas as pd
conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
ddf = spark.createDataFrame(dfnew)
exploded = ddf.withColumn('d', F.explode("Supplier Inv No"))
exploded.show()
像这样,使用 repeat
?
from pyspark.sql import functions as F
df = (spark
.sparkContext
.parallelize([
('ABCD',),
('EFGH',),
])
.toDF(['col_a'])
)
(df
.withColumn('col_b', F.repeat(F.col('col_a'), 2))
.withColumn('col_c', F.repeat(F.lit('X'), 10))
.show()
)
# +-----+--------+----------+
# |col_a| col_b| col_c|
# +-----+--------+----------+
# | ABCD|ABCDABCD|XXXXXXXXXX|
# | EFGH|EFGHEFGH|XXXXXXXXXX|
# +-----+--------+----------+
我有一个名为 Incito
的数据框,该数据框的 Supplier Inv No
列由逗号分隔值组成。我需要通过使用 pyspark.I 适当地重复这些逗号分隔值来重新创建数据框我正在使用以下 python 代码 that.Can 我将其转换为 pyspark?是否可以通过 pyspark?
from itertools import chain
def chainer(s):
return list(chain.from_iterable(s.str.split(',')))
incito['Supplier Inv No'] = incito['Supplier Inv No'].astype(str)
# calculate lengths of splits
lens = incito['Supplier Inv No'].str.split(',').map(len)
# create new dataframe, repeating or chaining as appropriate
dfnew = pd.DataFrame({'Supplier Inv No': chainer(incito['Supplier Inv No']),
'Forwarder': np.repeat(incito['Forwarder'], lens),
'Mode': np.repeat(incito['Mode'], lens),
'File No': np.repeat(incito['File No'], lens),
'ETD': np.repeat(incito['ETD'], lens),
'Flight No': np.repeat(incito['Flight No'], lens),
'Shipped Country': np.repeat(incito['Shipped Country'], lens),
'Port': np.repeat(incito['Port'], lens),
'Delivered_Country': np.repeat(incito['Delivered_Country'], lens),
'AirWeight': np.repeat(incito['AirWeight'], lens),
'FREIGHT CHARGE': np.repeat(incito['FREIGHT CHARGE'], lens)})
这是我在 pyspark.But 中尝试的方法,我没有得到预期的结果。
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql import functions as F
import pandas as pd
conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
ddf = spark.createDataFrame(dfnew)
exploded = ddf.withColumn('d', F.explode("Supplier Inv No"))
exploded.show()
像这样,使用 repeat
?
from pyspark.sql import functions as F
df = (spark
.sparkContext
.parallelize([
('ABCD',),
('EFGH',),
])
.toDF(['col_a'])
)
(df
.withColumn('col_b', F.repeat(F.col('col_a'), 2))
.withColumn('col_c', F.repeat(F.lit('X'), 10))
.show()
)
# +-----+--------+----------+
# |col_a| col_b| col_c|
# +-----+--------+----------+
# | ABCD|ABCDABCD|XXXXXXXXXX|
# | EFGH|EFGHEFGH|XXXXXXXXXX|
# +-----+--------+----------+