在 HDFS 中写入 avro 文件 - 存在
Write avro file in HDFS - exists
目前我正在学习 spark streaming 和 avro,所以我的第一个例子是,读取 Spark RDD 并构建通用记录,创建 avro 文件,这个文件我应该写在 HDFS 中。现在我可以打开 avro 文件并且我确实附加到 HDFS 文件存在?
这段代码写了一个 avro 文件,但是当我尝试添加或附加时,它失败了。为此我使用 java 8
public static void saveAvro(GenericRecord record, Schema schema) throws IOException {
DatumWriter<GenericRecord> bdPersonDatumWriter = new GenericDatumWriter<>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(bdPersonDatumWriter);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://sandbox-hdp.hortonworks.com:8020/tmp/poc/ResultHDFSTest.avro"),
conf);
Path F = new Path("hdfs://sandbox-hdp.hortonworks.com:8020/tmp/poc/ResultHDFSTest.avro");
fs.setReplication(F, (short) 1);
if (!fs.exists(F)) {
System.out.println("File not exists.. creating....");
OutputStream out = fs.create(F, (short) 1);
System.out.println("OutputStream create.");
dataFileWriter.create(schema, out);
System.out.println("dataFileWriter create.");
dataFileWriter.append(record);
System.out.println("dataFileWriter append OK {0} .");
} else {
//Here fail, not open file.. avro stored in HDFS
System.out.println("File exists....");
// I want to add information to an existing avro file.
dataFileWriter.append(record);
System.out.println("dataFileWriter append OK {1} .");
}
dataFileWriter.close();
System.out.println("dataFileWriter closed.");
}
追加存在文件 avro HDFS 的堆栈跟踪:
Exception in thread "main" org.apache.avro.AvroRuntimeException: not
open
at org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:88)
at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:311)
at com.test.avro.App.saveAvro(App.java:83)
at com.test.avro.App.main(App.java:55)
DataFileWriter appendTo 方法只接受文件 java.nio。我尝试做的是正确的还是有其他方法?
编辑 1.
我想向现有文件添加信息。
第一个代码片段显示了您为创建 avro 文件而尝试进行的实现。这是我的火花流的框架代码:
JavaStreamingContext jssc = sparkConfigurationBuilder
.buildJSC(sparkConfigurationBuilder.buildSparkConfiguration());
jssc.sparkContext().checkpointFile("c:\tmp");
Map<String, Object> kafkaParams = sparkDriverUtils.getKafkaProperties();
Collection<String> topics = Arrays.asList(sparkDriverUtils.getTopics().trim().split(","));// 1 o more topics
LOGGER.warn("Lista de Topics: " + topics.toString());
...
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
//This DSTream resulto to avro..
JavaDStream<Transactions> transactionsDS = transactions.map(f-> {
Transactions txn = jsonMapperUtil.rowToTransaction(f);
LOGGER.warn("Retornar : JavaDStream<Transactions>");
return txn;
});
现在我想将 transactionsDS 结果保存为 HDFS 中的 avro 文件。我有一个问题,JavaStreamingContext 我可以为数据集创建 SparkSession 还是应该更改订阅 kafka 代理的方式?
此致。
DataFileWriter appendTo method only accepts a File java.nio
正确。 Avro 与 HDFS 路径没有连接。
为了“附加到 HDFS 文件”,您需要将它们下载到本地,然后覆盖它们的全部内容
除此之外,您提到了 Spark Streaming,但显示的代码中没有任何部分实际使用 Spark API 调用
目前我正在学习 spark streaming 和 avro,所以我的第一个例子是,读取 Spark RDD 并构建通用记录,创建 avro 文件,这个文件我应该写在 HDFS 中。现在我可以打开 avro 文件并且我确实附加到 HDFS 文件存在?
这段代码写了一个 avro 文件,但是当我尝试添加或附加时,它失败了。为此我使用 java 8
public static void saveAvro(GenericRecord record, Schema schema) throws IOException {
DatumWriter<GenericRecord> bdPersonDatumWriter = new GenericDatumWriter<>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(bdPersonDatumWriter);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://sandbox-hdp.hortonworks.com:8020/tmp/poc/ResultHDFSTest.avro"),
conf);
Path F = new Path("hdfs://sandbox-hdp.hortonworks.com:8020/tmp/poc/ResultHDFSTest.avro");
fs.setReplication(F, (short) 1);
if (!fs.exists(F)) {
System.out.println("File not exists.. creating....");
OutputStream out = fs.create(F, (short) 1);
System.out.println("OutputStream create.");
dataFileWriter.create(schema, out);
System.out.println("dataFileWriter create.");
dataFileWriter.append(record);
System.out.println("dataFileWriter append OK {0} .");
} else {
//Here fail, not open file.. avro stored in HDFS
System.out.println("File exists....");
// I want to add information to an existing avro file.
dataFileWriter.append(record);
System.out.println("dataFileWriter append OK {1} .");
}
dataFileWriter.close();
System.out.println("dataFileWriter closed.");
}
追加存在文件 avro HDFS 的堆栈跟踪:
Exception in thread "main" org.apache.avro.AvroRuntimeException: not open at org.apache.avro.file.DataFileWriter.assertOpen(DataFileWriter.java:88) at org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:311) at com.test.avro.App.saveAvro(App.java:83) at com.test.avro.App.main(App.java:55)
DataFileWriter appendTo 方法只接受文件 java.nio。我尝试做的是正确的还是有其他方法?
编辑 1. 我想向现有文件添加信息。
第一个代码片段显示了您为创建 avro 文件而尝试进行的实现。这是我的火花流的框架代码:
JavaStreamingContext jssc = sparkConfigurationBuilder
.buildJSC(sparkConfigurationBuilder.buildSparkConfiguration());
jssc.sparkContext().checkpointFile("c:\tmp");
Map<String, Object> kafkaParams = sparkDriverUtils.getKafkaProperties();
Collection<String> topics = Arrays.asList(sparkDriverUtils.getTopics().trim().split(","));// 1 o more topics
LOGGER.warn("Lista de Topics: " + topics.toString());
...
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
//This DSTream resulto to avro..
JavaDStream<Transactions> transactionsDS = transactions.map(f-> {
Transactions txn = jsonMapperUtil.rowToTransaction(f);
LOGGER.warn("Retornar : JavaDStream<Transactions>");
return txn;
});
现在我想将 transactionsDS 结果保存为 HDFS 中的 avro 文件。我有一个问题,JavaStreamingContext 我可以为数据集创建 SparkSession 还是应该更改订阅 kafka 代理的方式?
此致。
DataFileWriter appendTo method only accepts a File java.nio
正确。 Avro 与 HDFS 路径没有连接。
为了“附加到 HDFS 文件”,您需要将它们下载到本地,然后覆盖它们的全部内容
除此之外,您提到了 Spark Streaming,但显示的代码中没有任何部分实际使用 Spark API 调用