gpt4 book ai didi

scala - 卡夫卡制片人卡在发送

转载 作者:行者123 更新时间:2023-12-02 19:20:25 25 4
gpt4 key购买 nike

逻辑是,从自定义源获取数据的流作业必须同时写入Kafka和HDFS。

我写了一个(非常)基本的Kafka生产者来执行此操作,但是整个流工作都卡在send方法上。

class KafkaProducer(val kafkaBootstrapServers: String, val kafkaTopic: String, val sslCertificatePath: String, val sslCertificatePassword: String) {

val kafkaProps: Properties = new Properties()
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers)
kafkaProps.put("acks", "1")
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
kafkaProps.put("ssl.truststore.location", sslCertificatePath)
kafkaProps.put("ssl.truststore.password", sslCertificatePassword)

val kafkaProducer: KafkaProducer[Long, Array[String]] = new KafkaProducer(kafkaProps)

def sendKafkaMessage(message: Message): Unit = {
message.data.foreach(list => {
val producerRecord: ProducerRecord[Long, Array[String]] = new ProducerRecord[Long, Array[String]](kafkaTopic, message.timeStamp.getTime, list.toArray)
kafkaProducer.send(producerRecord)
})
}
}

和调用生产者的代码:
receiverStream.foreachRDD(rdd => {
val messageRowRDD: RDD[Row] = rdd.mapPartitions(partition => {
val parser: Parser = new Parser
val kafkaProducer: KafkaProducer = new KafkaProducer(kafkaBootstrapServers, kafkaTopic, kafkaSslCertificatePath, kafkaSslCertificatePass)
val newPartition = partition.map(message => {
Logger.getLogger("importer").error("Writing Message to Kafka...")
kafkaProducer.sendKafkaMessage(message)
Logger.getLogger("importer").error("Finished writing Message to Kafka")
Message.data.map(singleMessage => parser.parseMessage(Message.timeStamp.getTime, singleMessage))
})
newPartition.flatten
})

val df = sqlContext.createDataFrame(messageRowRDD, Schema.messageSchema)

Logger.getLogger("importer").info("Entries-count: " + df.count())
val row = Try(df.first)

row match {
case Success(s) => Persister.writeDataframeToDisk(df, outputFolder)
case Failure(e) => Logger.getLogger("importer").warn("Resulting DataFrame is empty. Nothing can be written")
}
})

从日志中,我可以看出每个执行者都达到了“发送到kafka”的地步,但是还没有达到。所有执行者都坚持这一点,并且不会引发异常。

Message类是一个非常简单的case类,具有2个字段,一个时间戳和一个字符串数组。

最佳答案

这是由于卡夫卡的acks场。

Acks设置为1,发送速度更快。

关于scala - 卡夫卡制片人卡在发送,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47144994/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com