在 table 数据上加入流数据并在流接收时更新 table,这可能吗?
Joining streaming data on table data and update the table as the stream receives , is it possible?
我正在使用 spark-sql 2.4.1、spark-cassandra-connector_2.11-2.4.1.jar 和 java8。
我有一个场景,我需要使用 C*/Cassandra table data.
加入流数据
如果 record/join 发现我需要将现有的 C* table 记录复制到另一个 table_bkp 并用最新数据更新实际的 C* table 记录。
随着流数据的到来,我需要执行此操作。
这可以使用 spark-sql steaming 来完成吗?
如果可以,该怎么做?有什么注意事项要注意吗?
对于每个批次如何新鲜获取 C* table 数据?
What is wrong I am doing here
我有两个 table,如下所示“master_table”和“backup_table”
table kspace.master_table(
statement_id int,
statement_flag text,
statement_date date,
x_val double,
y_val double,
z_val double,
PRIMARY KEY (( statement_id ), statement_date)
) WITH CLUSTERING ORDER BY ( statement_date DESC );
table kspace.backup_table(
statement_id int,
statement_flag text,
statement_date date,
x_val double,
y_val double,
z_val double,
backup_timestamp timestamp,
PRIMARY KEY ((statement_id ), statement_date, backup_timestamp )
) WITH CLUSTERING ORDER BY ( statement_date DESC, backup_timestamp DESC);
Each streaming record would have "statement_flag" which might be "I" or "U".
If record with "I" comes we directly insert into "master_table".
If record with "U" comes , need to check if there is any record for given ( statement_id ), statement_date in "master_table".
If there is record in "master_table" copy that one to "backup_table" with current timestamp i.e. backup_timestamp
Update the record in "master_table" with latest record.
为了实现上述目标,我正在做 PoC/Code 如下所示
Dataset<Row> baseDs = //streaming data from topic
Dataset<Row> i_records = baseDs.filter(col("statement_flag").equalTo("I"));
Dataset<Row> u_records = baseDs.filter(col("statement_flag").equalTo("U"));
String keyspace="kspace";
String master_table = "master_table";
String backup_table = "backup_table";
Dataset<Row> cassandraMasterTableDs = getCassandraTableData(sparkSession, keyspace , master_table);
writeDfToCassandra( baseDs.toDF(), keyspace, master_table);
u_records.createOrReplaceTempView("u_records");
cassandraMasterTableDs.createOrReplaceTempView("persisted_records");
Dataset<Row> joinUpdatedRecordsDs = sparkSession.sql(
" select p.statement_id, p.statement_flag, p.statement_date,"
+ "p.x_val,p.y_val,p.z_val "
+ " from persisted_records as p "
+ "join u_records as u "
+ "on p.statement_id = u.statement_id and p.statement_date = u.statement_date");
Dataset<Row> updated_records = joinUpdatedRecordsDs
.withColumn("backup_timestamp",current_timestamp());
updated_records.show(); //Showing correct results
writeDfToCassandra( updated_records.toDF(), keyspace, backup_table); // But here/backup_table copying the latest "master_table" records
Sample data
第一个带有“I”标志的记录
master_table
backup_table
对于带有“U”标志的第二条记录,即与前面相同,除了“y_val”列数据
master_table
backup_table
预计
但实际table数据是
问题:
Till show the dataframe(updated_records) 显示正确的数据。
但是当我将相同的数据帧(updated_records)插入 table 时,C* backup_table 数据显示与 master_table 的最新记录完全相同,但假设有 [= 的较早记录=77=].
updated_records.show(); //Showing correct results
writeDfToCassandra( updated_records.toDF(), keyspace, backup_table); // But here/backup_table copying the latest "master_table" records
那么我在上面的程序代码中做错了什么?
根据您需要检查的数据量,有多种不同性能级别的方法可以做到这一点。
例如,如果您仅按分区键查找数据,最有效的做法是在 Dstream 上使用 joinWithCassandraTable。对于每个批次,这将提取与传入分区键匹配的记录。在结构化流式传输中,这将通过正确编写的 SQL join 和 DSE 自动发生。如果未使用 DSE,它将对每个批次完全扫描 table。
如果相反,您需要每个批次的整个 table,将 DStream 批次与 CassandraRDD 结合起来将导致在每个批次上完全重新读取 RDD。如果不重写整个 table,这会更加昂贵。
如果您只是更新记录而不检查它们以前的值,只需将传入数据直接写入 C* table 就足够了。 C* 使用 upserts 和 last write win 行为,并且只会覆盖以前的值(如果它们存在的话)。
我正在使用 spark-sql 2.4.1、spark-cassandra-connector_2.11-2.4.1.jar 和 java8。 我有一个场景,我需要使用 C*/Cassandra table data.
加入流数据如果 record/join 发现我需要将现有的 C* table 记录复制到另一个 table_bkp 并用最新数据更新实际的 C* table 记录。
随着流数据的到来,我需要执行此操作。 这可以使用 spark-sql steaming 来完成吗? 如果可以,该怎么做?有什么注意事项要注意吗?
对于每个批次如何新鲜获取 C* table 数据?
What is wrong I am doing here
我有两个 table,如下所示“master_table”和“backup_table”
table kspace.master_table(
statement_id int,
statement_flag text,
statement_date date,
x_val double,
y_val double,
z_val double,
PRIMARY KEY (( statement_id ), statement_date)
) WITH CLUSTERING ORDER BY ( statement_date DESC );
table kspace.backup_table(
statement_id int,
statement_flag text,
statement_date date,
x_val double,
y_val double,
z_val double,
backup_timestamp timestamp,
PRIMARY KEY ((statement_id ), statement_date, backup_timestamp )
) WITH CLUSTERING ORDER BY ( statement_date DESC, backup_timestamp DESC);
Each streaming record would have "statement_flag" which might be "I" or "U".
If record with "I" comes we directly insert into "master_table".
If record with "U" comes , need to check if there is any record for given ( statement_id ), statement_date in "master_table".
If there is record in "master_table" copy that one to "backup_table" with current timestamp i.e. backup_timestamp
Update the record in "master_table" with latest record.
为了实现上述目标,我正在做 PoC/Code 如下所示
Dataset<Row> baseDs = //streaming data from topic
Dataset<Row> i_records = baseDs.filter(col("statement_flag").equalTo("I"));
Dataset<Row> u_records = baseDs.filter(col("statement_flag").equalTo("U"));
String keyspace="kspace";
String master_table = "master_table";
String backup_table = "backup_table";
Dataset<Row> cassandraMasterTableDs = getCassandraTableData(sparkSession, keyspace , master_table);
writeDfToCassandra( baseDs.toDF(), keyspace, master_table);
u_records.createOrReplaceTempView("u_records");
cassandraMasterTableDs.createOrReplaceTempView("persisted_records");
Dataset<Row> joinUpdatedRecordsDs = sparkSession.sql(
" select p.statement_id, p.statement_flag, p.statement_date,"
+ "p.x_val,p.y_val,p.z_val "
+ " from persisted_records as p "
+ "join u_records as u "
+ "on p.statement_id = u.statement_id and p.statement_date = u.statement_date");
Dataset<Row> updated_records = joinUpdatedRecordsDs
.withColumn("backup_timestamp",current_timestamp());
updated_records.show(); //Showing correct results
writeDfToCassandra( updated_records.toDF(), keyspace, backup_table); // But here/backup_table copying the latest "master_table" records
Sample data
第一个带有“I”标志的记录
master_table
backup_table
对于带有“U”标志的第二条记录,即与前面相同,除了“y_val”列数据
master_table
backup_table
预计
但实际table数据是
问题:
Till show the dataframe(updated_records) 显示正确的数据。 但是当我将相同的数据帧(updated_records)插入 table 时,C* backup_table 数据显示与 master_table 的最新记录完全相同,但假设有 [= 的较早记录=77=].
updated_records.show(); //Showing correct results
writeDfToCassandra( updated_records.toDF(), keyspace, backup_table); // But here/backup_table copying the latest "master_table" records
那么我在上面的程序代码中做错了什么?
根据您需要检查的数据量,有多种不同性能级别的方法可以做到这一点。
例如,如果您仅按分区键查找数据,最有效的做法是在 Dstream 上使用 joinWithCassandraTable。对于每个批次,这将提取与传入分区键匹配的记录。在结构化流式传输中,这将通过正确编写的 SQL join 和 DSE 自动发生。如果未使用 DSE,它将对每个批次完全扫描 table。
如果相反,您需要每个批次的整个 table,将 DStream 批次与 CassandraRDD 结合起来将导致在每个批次上完全重新读取 RDD。如果不重写整个 table,这会更加昂贵。
如果您只是更新记录而不检查它们以前的值,只需将传入数据直接写入 C* table 就足够了。 C* 使用 upserts 和 last write win 行为,并且只会覆盖以前的值(如果它们存在的话)。