使用 Spark Streaming 原子写入 Cassandra
Atomic Writes to Cassandra with Spark Streaming
我对 Cassandra (2.1.11) 和 Spark (1.4.1) 都很陌生,我想知道是否有人 seen/developed 有原子写入两个不同 Cassandra 的解决方案 tables 使用 Spark Streaming。
我目前有两个 table 保存相同的数据集,但具有不同的分区键。为了简单起见,我将使用熟悉的 User table 示例来解释:
CREATE TABLE schema1.user_by_user_id
(
user_id uuid
,email_address text
,num int //a value that is frequently updated
,PRIMARY KEY (user_id)
);
CREATE TABLE schema1.user_by_email_address
(
email_address text
,user_id uuid
,num int //a value that is frequently updated
,PRIMARY KEY (email_address)
);
email_address
列将具有高基数(实际上它将在 user_id
值的数量的 50% 到 100% 之间)。高基数使得二级索引表现不佳,因此需要第二个table.
我正在使用 Spark Streaming 来处理 num
列中的更改并更新这两个 table。据我了解,saveToCassandra()
方法在 UNLOGGED BATCH 中为 RDD 中的每个项目执行写入,从而执行原子写入(如 "Save a Collection of Objects" 部分 here 中所述)。但是,saveToCassandra()
只能用于保存到单个 table。为了保持 schema1.user_by_user_id
和 schema1.user_by_email_address
table 同步,我必须发出两个单独的 saveToCassandra()
调用:
rdd.saveToCassandra("schema1","user_by_user_id",SomeColumns("user_id","email"address","num"))
rdd.saveToCassandra("schema1","user_by_email_address",SomeColumns("user_id","email"address","num"))
每个调用中发生的写入是以原子方式完成的,但两个调用一起不是原子的。第二次调用中的某些错误会使两个 table 不同步。
显然我的数据集和实际 table 结构比这更复杂,但我已尝试以尽可能简单的方式传达我的问题的要点。虽然我的问题是为了能够保存到两个 table,但我欢迎任何关于数据模型更改的替代建议,这将完全消除这种需要。
首先要了解的是:UNLOGGED 批次 不是 原子。参见 documentation。 UNLOGGED 批处理唯一能给你的是使用相同时间戳进行多次写入的能力。
因此,如果您想对 saveToCassandra
进行多次调用,并让它们的行为如同一次调用,只需对两次调用都进行 specify the WRITETIME。当一切都完成后,所有修改的数据将具有相同的时间戳。
至于你如何更新多个 tables 原子的问题......你不能。 Cassandra 不支持它。
我能想到的最好的建议是创建您自己的批处理日志,您可以在崩溃后查阅它以确定需要重新同步的内容。
想象一下这样的事情:
CREATE TABLE batch_log
(
id uuid,
updated_users set<uuid>,
PRIMARY KEY(id)
)
开始作业时,生成一个新的 uuid 作为该作业的 ID。然后,您将进行 3 次保存:
rdd.saveToCassandra("schema1", "batch_log", SomeColumns("batch_id", "user_id" append)
rdd.saveToCassandra("schema1","user_by_user_id",SomeColumns("user_id","email"address","num"))
rdd.saveToCassandra("schema1","user_by_email_address",SomeColumns("user_id","email"address","num"))
如果您的批处理完成时没有任何崩溃,您可以删除创建的 batch_log
行。
但是,如果系统在中途崩溃,那么一旦一切恢复正常,您可以查阅 batch_log
以获取已更新的用户列表。去查询这些用户的电子邮件地址,然后更新 user_by_email_address
table。完成此修复后,您可以删除 batch_log
.
实际上,您正在实施 "by hand" 一个 Cassandra LOGGED BATCH。
我对 Cassandra (2.1.11) 和 Spark (1.4.1) 都很陌生,我想知道是否有人 seen/developed 有原子写入两个不同 Cassandra 的解决方案 tables 使用 Spark Streaming。
我目前有两个 table 保存相同的数据集,但具有不同的分区键。为了简单起见,我将使用熟悉的 User table 示例来解释:
CREATE TABLE schema1.user_by_user_id
(
user_id uuid
,email_address text
,num int //a value that is frequently updated
,PRIMARY KEY (user_id)
);
CREATE TABLE schema1.user_by_email_address
(
email_address text
,user_id uuid
,num int //a value that is frequently updated
,PRIMARY KEY (email_address)
);
email_address
列将具有高基数(实际上它将在 user_id
值的数量的 50% 到 100% 之间)。高基数使得二级索引表现不佳,因此需要第二个table.
我正在使用 Spark Streaming 来处理 num
列中的更改并更新这两个 table。据我了解,saveToCassandra()
方法在 UNLOGGED BATCH 中为 RDD 中的每个项目执行写入,从而执行原子写入(如 "Save a Collection of Objects" 部分 here 中所述)。但是,saveToCassandra()
只能用于保存到单个 table。为了保持 schema1.user_by_user_id
和 schema1.user_by_email_address
table 同步,我必须发出两个单独的 saveToCassandra()
调用:
rdd.saveToCassandra("schema1","user_by_user_id",SomeColumns("user_id","email"address","num"))
rdd.saveToCassandra("schema1","user_by_email_address",SomeColumns("user_id","email"address","num"))
每个调用中发生的写入是以原子方式完成的,但两个调用一起不是原子的。第二次调用中的某些错误会使两个 table 不同步。
显然我的数据集和实际 table 结构比这更复杂,但我已尝试以尽可能简单的方式传达我的问题的要点。虽然我的问题是为了能够保存到两个 table,但我欢迎任何关于数据模型更改的替代建议,这将完全消除这种需要。
首先要了解的是:UNLOGGED 批次 不是 原子。参见 documentation。 UNLOGGED 批处理唯一能给你的是使用相同时间戳进行多次写入的能力。
因此,如果您想对 saveToCassandra
进行多次调用,并让它们的行为如同一次调用,只需对两次调用都进行 specify the WRITETIME。当一切都完成后,所有修改的数据将具有相同的时间戳。
至于你如何更新多个 tables 原子的问题......你不能。 Cassandra 不支持它。
我能想到的最好的建议是创建您自己的批处理日志,您可以在崩溃后查阅它以确定需要重新同步的内容。
想象一下这样的事情:
CREATE TABLE batch_log
(
id uuid,
updated_users set<uuid>,
PRIMARY KEY(id)
)
开始作业时,生成一个新的 uuid 作为该作业的 ID。然后,您将进行 3 次保存:
rdd.saveToCassandra("schema1", "batch_log", SomeColumns("batch_id", "user_id" append)
rdd.saveToCassandra("schema1","user_by_user_id",SomeColumns("user_id","email"address","num"))
rdd.saveToCassandra("schema1","user_by_email_address",SomeColumns("user_id","email"address","num"))
如果您的批处理完成时没有任何崩溃,您可以删除创建的 batch_log
行。
但是,如果系统在中途崩溃,那么一旦一切恢复正常,您可以查阅 batch_log
以获取已更新的用户列表。去查询这些用户的电子邮件地址,然后更新 user_by_email_address
table。完成此修复后,您可以删除 batch_log
.
实际上,您正在实施 "by hand" 一个 Cassandra LOGGED BATCH。