火花结构化流:没有正确写入
spark structured streaming: not writing correctly
我正在将 JSON 的抄表记录从 kafka_2.11-0.10.0.1 流式传输到 Spark 2.1。我切换到结构化流媒体;尽管 kafka 消费者确认传入数据,但我的控制台和 writeStream 没有移动。我正在使用
进行测试
pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
我的代码:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession \
.builder \
.appName("interval") \
.master("local[4]") \
.getOrCreate()
schema = StructType().add("customer_id", StringType())
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xx.xxx.xx.xxx:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "earliest") \
.load() \
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
query = df.writeStream \
.option("checkpointLocation", "/user/XX/checkpoint5") \
.format("parquet") \
.start("/user/XX/interval5")
它使用 388 字节的镶木地板文件创建检查点和数据目录。然而,从未写入流式数据。
$ hdfs dfs -ls interval5
drwxr-xr-x ... interval5/_spark_metadata
-rw-r--r-- ... interval5/part-00000-0b2eb00a-c361-4dfe-a24e-9589d150a911.snappy.parquet
-rw-r--r-- ... interval5/part-00000-e0cb12d1-9c29-4eb0-92a8-688f468a42ce.snappy.parquet
kafka-consumer 确认正在发送数据:
{"customer_id":"customer_736"}
{"customer_id":"customer_995"}
{"customer_id":"customer_1899"}
{"customer_id":"customer_35"}
kafka-consumer 显示流式数据。
我想我错过了出列和保存流式处理行的重要步骤 - 一天的拖网 Whosebug 没有帮助。
(编辑以删除对控制台的引用;因为它不相关)。
使用 .option("startingOffsets", "latest")
,您应该只期望在您开始流式查询后发布的消息。
因此,预期的操作过程是启动流式查询,然后发布消息。
Nothing is written into the parquet files.
自从您使用 .format("console")
以来,您将看到 没有任何内容 保存到 parquet 文件中。您必须将其更改为 parquet
并重新启动查询。
相同的结构化流 .py 代码在 spark-submit 中工作,但它从不使用 pspark 处理任何数据;没有错误消息、控制台输出或镶木地板数据(目录创建和元数据除外)。
去图吧。
我正在将 JSON 的抄表记录从 kafka_2.11-0.10.0.1 流式传输到 Spark 2.1。我切换到结构化流媒体;尽管 kafka 消费者确认传入数据,但我的控制台和 writeStream 没有移动。我正在使用
进行测试pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
我的代码:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession \
.builder \
.appName("interval") \
.master("local[4]") \
.getOrCreate()
schema = StructType().add("customer_id", StringType())
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xx.xxx.xx.xxx:9092") \
.option("subscribe", "test") \
.option("startingOffsets", "earliest") \
.load() \
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
query = df.writeStream \
.option("checkpointLocation", "/user/XX/checkpoint5") \
.format("parquet") \
.start("/user/XX/interval5")
它使用 388 字节的镶木地板文件创建检查点和数据目录。然而,从未写入流式数据。
$ hdfs dfs -ls interval5
drwxr-xr-x ... interval5/_spark_metadata
-rw-r--r-- ... interval5/part-00000-0b2eb00a-c361-4dfe-a24e-9589d150a911.snappy.parquet
-rw-r--r-- ... interval5/part-00000-e0cb12d1-9c29-4eb0-92a8-688f468a42ce.snappy.parquet
kafka-consumer 确认正在发送数据:
{"customer_id":"customer_736"}
{"customer_id":"customer_995"}
{"customer_id":"customer_1899"}
{"customer_id":"customer_35"}
kafka-consumer 显示流式数据。
我想我错过了出列和保存流式处理行的重要步骤 - 一天的拖网 Whosebug 没有帮助。 (编辑以删除对控制台的引用;因为它不相关)。
使用 .option("startingOffsets", "latest")
,您应该只期望在您开始流式查询后发布的消息。
因此,预期的操作过程是启动流式查询,然后发布消息。
Nothing is written into the parquet files.
自从您使用 .format("console")
以来,您将看到 没有任何内容 保存到 parquet 文件中。您必须将其更改为 parquet
并重新启动查询。
相同的结构化流 .py 代码在 spark-submit 中工作,但它从不使用 pspark 处理任何数据;没有错误消息、控制台输出或镶木地板数据(目录创建和元数据除外)。 去图吧。