gpt4 book ai didi

mongodb - 如何在结构化流中处理 JSON 文档(来自 MongoDB)并写入 HBase?

转载 作者:行者123 更新时间:2023-12-03 15:59:30 26 4
gpt4 key购买 nike

我正在获取 mongoDB 文档,处理后我想使用 Bson.Document 库将其存储到 Hbase

将流式传输方法从 Spark kafkastreaming 更改为结构化流式传输所以早期使用 kafkaUtils 的方法是生成 Dstream[Document]

在结构化流中,我正在获取数据集[文档]

scala> val stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers","brokerList").option("subscribe", s"topic_name").load().selectExpr("CAST(value AS STRING)")
stream: org.apache.spark.sql.DataFrame = [value: string]

scala> val strming_doc = stream.map(record => record.getAs[String]("value")
scala> org.apache.spark.sql.Dataset[String] = [value: string]

为了进一步处理,我需要从数据集中获取文档

scala> val stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers","brokerList").option("subscribe", s"topic_name").load().selectExpr("CAST(value AS STRING)")
stream: org.apache.spark.sql.DataFrame = [value: string]

scala> val strming_doc = stream.map(record => record.getAs[String]("value")
scala> org.apache.spark.sql.Dataset[String] = [value: string]

我需要从数据集中获取文档,基本上是从 mongoDB 获取数据

最佳答案

看来您需要 foreachforeachBatch 运算符将流式查询的结果写入 HBase。请咨询Using Foreach and ForeachBatch在官方文档中。

关于mongodb - 如何在结构化流中处理 JSON 文档(来自 MongoDB)并写入 HBase?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58773097/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com