gpt4 book ai didi

scala - 使用spark将数据写入cassandra

转载 作者:行者123 更新时间:2023-12-01 09:46:15 24 4
gpt4 key购买 nike

我有一个用 Scala 编写的 spark 作业,我只是想写一行以逗号分隔,从 Kafka 生产者到 Cassandra 数据库。 但我无法调用 saveToCassandra。我看到几个 wordcount 的例子,他们正在将 map 结构写入具有两列的 Cassandra 表,并且看起来工作正常。但是我有很多专栏,我发现数据结构需要并行化。这是我的代码示例:

object TestPushToCassandra extends SparkStreamingJob {
def validate(ssc: StreamingContext, config: Config): SparkJobValidation = SparkJobValid

def runJob(ssc: StreamingContext, config: Config): Any = {

val bp_conf=BpHooksUtils.getSparkConf()
val brokers=bp_conf.get("bp_kafka_brokers","unknown_default")


val input_topics = config.getString("topics.in").split(",").toSet


val output_topic = config.getString("topic.out")


val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, input_topics)


val lines = messages.map(_._2)
val words = lines.flatMap(_.split(","))

val li = words.par

li.saveToCassandra("testspark","table1", SomeColumns("col1","col2","col3"))
li.print()



words.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach{
case x:String=>{

val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")

val outMsg=x+" from spark"
val producer = new KafkaProducer[String,String](props)
val message=new ProducerRecord[String, String](output_topic,null,outMsg)
producer.send(message)
}
}


)
)


ssc.start()
ssc.awaitTermination()
}
}

我认为是 Scala 的语法不正确。提前致谢。

最佳答案

您需要将您的单词 DStream 更改为连接器可以处理的内容。

像一个元组

val words = lines
.map(_.split(","))
.map( wordArr => (wordArr(0), wordArr(1), wordArr(2))

或案例类

case class YourRow(col1: String, col2: String, col3: String)
val words = lines
.map(_.split(","))
.map( wordArr => YourRow(wordArr(0), wordArr(1), wordArr(2)))

或 CassandraRow

这是因为如果您将一个数组单独放置在那里,它可能是您尝试插入的 C* 中的一个数组,而不是 3 列。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md

关于scala - 使用spark将数据写入cassandra,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35211587/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com