kafka ksqlDB 进程时间戳,格式为 yyyy-MM-dd HH:mm:ss.SSSSSS
kafka ksqlDB process timestamp with `yyyy-MM-dd HH:mm:ss.SSSSSS` format
我使用 kafka-connect
将 table 从 MySQL
流式传输到 kafka 主题。 table 包含一些 datetime(6) 列类型的列,喜欢这个 1611290740285818
。
当我使用 ksqlDB 将此时间戳转换为字符串时:
SELECT TIMESTAMPTOSTRING(my_timestamp, 'yyyy-MM-dd HH:mm:ss.SSSSSS','UTC') AS DT6
FROM my_topic
EMIT CHANGES;
显示的字符串实际上是+53114-10-20 14:12:20.712000
,而实际时间应该是2021-01-22 04:45:40.285818
。
任何人都可以告诉我我的查询有什么问题吗?
@Aydin 的回答是正确的。您共享的 bigint 值是微秒,而 ksqlDB 的 TIMESTAMPTOSTRING
函数需要 毫秒 。您指定的时间格式字符串只是告诉 ksqlDB 如何格式化时间戳,而不是如何解释它。这是一个例子:
-- Create a sample stream
ksql> CREATE STREAM TMP (TS BIGINT) WITH (KAFKA_TOPIC='TMP', PARTITIONS=1, VALUE_FORMAT='AVRO');
Message
----------------
Stream created
----------------
-- Populate it with example data
ksql> INSERT INTO TMP VALUES (1611290740285818);
-- Query the stream from the beginning
ksql> SET 'auto.offset.reset' = 'earliest';
>
Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'.
-- Reproduce the described behaviour
ksql> SELECT TS, TIMESTAMPTOSTRING(TS, 'yyyy-MM-dd HH:mm:ss.SSSSSS','UTC') FROM TMP EMIT CHANGES;
+--------------------+------------------------------+
|TS |KSQL_COL_0 |
+--------------------+------------------------------+
|1611290740285818 |+53029-10-09 09:11:25.818000 |
^CQuery terminated
通过将微秒除以 1000,它们变成毫秒,并且该函数的行为与您预期的一样:
ksql> SELECT TS,
TIMESTAMPTOSTRING(TS/1000, 'yyyy-MM-dd HH:mm:ss.SSS','UTC')
FROM TMP
EMIT CHANGES;
+------------------+-------------------------+
|TS |KSQL_COL_0 |
+------------------+-------------------------+
|1611290740285818 |2021-01-22 04:45:40.285 |
我使用 kafka-connect
将 table 从 MySQL
流式传输到 kafka 主题。 table 包含一些 datetime(6) 列类型的列,喜欢这个 1611290740285818
。
当我使用 ksqlDB 将此时间戳转换为字符串时:
SELECT TIMESTAMPTOSTRING(my_timestamp, 'yyyy-MM-dd HH:mm:ss.SSSSSS','UTC') AS DT6
FROM my_topic
EMIT CHANGES;
显示的字符串实际上是+53114-10-20 14:12:20.712000
,而实际时间应该是2021-01-22 04:45:40.285818
。
任何人都可以告诉我我的查询有什么问题吗?
@Aydin 的回答是正确的。您共享的 bigint 值是微秒,而 ksqlDB 的 TIMESTAMPTOSTRING
函数需要 毫秒 。您指定的时间格式字符串只是告诉 ksqlDB 如何格式化时间戳,而不是如何解释它。这是一个例子:
-- Create a sample stream
ksql> CREATE STREAM TMP (TS BIGINT) WITH (KAFKA_TOPIC='TMP', PARTITIONS=1, VALUE_FORMAT='AVRO');
Message
----------------
Stream created
----------------
-- Populate it with example data
ksql> INSERT INTO TMP VALUES (1611290740285818);
-- Query the stream from the beginning
ksql> SET 'auto.offset.reset' = 'earliest';
>
Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'.
-- Reproduce the described behaviour
ksql> SELECT TS, TIMESTAMPTOSTRING(TS, 'yyyy-MM-dd HH:mm:ss.SSSSSS','UTC') FROM TMP EMIT CHANGES;
+--------------------+------------------------------+
|TS |KSQL_COL_0 |
+--------------------+------------------------------+
|1611290740285818 |+53029-10-09 09:11:25.818000 |
^CQuery terminated
通过将微秒除以 1000,它们变成毫秒,并且该函数的行为与您预期的一样:
ksql> SELECT TS,
TIMESTAMPTOSTRING(TS/1000, 'yyyy-MM-dd HH:mm:ss.SSS','UTC')
FROM TMP
EMIT CHANGES;
+------------------+-------------------------+
|TS |KSQL_COL_0 |
+------------------+-------------------------+
|1611290740285818 |2021-01-22 04:45:40.285 |