如何将来自两个不同数据帧的数据与 pyspark 结合起来?

How to combine the data from two different dataframes with pyspark?

我有两个不同的(并且非常大的)数据框(详情如下)。我需要合并他们两个的数据。由于这些数据帧很大(第一个数据帧有数百万行,第二个数据帧有数千行),我尝试使用 AWS EMR 服务。但我不太明白它是如何在那里完成的,我看到的教程大多只显示一个数据框的说明。所以,我一直想知道如何将 pyspark 用于两个不同的数据帧。

详情如下:

第一个数据框,比如 df,包含有关人们在不同日子看电视的数据。它看起来像这样:

    id     date     other_data

0   0   2020-01-01  some data
1   1   2020-02-01  some data
2   2   2020-03-01  some data
3   3   2020-04-01  some data
4   4   2020-05-01  some data

这里,id是观看者的id,date是观看日期,other_data包含其他信息(比如观看时长,频道等)

第二个数据帧,比如 program,包含有关程序的数据。它看起来像这样:

       date      program    start_time  end_time

0   2020-01-01  program 1   14:00:00    15:00:00
1   2020-01-01  program 2   15:00:00    16:00:00
2   2020-01-01  program 3   16:00:00    17:00:00
3   2020-01-01  program 4   17:00:00    18:00:00
4   2020-01-01  program 5   18:00:00    19:00:00

这里,date是日期,program是节目名称,start_timeend_time是节目开始和结束的时间。

基本上,我需要做的是创建一个包含来自这两个数据帧的所有信息的数据帧。我需要这个最终数据框为每个用户和每个程序单独一行。换句话说,我需要一个数据框,它可以在同一天为每个程序复制第一个数据框中的每一行。

这可能看起来有点混乱,但这是我想要接收的最终数据帧的示例:

    id     date     other_data   program    start_time  end_time

0   0   2020-01-01  some data   program 1   14:00:00    15:00:00
1   0   2020-01-01  some data   program 2   15:00:00    16:00:00
2   0   2020-01-01  some data   program 3   16:00:00    17:00:00
3   0   2020-01-01  some data   program 4   17:00:00    18:00:00
4   0   2020-01-01  some data   program 5   18:00:00    19:00:00

如您所见,此最终数据框包含每个用户的数据以及该用户观看电视的同一天播放的每个节目。在这种特殊情况下,id=0 的用户在 01/01/2020 上看过电视。同一天,program 1program 2program 3program 4program 5也出现了。因此,我需要为每个程序留出一行及其详细信息。而且,当然,我需要第一个数据帧中的数据(包含在 other_data 中)。

到目前为止,我创建了以下方法:我遍历第一个数据帧,对于每一行,我在第二个数据帧中找到具有相同日期的所有行,合并它并添加到第三个(最终)数据帧.

这是我使用的代码:

ids = []  # users' id
dates = []  # dates
other_data = []  # other data from the first dataframe 
programs = []  # all programs
start_times = []  # starting times
end_times = []  # ending times

for i, row in df.iterrows():
    temp = program.loc[program['date'] == row['date']]  # find all programs on the same date
    
    for j, program_row in temp.iterrows():  # iterate over the programs on the same date
        # append all the info
        ids.append(row['id'])
        dates.append(row['date'])
        other_data.append(row['other_data'])
        programs.append(program_row['program'])
        start_times.append(program_row['start_time'])
        end_times.append(program_row['end_time'])
        
# create final dataframe
final = pd.DataFrame({'id': ids, 'date': dates, 'other_data': other_data, 'program': programs, 
                      'start_time': start_times, 'end_time': end_times})

这种方法可行,但速度极慢(考虑到数据帧的大小)。因此,我想知道如何使用 ERM by AWS 将这项工作分配给几个工作人员。如果我理解正确,我需要在工作人员之间拆分第一个数据帧 df,同时为他们提供完整的 program 数据帧。有可能这样做吗?又如何?

非常感谢任何帮助或建议!

df 和 program 似乎都是 Pandas 数据帧,merging/joining 是所需的操作,参见 pandas.DataFrame.merge。试试这个:

import pandas as pd

finial = pd.merge(df, program, on=['date'], how='inner')

如果 Pandas 版本太慢,您可以将数据帧转换为 PySPark 数据帧并执行以下步骤:

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("convert").getOrCreate()

df_spark = spark.createDataFrame(df)
program_spark = spark.createDataFrame(program)

final_spark = df_spark.join(F.broadcast(program), on=['date'], how='inner')

这里假设dataframe程序是一个小dataframe——如果不是请去掉广播

希望它能解决您的问题并消除此处的慢速循环。