gpt4 book ai didi

scala - 将 Cassandra 查询数据组合/更新到从 Kafka 接收的结构化流

转载 作者:行者123 更新时间:2023-12-02 19:48:40 24 4
gpt4 key购买 nike

我正在创建一个 Spark 结构化流应用程序,它将每 10 秒计算一次从 Kafka 接收的数据。

为了能够进行一些计算,我需要查找有关 Cassandra 数据库中的传感器和放置情况的一些信息

我有点困惑如何保持 Cassandra 数据在整个集群中可用,并以某种方式不时更新数据,以防我们对数据库表进行了一些更改。

目前,我使用 Datastax Spark-Cassandra-connector 在本地启动 Spark 后立即查询数据库

val cassandraSensorDf = spark
.read
.cassandraFormat("specifications", "sensors")
.load

从这里开始,我可以通过将这个 cassandraSensorDs 与我的结构化流数据集连接来使用它。

.join(
cassandraSensorDs ,
sensorStateDf("plantKey") <=> cassandraSensorDf ("cassandraPlantKey")
)

在运行结构化流时,如何执行其他查询来更新此 Cassandra 数据?如何使查询的数据在集群设置中可用?

最佳答案

使用广播变量,您可以编写一个包装器来定期从 Cassandra 获取数据并更新广播变量。使用广播变量在流上执行映射端连接。我还没有测试过这种方法,我认为这可能是一种矫枉过正,具体取决于您的用例(吞吐量)。

How can I update a broadcast variable in spark streaming?

另一种方法是查询 Cassandra 以获取流中的每个项目,为了优化连接,您应该确保使用连接池并为 JVM/分区仅创建一个连接。这种方法更简单,您不必担心定期加热 Cassandra 数据。

spark-streaming and connection pool implementation

关于scala - 将 Cassandra 查询数据组合/更新到从 Kafka 接收的结构化流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49863343/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com