Spark:每天从 Cassandra 读取并保存到镶木地板,如何只读取新行?
Spark: daily read from Cassandra and save to parquets, how to read only new rows?
我正在尝试使用 Spark 构建 ETL 过程。我的目标是阅读
Cassandra table 并保存到 parquet 文件中。
到目前为止我设法做的是使用 Cassandra 连接器(在 pyspark 中)从 Cassandra 读取整个 table:
df = app.sqlSparkContext.read.format("org.apache.spark.sql.cassandra")\
.option("table", my_table)\
.option("keyspace",my_keyspace)\
.load()
问题是我的数据增长很快,我想每天重复 ETL 过程,从 Cassandra 读取新添加的行并将它们保存到新的镶木地板文件中。
如果我的 Cassandra table 中没有排序,我将无法根据时间读取,有什么办法可以从 Spark 端做到这一点吗?
只有当您有 time-based 第一个聚类列时,基于时间的有效过滤才是真正可能的,如下所示:
create table test.test (
pk1 <type>,
pk2 <type>,
cl1 timestamp,
cl2 ...,
primary key ((pk1, pk2), cl1, cl2));
在这种情况下,条件为 cl1
,如下所示:
import org.apache.spark.sql.cassandra._
val data = { spark.read.cassandraFormat("test", "test").load()}
val filtered = data.filter("cl1 >= cast('2019-03-10T14:41:34.373+0000' as timestamp)")
将被有效地推送到 Cassandra 中,过滤将在服务器端进行,只检索必要的数据——这很容易用解释来检查——它应该生成这样的东西(推送过滤器表示为 *
):
// *Filter ((cl1#23 >= 1552228894373000))
// +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [pk1#21,pk2#22L,cl1#23,...]
PushedFilters: [*GreaterThanOrEqual(cl1,2019-03-10 14:41:34.373)],
ReadSchema: struct<pk1:int,pk2:int,cl1:timestamp,...
在所有其他情况下,过滤将发生在 Spark 端,从 Cassandra 检索所有数据。
我正在尝试使用 Spark 构建 ETL 过程。我的目标是阅读 Cassandra table 并保存到 parquet 文件中。
到目前为止我设法做的是使用 Cassandra 连接器(在 pyspark 中)从 Cassandra 读取整个 table:
df = app.sqlSparkContext.read.format("org.apache.spark.sql.cassandra")\
.option("table", my_table)\
.option("keyspace",my_keyspace)\
.load()
问题是我的数据增长很快,我想每天重复 ETL 过程,从 Cassandra 读取新添加的行并将它们保存到新的镶木地板文件中。
如果我的 Cassandra table 中没有排序,我将无法根据时间读取,有什么办法可以从 Spark 端做到这一点吗?
只有当您有 time-based 第一个聚类列时,基于时间的有效过滤才是真正可能的,如下所示:
create table test.test (
pk1 <type>,
pk2 <type>,
cl1 timestamp,
cl2 ...,
primary key ((pk1, pk2), cl1, cl2));
在这种情况下,条件为 cl1
,如下所示:
import org.apache.spark.sql.cassandra._
val data = { spark.read.cassandraFormat("test", "test").load()}
val filtered = data.filter("cl1 >= cast('2019-03-10T14:41:34.373+0000' as timestamp)")
将被有效地推送到 Cassandra 中,过滤将在服务器端进行,只检索必要的数据——这很容易用解释来检查——它应该生成这样的东西(推送过滤器表示为 *
):
// *Filter ((cl1#23 >= 1552228894373000))
// +- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [pk1#21,pk2#22L,cl1#23,...]
PushedFilters: [*GreaterThanOrEqual(cl1,2019-03-10 14:41:34.373)],
ReadSchema: struct<pk1:int,pk2:int,cl1:timestamp,...
在所有其他情况下,过滤将发生在 Spark 端,从 Cassandra 检索所有数据。