从 Kafka 读取并写入 parquet 中的 hdfs
Read from Kafka and write to hdfs in parquet
我是 BigData 生态系统的新手,有点入门。
我已经阅读了几篇关于使用 spark 流读取 kafka 主题的文章,但想知道是否可以使用 spark 作业而不是流从 kafka 读取?
如果是的话,你们能帮我指出一些可以帮助我入门的文章或代码片段吗?
我的问题的第二部分是以 parquet 格式写入 hdfs。
一旦我从 Kafka 读到,我想我会有一个 rdd。
将此 rdd 转换为数据帧,然后将数据帧写入镶木地板文件。
这是正确的做法吗?
感谢任何帮助。
谢谢
要从 Kafka 读取数据并将其以 Parquet 格式写入 HDFS,使用 Spark Batch 作业而不是流,您可以使用 Spark Structured Streaming.
Structured Streaming 是一种基于 Spark SQL 引擎构建的可扩展且容错的流处理引擎。您可以像表达对静态数据的批处理计算一样表达流式计算。 Spark SQL 引擎将处理 运行 它以增量和连续的方式处理,并随着流数据不断到达而更新最终结果。您可以使用 Scala 中的 Dataset/DataFrame API、Java、Python 或 R 来表达流聚合、事件时间 windows、流到批连接等。计算是在同一个优化的 Spark SQL 引擎上执行的。最后,系统通过检查点和预写日志确保端到端的恰好一次容错保证。简而言之,Structured Streaming 提供了快速、可扩展、容错、端到端的 exactly-once 流处理,而无需用户对流进行推理。
Kafka 作为内置源附带,即我们可以从 Kafka 轮询数据。它与 Kafka 代理版本 0.10.0 或更高版本兼容。
为了以批处理模式从 Kafka 中提取数据,您可以为定义的偏移量范围创建一个 Dataset/DataFrame。
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
源中的每一行都具有以下架构:
| Column | Type |
|:-----------------|--------------:|
| key | binary |
| value | binary |
| topic | string |
| partition | int |
| offset | long |
| timestamp | long |
| timestampType | int |
现在,要将数据以parquet格式写入HDFS,可以编写如下代码:
df.write.parquet("hdfs://data.parquet")
关于Spark Structured Streaming + Kafka的更多信息,请参考以下指南 - Kafka Integration Guide
希望对您有所帮助!
使用卡夫卡流。 Spark Streaming 是一个用词不当(它的引擎盖下的小批量,至少高达 2.2)。
https://eng.verizondigitalmedia.com/2017/04/28/Kafka-to-Hdfs-ParquetSerializer/
关于这个话题,你已经有了几个很好的答案。
只是想强调一下 - 小心直接流入镶木地板 table。
当 parquet 行组大小足够大时(为简单起见,您可以说文件大小应为 64-256Mb),以利用字典压缩、bloom 过滤器等(一个 parquet 文件可以有多个),Parquet 的性能表现出色其中的行块,并且通常每个文件中确实有多个行块;尽管行块不能跨越多个镶木地板文件)
如果您直接流式传输到 parquet table,那么您很可能会得到一堆小的 parquet 文件(取决于 mini-batch Spark Streaming 的大小和音量数据的)。查询此类文件可能会非常慢。例如,Parquet 可能需要读取所有文件的 headers 来协调模式,这是一个很大的开销。如果是这种情况,您将需要一个单独的进程,例如,作为解决方法,读取旧文件并将它们写入 "merged"(这不是简单的 file-level 合并,一个进程实际上需要读入所有镶木地板数据并溢出更大的镶木地板文件)。
此解决方法可能会破坏数据的原始用途 "streaming"。你也可以在这里看看其他技术——比如 Apache Kudu、Apache Kafka、Apache Druid、Kinesis 等,它们可以在这里更好地工作。
更新:由于我发布了这个答案,现在这里有一个新的强者 - Delta Lake。 https://delta.io/ 如果你用惯了parquet,你会发现Delta非常有吸引力(实际上,Delta是建立在parquet层+元数据之上的)。三角洲湖提供:
Spark 上的 ACID 事务:
- 可序列化的隔离级别确保读者永远不会看到不一致的数据。
- 可扩展的元数据处理:利用 Spark 的分布式处理能力轻松处理 petabyte-scale table 秒的所有元数据,数十亿个文件。
- 流批统一:Delta Lake中的一个table是一个批处理table,也是一个流源和流槽。流式数据摄取、批量历史回填、交互式查询都开箱即用。
- 架构实施:自动处理架构变化以防止在摄取期间插入错误记录。
- 时间旅行:数据版本控制支持回滚、完整的历史审计跟踪和可重现的机器学习实验。
- 更新和删除:支持合并、更新和删除操作,以启用复杂的用例,如 change-data-capture、slowly-changing-dimension (SCD) 操作、流更新插入等。
我是 BigData 生态系统的新手,有点入门。
我已经阅读了几篇关于使用 spark 流读取 kafka 主题的文章,但想知道是否可以使用 spark 作业而不是流从 kafka 读取? 如果是的话,你们能帮我指出一些可以帮助我入门的文章或代码片段吗?
我的问题的第二部分是以 parquet 格式写入 hdfs。 一旦我从 Kafka 读到,我想我会有一个 rdd。 将此 rdd 转换为数据帧,然后将数据帧写入镶木地板文件。 这是正确的做法吗?
感谢任何帮助。
谢谢
要从 Kafka 读取数据并将其以 Parquet 格式写入 HDFS,使用 Spark Batch 作业而不是流,您可以使用 Spark Structured Streaming.
Structured Streaming 是一种基于 Spark SQL 引擎构建的可扩展且容错的流处理引擎。您可以像表达对静态数据的批处理计算一样表达流式计算。 Spark SQL 引擎将处理 运行 它以增量和连续的方式处理,并随着流数据不断到达而更新最终结果。您可以使用 Scala 中的 Dataset/DataFrame API、Java、Python 或 R 来表达流聚合、事件时间 windows、流到批连接等。计算是在同一个优化的 Spark SQL 引擎上执行的。最后,系统通过检查点和预写日志确保端到端的恰好一次容错保证。简而言之,Structured Streaming 提供了快速、可扩展、容错、端到端的 exactly-once 流处理,而无需用户对流进行推理。
Kafka 作为内置源附带,即我们可以从 Kafka 轮询数据。它与 Kafka 代理版本 0.10.0 或更高版本兼容。
为了以批处理模式从 Kafka 中提取数据,您可以为定义的偏移量范围创建一个 Dataset/DataFrame。
// Subscribe to 1 topic defaults to the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, specifying explicit Kafka offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
源中的每一行都具有以下架构:
| Column | Type |
|:-----------------|--------------:|
| key | binary |
| value | binary |
| topic | string |
| partition | int |
| offset | long |
| timestamp | long |
| timestampType | int |
现在,要将数据以parquet格式写入HDFS,可以编写如下代码:
df.write.parquet("hdfs://data.parquet")
关于Spark Structured Streaming + Kafka的更多信息,请参考以下指南 - Kafka Integration Guide
希望对您有所帮助!
使用卡夫卡流。 Spark Streaming 是一个用词不当(它的引擎盖下的小批量,至少高达 2.2)。
https://eng.verizondigitalmedia.com/2017/04/28/Kafka-to-Hdfs-ParquetSerializer/
关于这个话题,你已经有了几个很好的答案。
只是想强调一下 - 小心直接流入镶木地板 table。 当 parquet 行组大小足够大时(为简单起见,您可以说文件大小应为 64-256Mb),以利用字典压缩、bloom 过滤器等(一个 parquet 文件可以有多个),Parquet 的性能表现出色其中的行块,并且通常每个文件中确实有多个行块;尽管行块不能跨越多个镶木地板文件)
如果您直接流式传输到 parquet table,那么您很可能会得到一堆小的 parquet 文件(取决于 mini-batch Spark Streaming 的大小和音量数据的)。查询此类文件可能会非常慢。例如,Parquet 可能需要读取所有文件的 headers 来协调模式,这是一个很大的开销。如果是这种情况,您将需要一个单独的进程,例如,作为解决方法,读取旧文件并将它们写入 "merged"(这不是简单的 file-level 合并,一个进程实际上需要读入所有镶木地板数据并溢出更大的镶木地板文件)。
此解决方法可能会破坏数据的原始用途 "streaming"。你也可以在这里看看其他技术——比如 Apache Kudu、Apache Kafka、Apache Druid、Kinesis 等,它们可以在这里更好地工作。
更新:由于我发布了这个答案,现在这里有一个新的强者 - Delta Lake。 https://delta.io/ 如果你用惯了parquet,你会发现Delta非常有吸引力(实际上,Delta是建立在parquet层+元数据之上的)。三角洲湖提供:
Spark 上的 ACID 事务:
- 可序列化的隔离级别确保读者永远不会看到不一致的数据。
- 可扩展的元数据处理:利用 Spark 的分布式处理能力轻松处理 petabyte-scale table 秒的所有元数据,数十亿个文件。
- 流批统一:Delta Lake中的一个table是一个批处理table,也是一个流源和流槽。流式数据摄取、批量历史回填、交互式查询都开箱即用。
- 架构实施:自动处理架构变化以防止在摄取期间插入错误记录。
- 时间旅行:数据版本控制支持回滚、完整的历史审计跟踪和可重现的机器学习实验。
- 更新和删除:支持合并、更新和删除操作,以启用复杂的用例,如 change-data-capture、slowly-changing-dimension (SCD) 操作、流更新插入等。