Pyspark DataFrame 循环
Pyspark DataFrame loop
我是 Python 和 DataFrame 的新手。在这里,我正在为 运行 AWS Glue 中的 ETL 作业编写 Python 代码。请在下面找到相同的代码片段。
test_DyF = glueContext.create_dynamic_frame.from_catalog(database="teststoragedb", table_name="testtestfile_csv")
test_dataframe = test_DyF.select_fields(['empid','name']).toDF()
现在上面的 test_dataframe 是类型 pyspark.sql.dataframe.DataFrame
现在,我需要循环上面的test_dataframe。据我所知,我只能看到 collect
或 toLocalIterator
。请找到下面的示例代码
for row_val in test_dataframe.collect():
但是这两种方法都很慢而且效率不高。我无法使用 pandas,因为 AWS Glue 不支持它。
请找出我正在做的步骤
来源信息:
productid|matchval|similar product|similar product matchval
product A|100|product X|100
product A|101|product Y|101
product B|100|product X|100
product C|102|product Z|102
预期结果:
product |similar products
product A|product X, product Y
product B|product X
product C|product Z
这是我正在写的代码
- 我正在使用 productID
获取源的不同数据框
遍历这个不同的数据帧集
a) 从源中获取产品的 matchval 列表
b) 根据 matchval 过滤器识别相似产品
c) 循环获取连接的字符串 ---> 这个使用 rdd.collect 的循环会影响性能
能否就可以做什么分享更好的建议?
请详细说明您想尝试的逻辑。 DF 循环可以通过 SQL 方法完成,或者您也可以遵循以下 RDD 方法
def my_function(each_record):
#my_logic
#loop through for each command.
df.rdd.foreach(my_function)
根据您的输入进一步添加了以下代码
df = spark.read.csv("/mylocation/61250775.csv", header=True, inferSchema=True, sep="|")
seq = ['product X','product Y','product Z']
df2 = df.groupBy("productid").pivot("similar_product",seq).count()
+---------+---------+---------+---------+
|productid|product X|product Y|product Z|
+---------+---------+---------+---------+
|product B| 1| null| null|
|product A| 1| 1| null|
|product C| null| null| 1|
+---------+---------+---------+---------+
符合您要求的最终方法
df = spark.read.csv("/mylocation/61250775.csv", header=True, inferSchema=True, sep="|")
df.printSchema()
>>> df.printSchema()
root
|-- id: string (nullable = true)
|-- matchval1: integer (nullable = true)
|-- similar: string (nullable = true)
|-- matchval3: integer (nullable = true)
from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import collect_list
dfx = df.groupBy("id").agg(concat_ws(",", collect_list("similar")).alias("Similar_Items")).select(col("id"), col("Similar_Items"))
dfx.show()
+---------+-------------------+
| id| Similar_Items|
+---------+-------------------+
|product B| product X|
|product A|product X,product Y|
|product C| product Z|
+---------+-------------------+
您也可以使用 MAP class。就我而言,我正在遍历数据并计算整行的哈希值。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import hashlib
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "load-test", table_name = "table_test", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "load-test", table_name = "table_test", transformation_ctx = "datasource0")
def hash_calculation(rec):
md5 = hashlib.md5()
md5.update('{}_{}_{}_{}'.format(rec["funcname"],rec["parameter"],rec["paramtype"],rec["structure"]).encode())
rec["hash"] = md5.hexdigest()
print("looping the recs")
return rec
mapped_dyF = Map.apply(frame = datasource0, f = hash_calculation)
我是 Python 和 DataFrame 的新手。在这里,我正在为 运行 AWS Glue 中的 ETL 作业编写 Python 代码。请在下面找到相同的代码片段。
test_DyF = glueContext.create_dynamic_frame.from_catalog(database="teststoragedb", table_name="testtestfile_csv")
test_dataframe = test_DyF.select_fields(['empid','name']).toDF()
现在上面的 test_dataframe 是类型 pyspark.sql.dataframe.DataFrame
现在,我需要循环上面的test_dataframe。据我所知,我只能看到 collect
或 toLocalIterator
。请找到下面的示例代码
for row_val in test_dataframe.collect():
但是这两种方法都很慢而且效率不高。我无法使用 pandas,因为 AWS Glue 不支持它。
请找出我正在做的步骤
来源信息:
productid|matchval|similar product|similar product matchval
product A|100|product X|100
product A|101|product Y|101
product B|100|product X|100
product C|102|product Z|102
预期结果:
product |similar products
product A|product X, product Y
product B|product X
product C|product Z
这是我正在写的代码
- 我正在使用 productID 获取源的不同数据框
遍历这个不同的数据帧集
a) 从源中获取产品的 matchval 列表
b) 根据 matchval 过滤器识别相似产品
c) 循环获取连接的字符串 ---> 这个使用 rdd.collect 的循环会影响性能
能否就可以做什么分享更好的建议?
请详细说明您想尝试的逻辑。 DF 循环可以通过 SQL 方法完成,或者您也可以遵循以下 RDD 方法
def my_function(each_record):
#my_logic
#loop through for each command.
df.rdd.foreach(my_function)
根据您的输入进一步添加了以下代码
df = spark.read.csv("/mylocation/61250775.csv", header=True, inferSchema=True, sep="|")
seq = ['product X','product Y','product Z']
df2 = df.groupBy("productid").pivot("similar_product",seq).count()
+---------+---------+---------+---------+
|productid|product X|product Y|product Z|
+---------+---------+---------+---------+
|product B| 1| null| null|
|product A| 1| 1| null|
|product C| null| null| 1|
+---------+---------+---------+---------+
符合您要求的最终方法
df = spark.read.csv("/mylocation/61250775.csv", header=True, inferSchema=True, sep="|") df.printSchema()
>>> df.printSchema()
root
|-- id: string (nullable = true)
|-- matchval1: integer (nullable = true)
|-- similar: string (nullable = true)
|-- matchval3: integer (nullable = true)
from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import collect_list
dfx = df.groupBy("id").agg(concat_ws(",", collect_list("similar")).alias("Similar_Items")).select(col("id"), col("Similar_Items"))
dfx.show()
+---------+-------------------+
| id| Similar_Items|
+---------+-------------------+
|product B| product X|
|product A|product X,product Y|
|product C| product Z|
+---------+-------------------+
您也可以使用 MAP class。就我而言,我正在遍历数据并计算整行的哈希值。
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import hashlib
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "load-test", table_name = "table_test", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "load-test", table_name = "table_test", transformation_ctx = "datasource0")
def hash_calculation(rec):
md5 = hashlib.md5()
md5.update('{}_{}_{}_{}'.format(rec["funcname"],rec["parameter"],rec["paramtype"],rec["structure"]).encode())
rec["hash"] = md5.hexdigest()
print("looping the recs")
return rec
mapped_dyF = Map.apply(frame = datasource0, f = hash_calculation)