使用 spark streaming 从 kafka 读取流并为其分配模式
Reading a stream from kafka using spark streaming and assigning a schema to it
我正在尝试从 kafka 读取一个流,其中的值是一串以逗号分隔的值(代表数据集中的列)
objective 是读取两个这样的流并加入它们。
如果我从一个文件中读取,有一种方法可以通过为输入流分配一个分隔符和一个模式来实现。这是我能做的:
val stearm_L: DataFrame = spark.readStream
.option("sep", ";")
.schema(schema_L)
.csv("inputFileSteam_L")
如果我从 kafka 而不是文件读取,我该如何做同样的事情?
而不是 csv("filename")
,您基本上将其替换为 format("kafka")
。
Spark Streaming 部分下有一个关于 Kafka 集成的页面以获取更多详细信息。
关于解析 CSV,请参阅 spark streaming: read CSV string from kafka, write to parquet
我正在尝试从 kafka 读取一个流,其中的值是一串以逗号分隔的值(代表数据集中的列) objective 是读取两个这样的流并加入它们。
如果我从一个文件中读取,有一种方法可以通过为输入流分配一个分隔符和一个模式来实现。这是我能做的:
val stearm_L: DataFrame = spark.readStream
.option("sep", ";")
.schema(schema_L)
.csv("inputFileSteam_L")
如果我从 kafka 而不是文件读取,我该如何做同样的事情?
而不是 csv("filename")
,您基本上将其替换为 format("kafka")
。
Spark Streaming 部分下有一个关于 Kafka 集成的页面以获取更多详细信息。
关于解析 CSV,请参阅 spark streaming: read CSV string from kafka, write to parquet