如何将收到记录的年、月、日写入S3?
How to write stream to S3 with year, month and day of the day when records were received?
我有一个从 Kafka 主题读取一些数据的简单流:
val ds = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
val df = ds.selectExpr("cast (value as string) as json")
.select(from_json($"json", schema).as("data"))
.select("data.*")
我想根据收到的日期将此数据存储在 S3 中,例如:
s3_bucket/year/month/day/data.json
当我要写入数据时我会这样做:
df.writeStream
.format("json")
.outputMode("append")
.option("path", s3_path)
.start()
但如果我这样做,我只能指定一个路径。有没有办法根据日期动态更改 s3 路径?
使用partitionBy
子句:
import org.apache.spark.sql.functions._
df.select(
dayofmonth(current_date()) as "day",
month(current_date()) as "month",
year(current_date()) as "year",
$"*")
.writeStream
.partitionBy("year", "month", "day")
... // all other options
我有一个从 Kafka 主题读取一些数据的简单流:
val ds = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.load()
val df = ds.selectExpr("cast (value as string) as json")
.select(from_json($"json", schema).as("data"))
.select("data.*")
我想根据收到的日期将此数据存储在 S3 中,例如:
s3_bucket/year/month/day/data.json
当我要写入数据时我会这样做:
df.writeStream
.format("json")
.outputMode("append")
.option("path", s3_path)
.start()
但如果我这样做,我只能指定一个路径。有没有办法根据日期动态更改 s3 路径?
使用partitionBy
子句:
import org.apache.spark.sql.functions._
df.select(
dayofmonth(current_date()) as "day",
month(current_date()) as "month",
year(current_date()) as "year",
$"*")
.writeStream
.partitionBy("year", "month", "day")
... // all other options