优化迭代数据帧的复杂循环的最佳方法
Best way to optimize a complex loop that iterates a dataframe
我这里有几个方法花费的时间比我想要的要长。我目前正在碰壁,因为我没有看到任何明显的方法来更有效地编写这些方法。
作为背景,代码正在处理销售数据集,以查找与同一客户相关的先前销售订单。但是,正如您将看到的,中间有很多业务逻辑可能会减慢速度。
我正在考虑将其重构为 PySpark 作业,但在我这样做之前,我想知道这是否是完成这项工作的最佳方式。
我将非常感谢这里的任何建议。
更多背景信息:
每个循环大约需要 10 分钟才能完成。 search_keys
中大约有 24k 行。这些方法是 Luigi 任务的一部分。
def previous_commits(self, df: pd.DataFrame):
# Build some filters to slice data:
search_keys = df.loc[:, ['accountid', 'opportunityid']].drop_duplicates()
cols_a = ['opportunityid', 'opportunity_type', 'platform', 'closedate']
cols_b = ['opportunityid', 'productid']
# Build a list with the previous commit oppy_id:
commits = [
{
'acc_id': acc,
'current_oppy': oppy,
'previous_commit': self.fetch_latest_commit(oppy, df.loc[df.accountid == acc, cols_a].drop_duplicates())
}
for oppy, acc in tqdm(
zip(search_keys.opportunityid, search_keys.accountid),
desc='Finding previous commits data',
file=sys.stdout,
total=search_keys.shape[0]
)
]
# Fetch products for the previous commit as well as the current row oppy:
products = [
{
'current_oppy': x.get('current_oppy'),
'current_products': self.fetch_products_id(
[x.get('current_oppy')],
df.loc[df.accountid == x.get('acc_id'), cols_b].drop_duplicates()
),
'previous_products': self.fetch_products_id(
x.get('previous_commit'),
df.loc[df.accountid == x.get('acc_id'), cols_b].drop_duplicates()
),
'previous_recurrent_products': self.fetch_products_id(
x.get('previous_commit'),
df.loc[(df.accountid == x.get('acc_id')) & (df.fee_type == 'Recurring'), cols_b].drop_duplicates()
)
}
for x in tqdm(
commits,
desc='Finding previous commit products',
file=sys.stdout
)
]
# Pick new calculated column and change its name for compatibility:
df = pd.DataFrame(commits).join(pd.DataFrame(products).set_index('current_oppy'), on='current_oppy')
df = df.loc[:, ['current_oppy', 'previous_commit', 'current_products', 'previous_recurrent_products']]
df.columns = ['current_oppy', 'previous_commit', 'current_products', 'previous_products']
return df
@staticmethod
def fetch_latest_commit(oppy_id: str, data: pd.DataFrame):
# Build some filters and create a df copy to search against:
data = data.set_index('opportunityid')
current_closedate = data.loc[data.index == oppy_id, ['closedate']].iat[0, 0]
current_platform = data.loc[data.index == oppy_id, ['platform']].iat[0, 0]
date_filter = data.closedate < current_closedate
platform_filter = data.platform == current_platform
eb_filter = data.opportunity_type != 'EB'
subset = data.loc[date_filter & eb_filter, :].sort_values('closedate', ascending=False)
if current_platform in {'CL', 'WE'}:
# Fetch latest commit closedate for the same platform:
subset = data.loc[date_filter & platform_filter & eb_filter, :].sort_values('closedate', ascending=False)
latest_commit_date = subset.loc[:, 'closedate'].max()
latest_commit_filter = subset.closedate == latest_commit_date
else:
# Fetch latest commit closedate:
latest_commit_date = subset.loc[:, 'closedate'].max()
latest_commit_filter = subset.closedate == latest_commit_date
# Now try to get the latest commit oppy_id (if available), otherwise, just exit the function
# and return the current oppy_id. If the latest commit is a NB or NBU
# deal, then make another lookup to ensure that all the NB info is gathered since they might
# have different closedates.
try:
latest_commit_id = list(subset.loc[latest_commit_filter, :].index)
latest_commitid_filter = subset.index.isin(latest_commit_id)
latest_commit_type = subset.loc[latest_commitid_filter, 'opportunity_type'].unique()[0]
except IndexError:
return {oppy_id}
if latest_commit_type == 'RN':
return set(latest_commit_id)
else:
try:
nb_before_latest_commit_filter = subset.closedate < latest_commit_date
nb_only_filter = subset.opportunity_type == 'NB'
nb_commit_id = list(subset.loc[nb_only_filter & nb_before_latest_commit_filter, :].index)
return set(latest_commit_id + nb_commit_id)
except IndexError:
return set(latest_commit_id)
@staticmethod
def fetch_products_id(oppy_ids: list, data: pd.DataFrame):
data = data.set_index('opportunityid')
return set(data.loc[data.index.isin(oppy_ids), 'productid'])
“用非常简单的话来说 Pandas 运行 在单台机器上运行,而 PySpark 运行 在多台机器上运行。如果你正在处理你正在处理的机器学习应用程序对于更大的数据集,PySpark 是最适合的,它可以比 Pandas."
快很多倍(100 倍)处理操作
来自 https://sparkbyexamples.com/pyspark/pandas-vs-pyspark-dataframe-with-examples/
您还应该考虑一种 Windows 函数方法来获取先前的顺序。这将避免在所有记录上循环。
我这里有几个方法花费的时间比我想要的要长。我目前正在碰壁,因为我没有看到任何明显的方法来更有效地编写这些方法。
作为背景,代码正在处理销售数据集,以查找与同一客户相关的先前销售订单。但是,正如您将看到的,中间有很多业务逻辑可能会减慢速度。
我正在考虑将其重构为 PySpark 作业,但在我这样做之前,我想知道这是否是完成这项工作的最佳方式。
我将非常感谢这里的任何建议。
更多背景信息:
每个循环大约需要 10 分钟才能完成。 search_keys
中大约有 24k 行。这些方法是 Luigi 任务的一部分。
def previous_commits(self, df: pd.DataFrame):
# Build some filters to slice data:
search_keys = df.loc[:, ['accountid', 'opportunityid']].drop_duplicates()
cols_a = ['opportunityid', 'opportunity_type', 'platform', 'closedate']
cols_b = ['opportunityid', 'productid']
# Build a list with the previous commit oppy_id:
commits = [
{
'acc_id': acc,
'current_oppy': oppy,
'previous_commit': self.fetch_latest_commit(oppy, df.loc[df.accountid == acc, cols_a].drop_duplicates())
}
for oppy, acc in tqdm(
zip(search_keys.opportunityid, search_keys.accountid),
desc='Finding previous commits data',
file=sys.stdout,
total=search_keys.shape[0]
)
]
# Fetch products for the previous commit as well as the current row oppy:
products = [
{
'current_oppy': x.get('current_oppy'),
'current_products': self.fetch_products_id(
[x.get('current_oppy')],
df.loc[df.accountid == x.get('acc_id'), cols_b].drop_duplicates()
),
'previous_products': self.fetch_products_id(
x.get('previous_commit'),
df.loc[df.accountid == x.get('acc_id'), cols_b].drop_duplicates()
),
'previous_recurrent_products': self.fetch_products_id(
x.get('previous_commit'),
df.loc[(df.accountid == x.get('acc_id')) & (df.fee_type == 'Recurring'), cols_b].drop_duplicates()
)
}
for x in tqdm(
commits,
desc='Finding previous commit products',
file=sys.stdout
)
]
# Pick new calculated column and change its name for compatibility:
df = pd.DataFrame(commits).join(pd.DataFrame(products).set_index('current_oppy'), on='current_oppy')
df = df.loc[:, ['current_oppy', 'previous_commit', 'current_products', 'previous_recurrent_products']]
df.columns = ['current_oppy', 'previous_commit', 'current_products', 'previous_products']
return df
@staticmethod
def fetch_latest_commit(oppy_id: str, data: pd.DataFrame):
# Build some filters and create a df copy to search against:
data = data.set_index('opportunityid')
current_closedate = data.loc[data.index == oppy_id, ['closedate']].iat[0, 0]
current_platform = data.loc[data.index == oppy_id, ['platform']].iat[0, 0]
date_filter = data.closedate < current_closedate
platform_filter = data.platform == current_platform
eb_filter = data.opportunity_type != 'EB'
subset = data.loc[date_filter & eb_filter, :].sort_values('closedate', ascending=False)
if current_platform in {'CL', 'WE'}:
# Fetch latest commit closedate for the same platform:
subset = data.loc[date_filter & platform_filter & eb_filter, :].sort_values('closedate', ascending=False)
latest_commit_date = subset.loc[:, 'closedate'].max()
latest_commit_filter = subset.closedate == latest_commit_date
else:
# Fetch latest commit closedate:
latest_commit_date = subset.loc[:, 'closedate'].max()
latest_commit_filter = subset.closedate == latest_commit_date
# Now try to get the latest commit oppy_id (if available), otherwise, just exit the function
# and return the current oppy_id. If the latest commit is a NB or NBU
# deal, then make another lookup to ensure that all the NB info is gathered since they might
# have different closedates.
try:
latest_commit_id = list(subset.loc[latest_commit_filter, :].index)
latest_commitid_filter = subset.index.isin(latest_commit_id)
latest_commit_type = subset.loc[latest_commitid_filter, 'opportunity_type'].unique()[0]
except IndexError:
return {oppy_id}
if latest_commit_type == 'RN':
return set(latest_commit_id)
else:
try:
nb_before_latest_commit_filter = subset.closedate < latest_commit_date
nb_only_filter = subset.opportunity_type == 'NB'
nb_commit_id = list(subset.loc[nb_only_filter & nb_before_latest_commit_filter, :].index)
return set(latest_commit_id + nb_commit_id)
except IndexError:
return set(latest_commit_id)
@staticmethod
def fetch_products_id(oppy_ids: list, data: pd.DataFrame):
data = data.set_index('opportunityid')
return set(data.loc[data.index.isin(oppy_ids), 'productid'])
“用非常简单的话来说 Pandas 运行 在单台机器上运行,而 PySpark 运行 在多台机器上运行。如果你正在处理你正在处理的机器学习应用程序对于更大的数据集,PySpark 是最适合的,它可以比 Pandas."
快很多倍(100 倍)处理操作来自 https://sparkbyexamples.com/pyspark/pandas-vs-pyspark-dataframe-with-examples/
您还应该考虑一种 Windows 函数方法来获取先前的顺序。这将避免在所有记录上循环。