通过 Spark 查询 Cassandra UDT SQL

Query Cassandra UDT via Spark SQL

我们想通过 SparkSQL 从 Cassandra DB 查询数据。问题是数据在 cassandra 中存储为 UDT。 UDT 的结构是深度嵌套的,包含可变长度的数组,因此很难将数据分解为扁平结构。 我找不到任何工作示例如何通过 SparkSQL 查询此类 UDT - 特别是根据 UDT 值过滤结果。

或者,您能否建议更适合我们用例的不同 ETL 管道(查询引擎、存储引擎...)?

我们的 ETL 管道:

Kafka(重复事件)-> Spark streaming -> Cassandra(重复数据删除仅存储最新事件) <- Spark SQL <- 分析平台(UI)

目前我们尝试过的解决方案:

1) Kafka -> Spark -> Parquet <- Apache Drill

一切正常,我们可以查询和过滤数组和嵌套数据结构。

问题:无法删除重复数据(用最新事件重写 parquet 文件)

2) Kafka -> Spark -> Cassandra <- Presto

通过重复数据删除解决了问题 1)。

问题:Presto 不支持 UDT 类型 (presto doc, presto issue)

我们的主要要求是:

感谢您的任何建议

您可以只使用点语法对嵌套元素执行查询。例如,如果我有以下 CQL 定义:

cqlsh> use test;
cqlsh:test> create type t1 (id int, t text);
cqlsh:test> create type t2 (id int, t1 frozen<t1>);
cqlsh:test> create table nudt (id int primary key, t2 frozen<t2>);
cqlsh:test> insert into nudt (id, t2) values (1, {id: 1, t1: {id: 1, t: 't1'}});
cqlsh:test> insert into nudt (id, t2) values (2, {id: 2, t1: {id: 2, t: 't2'}});
cqlsh:test> SELECT * from nudt;

 id | t2
----+-------------------------------
  1 | {id: 1, t1: {id: 1, t: 't1'}}
  2 | {id: 2, t1: {id: 2, t: 't2'}}

(2 rows)

然后我可以按如下方式加载该数据:

scala> val data = spark.read.format("org.apache.spark.sql.cassandra").
     options(Map( "table" -> "nudt", "keyspace" -> "test")).load()
data: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> data.cache
res0: data.type = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> data.show
+---+----------+
| id|        t2|
+---+----------+
|  1|[1,[1,t1]]|
|  2|[2,[2,t2]]|
+---+----------+

然后查询数据到select只有UDT中字段的特定值:

scala> val res = spark.sql("select * from test.nudt where t2.t1.t = 't1'")
res: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> res.show
+---+----------+
| id|        t2|
+---+----------+
|  1|[1,[1,t1]]|
+---+----------+

您可以使用 spark.sql 或相应的 .filter 函数 - 取决于您的编程风格。此技术适用于来自不同来源的任何结构类型数据,例如 JSON 等

但请注意,您不会像按分区键/集群列查询时那样从 Cassandra 连接器获得优化