PyFlink UDAF InternalRow 与 Row
PyFlink UDAF InternalRow vs. Row
我正在尝试通过 PyFlink
中的自定义 UDAF 调用外部函数。我使用的函数要求数据位于字典对象中。我尝试使用row(t.rowtime, t.b, t.c).cast(schema)
来达到这样的效果。
在 UDAF 之外,此表达式效果很好。在 UDAF 内部,此表达式被翻译为 InternalRow
,无法转换为字典对象。
有没有办法强制 UDAF 使用 Row
而不是 InternalRow
?
from pyflink.common import Row
from pyflink.table import EnvironmentSettings, TableEnvironment, AggregateFunction, DataTypes
from pyflink.table.expressions import row, col, lit, row_interval
from pyflink.table.window import Tumble
from pyflink.table.udf import udaf
from datetime import datetime, date, time
class TestAccumulator(AggregateFunction):
def create_accumulator(self):
return Row(last_type="")
def accumulate(self, accumulator, value):
accumulator["last_type"] = str(type(value))
def get_value(self, accumulator):
return accumulator["last_type"]
def get_result_type(self):
return DataTypes.STRING()
def get_accumulator_type(self):
return DataTypes.ROW([
DataTypes.FIELD("last_type", DataTypes.STRING()),
])
if __name__ == "__main__":
# create a blink streaming TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = TableEnvironment.create(env_settings)
schema = DataTypes.ROW([
DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3)),
DataTypes.FIELD("b", DataTypes.STRING()),
DataTypes.FIELD("c", DataTypes.STRING()),
])
my_udaf = udaf(TestAccumulator())
t = table_env.from_elements([(datetime(1970, 1, 1, 0, 0, 0), 'Hi', 'Hello'),
(datetime(1970, 1, 1, 1, 0, 0), 'Hi', 'hi'),
(datetime(1970, 1, 1, 2, 0, 0), 'Hi2', 'hi'),
(datetime(1970, 1, 1, 3, 0, 0), 'Hi', 'Hello'),
(datetime(1970, 1, 1, 4, 0, 0), 'Hi', 'Hello')], schema=schema)
print(
t.select( my_udaf(row(t.rowtime, t.b, t.c).cast(schema)).alias("udaf")).to_pandas().values
)
输出:
[["<class 'pyflink.fn_execution.coder_impl_fast.InternalRow'>"]]
感谢您报告问题。这是一个错误。我创建了一个 JIRA https://issues.apache.org/jira/browse/FLINK-23121 来修复它。它将在 1.13.2
版本中修复
我正在尝试通过 PyFlink
中的自定义 UDAF 调用外部函数。我使用的函数要求数据位于字典对象中。我尝试使用row(t.rowtime, t.b, t.c).cast(schema)
来达到这样的效果。
在 UDAF 之外,此表达式效果很好。在 UDAF 内部,此表达式被翻译为 InternalRow
,无法转换为字典对象。
有没有办法强制 UDAF 使用 Row
而不是 InternalRow
?
from pyflink.common import Row
from pyflink.table import EnvironmentSettings, TableEnvironment, AggregateFunction, DataTypes
from pyflink.table.expressions import row, col, lit, row_interval
from pyflink.table.window import Tumble
from pyflink.table.udf import udaf
from datetime import datetime, date, time
class TestAccumulator(AggregateFunction):
def create_accumulator(self):
return Row(last_type="")
def accumulate(self, accumulator, value):
accumulator["last_type"] = str(type(value))
def get_value(self, accumulator):
return accumulator["last_type"]
def get_result_type(self):
return DataTypes.STRING()
def get_accumulator_type(self):
return DataTypes.ROW([
DataTypes.FIELD("last_type", DataTypes.STRING()),
])
if __name__ == "__main__":
# create a blink streaming TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
table_env = TableEnvironment.create(env_settings)
schema = DataTypes.ROW([
DataTypes.FIELD("rowtime", DataTypes.TIMESTAMP(3)),
DataTypes.FIELD("b", DataTypes.STRING()),
DataTypes.FIELD("c", DataTypes.STRING()),
])
my_udaf = udaf(TestAccumulator())
t = table_env.from_elements([(datetime(1970, 1, 1, 0, 0, 0), 'Hi', 'Hello'),
(datetime(1970, 1, 1, 1, 0, 0), 'Hi', 'hi'),
(datetime(1970, 1, 1, 2, 0, 0), 'Hi2', 'hi'),
(datetime(1970, 1, 1, 3, 0, 0), 'Hi', 'Hello'),
(datetime(1970, 1, 1, 4, 0, 0), 'Hi', 'Hello')], schema=schema)
print(
t.select( my_udaf(row(t.rowtime, t.b, t.c).cast(schema)).alias("udaf")).to_pandas().values
)
输出:
[["<class 'pyflink.fn_execution.coder_impl_fast.InternalRow'>"]]
感谢您报告问题。这是一个错误。我创建了一个 JIRA https://issues.apache.org/jira/browse/FLINK-23121 来修复它。它将在 1.13.2
版本中修复