kafka ksqlDB 进程时间戳,格式为 yyyy-MM-dd HH:mm:ss.SSSSSS

kafka ksqlDB process timestamp with `yyyy-MM-dd HH:mm:ss.SSSSSS` format

我使用 kafka-connect 将 table 从 MySQL 流式传输到 kafka 主题。 table 包含一些 datetime(6) 列类型的列,喜欢这个 1611290740285818

当我使用 ksqlDB 将此时间戳转换为字符串时:

    SELECT TIMESTAMPTOSTRING(my_timestamp, 'yyyy-MM-dd HH:mm:ss.SSSSSS','UTC') AS DT6
      FROM my_topic
      EMIT CHANGES;

显示的字符串实际上是+53114-10-20 14:12:20.712000,而实际时间应该是2021-01-22 04:45:40.285818

任何人都可以告诉我我的查询有什么问题吗?

@Aydin 的回答是正确的。您共享的 bigint 值是微秒,而 ksqlDB 的 TIMESTAMPTOSTRING 函数需要 毫秒 。您指定的时间格式字符串只是告诉 ksqlDB 如何格式化时间戳,而不是如何解释它。这是一个例子:

-- Create a sample stream
ksql> CREATE STREAM TMP (TS BIGINT) WITH (KAFKA_TOPIC='TMP', PARTITIONS=1, VALUE_FORMAT='AVRO');

 Message
----------------
 Stream created
----------------

-- Populate it with example data
ksql> INSERT INTO TMP VALUES (1611290740285818);

-- Query the stream from the beginning
ksql> SET 'auto.offset.reset' = 'earliest';
>
Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'.

-- Reproduce the described behaviour
ksql> SELECT TS, TIMESTAMPTOSTRING(TS, 'yyyy-MM-dd HH:mm:ss.SSSSSS','UTC') FROM TMP EMIT CHANGES;
+--------------------+------------------------------+
|TS                  |KSQL_COL_0                    |
+--------------------+------------------------------+
|1611290740285818    |+53029-10-09 09:11:25.818000  |
^CQuery terminated

通过将微秒除以 1000,它们变成毫秒,并且该函数的行为与您预期的一样:

ksql> SELECT TS, 
             TIMESTAMPTOSTRING(TS/1000, 'yyyy-MM-dd HH:mm:ss.SSS','UTC') 
        FROM TMP 
      EMIT CHANGES;
+------------------+-------------------------+
|TS                |KSQL_COL_0               |
+------------------+-------------------------+
|1611290740285818  |2021-01-22 04:45:40.285  |