通过 Spark 查询 Cassandra UDT SQL
Query Cassandra UDT via Spark SQL
我们想通过 SparkSQL 从 Cassandra DB 查询数据。问题是数据在 cassandra 中存储为 UDT。
UDT 的结构是深度嵌套的,包含可变长度的数组,因此很难将数据分解为扁平结构。
我找不到任何工作示例如何通过 SparkSQL 查询此类 UDT - 特别是根据 UDT 值过滤结果。
或者,您能否建议更适合我们用例的不同 ETL 管道(查询引擎、存储引擎...)?
我们的 ETL 管道:
Kafka(重复事件)-> Spark streaming -> Cassandra(重复数据删除仅存储最新事件) <- Spark SQL <- 分析平台(UI)
目前我们尝试过的解决方案:
1) Kafka -> Spark -> Parquet <- Apache Drill
一切正常,我们可以查询和过滤数组和嵌套数据结构。
问题:无法删除重复数据(用最新事件重写 parquet 文件)
2) Kafka -> Spark -> Cassandra <- Presto
通过重复数据删除解决了问题 1)。
问题:Presto 不支持 UDT 类型 (presto doc, presto issue)
我们的主要要求是:
- 支持重复数据删除。我们可能会收到许多具有相同 ID 的事件,我们只需要存储最新的一个。
- 用数组存储深度嵌套的数据结构
- 分布式存储,可扩展以备将来扩展
- 具有 SQL 类查询支持的分布式查询引擎(用于与 Zeppelin、Tableau、Qlik 等的连接)。查询不必实时运行,延迟几分钟是可以接受的。
- 支持模式演变(AVRO 风格)
感谢您的任何建议
您可以只使用点语法对嵌套元素执行查询。例如,如果我有以下 CQL 定义:
cqlsh> use test;
cqlsh:test> create type t1 (id int, t text);
cqlsh:test> create type t2 (id int, t1 frozen<t1>);
cqlsh:test> create table nudt (id int primary key, t2 frozen<t2>);
cqlsh:test> insert into nudt (id, t2) values (1, {id: 1, t1: {id: 1, t: 't1'}});
cqlsh:test> insert into nudt (id, t2) values (2, {id: 2, t1: {id: 2, t: 't2'}});
cqlsh:test> SELECT * from nudt;
id | t2
----+-------------------------------
1 | {id: 1, t1: {id: 1, t: 't1'}}
2 | {id: 2, t1: {id: 2, t: 't2'}}
(2 rows)
然后我可以按如下方式加载该数据:
scala> val data = spark.read.format("org.apache.spark.sql.cassandra").
options(Map( "table" -> "nudt", "keyspace" -> "test")).load()
data: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]
scala> data.cache
res0: data.type = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]
scala> data.show
+---+----------+
| id| t2|
+---+----------+
| 1|[1,[1,t1]]|
| 2|[2,[2,t2]]|
+---+----------+
然后查询数据到select只有UDT中字段的特定值:
scala> val res = spark.sql("select * from test.nudt where t2.t1.t = 't1'")
res: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]
scala> res.show
+---+----------+
| id| t2|
+---+----------+
| 1|[1,[1,t1]]|
+---+----------+
您可以使用 spark.sql
或相应的 .filter
函数 - 取决于您的编程风格。此技术适用于来自不同来源的任何结构类型数据,例如 JSON 等
但请注意,您不会像按分区键/集群列查询时那样从 Cassandra 连接器获得优化
我们想通过 SparkSQL 从 Cassandra DB 查询数据。问题是数据在 cassandra 中存储为 UDT。 UDT 的结构是深度嵌套的,包含可变长度的数组,因此很难将数据分解为扁平结构。 我找不到任何工作示例如何通过 SparkSQL 查询此类 UDT - 特别是根据 UDT 值过滤结果。
或者,您能否建议更适合我们用例的不同 ETL 管道(查询引擎、存储引擎...)?
我们的 ETL 管道:
Kafka(重复事件)-> Spark streaming -> Cassandra(重复数据删除仅存储最新事件) <- Spark SQL <- 分析平台(UI)
目前我们尝试过的解决方案:
1) Kafka -> Spark -> Parquet <- Apache Drill
一切正常,我们可以查询和过滤数组和嵌套数据结构。
问题:无法删除重复数据(用最新事件重写 parquet 文件)
2) Kafka -> Spark -> Cassandra <- Presto
通过重复数据删除解决了问题 1)。
问题:Presto 不支持 UDT 类型 (presto doc, presto issue)
我们的主要要求是:
- 支持重复数据删除。我们可能会收到许多具有相同 ID 的事件,我们只需要存储最新的一个。
- 用数组存储深度嵌套的数据结构
- 分布式存储,可扩展以备将来扩展
- 具有 SQL 类查询支持的分布式查询引擎(用于与 Zeppelin、Tableau、Qlik 等的连接)。查询不必实时运行,延迟几分钟是可以接受的。
- 支持模式演变(AVRO 风格)
感谢您的任何建议
您可以只使用点语法对嵌套元素执行查询。例如,如果我有以下 CQL 定义:
cqlsh> use test;
cqlsh:test> create type t1 (id int, t text);
cqlsh:test> create type t2 (id int, t1 frozen<t1>);
cqlsh:test> create table nudt (id int primary key, t2 frozen<t2>);
cqlsh:test> insert into nudt (id, t2) values (1, {id: 1, t1: {id: 1, t: 't1'}});
cqlsh:test> insert into nudt (id, t2) values (2, {id: 2, t1: {id: 2, t: 't2'}});
cqlsh:test> SELECT * from nudt;
id | t2
----+-------------------------------
1 | {id: 1, t1: {id: 1, t: 't1'}}
2 | {id: 2, t1: {id: 2, t: 't2'}}
(2 rows)
然后我可以按如下方式加载该数据:
scala> val data = spark.read.format("org.apache.spark.sql.cassandra").
options(Map( "table" -> "nudt", "keyspace" -> "test")).load()
data: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]
scala> data.cache
res0: data.type = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]
scala> data.show
+---+----------+
| id| t2|
+---+----------+
| 1|[1,[1,t1]]|
| 2|[2,[2,t2]]|
+---+----------+
然后查询数据到select只有UDT中字段的特定值:
scala> val res = spark.sql("select * from test.nudt where t2.t1.t = 't1'")
res: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]
scala> res.show
+---+----------+
| id| t2|
+---+----------+
| 1|[1,[1,t1]]|
+---+----------+
您可以使用 spark.sql
或相应的 .filter
函数 - 取决于您的编程风格。此技术适用于来自不同来源的任何结构类型数据,例如 JSON 等
但请注意,您不会像按分区键/集群列查询时那样从 Cassandra 连接器获得优化