从 1 个 kafka 主题中获取 2 个不同的数据到 2 个数据帧中
Get 2 different data from 1 kafka topic into 2 dataframes
我有这样的作业:
- 使用 python 读取 2 个文件夹 song_data 和 log_data.
中的 json 个文件
- 使用 Python Kafka 将 song_data 和 log_data 文件类型的混合发布到 Kafka 主题中。
- 使用 Pyspark 从上述 Kafka 主题中消费数据。
- 使用流处理来使用来自 song_data 的消息并创建 2 个数据帧、歌曲和艺术家。并从 log_data 生成数据帧作为用户、时间。
- 从维度表的数据帧创建歌曲。
我在从 1 个主题读取不同的文件时遇到问题,2 个文件夹包含 json 文件,但 1 个是歌曲数据,1 个是日志。如何仅从 1 个主题中获取自己的数据?
不清楚为什么不能只使用两个主题,每个文件一个。特别是如果它们没有匹配的模式,这对 SparkSQL 很重要。
How can I get their own data from just 1 topics ?
从第 2 步开始。
以如下格式将数据写入您的单个主题(content
仅用于示例目的)
{"type": "song", "content": "..."}
或
{"type": "log", "content": "..."}
然后,在SparkSQL中,你可以这样做
df = spark.readStream.format("kafka")... # TODO: apply a schema to the data to get a "type" column
song_data = df.where(df("type") == "song").select("content")
log_data = df.where(df("type") == "log").select("content")
您也可以在 Python-Kafka 中进行相同的过滤,而不需要数据帧或 Spark 环境。
我有这样的作业:
- 使用 python 读取 2 个文件夹 song_data 和 log_data. 中的 json 个文件
- 使用 Python Kafka 将 song_data 和 log_data 文件类型的混合发布到 Kafka 主题中。
- 使用 Pyspark 从上述 Kafka 主题中消费数据。
- 使用流处理来使用来自 song_data 的消息并创建 2 个数据帧、歌曲和艺术家。并从 log_data 生成数据帧作为用户、时间。
- 从维度表的数据帧创建歌曲。
我在从 1 个主题读取不同的文件时遇到问题,2 个文件夹包含 json 文件,但 1 个是歌曲数据,1 个是日志。如何仅从 1 个主题中获取自己的数据?
不清楚为什么不能只使用两个主题,每个文件一个。特别是如果它们没有匹配的模式,这对 SparkSQL 很重要。
How can I get their own data from just 1 topics ?
从第 2 步开始。
以如下格式将数据写入您的单个主题(content
仅用于示例目的)
{"type": "song", "content": "..."}
或
{"type": "log", "content": "..."}
然后,在SparkSQL中,你可以这样做
df = spark.readStream.format("kafka")... # TODO: apply a schema to the data to get a "type" column
song_data = df.where(df("type") == "song").select("content")
log_data = df.where(df("type") == "log").select("content")
您也可以在 Python-Kafka 中进行相同的过滤,而不需要数据帧或 Spark 环境。