将 Spark 流式传输 PySpark 数据帧写入 Cassandra 会覆盖 table 而不是附加
Writing Spark streaming PySpark dataframe to Cassandra overwrites table instead of appending
我是 运行 一个由 Kafka、Spark 和 Cassandra 组成的单节点集群。所有本地都在同一台机器上。
通过一个简单的 Python 脚本,我每 5 秒将一些虚拟数据流式传输到 Kafka 主题中。然后使用 Spark 结构化流,我将此数据流(一次一行)读入具有 startingOffset
= latest
的 PySpark DataFrame。最后,我试图将这一行附加到已经存在的 Cassandra table.
我一直在关注 (How to write streaming Dataset to Cassandra?) and (Cassandra Sink for PySpark Structured Streaming from Kafka topic)。
一行数据已成功写入 Cassandra table,但我的问题是每次 覆盖 而不是追加 到 table 的结尾。我可能做错了什么?
这是我的代码:
用于在 Cassandra 中创建 kafkaspark
键空间后跟 randintstream
table 的 CQL DDL:
DESCRIBE keyspaces;
CREATE KEYSPACE kafkaspark
WITH REPLICATION = {
'class' : 'SimpleStrategy',
'replication_factor' : 1
};
USE kafkaspark;
CREATE TABLE randIntStream (
key int,
value int,
topic text,
partition int,
offset bigint,
timestamp timestamp,
timestampType int,
PRIMARY KEY (partition, topic)
);
启动 PySpark shell
./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 --conf spark.cassandra.connection.host=127.0.0.1,spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
将来自 Kafka 主题的最新消息读入流式 DataFrame:
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("startingOffsets","latest").option("subscribe","topic1").load()
一些转换和检查模式:
df2 = df.withColumn("key", df["key"].cast("string")).withColumn("value", df["value"].cast("string"))
df3 = df2.withColumn("key", df2["key"].cast("integer")).withColumn("value", df2["value"].cast("integer"))
df4 = df3.withColumnRenamed("timestampType","timestamptype")
df4.printSchema()
写入 Cassandra 的函数:
def writeToCassandra(writeDF, epochId):
writeDF.write \
.format("org.apache.spark.sql.cassandra") \
.options(table="randintstream", keyspace="kafkaspark") \
.mode("append") \
.save()
最后,查询从 Spark 写入 Cassandra:
query = df4.writeStream \
.trigger(processingTime="5 seconds") \
.outputMode("update") \
.foreachBatch(writeToCassandra) \
.start()
SELECT *
在 Cassandra 中 table:
如果该行总是在 Cassandra 中重写,那么您在 table 中的主键可能不正确 - 您需要确保每一行都有一个唯一的主键。如果您从 Spark 创建 Cassandra table,那么默认情况下它只将第一列作为分区键,并且它本身可能不是唯一的。
提供架构后更新:
是的,这就是我所指的情况 - 您有一个 (partition, topic)
的主键,但是您从该主题读取的特定分区的每一行都将具有相同的主键值,因此它将覆盖以前的版本。您需要使您的主键唯一 - 例如,将 offset
或 timestamp
列添加到主键(尽管如果您在同一毫秒内生成数据,timestamp
可能不是唯一的).
P.S。此外,在连接器 3.0.0 中,您不需要 foreachBatch
:
df4.writeStream \
.trigger(processingTime="5 seconds") \
.format("org.apache.spark.sql.cassandra") \
.options(table="randintstream", keyspace="kafkaspark") \
.mode("update") \
.start()
P.P.S 如果你只想将数据从 Kafka 移动到 Cassandra,你可以考虑使用 DataStax's Kafka Connector 与 Spark 相比,它可能更轻量级。
我是 运行 一个由 Kafka、Spark 和 Cassandra 组成的单节点集群。所有本地都在同一台机器上。
通过一个简单的 Python 脚本,我每 5 秒将一些虚拟数据流式传输到 Kafka 主题中。然后使用 Spark 结构化流,我将此数据流(一次一行)读入具有 startingOffset
= latest
的 PySpark DataFrame。最后,我试图将这一行附加到已经存在的 Cassandra table.
我一直在关注 (How to write streaming Dataset to Cassandra?) and (Cassandra Sink for PySpark Structured Streaming from Kafka topic)。
一行数据已成功写入 Cassandra table,但我的问题是每次 覆盖 而不是追加 到 table 的结尾。我可能做错了什么?
这是我的代码:
用于在 Cassandra 中创建 kafkaspark
键空间后跟 randintstream
table 的 CQL DDL:
DESCRIBE keyspaces;
CREATE KEYSPACE kafkaspark
WITH REPLICATION = {
'class' : 'SimpleStrategy',
'replication_factor' : 1
};
USE kafkaspark;
CREATE TABLE randIntStream (
key int,
value int,
topic text,
partition int,
offset bigint,
timestamp timestamp,
timestampType int,
PRIMARY KEY (partition, topic)
);
启动 PySpark shell
./bin/pyspark --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.datastax.spark:spark-cassandra-connector_2.12:3.0.0 --conf spark.cassandra.connection.host=127.0.0.1,spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
将来自 Kafka 主题的最新消息读入流式 DataFrame:
df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("startingOffsets","latest").option("subscribe","topic1").load()
一些转换和检查模式:
df2 = df.withColumn("key", df["key"].cast("string")).withColumn("value", df["value"].cast("string"))
df3 = df2.withColumn("key", df2["key"].cast("integer")).withColumn("value", df2["value"].cast("integer"))
df4 = df3.withColumnRenamed("timestampType","timestamptype")
df4.printSchema()
写入 Cassandra 的函数:
def writeToCassandra(writeDF, epochId):
writeDF.write \
.format("org.apache.spark.sql.cassandra") \
.options(table="randintstream", keyspace="kafkaspark") \
.mode("append") \
.save()
最后,查询从 Spark 写入 Cassandra:
query = df4.writeStream \
.trigger(processingTime="5 seconds") \
.outputMode("update") \
.foreachBatch(writeToCassandra) \
.start()
SELECT *
在 Cassandra 中 table:
如果该行总是在 Cassandra 中重写,那么您在 table 中的主键可能不正确 - 您需要确保每一行都有一个唯一的主键。如果您从 Spark 创建 Cassandra table,那么默认情况下它只将第一列作为分区键,并且它本身可能不是唯一的。
提供架构后更新:
是的,这就是我所指的情况 - 您有一个 (partition, topic)
的主键,但是您从该主题读取的特定分区的每一行都将具有相同的主键值,因此它将覆盖以前的版本。您需要使您的主键唯一 - 例如,将 offset
或 timestamp
列添加到主键(尽管如果您在同一毫秒内生成数据,timestamp
可能不是唯一的).
P.S。此外,在连接器 3.0.0 中,您不需要 foreachBatch
:
df4.writeStream \
.trigger(processingTime="5 seconds") \
.format("org.apache.spark.sql.cassandra") \
.options(table="randintstream", keyspace="kafkaspark") \
.mode("update") \
.start()
P.P.S 如果你只想将数据从 Kafka 移动到 Cassandra,你可以考虑使用 DataStax's Kafka Connector 与 Spark 相比,它可能更轻量级。