通过 python 将 parquet int96 时间戳转换为 datetime/date

parquet int96 timestamp conversion to datetime/date via python

TL;DR

我想将 int96 值(如 ACIE4NxJAAAKhSUA 转换为 可读 时间戳格式,如 2020- 03-02 14:34:22 或任何可以正常解释的内容...我主要使用 python 所以我希望构建一个函数来执行此转换。如果有另一个函数可以做相反的事情——那就更好了。

背景

我正在使用 parquet-tools 通过此命令将原始 parquet 文件(使用 snappy 压缩)转换为原始 JSON:

C:\Research> java -jar parquet-tools-1.8.2.jar cat --json original-file.snappy.parquet > parquet-output.json

在 JSON 中,我将这些值视为时间戳:

{... "_id":"101836","timestamp":"ACIE4NxJAAAKhSUA"}

我确定 "ACIE4NxJAAAKhSUA" 的时间戳值确实是 int96(这也通过读取 parquet 文件的模式得到证实....

message spark_schema {
 ...(stuff)...
  optional binary _id (UTF8);
  optional int96 timestamp;
}

我认为这也被称为 Impala 时间戳(至少我是这样收集的)

进一步的问题研究

我一直在到处寻找关于如何 "read" int96 值(到 python 的函数或信息——我想用那种语言保留它,因为我最熟悉它)并输出时间戳——我什么也没找到。

这是我已经研究过的一篇文章(与此主题相关):

关于折旧的 int96 时间戳

请不要让我停止在 parquet 文件中使用 old/depreciated 时间戳格式,根据我迄今为止所做的研究,我很清楚这一点。我是 file/data 的接收者 -- 我无法更改创建时使用的格式。

如果有另一种方法来控制初始 JSON 输出以提供 "non int96" 值——我也会对此感兴趣。

非常感谢您对 SO 社区的帮助!

parquet-tools 将无法将格式类型从 INT96 更改为 INT64。您在 json 输出中观察到的是存储在 INT96 TimestampType 中的时间戳的字符串表示形式。您将需要 spark 以 INT64 TimestampType 中的时间戳重写此镶木地板,然后 json 输出将生成一个时间戳(以您想要的格式)。

您需要在 Spark 中设置特定配置 -

spark-shell --conf spark.sql.parquet.outputTimestampType=TIMESTAMP_MICROS

2020-03-16 11:37:50 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://192.168.0.20:4040
Spark context available as 'sc' (master = local[*], app id = local-1584383875924).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
Type in expressions to have them evaluated.
Type :help for more information.

val sourceDf = spark.read.parquet("original-file.snappy.parquet")
2020-03-16 11:38:31 WARN  Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
sourceDf: org.apache.spark.sql.DataFrame = [application: struct<name: string, upgrades: struct<value: double> ... 3 more fields>, timestamp: timestamp ... 16 more fields]

scala> sourceDf.repartition(1).write.parquet("Downloads/output")

Parquet-tools 将显示正确的时间戳类型

parquet-tools schema Downloads/output/part-00000-edba239b-e696-4b4e-8fd3-c7cca9eea6bf-c000.snappy.parquet 

message spark_schema {
  ...
  optional binary _id (UTF8);
  optional int64 timestamp (TIMESTAMP_MICROS);
  ...
}

并且 json 转储给出 -

parquet-tools cat --json Downloads/output/part-00000-edba239b-e696-4b4e-8fd3-c7cca9eea6bf-c000.snappy.parquet

{..."_id":"101836", "timestamp":1583973827000000}

记录的时间戳以纳秒为单位。希望这对您有所帮助!

Doug,来自 arrow/cpp/src/parquet/types.h 的这段代码显示了 Int96 时间戳是如何在内部存储的:

constexpr int64_t kJulianToUnixEpochDays = INT64_C(2440588);
constexpr int64_t kSecondsPerDay = INT64_C(60 * 60 * 24);
constexpr int64_t kMillisecondsPerDay = kSecondsPerDay * INT64_C(1000);
constexpr int64_t kMicrosecondsPerDay = kMillisecondsPerDay * INT64_C(1000);
constexpr int64_t kNanosecondsPerDay = kMicrosecondsPerDay * INT64_C(1000);

MANUALLY_ALIGNED_STRUCT(1) Int96 { uint32_t value[3]; };
STRUCT_END(Int96, 12);

static inline void Int96SetNanoSeconds(parquet::Int96& i96, int64_t nanoseconds) {
  std::memcpy(&i96.value, &nanoseconds, sizeof(nanoseconds));
}

static inline int64_t Int96GetNanoSeconds(const parquet::Int96& i96) {
  // We do the computations in the unsigned domain to avoid unsigned behaviour
  // on overflow.
  uint64_t days_since_epoch =
      i96.value[2] - static_cast<uint64_t>(kJulianToUnixEpochDays);
  uint64_t nanoseconds = 0;

  memcpy(&nanoseconds, &i96.value, sizeof(uint64_t));
  return static_cast<int64_t>(days_since_epoch * kNanosecondsPerDay + nanoseconds);
}