Flink Table 到 DataStream:如何访问列名?
Flink Table to DataStream: how to access column name?
我想使用 Flink SQL 将 Kafka 主题消费到 table,然后将其转换回 DataStream。
这里是 SOURCE_DDL
:
CREATE TABLE kafka_source (
user_id BIGINT,
datetime TIMESTAMP(3),
last_5_clicks STRING
) WITH (
'connector' = 'kafka',
'topic' = 'aiinfra.fct.userfeature.0',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'test-group',
'format' = 'json'
)
使用Flink,我执行DDL。
val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")
然后我转成DataStream,在map(e => ...)
部分做下游逻辑
tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...)
在 map(e => ...)
部分中,我想访问列名,在本例中为 last_5_clicks
。为什么?因为我可能有不同的源和不同的列名(例如 last_10min_page_view
),但我想重用 map(e => ...)
.
中的代码
有办法吗?谢谢。
从 Flink 1.12 开始,可以通过 Table.getSchema.getFieldNames
访问它。从 1.13 版本开始,可以通过 Row.getFieldNames
.
访问
我想使用 Flink SQL 将 Kafka 主题消费到 table,然后将其转换回 DataStream。
这里是 SOURCE_DDL
:
CREATE TABLE kafka_source (
user_id BIGINT,
datetime TIMESTAMP(3),
last_5_clicks STRING
) WITH (
'connector' = 'kafka',
'topic' = 'aiinfra.fct.userfeature.0',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'test-group',
'format' = 'json'
)
使用Flink,我执行DDL。
val settings = EnvironmentSettings.newInstance.build
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(streamEnv, settings)
tableEnv.executeSql(SOURCE_DDL)
val table = tableEnv.from("kafka_source")
然后我转成DataStream,在map(e => ...)
部分做下游逻辑
tableEnv.toRetractStream[(Long, java.sql.Timestamp, String)](table).map(e => ...)
在 map(e => ...)
部分中,我想访问列名,在本例中为 last_5_clicks
。为什么?因为我可能有不同的源和不同的列名(例如 last_10min_page_view
),但我想重用 map(e => ...)
.
有办法吗?谢谢。
从 Flink 1.12 开始,可以通过 Table.getSchema.getFieldNames
访问它。从 1.13 版本开始,可以通过 Row.getFieldNames
.