如何使用纯 Java(包括日期和小数类型)生成 Parquet 文件并上传到 S3 [Windows](无 HDFS)
How to Generate Parquet File Using Pure Java (Including Date & Decimal Types) And Upload to S3 [Windows] (No HDFS)
我最近有一个要求,我需要生成仅使用 Java 即可由 Apache Spark 读取的 Parquet 文件(不使用额外的软件安装,例如:Apache Drill、Hive、Spark 等) .文件需要保存到 S3,因此我将分享如何执行这两项操作的详细信息。
没有关于如何执行此操作的简单易懂的指南。我也不是 Java 程序员,所以使用 Maven、Hadoop 等的概念对我来说都是陌生的。所以我花了将近两周的时间才开始工作。我想在下面分享我如何实现这一目标的个人指南
免责声明:下面的代码示例绝不代表最佳实践,仅作为粗略的操作方法提供。
依赖关系:
- parquet-avro (1.9.0) : https://mvnrepository.com/artifact/org.apache.parquet/parquet-avro/1.9.0(我们使用 1.9.0,因为此版本使用支持小数和日期的 Avro 1.8+)
- hadoop-aws (2.8.2) [如果您不打算写入 S3,您将不需要它,但您将需要添加其他几个通常会因此而添加的依赖项。我不会介绍那种情况。所以即使你只打算在本地磁盘上生成 Parquet 文件,你仍然可以将它作为依赖项添加到你的项目中]:https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/2.8.2(我们使用它是因为它是当时的最新版本)
- Hadoop 2.8.1:https://github.com/steveloughran/winutils/tree/master/hadoop-2.8.1(我们使用 2.8.X 因为它需要匹配 parquet-avro 和 hadoop-aws 依赖项中使用的 hadoop 库)
我将使用 NetBeans 作为我的 IDE。
Java 中有关镶木地板的一些信息(对于像我这样的菜鸟):
- 为了将数据序列化为 parquet,您必须选择一种流行的 Java 数据序列化框架:Avro、Protocol Buffers 或 Thrift(我将使用 Avro (1.8.0),因为可以从我们的parquet-avro依赖中看出)
- 您需要使用支持 Maven 的 IDE。这是因为上面的依赖有很多自己的依赖。 Maven 会自动为你下载那些(比如 NuGet for VisualStudio)
先决条件:
您的 windows 机器上必须有 hadoop,它将 运行 编译 Java 代码。好消息是您不需要安装整个 hadoop 软件,而只需要两个文件:
- hadoop.dll
- winutils.exe
这些可以下载here。对于此示例,您将需要版本 2.8.1(由于 parquet-avro 1.9.0)。
- 将这些文件复制到目标机器上的C:\hadoop-2.8.1\bin。
添加一个名为:HADOOP_HOME 的新系统变量(不是用户变量),值为 C:\hadoop- 2.8.1
修改System Path变量(非用户变量),在末尾添加:%HADOOP_HOME% \bin
- 重新启动计算机以使更改生效。
如果此配置未正确完成,您将在 运行 时收到以下错误:java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
编码入门:
- 首先新建一个空的Maven工程,添加parquet-avro 1.9.0和hadoop-aws 2.8.2作为依赖:
- 创建您的主class,您可以在其中编写一些代码
首先你需要生成一个Schema。现在据我所知,您无法在 运行 时间以编程方式生成模式。 Schema.Parser class' parse() 方法仅将文件或字符串文字作为参数,而不会'模式一旦创建,您就无法修改它。
为了避免这种情况,我在 运行 时间生成我的架构 JSON 并解析它。下面是一个示例模式:
String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything
+ "\"type\": \"record\"," //Must be set as record
+ "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything
+ "\"fields\": ["
+ " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field
+ " {\"name\": \"myString\", \"type\": [\"string\", \"null\"]},"
+ " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]},"
+ " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}"
+ " ]}";
Parser parser = new Schema.Parser().setValidate(true);
Schema avroSchema = parser.parse(schema);
有关 Avro 架构的详细信息可在此处找到:https://avro.apache.org/docs/1.8.0/spec.html
接下来我们就可以开始生成记录了(Avro原始类型很简单):
GenericData.Record record = new GenericData.Record(avroSchema);
record.put("myInteger", 1);
record.put("myString", "string value 1");
- 为了生成十进制逻辑类型,必须使用 fixed 或 bytes 基本类型作为实际存储数据类型。当前的 Parquet 格式仅支持固定长度的字节数组(又名:
fixed_len_byte_array
)。所以我们也必须在我们的例子中使用 fixed(如模式中所示)。在 Java 中,我们必须使用 BigDecimal
才能真正处理小数。而且我已经确定 Decimal(32,4)
无论值如何,都不会超过 16 个字节。因此,我们将在下面的序列化(以及上面的架构中)中使用标准字节数组大小 16:
BigDecimal myDecimalValue = new BigDecimal("99.9999");
//First we need to make sure the BigDecimal matches our schema scale:
myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP);
//Next we get the decimal value as one BigInteger (like there was no decimal point)
BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue();
//Finally we serialize the integer
byte[] decimalBytes = myUnscaledDecimalValue.toByteArray();
//We need to create an Avro 'Fixed' type and pass the decimal schema once more here:
GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}"));
byte[] myDecimalBuffer = new byte[16];
if (myDecimalBuffer.length >= decimalBytes.length) {
//Because we set our fixed byte array size as 16 bytes, we need to
//pad-left our original value's bytes with zeros
int myDecimalBufferIndex = myDecimalBuffer.length - 1;
for(int i = decimalBytes.length - 1; i >= 0; i--){
myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i];
myDecimalBufferIndex--;
}
//Save result
fixed.bytes(myDecimalBuffer);
} else {
throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length));
}
//We can finally write our decimal to our record
record.put("myDecimal", fixed);
对于日期值,Avro 指定我们需要将自 EPOCH 以来的天数保存为整数。 (如果您还需要时间组件,例如实际的 DateTime 类型,则需要使用 Timestamp Avro 类型,我不会介绍)。
我发现获取纪元以来天数的最简单方法是使用 joda-time 库。如果您将 hadoop-aws 依赖项添加到您的项目中,您应该已经拥有该库。如果没有,您需要自己添加:
//Get epoch value
MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC);
DateTime currentDate = new DateTime(); //Can take Java Date in constructor
Days days = Days.daysBetween(epoch, currentDate);
//We can write number of days since epoch into the record
record.put("myDate", days.getDays());
我们终于可以开始编写 parquet 文件了
try {
Configuration conf = new Configuration();
conf.set("fs.s3a.access.key", "ACCESSKEY");
conf.set("fs.s3a.secret.key", "SECRETKEY");
//Below are some other helpful settings
//conf.set("fs.s3a.endpoint", "s3.amazonaws.com");
//conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
//conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library.
//conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors
Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet");
//Use path below to save to local file system instead
//Path path = new Path("data.parquet");
try (ParquetWriter writer = AvroParquetWriter.builder(path)
.withSchema(avroSchema)
.withCompressionCodec(CompressionCodecName.GZIP)
.withConf(conf)
.withPageSize(4 * 1024 * 1024) //For compression
.withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size)
.build()) {
//We only have one record to write in our example
writer.write(record);
}
} catch (Exception ex) { ex.printStackTrace(System.out); }
这是加载到 Apache Spark (2.2.0) 中的数据:
为了您的方便,完整的源代码:
package com.mycompany.Whosebug;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Days;
import org.joda.time.MutableDateTime;
public class Main {
public static void main(String[] args) {
System.out.println("Start");
String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything
+ "\"type\": \"record\"," //Must be set as record
+ "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything
+ "\"fields\": ["
+ " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field
+ " {\"name\": \"myString\", \"type\": [\"string\", \"null\"]},"
+ " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]},"
+ " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}"
+ " ]}";
Schema.Parser parser = new Schema.Parser().setValidate(true);
Schema avroSchema = parser.parse(schema);
GenericData.Record record = new GenericData.Record(avroSchema);
record.put("myInteger", 1);
record.put("myString", "string value 1");
BigDecimal myDecimalValue = new BigDecimal("99.9999");
//First we need to make sure the huge decimal matches our schema scale:
myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP);
//Next we get the decimal value as one BigInteger (like there was no decimal point)
BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue();
//Finally we serialize the integer
byte[] decimalBytes = myUnscaledDecimalValue.toByteArray();
//We need to create an Avro 'Fixed' type and pass the decimal schema once more here:
GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}"));
byte[] myDecimalBuffer = new byte[16];
if (myDecimalBuffer.length >= decimalBytes.length) {
//Because we set our fixed byte array size as 16 bytes, we need to
//pad-left our original value's bytes with zeros
int myDecimalBufferIndex = myDecimalBuffer.length - 1;
for(int i = decimalBytes.length - 1; i >= 0; i--){
myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i];
myDecimalBufferIndex--;
}
//Save result
fixed.bytes(myDecimalBuffer);
} else {
throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length));
}
//We can finally write our decimal to our record
record.put("myDecimal", fixed);
//Get epoch value
MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC);
DateTime currentDate = new DateTime(); //Can take Java Date in constructor
Days days = Days.daysBetween(epoch, currentDate);
//We can write number of days since epoch into the record
record.put("myDate", days.getDays());
try {
Configuration conf = new Configuration();
conf.set("fs.s3a.access.key", "ACCESSKEY");
conf.set("fs.s3a.secret.key", "SECRETKEY");
//Below are some other helpful settings
//conf.set("fs.s3a.endpoint", "s3.amazonaws.com");
//conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
//conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library.
//conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors.
Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet");
//Use path below to save to local file system instead
//Path path = new Path("data.parquet");
try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData.Record>builder(path)
.withSchema(avroSchema)
.withCompressionCodec(CompressionCodecName.GZIP)
.withConf(conf)
.withPageSize(4 * 1024 * 1024) //For compression
.withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size)
.build()) {
//We only have one record to write in our example
writer.write(record);
}
} catch (Exception ex) {
ex.printStackTrace(System.out);
}
}
}
我最近有一个要求,我需要生成仅使用 Java 即可由 Apache Spark 读取的 Parquet 文件(不使用额外的软件安装,例如:Apache Drill、Hive、Spark 等) .文件需要保存到 S3,因此我将分享如何执行这两项操作的详细信息。
没有关于如何执行此操作的简单易懂的指南。我也不是 Java 程序员,所以使用 Maven、Hadoop 等的概念对我来说都是陌生的。所以我花了将近两周的时间才开始工作。我想在下面分享我如何实现这一目标的个人指南
免责声明:下面的代码示例绝不代表最佳实践,仅作为粗略的操作方法提供。
依赖关系:
- parquet-avro (1.9.0) : https://mvnrepository.com/artifact/org.apache.parquet/parquet-avro/1.9.0(我们使用 1.9.0,因为此版本使用支持小数和日期的 Avro 1.8+)
- hadoop-aws (2.8.2) [如果您不打算写入 S3,您将不需要它,但您将需要添加其他几个通常会因此而添加的依赖项。我不会介绍那种情况。所以即使你只打算在本地磁盘上生成 Parquet 文件,你仍然可以将它作为依赖项添加到你的项目中]:https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws/2.8.2(我们使用它是因为它是当时的最新版本)
- Hadoop 2.8.1:https://github.com/steveloughran/winutils/tree/master/hadoop-2.8.1(我们使用 2.8.X 因为它需要匹配 parquet-avro 和 hadoop-aws 依赖项中使用的 hadoop 库)
我将使用 NetBeans 作为我的 IDE。
Java 中有关镶木地板的一些信息(对于像我这样的菜鸟):
- 为了将数据序列化为 parquet,您必须选择一种流行的 Java 数据序列化框架:Avro、Protocol Buffers 或 Thrift(我将使用 Avro (1.8.0),因为可以从我们的parquet-avro依赖中看出)
- 您需要使用支持 Maven 的 IDE。这是因为上面的依赖有很多自己的依赖。 Maven 会自动为你下载那些(比如 NuGet for VisualStudio)
先决条件:
您的 windows 机器上必须有 hadoop,它将 运行 编译 Java 代码。好消息是您不需要安装整个 hadoop 软件,而只需要两个文件:
- hadoop.dll
- winutils.exe
这些可以下载here。对于此示例,您将需要版本 2.8.1(由于 parquet-avro 1.9.0)。
- 将这些文件复制到目标机器上的C:\hadoop-2.8.1\bin。
添加一个名为:HADOOP_HOME 的新系统变量(不是用户变量),值为 C:\hadoop- 2.8.1
修改System Path变量(非用户变量),在末尾添加:%HADOOP_HOME% \bin
- 重新启动计算机以使更改生效。
如果此配置未正确完成,您将在 运行 时收到以下错误:java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
编码入门:
- 首先新建一个空的Maven工程,添加parquet-avro 1.9.0和hadoop-aws 2.8.2作为依赖:
- 创建您的主class,您可以在其中编写一些代码
首先你需要生成一个Schema。现在据我所知,您无法在 运行 时间以编程方式生成模式。 Schema.Parser class' parse() 方法仅将文件或字符串文字作为参数,而不会'模式一旦创建,您就无法修改它。 为了避免这种情况,我在 运行 时间生成我的架构 JSON 并解析它。下面是一个示例模式:
String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything + "\"type\": \"record\"," //Must be set as record + "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything + "\"fields\": [" + " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field + " {\"name\": \"myString\", \"type\": [\"string\", \"null\"]}," + " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]}," + " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}" + " ]}"; Parser parser = new Schema.Parser().setValidate(true); Schema avroSchema = parser.parse(schema);
有关 Avro 架构的详细信息可在此处找到:https://avro.apache.org/docs/1.8.0/spec.html
接下来我们就可以开始生成记录了(Avro原始类型很简单):
GenericData.Record record = new GenericData.Record(avroSchema); record.put("myInteger", 1); record.put("myString", "string value 1");
- 为了生成十进制逻辑类型,必须使用 fixed 或 bytes 基本类型作为实际存储数据类型。当前的 Parquet 格式仅支持固定长度的字节数组(又名:
fixed_len_byte_array
)。所以我们也必须在我们的例子中使用 fixed(如模式中所示)。在 Java 中,我们必须使用BigDecimal
才能真正处理小数。而且我已经确定Decimal(32,4)
无论值如何,都不会超过 16 个字节。因此,我们将在下面的序列化(以及上面的架构中)中使用标准字节数组大小 16:
BigDecimal myDecimalValue = new BigDecimal("99.9999"); //First we need to make sure the BigDecimal matches our schema scale: myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP); //Next we get the decimal value as one BigInteger (like there was no decimal point) BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue(); //Finally we serialize the integer byte[] decimalBytes = myUnscaledDecimalValue.toByteArray(); //We need to create an Avro 'Fixed' type and pass the decimal schema once more here: GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}")); byte[] myDecimalBuffer = new byte[16]; if (myDecimalBuffer.length >= decimalBytes.length) { //Because we set our fixed byte array size as 16 bytes, we need to //pad-left our original value's bytes with zeros int myDecimalBufferIndex = myDecimalBuffer.length - 1; for(int i = decimalBytes.length - 1; i >= 0; i--){ myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i]; myDecimalBufferIndex--; } //Save result fixed.bytes(myDecimalBuffer); } else { throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length)); } //We can finally write our decimal to our record record.put("myDecimal", fixed);
- 为了生成十进制逻辑类型,必须使用 fixed 或 bytes 基本类型作为实际存储数据类型。当前的 Parquet 格式仅支持固定长度的字节数组(又名:
对于日期值,Avro 指定我们需要将自 EPOCH 以来的天数保存为整数。 (如果您还需要时间组件,例如实际的 DateTime 类型,则需要使用 Timestamp Avro 类型,我不会介绍)。 我发现获取纪元以来天数的最简单方法是使用 joda-time 库。如果您将 hadoop-aws 依赖项添加到您的项目中,您应该已经拥有该库。如果没有,您需要自己添加:
//Get epoch value MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC); DateTime currentDate = new DateTime(); //Can take Java Date in constructor Days days = Days.daysBetween(epoch, currentDate); //We can write number of days since epoch into the record record.put("myDate", days.getDays());
我们终于可以开始编写 parquet 文件了
try { Configuration conf = new Configuration(); conf.set("fs.s3a.access.key", "ACCESSKEY"); conf.set("fs.s3a.secret.key", "SECRETKEY"); //Below are some other helpful settings //conf.set("fs.s3a.endpoint", "s3.amazonaws.com"); //conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"); //conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library. //conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet"); //Use path below to save to local file system instead //Path path = new Path("data.parquet"); try (ParquetWriter writer = AvroParquetWriter.builder(path) .withSchema(avroSchema) .withCompressionCodec(CompressionCodecName.GZIP) .withConf(conf) .withPageSize(4 * 1024 * 1024) //For compression .withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size) .build()) { //We only have one record to write in our example writer.write(record); } } catch (Exception ex) { ex.printStackTrace(System.out); }
这是加载到 Apache Spark (2.2.0) 中的数据:
为了您的方便,完整的源代码:
package com.mycompany.Whosebug;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.Days;
import org.joda.time.MutableDateTime;
public class Main {
public static void main(String[] args) {
System.out.println("Start");
String schema = "{\"namespace\": \"org.myorganization.mynamespace\"," //Not used in Parquet, can put anything
+ "\"type\": \"record\"," //Must be set as record
+ "\"name\": \"myrecordname\"," //Not used in Parquet, can put anything
+ "\"fields\": ["
+ " {\"name\": \"myInteger\", \"type\": \"int\"}," //Required field
+ " {\"name\": \"myString\", \"type\": [\"string\", \"null\"]},"
+ " {\"name\": \"myDecimal\", \"type\": [{\"type\": \"fixed\", \"size\":16, \"logicalType\": \"decimal\", \"name\": \"mydecimaltype1\", \"precision\": 32, \"scale\": 4}, \"null\"]},"
+ " {\"name\": \"myDate\", \"type\": [{\"type\": \"int\", \"logicalType\" : \"date\"}, \"null\"]}"
+ " ]}";
Schema.Parser parser = new Schema.Parser().setValidate(true);
Schema avroSchema = parser.parse(schema);
GenericData.Record record = new GenericData.Record(avroSchema);
record.put("myInteger", 1);
record.put("myString", "string value 1");
BigDecimal myDecimalValue = new BigDecimal("99.9999");
//First we need to make sure the huge decimal matches our schema scale:
myDecimalValue = myDecimalValue.setScale(4, RoundingMode.HALF_UP);
//Next we get the decimal value as one BigInteger (like there was no decimal point)
BigInteger myUnscaledDecimalValue = myDecimalValue.unscaledValue();
//Finally we serialize the integer
byte[] decimalBytes = myUnscaledDecimalValue.toByteArray();
//We need to create an Avro 'Fixed' type and pass the decimal schema once more here:
GenericData.Fixed fixed = new GenericData.Fixed(new Schema.Parser().parse("{\"type\": \"fixed\", \"size\":16, \"precision\": 32, \"scale\": 4, \"name\":\"mydecimaltype1\"}"));
byte[] myDecimalBuffer = new byte[16];
if (myDecimalBuffer.length >= decimalBytes.length) {
//Because we set our fixed byte array size as 16 bytes, we need to
//pad-left our original value's bytes with zeros
int myDecimalBufferIndex = myDecimalBuffer.length - 1;
for(int i = decimalBytes.length - 1; i >= 0; i--){
myDecimalBuffer[myDecimalBufferIndex] = decimalBytes[i];
myDecimalBufferIndex--;
}
//Save result
fixed.bytes(myDecimalBuffer);
} else {
throw new IllegalArgumentException(String.format("Decimal size: %d was greater than the allowed max: %d", decimalBytes.length, myDecimalBuffer.length));
}
//We can finally write our decimal to our record
record.put("myDecimal", fixed);
//Get epoch value
MutableDateTime epoch = new MutableDateTime(0l, DateTimeZone.UTC);
DateTime currentDate = new DateTime(); //Can take Java Date in constructor
Days days = Days.daysBetween(epoch, currentDate);
//We can write number of days since epoch into the record
record.put("myDate", days.getDays());
try {
Configuration conf = new Configuration();
conf.set("fs.s3a.access.key", "ACCESSKEY");
conf.set("fs.s3a.secret.key", "SECRETKEY");
//Below are some other helpful settings
//conf.set("fs.s3a.endpoint", "s3.amazonaws.com");
//conf.set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
//conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); // Not needed unless you reference the hadoop-hdfs library.
//conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName()); // Uncomment if you get "No FileSystem for scheme: file" errors.
Path path = new Path("s3a://your-bucket-name/examplefolder/data.parquet");
//Use path below to save to local file system instead
//Path path = new Path("data.parquet");
try (ParquetWriter<GenericData.Record> writer = AvroParquetWriter.<GenericData.Record>builder(path)
.withSchema(avroSchema)
.withCompressionCodec(CompressionCodecName.GZIP)
.withConf(conf)
.withPageSize(4 * 1024 * 1024) //For compression
.withRowGroupSize(16 * 1024 * 1024) //For write buffering (Page size)
.build()) {
//We only have one record to write in our example
writer.write(record);
}
} catch (Exception ex) {
ex.printStackTrace(System.out);
}
}
}