使用 ParquetWriter 在 parquet 中存储当前时间戳
Store current timestamp in parquet using ParquetWriter
我正在使用以下方法在 parquet 文件中写入数据:
static void toParquet() {
String schema = "message spark_schema {\n optional binary stringField (UTF8);\n optional INT96 createdAt;\n}";
MessageType readSchema = MessageTypeParser.parseMessageType(schema);
Configuration configuration = new Configuration();
GroupWriteSupport.setSchema(readSchema, configuration);
SimpleGroupFactory sfg = new SimpleGroupFactory(readSchema);
Path file = new Path("/home/user/data-" + System.currentTimeMillis() + ".parquet");
try {
ParquetWriter<Group> writer = new ParquetWriter<Group>(file, new GroupWriteSupport(), CompressionCodecName.UNCOMPRESSED, 1024, 1024, 512,
true, false, ParquetProperties.WriterVersion.PARQUET_1_0, configuration);
for (int i = 0; i < 10000; ++i) {
writer.write(sfg.newGroup().append("stringField", "abc").append("createdAt", String.valueOf(System.currentTimeMillis())));
}
writer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
但它给了我以下异常:
java.lang.IllegalArgumentException: Fixed Binary size 13 does not
match field type length 12 at
org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter.writeBytes(FixedLenByteArrayPlainValuesWriter.java:53)
at
org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainFixedLenArrayDictionaryValuesWriter.createDictionaryPage(DictionaryValuesWriter.java:324)
at
org.apache.parquet.column.values.fallback.FallbackValuesWriter.createDictionaryPage(FallbackValuesWriter.java:102)
at
org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:242)
at
org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:126)
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:164)
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:141)
at
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:123)
at
org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:288)
at com.app.test.SimpleParquet.toParquet(SimpleParquet.java:498) at
com.app.test.SimpleParquet.main(SimpleParquet.java:63)
由于System.currentTimeMillis()给出了13字节的值,而epoc时间是10字节,如何为固定长度的12字节数组的当前时间戳提供int96值。
我发现一些帖子说时间戳是儒略日和时间的组合,以纳秒为单位,我如何将当前时间戳转换成这种格式?
以下代码对我有用,我在其中计算了 NanoTime,然后在其上调用了 toBinary 函数。
public static NanoTime getNanoTime(String time) {
Timestamp ts = Timestamp.valueOf(time);
Calendar calendar = getCalendar();
calendar.setTime(ts);
JDateTime jDateTime = new JDateTime(calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH));
int days = jDateTime.getJulianDayNumber();
long hour = calendar.get(Calendar.HOUR_OF_DAY);
long minute = calendar.get(Calendar.MINUTE);
long second = calendar.get(Calendar.SECOND);
long nanos = ts.getNanos();
long nanosOfDay = nanos + NANOS_PER_SECOND * second + NANOS_PER_SECOND * SECONDS_PER_MINUTE * minute + NANOS_PER_SECOND * SECONDS_PER_MINUTE
* MINUTES_PER_HOUR * hour;
return new NanoTime(days, nanosOfDay);
}
使用上述方法在 parquet 中写入时 -
writer.write(sfg.newGroup().append("stringField", "abc").append("createdAt", getNanoTime("2017-05-23 11:59:43.345717").toBinary()));
我正在使用以下方法在 parquet 文件中写入数据:
static void toParquet() {
String schema = "message spark_schema {\n optional binary stringField (UTF8);\n optional INT96 createdAt;\n}";
MessageType readSchema = MessageTypeParser.parseMessageType(schema);
Configuration configuration = new Configuration();
GroupWriteSupport.setSchema(readSchema, configuration);
SimpleGroupFactory sfg = new SimpleGroupFactory(readSchema);
Path file = new Path("/home/user/data-" + System.currentTimeMillis() + ".parquet");
try {
ParquetWriter<Group> writer = new ParquetWriter<Group>(file, new GroupWriteSupport(), CompressionCodecName.UNCOMPRESSED, 1024, 1024, 512,
true, false, ParquetProperties.WriterVersion.PARQUET_1_0, configuration);
for (int i = 0; i < 10000; ++i) {
writer.write(sfg.newGroup().append("stringField", "abc").append("createdAt", String.valueOf(System.currentTimeMillis())));
}
writer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
但它给了我以下异常:
java.lang.IllegalArgumentException: Fixed Binary size 13 does not match field type length 12 at org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter.writeBytes(FixedLenByteArrayPlainValuesWriter.java:53) at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainFixedLenArrayDictionaryValuesWriter.createDictionaryPage(DictionaryValuesWriter.java:324) at org.apache.parquet.column.values.fallback.FallbackValuesWriter.createDictionaryPage(FallbackValuesWriter.java:102) at org.apache.parquet.column.impl.ColumnWriterV1.flush(ColumnWriterV1.java:242) at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:126) at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:164) at org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:141) at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:123) at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:288) at com.app.test.SimpleParquet.toParquet(SimpleParquet.java:498) at com.app.test.SimpleParquet.main(SimpleParquet.java:63)
由于System.currentTimeMillis()给出了13字节的值,而epoc时间是10字节,如何为固定长度的12字节数组的当前时间戳提供int96值。
我发现一些帖子说时间戳是儒略日和时间的组合,以纳秒为单位,我如何将当前时间戳转换成这种格式?
以下代码对我有用,我在其中计算了 NanoTime,然后在其上调用了 toBinary 函数。
public static NanoTime getNanoTime(String time) {
Timestamp ts = Timestamp.valueOf(time);
Calendar calendar = getCalendar();
calendar.setTime(ts);
JDateTime jDateTime = new JDateTime(calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH));
int days = jDateTime.getJulianDayNumber();
long hour = calendar.get(Calendar.HOUR_OF_DAY);
long minute = calendar.get(Calendar.MINUTE);
long second = calendar.get(Calendar.SECOND);
long nanos = ts.getNanos();
long nanosOfDay = nanos + NANOS_PER_SECOND * second + NANOS_PER_SECOND * SECONDS_PER_MINUTE * minute + NANOS_PER_SECOND * SECONDS_PER_MINUTE
* MINUTES_PER_HOUR * hour;
return new NanoTime(days, nanosOfDay);
}
使用上述方法在 parquet 中写入时 -
writer.write(sfg.newGroup().append("stringField", "abc").append("createdAt", getNanoTime("2017-05-23 11:59:43.345717").toBinary()));