如何计算 Apache Beam 的百分比变化?即 pandas.DataFrame.pct_change
How to calculate percentage change in Apache Beam? i.e. pandas.DataFrame.pct_change
我是 Apache Beam 的新手,在这个看似非常简单的事情上卡了几个小时:
如何在 Apache Beam 中完成 pandas.DataFrame.pct_change?
我正在从 CSV 读取数据(使用 beam.io.ReadFromText),比如:
0 90
1 91
2 85
我想把它变成行之间的百分比变化,即
0 NaN
1 0.011111
2 -0.065934
如何在 Apache Beam 管道中做到这一点?
祝一切顺利!
Beam 相对于 Pandas 的主要优势是能够并行执行大量操作。并行性也发生在读取中,因此没有像 Pandas.
中那样的“下一步”的简单概念
这就是为什么需要固定顺序的操作(例如,来自 Pandas 的所有滚动功能)在 Beam(和其他并行 ETL 框架)中更难执行的主要原因。他们几乎需要将所有元素发送给同一个工作人员并在那里执行操作,因此您正在失去 Beam 的优势,使用 Pandas.
可能会更好
BUT,因为你有一个 row
字段告诉我们顺序,我们可以使用 row
字段作为 [=14] =] 和 SlidingWindows
而不会失去并行性。
由于 Combiner(我们对事物进行分组的方式)不是 commutative/associative,因此我们需要高级组合器。在这两个答案中有关于这个概念的更多信息 1
p = beam.Pipeline()
class RollingChange(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self, list, input):
list.append(input)
return list
def merge_accumulators(self, accumulators):
final_list = []
for list in accumulators:
final_list += list
return final_list
def extract_output(self, list_of_list):
if len(list_of_list) == 2:
first = list_of_list[0]
second = list_of_list[1]
second["change"] = second["value"] / first["value"] - 1
return second
elif len(list_of_list) == 1 and list_of_list[0]['row'] == 0:
list_of_list[0]["change"] = 0
return list_of_list[0]
else:
pass
elements = [
{"row": 0, "value": 90},
{"row": 1, "value": 91},
{"row": 2, "value": 85},
{"row": 3, "value": 100},
{"row": 4, "value": 200}
]
(p | Create(elements)
| Map(lambda x: window.TimestampedValue(x, x['row'])) # adds row as timestamp for windows
| WindowInto(window.SlidingWindows(2, 1))
| beam.core.CombineGlobally(RollingChange()).without_defaults()
| beam.core.Filter(lambda x: x != None) # filters the last row (4)
| Map(print))
p.run()
这个输出是(注意顺序可能会改变)
{'row': 1, 'value': 91, 'change': 0.011111111111111072}
{'row': 0, 'value': 90, 'change': 0}
{'row': 2, 'value': 85, 'change': -0.06593406593406592}
{'row': 3, 'value': 100, 'change': 0.17647058823529416}
{'row': 4, 'value': 200, 'change': 1.0}
我是 Apache Beam 的新手,在这个看似非常简单的事情上卡了几个小时:
如何在 Apache Beam 中完成 pandas.DataFrame.pct_change?
我正在从 CSV 读取数据(使用 beam.io.ReadFromText),比如:
0 90
1 91
2 85
我想把它变成行之间的百分比变化,即
0 NaN
1 0.011111
2 -0.065934
如何在 Apache Beam 管道中做到这一点?
祝一切顺利!
Beam 相对于 Pandas 的主要优势是能够并行执行大量操作。并行性也发生在读取中,因此没有像 Pandas.
中那样的“下一步”的简单概念这就是为什么需要固定顺序的操作(例如,来自 Pandas 的所有滚动功能)在 Beam(和其他并行 ETL 框架)中更难执行的主要原因。他们几乎需要将所有元素发送给同一个工作人员并在那里执行操作,因此您正在失去 Beam 的优势,使用 Pandas.
可能会更好BUT,因为你有一个 row
字段告诉我们顺序,我们可以使用 row
字段作为 [=14] =] 和 SlidingWindows
而不会失去并行性。
由于 Combiner(我们对事物进行分组的方式)不是 commutative/associative,因此我们需要高级组合器。在这两个答案中有关于这个概念的更多信息 1
p = beam.Pipeline()
class RollingChange(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self, list, input):
list.append(input)
return list
def merge_accumulators(self, accumulators):
final_list = []
for list in accumulators:
final_list += list
return final_list
def extract_output(self, list_of_list):
if len(list_of_list) == 2:
first = list_of_list[0]
second = list_of_list[1]
second["change"] = second["value"] / first["value"] - 1
return second
elif len(list_of_list) == 1 and list_of_list[0]['row'] == 0:
list_of_list[0]["change"] = 0
return list_of_list[0]
else:
pass
elements = [
{"row": 0, "value": 90},
{"row": 1, "value": 91},
{"row": 2, "value": 85},
{"row": 3, "value": 100},
{"row": 4, "value": 200}
]
(p | Create(elements)
| Map(lambda x: window.TimestampedValue(x, x['row'])) # adds row as timestamp for windows
| WindowInto(window.SlidingWindows(2, 1))
| beam.core.CombineGlobally(RollingChange()).without_defaults()
| beam.core.Filter(lambda x: x != None) # filters the last row (4)
| Map(print))
p.run()
这个输出是(注意顺序可能会改变)
{'row': 1, 'value': 91, 'change': 0.011111111111111072}
{'row': 0, 'value': 90, 'change': 0}
{'row': 2, 'value': 85, 'change': -0.06593406593406592}
{'row': 3, 'value': 100, 'change': 0.17647058823529416}
{'row': 4, 'value': 200, 'change': 1.0}