Apache 方解石:将整数转换为日期时间
Apache calcite: cast integer to datetime
我正在使用 Beam SQL 并尝试将整数转换为日期时间字段。
Schema resultSchema =
Schema.builder()
.addInt64Field("detectedCount")
.addStringField("sensor")
.addInt64Field("timestamp")
.build();
PCollection<Row> sensorRawUnboundedTimestampedSubset =
sensorRowUnbounded.apply(
SqlTransform.query(
"select PCOLLECTION.payload.`value`.`count` detectedCount, \n"
+ "PCOLLECTION.payload.`value`.`id` sensor, \n"
+ "PCOLLECTION.`timestamp` `timestamp` \n"
+ "from PCOLLECTION "))
.setRowSchema(resultSchema);
为了一些计算和开窗,我想convert/cast timestamp
到Datetime
字段?请提供一些将 resultSchema
中的 timestamp
转换为 DateTime
的指针。数据类型。
Beam (or in Calcite) 中没有开箱即用的方法。简短版本 - Calcite 或 Beam 无法知道您实际如何将日期或时间戳存储在整数中。然而,假设你有千分之一秒,这应该有效:
@Test
public void testBlah() throws Exception {
// input schema, has timestamps as epoch millis
Schema schema = Schema.builder().addInt64Field("ts").addStringField("st").build();
DateTime ts1 = new DateTime(2019, 8, 9, 10, 11, 12);
DateTime ts2 = new DateTime(2019, 8, 9, 10, 11, 12);
PCollection<Row> input =
pipeline
.apply(
"createRows",
Create.of(
Row.withSchema(schema).addValues(ts1.getMillis(), "two").build(),
Row.withSchema(schema).addValues(ts2.getMillis(), "twelve").build()))
.setRowSchema(schema);
PCollection<Row> result =
input.apply(
SqlTransform.query(
"SELECT \n"
+ "(TIMESTAMP '1970-01-01 00:00:00' + ts * INTERVAL '0.001' SECOND) as ts, \n"
+ "st \n"
+ "FROM \n"
+ "PCOLLECTION"));
// output schema, has timestamps as DateTime
Schema outSchema = Schema.builder().addDateTimeField("ts").addStringField("st").build();
PAssert.that(result)
.containsInAnyOrder(
Row.withSchema(outSchema).addValues(ts1, "two").build(),
Row.withSchema(outSchema).addValues(ts2, "twelve").build());
pipeline.run();
}
或者您始终可以在 java 中执行此操作,而不是在 SQL 中执行,只需将自定义 ParDo
应用于 SqlTransform
的输出即可。在 ParDo
从 Row
对象中提取整数时间戳,将其转换为 DateTime
然后发出它,例如作为具有不同架构的另一行的一部分。
我正在使用 Beam SQL 并尝试将整数转换为日期时间字段。
Schema resultSchema =
Schema.builder()
.addInt64Field("detectedCount")
.addStringField("sensor")
.addInt64Field("timestamp")
.build();
PCollection<Row> sensorRawUnboundedTimestampedSubset =
sensorRowUnbounded.apply(
SqlTransform.query(
"select PCOLLECTION.payload.`value`.`count` detectedCount, \n"
+ "PCOLLECTION.payload.`value`.`id` sensor, \n"
+ "PCOLLECTION.`timestamp` `timestamp` \n"
+ "from PCOLLECTION "))
.setRowSchema(resultSchema);
为了一些计算和开窗,我想convert/cast timestamp
到Datetime
字段?请提供一些将 resultSchema
中的 timestamp
转换为 DateTime
的指针。数据类型。
Beam (or in Calcite) 中没有开箱即用的方法。简短版本 - Calcite 或 Beam 无法知道您实际如何将日期或时间戳存储在整数中。然而,假设你有千分之一秒,这应该有效:
@Test
public void testBlah() throws Exception {
// input schema, has timestamps as epoch millis
Schema schema = Schema.builder().addInt64Field("ts").addStringField("st").build();
DateTime ts1 = new DateTime(2019, 8, 9, 10, 11, 12);
DateTime ts2 = new DateTime(2019, 8, 9, 10, 11, 12);
PCollection<Row> input =
pipeline
.apply(
"createRows",
Create.of(
Row.withSchema(schema).addValues(ts1.getMillis(), "two").build(),
Row.withSchema(schema).addValues(ts2.getMillis(), "twelve").build()))
.setRowSchema(schema);
PCollection<Row> result =
input.apply(
SqlTransform.query(
"SELECT \n"
+ "(TIMESTAMP '1970-01-01 00:00:00' + ts * INTERVAL '0.001' SECOND) as ts, \n"
+ "st \n"
+ "FROM \n"
+ "PCOLLECTION"));
// output schema, has timestamps as DateTime
Schema outSchema = Schema.builder().addDateTimeField("ts").addStringField("st").build();
PAssert.that(result)
.containsInAnyOrder(
Row.withSchema(outSchema).addValues(ts1, "two").build(),
Row.withSchema(outSchema).addValues(ts2, "twelve").build());
pipeline.run();
}
或者您始终可以在 java 中执行此操作,而不是在 SQL 中执行,只需将自定义 ParDo
应用于 SqlTransform
的输出即可。在 ParDo
从 Row
对象中提取整数时间戳,将其转换为 DateTime
然后发出它,例如作为具有不同架构的另一行的一部分。