通过sparksql查询cassandra udt

qoefvg9y  于 2021-06-14  发布在  Cassandra
关注(0)|答案(1)|浏览(590)

我们想通过sparksql从cassandradb查询数据。问题是数据作为udt存储在cassandra中。udt的结构嵌套很深,并且包含可变长度的数组,因此很难将数据分解为平面结构。我找不到任何工作示例如何通过sparksql查询这样的udt-尤其是根据udt值过滤结果。
或者,您可以建议不同的etl管道(查询引擎、存储引擎等),哪种更适合我们的用例?
我们的etl管道:
kafka(重复事件)->spark streaming->cassandra(重复数据消除仅存储最新事件)<-spark sql<-analytics platform(ui)
我们迄今为止尝试的解决方案:
1) Kafka->Spark->Parquet
一切正常,我们可以查询和过滤数组和嵌套数据结构。
问题:无法消除重复数据(用最新事件重写Parquet文件)
2) Kafka->Spark->Cassandra<-presto
通过重复数据消除解决了问题1)。
问题:presto不支持udt类型(presto doc,presto issue)
我们的主要要求是:
支持重复数据消除。我们可能会收到许多具有相同id的事件,并且只需要存储最新的事件。
用数组存储深度嵌套的数据结构
分布式存储,可扩展以供将来扩展
分布式查询引擎,支持类似sql的查询(用于连接zeppelin、tableau、qlik等)。查询不必实时运行,几分钟的延迟是可以接受的。
支持模式演化(avro风格)
谢谢你的建议

hyrbngr7

hyrbngr71#

只需使用点语法就可以对嵌套元素执行查询。例如,如果我有以下cql定义:

cqlsh> use test;
cqlsh:test> create type t1 (id int, t text);
cqlsh:test> create type t2 (id int, t1 frozen<t1>);
cqlsh:test> create table nudt (id int primary key, t2 frozen<t2>);
cqlsh:test> insert into nudt (id, t2) values (1, {id: 1, t1: {id: 1, t: 't1'}});
cqlsh:test> insert into nudt (id, t2) values (2, {id: 2, t1: {id: 2, t: 't2'}});
cqlsh:test> SELECT * from nudt;

 id | t2
----+-------------------------------
  1 | {id: 1, t1: {id: 1, t: 't1'}}
  2 | {id: 2, t1: {id: 2, t: 't2'}}

(2 rows)

然后我可以按如下方式加载数据:

scala> val data = spark.read.format("org.apache.spark.sql.cassandra").
     options(Map( "table" -> "nudt", "keyspace" -> "test")).load()
data: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> data.cache
res0: data.type = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> data.show
+---+----------+
| id|        t2|
+---+----------+
|  1|[1,[1,t1]]|
|  2|[2,[2,t2]]|
+---+----------+

然后查询数据,只选择udt中字段的特定值:

scala> val res = spark.sql("select * from test.nudt where t2.t1.t = 't1'")
res: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> res.show
+---+----------+
| id|        t2|
+---+----------+
|  1|[1,[1,t1]]|
+---+----------+

你可以用任何一个 spark.sql ,或相应的 .filter 函数-取决于您的编程风格。这种技术适用于任何结构类型的数据,这些数据来自不同的源,如json等。
但要考虑到,您不会像按分区键/群集列查询时那样从cassandra连接器获得优化

相关问题