Avro/Parquet如何将实时数据写入HDFS?
how to write real-time data to HDFS with Avro/Parquet?
我在单元测试中进行了以下工作,将 Avro/Parquet 中的单个对象写入我的 Cloudera/HDFS 集群中的文件。
也就是说,考虑到Parquet是一种列式格式,它似乎只能以批处理方式写出整个文件(不支持更新)。
那么,为实时(通过 ActiveMQ/Camel)摄取的数据(1k 的小消息 msg/sec 等)写入文件的最佳做法是什么?
我想我可以聚合我的消息(内存中的缓冲区或其他临时存储)并使用动态文件名以批处理模式将它们写出,但我觉得我在 partitioning/file 命名中遗漏了一些东西手工等...
Configuration conf = new Configuration(false);
conf.set("fs.defaultFS", "hdfs://cloudera-test:8020/cm/user/hive/warehouse");
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
AvroReadSupport.setAvroDataSupplier(conf, ReflectDataSupplier.class);
Path path = new Path("/cm/user/hive/warehouse/test1.data");
MyObject object = new MyObject("test");
Schema schema = ReflectData.get().getSchema(object.getClass());
ParquetWriter<InboundWirelessMessageForHDFS> parquetWriter = AvroParquetWriter.<MyObject>builder(path)
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
.withDataModel(ReflectData.get())
.withDictionaryEncoding(false)
.withConf(conf)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE) //required because the filename doesn't change for this test
.build();
parquetWriter.write(object);
parquetWriter.close();
基于我的(有限的)研究...我假设文件不能附加到(按设计)...所以我必须先批处理实时数据(在内存中或其他地方)用镶木地板写出文件...
How to append data to an existing parquet file
我在单元测试中进行了以下工作,将 Avro/Parquet 中的单个对象写入我的 Cloudera/HDFS 集群中的文件。
也就是说,考虑到Parquet是一种列式格式,它似乎只能以批处理方式写出整个文件(不支持更新)。
那么,为实时(通过 ActiveMQ/Camel)摄取的数据(1k 的小消息 msg/sec 等)写入文件的最佳做法是什么?
我想我可以聚合我的消息(内存中的缓冲区或其他临时存储)并使用动态文件名以批处理模式将它们写出,但我觉得我在 partitioning/file 命名中遗漏了一些东西手工等...
Configuration conf = new Configuration(false);
conf.set("fs.defaultFS", "hdfs://cloudera-test:8020/cm/user/hive/warehouse");
conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, false);
AvroReadSupport.setAvroDataSupplier(conf, ReflectDataSupplier.class);
Path path = new Path("/cm/user/hive/warehouse/test1.data");
MyObject object = new MyObject("test");
Schema schema = ReflectData.get().getSchema(object.getClass());
ParquetWriter<InboundWirelessMessageForHDFS> parquetWriter = AvroParquetWriter.<MyObject>builder(path)
.withSchema(schema)
.withCompressionCodec(CompressionCodecName.UNCOMPRESSED)
.withDataModel(ReflectData.get())
.withDictionaryEncoding(false)
.withConf(conf)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE) //required because the filename doesn't change for this test
.build();
parquetWriter.write(object);
parquetWriter.close();
基于我的(有限的)研究...我假设文件不能附加到(按设计)...所以我必须先批处理实时数据(在内存中或其他地方)用镶木地板写出文件...
How to append data to an existing parquet file