gpt4 book ai didi

apache-spark - 将数据从 Spark Structured Streaming 加载到 ArrayList

转载 作者:行者123 更新时间:2023-12-05 07:20:42 26 4
gpt4 key购买 nike

我需要将数据从 Kafka 发送到 Kinesis Firehose。我正在使用 Spark Structured Streaming 处理 Kafka 数据。我不确定如何将流式查询的数据集处理成 ArrayList 变量 - 比如 recordList - 例如100 条记录(可以是任何其他值),然后调用 Firehose API 的 putRecordBatch(recordList) 将记录放入 Firehose。

最佳答案

我想你想看看Foreach and ForeachBatch取决于您的 Spark 版本。 ForeachBatch 出现在 V2.4.0 中,foreach 可用 < V2.4.0。如果 Kinesis Firehouse 没有可用的流接收器实现,那么您应该自己实现 ForeachWriter。 Databricks 有一些不错的 examples使用 foreach 创建自定义编写器。

我从未使用过 Kinesis,但这是您的自定义接收器的示例。

case class MyConfigInfo(info1: String, info2: String)

class KinesisSink(configInfo: MyConfigInfo) extends ForeachWriter[(String, String)] {
val kinesisProducer = _

def open(partitionId: Long,version: Long): Boolean = {
kinesisProducer = //set up the kinesis producer using MyConfigInfo
true
}

def process(value: (String, String)): Unit = {
//ask kinesisProducer to send data
}

def close(errorOrNull: Throwable): Unit = {
//close the kinesis producer
}
}

如果您正在使用 AWS kinesisfirehose API,您可能会这样做

case class MyConfigInfo(info1: String, info2: String)

class KinesisSink(configInfo: MyConfigInfo) extends ForeachWriter[(String, String)] {
val firehoseClient = _
val req = putRecordBatchRequest = new PutRecordBatchRequest()
val records = 0
val recordLimit = //maybe you need to set this?

def open(partitionId: Long,version: Long): Boolean = {
firehoseClient = //set up the firehose client using MyConfigInfo
true
}

def process(value: (String, String)): Unit = {
//ask fireHose client to send data or batch the request
val record: Record = //create Record out of value
req.setRecords(record)
records = records + 1
if(records >= recordLimit) {
firehoseClient.putRecordBatch(req)
records = 0
}
}

def close(errorOrNull: Throwable): Unit = {
//close the firehose client
//or instead you could put the batch request to the firehose client here but i'm not sure if that's good practice
}
}

然后你就这样使用它

val writer = new KinesisSink(configuration)
val query =
streamingSelectDF
.writeStream
.foreach(writer)
.outputMode("update")
.trigger(ProcessingTime("25 seconds"))
.start()

关于apache-spark - 将数据从 Spark Structured Streaming 加载到 ArrayList,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57417230/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com