Pyspark DataFrame 循环

Pyspark DataFrame loop

我是 Python 和 DataFrame 的新手。在这里,我正在为 运行 AWS Glue 中的 ETL 作业编写 Python 代码。请在下面找到相同的代码片段。

test_DyF = glueContext.create_dynamic_frame.from_catalog(database="teststoragedb", table_name="testtestfile_csv")
test_dataframe = test_DyF.select_fields(['empid','name']).toDF()

现在上面的 test_dataframe 是类型 pyspark.sql.dataframe.DataFrame

现在,我需要循环上面的test_dataframe。据我所知,我只能看到 collecttoLocalIterator。请找到下面的示例代码

for row_val in test_dataframe.collect():

但是这两种方法都很慢而且效率不高。我无法使用 pandas,因为 AWS Glue 不支持它。

请找出我正在做的步骤

来源信息:

productid|matchval|similar product|similar product matchval
product A|100|product X|100
product A|101|product Y|101
product B|100|product X|100
product C|102|product Z|102

预期结果:

product |similar products
product A|product X, product Y
product B|product X
product C|product Z

这是我正在写的代码

  1. 我正在使用 productID
  2. 获取源的不同数据框
  3. 遍历这个不同的数据帧集

    a) 从源中获取产品的 matchval 列表

    b) 根据 matchval 过滤器识别相似产品

    c) 循环获取连接的字符串 ---> 这个使用 rdd.collect 的循环会影响性能

能否就可以做什么分享更好的建议?

请详细说明您想尝试的逻辑。 DF 循环可以通过 SQL 方法完成,或者您也可以遵循以下 RDD 方法

def my_function(each_record):
#my_logic

#loop through for each command. 
df.rdd.foreach(my_function)

根据您的输入进一步添加了以下代码

df = spark.read.csv("/mylocation/61250775.csv", header=True, inferSchema=True, sep="|")
seq = ['product X','product Y','product Z']
df2 = df.groupBy("productid").pivot("similar_product",seq).count()

+---------+---------+---------+---------+
|productid|product X|product Y|product Z|
+---------+---------+---------+---------+
|product B|        1|     null|     null|
|product A|        1|        1|     null|
|product C|     null|     null|        1|
+---------+---------+---------+---------+

符合您要求的最终方法

df = spark.read.csv("/mylocation/61250775.csv", header=True, inferSchema=True, sep="|") df.printSchema()

>>> df.printSchema()
root
 |-- id: string (nullable = true)
 |-- matchval1: integer (nullable = true)
 |-- similar: string (nullable = true)
 |-- matchval3: integer (nullable = true)


from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import collect_list
dfx = df.groupBy("id").agg(concat_ws(",", collect_list("similar")).alias("Similar_Items")).select(col("id"), col("Similar_Items"))
dfx.show()

+---------+-------------------+
|       id|      Similar_Items|
+---------+-------------------+
|product B|          product X|
|product A|product X,product Y|
|product C|          product Z|
+---------+-------------------+

您也可以使用 MAP class。就我而言,我正在遍历数据并计算整行的哈希值。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import hashlib

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "load-test", table_name = "table_test", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "load-test", table_name = "table_test", transformation_ctx = "datasource0")

def hash_calculation(rec):
    md5 = hashlib.md5()
    md5.update('{}_{}_{}_{}'.format(rec["funcname"],rec["parameter"],rec["paramtype"],rec["structure"]).encode())
    rec["hash"]  = md5.hexdigest()
    print("looping the recs")
    return rec
    
mapped_dyF =  Map.apply(frame = datasource0, f = hash_calculation)