使用 Apache beam `GroupByKey` 并构建一个新列 - Python
Use Apache beam `GroupByKey` and construct a new column - Python
从这个问题:,我知道如何使用pandas
对多列进行分组并构造一个新的唯一ID,但是如果我想在[=中使用Apache beam
31=] 来实现该问题中描述的相同内容,我如何实现它,然后将新数据写入换行符分隔的 JSON 格式文件(每行是一个 unique_id
数组属于那个 unique_id)?
的对象
假设数据集存储在 csv 文件中。
我是 Apache Beam 的新手,这是我现在拥有的:
import pandas
import apache_beam as beam
from apache_beam.dataframe.io import read_csv
with beam.Pipeline() as p:
df = p | read_csv("example.csv", names=cols)
agg_df = df.insert(0, 'unique_id',
df.groupby(['postcode', 'house_number'], sort=False).ngroup())
agg_df.to_csv('test_output')
这给了我一个错误:
NotImplementedError: 'ngroup' is not yet supported (BEAM-9547)
这真的很烦人,我对 Apache Beam 不是很熟悉,有人可以帮忙吗...
(参考:https://beam.apache.org/documentation/dsls/dataframes/overview/)
将连续的整数分配给一个集合不是很适合并行计算的事情。它也不是很稳定。有没有其他标识符的原因(例如元组 (postcode, house_number)
或其散列不合适?
从这个问题:pandas
对多列进行分组并构造一个新的唯一ID,但是如果我想在[=中使用Apache beam
31=] 来实现该问题中描述的相同内容,我如何实现它,然后将新数据写入换行符分隔的 JSON 格式文件(每行是一个 unique_id
数组属于那个 unique_id)?
假设数据集存储在 csv 文件中。
我是 Apache Beam 的新手,这是我现在拥有的:
import pandas
import apache_beam as beam
from apache_beam.dataframe.io import read_csv
with beam.Pipeline() as p:
df = p | read_csv("example.csv", names=cols)
agg_df = df.insert(0, 'unique_id',
df.groupby(['postcode', 'house_number'], sort=False).ngroup())
agg_df.to_csv('test_output')
这给了我一个错误:
NotImplementedError: 'ngroup' is not yet supported (BEAM-9547)
这真的很烦人,我对 Apache Beam 不是很熟悉,有人可以帮忙吗...
(参考:https://beam.apache.org/documentation/dsls/dataframes/overview/)
将连续的整数分配给一个集合不是很适合并行计算的事情。它也不是很稳定。有没有其他标识符的原因(例如元组 (postcode, house_number)
或其散列不合适?