gpt4 book ai didi

apache-spark - 通过 Spark SQL 查询 Cassandra UDT

转载 作者:行者123 更新时间:2023-12-04 17:42:46 24 4
gpt4 key购买 nike

我们想通过 SparkSQL 从 Cassandra DB 查询数据。问题是数据作为 UDT 存储在 cassandra 中。UDT 的结构是深度嵌套的,包含可变长度的数组,因此很难将数据分解为扁平结构。我找不到任何工作示例如何通过 SparkSQL 查询此类 UDT - 特别是根据 UDT 值过滤结果。

或者,您能否建议更适合我们用例的不同 ETL 管道(查询引擎、存储引擎...)?

我们的 ETL 管道:

Kafka(重复事件)-> Spark streaming -> Cassandra(去重以仅存储最新事件) <- Spark SQL <- 分析平台(UI)

目前我们尝试过的解决方案:

1) Kafka -> Spark -> Parquet <- Apache Drill

一切正常,我们可以查询和过滤数组和嵌套数据结构。

问题:无法删除重复数据(用最新事件重写 parquet 文件)

2) Kafka -> Spark -> Cassandra <- Presto

通过重复数据删除解决了问题 1)。

问题:Presto 不支持 UDT 类型(presto docpresto issue)

我们的主要要求是:

  • 支持重复数据删除。我们可能会收到许多具有相同 ID 的事件,我们只需要存储最新的一个。
  • 用数组存储深度嵌套的数据结构
  • 分布式存储,可扩展以备将来扩展
  • 具有类似 SQL 查询支持的分布式查询引擎(用于与 Zeppelin、Tableau、Qlik 等的连接)。查询不必实时运行,几分钟的延迟是可以接受的。
  • 支持模式演变(AVRO 风格)

谢谢你的建议

最佳答案

您可以只使用点语法对嵌套元素执行查询。例如,如果我有以下 CQL 定义:

cqlsh> use test;
cqlsh:test> create type t1 (id int, t text);
cqlsh:test> create type t2 (id int, t1 frozen<t1>);
cqlsh:test> create table nudt (id int primary key, t2 frozen<t2>);
cqlsh:test> insert into nudt (id, t2) values (1, {id: 1, t1: {id: 1, t: 't1'}});
cqlsh:test> insert into nudt (id, t2) values (2, {id: 2, t1: {id: 2, t: 't2'}});
cqlsh:test> SELECT * from nudt;

id | t2
----+-------------------------------
1 | {id: 1, t1: {id: 1, t: 't1'}}
2 | {id: 2, t1: {id: 2, t: 't2'}}

(2 rows)

然后我可以按如下方式加载该数据:

scala> val data = spark.read.format("org.apache.spark.sql.cassandra").
options(Map( "table" -> "nudt", "keyspace" -> "test")).load()
data: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> data.cache
res0: data.type = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> data.show
+---+----------+
| id| t2|
+---+----------+
| 1|[1,[1,t1]]|
| 2|[2,[2,t2]]|
+---+----------+

然后查询数据以仅选择UDT中字段的特定值:

scala> val res = spark.sql("select * from test.nudt where t2.t1.t = 't1'")
res: org.apache.spark.sql.DataFrame = [id: int, t2: struct<id: int, t1: struct<id: int, t: string>>]

scala> res.show
+---+----------+
| id| t2|
+---+----------+
| 1|[1,[1,t1]]|
+---+----------+

您可以使用 spark.sql 或相应的 .filter 函数 - 取决于您的编程风格。此技术适用于来自不同来源(如 JSON 等)的任何结构类型数据。

但请注意,您不会像按分区键/集群列查询时那样从 Cassandra 连接器获得优化

关于apache-spark - 通过 Spark SQL 查询 Cassandra UDT,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53744677/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com