gpt4 book ai didi

java - 是否有从 Spark Streaming 到 google big query 的连接器?

转载 作者:太空宇宙 更新时间:2023-11-04 10:30:38 25 4
gpt4 key购买 nike

我正在寻找可以将数据从 Spark Streaming 流式传输到 Google Big Query 的开源连接器,有吗?

据我所知,有one from Spotify ,但它并未得到积极维护,并且仅允许发送 Avro 格式的记录。

最佳答案

我也需要它,但我找不到任何东西,所以我直接在我的依赖项中添加了google-cloud-bigquery,然后:

implicit class RichDStreamMyClass(dstream: DStream[MyClass]) {
/** Writes the [[DStream]] with [[MyClass]]s to BigQuery.
* All the records are inserted at once per RDD (= per partition per window).
*/
def saveToBigQuery(tableRef: Table) =
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partition =>
val rowsToInsert = partition.map(toRowToInsert).toSeq.asJava
if (!rowsToInsert.isEmpty) {
val insertResponse = tableRef.insert(rowsToInsert)
if (insertResponse.hasErrors)
logger.error(s"${insertResponse.getInsertErrors.values()}")
}
}
}
}

/** Creates [[RowToInsert]] for BigQuery by mapping the field of a
* [[MyClass]]. */
def toRowToInsert(myClass: MyClass): RowToInsert = {
val fields = Map(
"timestamp" -> myClass.timestamp,
"name" -> myClass.name
).asJava
RowToInsert.of(s"${myClass.key}", fields)
}

请注意 insert 方法一次不能插入超过 10k 个元素,所以我也有这个:

val conf = new SparkConf()
.set("spark.streaming.kafka.maxRatePerPartition",
(10000 / config.spark.window).toString)

tableRefcom.google.cloud.bigquery.Table 的实例。

关于java - 是否有从 Spark Streaming 到 google big query 的连接器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50044631/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com