如何使用 Spark 创建从 Kafka 到 Hdfs 的数据湖 - 存储在自定义目录中?
How to create a data lake from Kafka to Hdfs with Spark - storing in custom directories?
我将 RDD 转换为以下结构的数据帧:
+-------------+--------------------+
| key| value|
+-------------+--------------------+
|1556110998000|{"eventId":"55108...|
|1556110998000|{"eventId":"558ac...|
|1556110998000|{"eventId":"553c0...|
|1556111001600|{"eventId":"56886...|
|1556111001600|{"eventId":"569ad...|
|1556111001600|{"eventId":"56b34...|
|1556110998000|{"eventId":"55d1b...|
...
键是四舍五入到一小时的时间戳,值是 json 字符串。
我想要的是根据时间戳将值存储到不同的桶中。所以基本上我想要的结构如下:
...
/datalake/2019/03/31/03
/datalake/2019/03/31/04
/datalake/2019/03/31/05
...
/datalake/2019/04/25/08
/datalake/2019/04/25/09
...
仅使用 eventsRdd.saveAsTextFile("/datalake");
存储实际的 rdd 并不能解决问题,因为所有事件最终都在一个文件中。此外,此文件在下一个 "round".
中被覆盖
那我该怎么做呢?我读了一些关于分区的文章,但它们并没有真正帮助。我实际上正在考虑切换到 Kafka Connect 而根本不使用 Spark。
下面是我尝试存储事件的一些代码(目前仅在本地 fs 上)
private void saveToDatalake(JavaRDD<E> eventsRdd) {
JavaPairRDD<Long, String> longEJavaPairRdd = eventsRdd
.mapToPair(event -> new Tuple2<>(calculateRoundedDownTimestampFromSeconds(event.getTimestamp()), serialize(event)));
SparkSession sparkSession = SparkSession.builder().appName("Build a DataFrame from Scratch").master("local[*]").getOrCreate();
StructType dataFrameSchema = DataTypes
.createStructType(new StructField[]
{DataTypes.createStructField("key", DataTypes.LongType, false),
DataTypes.createStructField("value", DataTypes.StringType, false),
});
JavaRDD<Row> rowRdd = longEJavaPairRdd.map(pair -> RowFactory.create(pair._1, pair._2));
Dataset<Row> dataFrame = sparkSession.sqlContext().createDataFrame(rowRdd, dataFrameSchema);
Dataset<Row> buckets = dataFrame.select("key").dropDuplicates();
//buckets.show();
buckets.foreach(bucket -> {
Dataset<Row> valuesPerBucket = dataFrame.where(dataFrame.col("key").equalTo(bucket)).select("value");
//valuesPerBucket.show();
long timestamp = bucket.getLong(0);
valuesPerBucket.rdd().saveAsTextFile("/data/datalake/" + calculateSubpathFromTimestamp(timestamp));
});
}
private String calculateSubpathFromTimestamp(long timestamp) {
String FORMAT = "yyyy/MM/dd/HH";
ZoneId zone = ZoneId.systemDefault();
DateTimeFormatter df = DateTimeFormatter.ofPattern(FORMAT).withZone(zone);
String time = df.format(Instant.ofEpochMilli(timestamp));
System.out.println("Formatted Date " + time);
return time;
}
我们通过使用 Kafka Connect HDFS 连接器并提供自定义序列化程序 class 将来自 Kafka 的 Protobuf 消息转换为 JSON.
来完成它
我将 RDD 转换为以下结构的数据帧:
+-------------+--------------------+
| key| value|
+-------------+--------------------+
|1556110998000|{"eventId":"55108...|
|1556110998000|{"eventId":"558ac...|
|1556110998000|{"eventId":"553c0...|
|1556111001600|{"eventId":"56886...|
|1556111001600|{"eventId":"569ad...|
|1556111001600|{"eventId":"56b34...|
|1556110998000|{"eventId":"55d1b...|
...
键是四舍五入到一小时的时间戳,值是 json 字符串。
我想要的是根据时间戳将值存储到不同的桶中。所以基本上我想要的结构如下:
...
/datalake/2019/03/31/03
/datalake/2019/03/31/04
/datalake/2019/03/31/05
...
/datalake/2019/04/25/08
/datalake/2019/04/25/09
...
仅使用 eventsRdd.saveAsTextFile("/datalake");
存储实际的 rdd 并不能解决问题,因为所有事件最终都在一个文件中。此外,此文件在下一个 "round".
那我该怎么做呢?我读了一些关于分区的文章,但它们并没有真正帮助。我实际上正在考虑切换到 Kafka Connect 而根本不使用 Spark。
下面是我尝试存储事件的一些代码(目前仅在本地 fs 上)
private void saveToDatalake(JavaRDD<E> eventsRdd) {
JavaPairRDD<Long, String> longEJavaPairRdd = eventsRdd
.mapToPair(event -> new Tuple2<>(calculateRoundedDownTimestampFromSeconds(event.getTimestamp()), serialize(event)));
SparkSession sparkSession = SparkSession.builder().appName("Build a DataFrame from Scratch").master("local[*]").getOrCreate();
StructType dataFrameSchema = DataTypes
.createStructType(new StructField[]
{DataTypes.createStructField("key", DataTypes.LongType, false),
DataTypes.createStructField("value", DataTypes.StringType, false),
});
JavaRDD<Row> rowRdd = longEJavaPairRdd.map(pair -> RowFactory.create(pair._1, pair._2));
Dataset<Row> dataFrame = sparkSession.sqlContext().createDataFrame(rowRdd, dataFrameSchema);
Dataset<Row> buckets = dataFrame.select("key").dropDuplicates();
//buckets.show();
buckets.foreach(bucket -> {
Dataset<Row> valuesPerBucket = dataFrame.where(dataFrame.col("key").equalTo(bucket)).select("value");
//valuesPerBucket.show();
long timestamp = bucket.getLong(0);
valuesPerBucket.rdd().saveAsTextFile("/data/datalake/" + calculateSubpathFromTimestamp(timestamp));
});
}
private String calculateSubpathFromTimestamp(long timestamp) {
String FORMAT = "yyyy/MM/dd/HH";
ZoneId zone = ZoneId.systemDefault();
DateTimeFormatter df = DateTimeFormatter.ofPattern(FORMAT).withZone(zone);
String time = df.format(Instant.ofEpochMilli(timestamp));
System.out.println("Formatted Date " + time);
return time;
}
我们通过使用 Kafka Connect HDFS 连接器并提供自定义序列化程序 class 将来自 Kafka 的 Protobuf 消息转换为 JSON.
来完成它