PyFlink unix epoch 时间戳转换问题
PyFlink unix epoch timestamp conversion issue
我有带有 unix 纪元时间戳的事件,我正在使用 table 和源 table 的 Kinesis 连接器。我需要使用与水印相同的时间戳字段。
我如何在 python 中执行此操作?
我正在使用 Flink-1.11 版本,因为这是最新的 AWS 支持。
事件格式:{'event_time': 1633098843692, 'ticker': 'AMZN'}
Python table:
CREATE TABLE event_input_table (
event_time TIMESTAMP,
ticker VARCHAR,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kinesis',
'stream' = 'inputstream1',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json' ,
'aws.credentials.provider' = 'ENV_VAR'
)
CREATE TABLE event_input_table (
event_time BIGINT,
ip_src VARCHAR,
ip_dst VARCHAR,
domain ARRAY<VARCHAR>,
new_time as TO_TIMESTAMP(FROM_UNIXTIME(event_time))
)
我有带有 unix 纪元时间戳的事件,我正在使用 table 和源 table 的 Kinesis 连接器。我需要使用与水印相同的时间戳字段。 我如何在 python 中执行此操作? 我正在使用 Flink-1.11 版本,因为这是最新的 AWS 支持。
事件格式:{'event_time': 1633098843692, 'ticker': 'AMZN'}
Python table:
CREATE TABLE event_input_table (
event_time TIMESTAMP,
ticker VARCHAR,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
)
WITH (
'connector' = 'kinesis',
'stream' = 'inputstream1',
'aws.region' = 'us-east-1',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json' ,
'aws.credentials.provider' = 'ENV_VAR'
)
CREATE TABLE event_input_table (
event_time BIGINT,
ip_src VARCHAR,
ip_dst VARCHAR,
domain ARRAY<VARCHAR>,
new_time as TO_TIMESTAMP(FROM_UNIXTIME(event_time))
)