gpt4 book ai didi

java - 线程中出现异常 "streaming-job-executor-11"java.lang.ClassFormatError

转载 作者:太空宇宙 更新时间:2023-11-04 10:42:33 25 4
gpt4 key购买 nike

我正在使用 kafka (scala) 和 Spark Streaming (scala) 将多个 CSV 中的数据插入到 Cassandra 表中,并且我创建了生产者和消费者,以下是他们各自的代码制作人:

import java.sql.Timestamp
import java.util.Properties
import java.io._
import java.io.File
import java.nio.file.{Files, Paths, Path, SimpleFileVisitor,
FileVisitResult}

import scala.io.Source

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

class produceMessages(brokers: String, topic: String) extends Actor {

// All helpers needed to send messages
def filecontent(namefile: String){
for (line <- Source.fromFile(namefile).getLines) {
println(line)
}
}

def getListOfFiles(dir: String):List[File] = {
val d = new File(dir)
if (d.exists && d.isDirectory) {
d.listFiles.filter(_.isFile).toList
} else {
List[File]()
}
}

def between(value: String, a:String, b: String):String = {
// Return a substring between the two strings.
val posA = value.indexOf(a)
val posB = value.lastIndexOf(b)
val adjustedPosA = posA + a.length()
val res = value.substring(adjustedPosA, posB)
return res
}

def getTableName(filePath: String):String = {
//return table name from filePath
val fileName = filePath.toString.split("\\\\").last
val tableName = between(fileName,"100001_","_2017")
return tableName
}
// end of helpers

object kafka {
val producer = {
val props = new Properties()
props.put("metadata.broker.list", brokers)
//props.put(" max.request.size","5242880")
props.put("serializer.class", "kafka.serializer.StringEncoder")

val config = new ProducerConfig(props)
new Producer[String, String](config)
}
}


def receive = {
case "send" => {
val listeFichiers = getListOfFiles("C:\\Users\\acer\\Desktop\\csvs")
for (i <- 0 until listeFichiers.length)yield{
val chemin = listeFichiers(i).toString
val nomTable = getTableName(chemin)
println(nomTable)
val lines = Source.fromFile(chemin).getLines.toArray
val headerLine = lines(0)
println(headerLine)
val data = lines.slice(1,lines.length)
val messages = for (j <- 0 until data.length) yield{
val str = s"${data(j).toString}"
println(str)
new KeyedMessage[String, String](topic, str)
}
//sending the messages
val numberOfLinesInTable = new KeyedMessage[String, String](topic, data.length.toString)
val table = new KeyedMessage[String, String](topic, nomTable)
val header = new KeyedMessage[String, String](topic, headerLine)
kafka.producer.send(numberOfLinesInTable)
kafka.producer.send(table)
kafka.producer.send(header)
kafka.producer.send(messages: _*)

}
}

/*case "delete" =>{
val listeFichiers = getListOfFiles("C:\\Users\\acer\\Desktop\\csvs")
for (file <- listeFichiers){
if (file.isDirectory)
Option(file.listFiles).map(_.toList).getOrElse(Nil).foreach(Files.delete(_))
file.delete
}


}*/


case _ => println("Not a valid message!")
}
}

// Produces some random words between 1 and 100.
object KafkaStreamProducer extends App {

/*
* Get runtime properties from application.conf
*/
val systemConfig = ConfigFactory.load()
val kafkaHost = systemConfig.getString("KafkaStreamProducer.kafkaHost")
println(s"kafkaHost $kafkaHost")
val kafkaTopic = systemConfig.getString("KafkaStreamProducer.kafkaTopic")
println(s"kafkaTopic $kafkaTopic")
val numRecords = systemConfig.getLong("KafkaStreamProducer.numRecords")
println(s"numRecords $numRecords")
val waitMillis = systemConfig.getLong("KafkaStreamProducer.waitMillis")
println(s"waitMillis $waitMillis")

/*
* Set up the Akka Actor
*/
val system = ActorSystem("KafkaStreamProducer")
val messageActor = system.actorOf(Props(new produceMessages(kafkaHost, kafkaTopic)), name="genMessages")

/*
* Message Loop
*/
var numRecsWritten = 0
while(numRecsWritten < numRecords) {
messageActor ! "send"

numRecsWritten += numRecsWritten

println(s"${numRecsWritten} records written.")

//messageActor ! "delete"

Thread sleep waitMillis
}

}

这是消费者:

package com.datastax.demo

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SaveMode, Row, SparkSession}
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
import org.apache.spark.streaming.kafka.KafkaUtils
import com.datastax.spark.connector._
import kafka.serializer.StringDecoder
import org.apache.spark.rdd.RDD
import java.sql.Timestamp
import java.io.File
import scala.io.Source
import scala.reflect.runtime.universe
import scala.tools.reflect.ToolBox


case class cellmodu(collecttime: Double,sbnid: Double,enodebid: Double,cellid: Double,c373515500: Double,c373515501: Double,c373515502: Double,c373515503: Double,c373515504: Double,c373515505: Double,c373515506: Double,c373515507: Double,c373515508: Double,c373515509: Double,c373515510: Double,c373515511: Double,c373515512: Double,c373515513: Double,c373515514: Double,c373515515: Double,c373515516: Double,c373515517: Double,c373515518: Double,c373515519: Double,c373515520: Double,c373515521: Double,c373515522: Double,c373515523: Double,c373515524: Double,c373515525: Double,c373515526: Double,c373515527: Double,c373515528: Double,c373515529: Double,c373515530: Double,c373515531: Double,c373515532: Double,c373515533: Double,c373515534: Double,c373515535: Double,c373515536: Double,c373515537: Double,c373515538: Double,c373515539: Double,c373515540: Double,c373515541: Double,c373515542: Double,c373515543: Double,c373515544: Double,c373515545: Double,c373515546: Double,c373515547: Double,c373515548: Double,c373515549: Double,c373515550: Double,c373515551: Double,c373515552: Double,c373515553: Double,c373515554: Double,c373515555: Double,c373515556: Double,c373515557: Double,c373515558: Double,c373515559: Double,c373515560: Double,c373515561: Double,c373515562: Double,c373515563: Double,c373515564: Double,c373515565: Double,c373515566: Double,c373515567: Double,c373515568: Double,c373515569: Double,c373515570: Double,c373515571: Double,c373515572: Double,c373515573: Double,c373515574: Double,c373515575: Double,c373515576: Double,c373515577: Double,c373515578: Double,c373515589: Double,c373515590: Double,c373515591: Double,c373515592: Double,c373515593: Double,c373515594: Double,c373515595: Double,c373515596: Double,c373515597: Double,c373515598: Double,c373515601: Double,c373515602: Double,c373515608: Double,c373515609: Double,c373515610: Double,c373515611: Double,c373515616: Double,c373515618: Double,c373515619: Double,c373515620: Double,c373515621: Double,c373515622: Double,c373515623: Double,c373515624: Double,c373515625: Double,c373515626: Double,c373515627: Double,c373515628: Double,c373515629: Double,c373515630: Double,c373515631: Double,c373515632: Double,c373515633: Double,c373515634: Double,c373515635: Double,c373515636: Double,c373515637: Double,c373515638: Double,c373515639: Double,c373515640: Double,c373515641: Double,c373515642: Double,c373515643: Double,c373515644: Double,c373515645: Double,c373515646: Double,c373515647: Double,c373515648: Double,c373515649: Double,c373515650: Double,c373515651: Double,c373515652: Double,c373515653: Double,c373515654: Double,c373515655: Double,c373515656: Double,c373515657: Double,c373515658: Double,c373515659: Double,c373515660: Double,c373515661: Double,c373515662: Double,c373515663: Double,c373515664: Double,c373515665: Double,c373515666: Double,c373515667: Double,c373515668: Double,c373515669: Double,c373515670: Double,c373515671: Double,c373515672: Double,c373515673: Double,c373515674: Double,c373515675: Double,c373515676: Double,c373515677: Double,c373515678: Double,c373515679: Double,c373515680: Double,c373515681: Double,c373515682: Double,c373515683: Double,c373515684: Double,c373515685: Double,c373515686: Double,c373515687: Double,c373515688: Double,c373515689: Double,c373515690: Double,c373515691: Double,c373515692: Double,c373515693: Double,c373515694: Double,c373515695: Double,c373515696: Double,c373515697: Double,c373515698: Double,c373515699: Double,c373515700: Double,c373515701: Double,c373515702: Double,c373515703: Double,c373515704: Double,c373515705: Double,c373515706: Double,c373515707: Double,c373515708: Double,c373515709: Double,c373515710: Double,c373515711: Double,c373515712: Double,c373515713: Double,c373515714: Double,c373515715: Double,c373515716: Double,c373515717: Double,c373515718: Double,c373515719: Double,c373515720: Double,c373515721: Double,c373515722: Double,c373515723: Double,c373515724: Double,c373515725: Double,c373515726: Double,c373515727: Double,c373515728: Double,c373515729: Double,c373515730: Double,c373515731: Double,c373515732: Double,c373515733: Double,c373515734: Double,c373515735: Double,c373515736: Double,c373515737: Double,c373515738: Double,c373515739: Double,c373515740: Double,c373515741: Double,c373515742: Double,c373515743: Double,c373515744: Double,c373515745: Double,c373515746: Double,c373515747: Double,c373515748: Double,c373515749: Double,c373515750: Double,c373515751: Double,c373515752: Double,c373515753: Double,c373515754: Double,c373515755: Double,c373515756: Double) {}

object SparkKafkaConsumerCellmodu extends App {

//START OF HELPERS
def isNumeric(str:String): Boolean = str.matches("[-+]?\\d+(\\.\\d+)?")
def printList(args: List[_]): Unit = {args.foreach(println)}
//END OF HELPERS

val appName = "SparkKafkaConsumer"

val conf = new SparkConf()
.set("spark.cores.max", "2")
//.set("spark.executor.memory", "512M")
.set("spark.cassandra.connection.host","localhost")
.setAppName(appName)
val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
val sc = SparkContext.getOrCreate(conf)

val sqlContext = SQLContext.getOrCreate(sc)
import sqlContext.implicits._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._


val ssc = new StreamingContext(sc, Milliseconds(1000))
ssc.checkpoint(appName)

val kafkaTopics = Set("test")
//val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:9092")
val kafkaParams = Map(
"bootstrap.servers" -> "localhost:9092",
"fetch.message.max.bytes" -> "5242880")

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, kafkaTopics)


kafkaStream
.foreachRDD {
(message: RDD[(String, String)]) => {
val rddToArray = message.collect().toList
val msg = rddToArray.map(_._2)
var i = 0
while (i < msg.length){
if(isNumeric(msg(i))){
println("HHHHHHHHHHHHHHHHHHHHHHHHHHHHH")
val numberLines = msg(i).toInt //get number of lines to insert in table
val nameTable = msg(i+1) //get table name
val headerTable = msg(i+2).toLowerCase //get the columns of the table
println(headerTable)

if(msg(i+1)=="CELLMODU"){

val typedCols : Array[String] = headerTable.split(",") // transform headerTable into array to define dataframe dynamically

val listtoinsert:Array[String] = new Array[String](numberLines) // an empty list that will contain the lines to insert in the adequate table
val k = i + 3 //to skip name of table and header


//fill the toinsert array with the lines
for (j <- 0 until numberLines){
listtoinsert(j) = msg(k + j)
println (listtoinsert(j))
}

//convert the array to RDD
val rddtoinsert: RDD[(String)] = sc.parallelize(listtoinsert)
//rddtoinsert.foreach(println)

//convert rdd to dataframe
val df = rddtoinsert.map {
case (v) => v.split(",")
}.map(payload1 => { // instance of dynamic class
cellmodu(payload1(0).toDouble,payload1(1).toDouble,payload1(2).toDouble,payload1(3).toDouble,payload1(4).toDouble,payload1(5).toDouble,payload1(6).toDouble,payload1(7).toDouble,payload1(8).toDouble,payload1(9).toDouble,payload1(10).toDouble,payload1(11).toDouble,payload1(12).toDouble,payload1(13).toDouble,payload1(14).toDouble,payload1(15).toDouble,payload1(16).toDouble,payload1(17).toDouble,payload1(18).toDouble,payload1(19).toDouble,payload1(20).toDouble,payload1(21).toDouble,payload1(22).toDouble,payload1(23).toDouble,payload1(24).toDouble,payload1(25).toDouble,payload1(26).toDouble,payload1(27).toDouble,payload1(28).toDouble,payload1(29).toDouble,payload1(30).toDouble,payload1(31).toDouble,payload1(32).toDouble,payload1(33).toDouble,payload1(34).toDouble,payload1(35).toDouble,payload1(36).toDouble,payload1(37).toDouble,payload1(38).toDouble,payload1(39).toDouble,payload1(40).toDouble,payload1(41).toDouble,payload1(42).toDouble,payload1(43).toDouble,payload1(44).toDouble,payload1(45).toDouble,payload1(46).toDouble,payload1(47).toDouble,payload1(48).toDouble,payload1(49).toDouble,payload1(50).toDouble,payload1(51).toDouble,payload1(52).toDouble,payload1(53).toDouble,payload1(54).toDouble,payload1(55).toDouble,payload1(56).toDouble,payload1(57).toDouble,payload1(58).toDouble,payload1(59).toDouble,payload1(60).toDouble,payload1(61).toDouble,payload1(62).toDouble,payload1(63).toDouble,payload1(64).toDouble,payload1(65).toDouble,payload1(66).toDouble,payload1(67).toDouble,payload1(68).toDouble,payload1(69).toDouble,payload1(70).toDouble,payload1(71).toDouble,payload1(72).toDouble,payload1(73).toDouble,payload1(74).toDouble,payload1(75).toDouble,payload1(76).toDouble,payload1(77).toDouble,payload1(78).toDouble,payload1(79).toDouble,payload1(80).toDouble,payload1(81).toDouble,payload1(82).toDouble,payload1(83).toDouble,payload1(84).toDouble,payload1(85).toDouble,payload1(86).toDouble,payload1(87).toDouble,payload1(88).toDouble,payload1(89).toDouble,payload1(90).toDouble,payload1(91).toDouble,payload1(92).toDouble,payload1(93).toDouble,payload1(94).toDouble,payload1(95).toDouble,payload1(96).toDouble,payload1(97).toDouble,payload1(98).toDouble,payload1(99).toDouble,payload1(100).toDouble,payload1(101).toDouble,payload1(102).toDouble,payload1(103).toDouble,payload1(104).toDouble,payload1(105).toDouble,payload1(106).toDouble,payload1(107).toDouble,payload1(108).toDouble,payload1(109).toDouble,payload1(110).toDouble,payload1(111).toDouble,payload1(112).toDouble,payload1(113).toDouble,payload1(114).toDouble,payload1(115).toDouble,payload1(116).toDouble,payload1(117).toDouble,payload1(118).toDouble,payload1(119).toDouble,payload1(120).toDouble,payload1(121).toDouble,payload1(122).toDouble,payload1(123).toDouble,payload1(124).toDouble,payload1(125).toDouble,payload1(126).toDouble,payload1(127).toDouble,payload1(128).toDouble,payload1(129).toDouble,payload1(130).toDouble,payload1(131).toDouble,payload1(132).toDouble,payload1(133).toDouble,payload1(134).toDouble,payload1(135).toDouble,payload1(136).toDouble,payload1(137).toDouble,payload1(138).toDouble,payload1(139).toDouble,payload1(140).toDouble,payload1(141).toDouble,payload1(142).toDouble,payload1(143).toDouble,payload1(144).toDouble,payload1(145).toDouble,payload1(146).toDouble,payload1(147).toDouble,payload1(148).toDouble,payload1(149).toDouble,payload1(150).toDouble,payload1(151).toDouble,payload1(152).toDouble,payload1(153).toDouble,payload1(154).toDouble,payload1(155).toDouble,payload1(156).toDouble,payload1(157).toDouble,payload1(158).toDouble,payload1(159).toDouble,payload1(160).toDouble,payload1(161).toDouble,payload1(162).toDouble,payload1(163).toDouble,payload1(164).toDouble,payload1(165).toDouble,payload1(166).toDouble,payload1(167).toDouble,payload1(168).toDouble,payload1(169).toDouble,payload1(170).toDouble,payload1(171).toDouble,payload1(172).toDouble,payload1(173).toDouble,payload1(174).toDouble,payload1(175).toDouble,payload1(176).toDouble,payload1(177).toDouble,payload1(178).toDouble,payload1(179).toDouble,payload1(180).toDouble,payload1(181).toDouble,payload1(182).toDouble,payload1(183).toDouble,payload1(184).toDouble,payload1(185).toDouble,payload1(186).toDouble,payload1(187).toDouble,payload1(188).toDouble,payload1(189).toDouble,payload1(190).toDouble,payload1(191).toDouble,payload1(192).toDouble,payload1(193).toDouble,payload1(194).toDouble,payload1(195).toDouble,payload1(196).toDouble,payload1(197).toDouble,payload1(198).toDouble,payload1(199).toDouble,payload1(200).toDouble,payload1(201).toDouble,payload1(202).toDouble,payload1(203).toDouble,payload1(204).toDouble,payload1(205).toDouble,payload1(206).toDouble,payload1(207).toDouble,payload1(208).toDouble,payload1(209).toDouble,payload1(210).toDouble,payload1(211).toDouble,payload1(212).toDouble,payload1(213).toDouble,payload1(214).toDouble,payload1(215).toDouble,payload1(216).toDouble,payload1(217).toDouble,payload1(218).toDouble,payload1(219).toDouble,payload1(220).toDouble,payload1(221).toDouble,payload1(222).toDouble,payload1(223).toDouble,payload1(224).toDouble,payload1(225).toDouble,payload1(226).toDouble,payload1(227).toDouble,payload1(228).toDouble,payload1(229).toDouble,payload1(230).toDouble,payload1(231).toDouble,payload1(232).toDouble,payload1(233).toDouble,payload1(234).toDouble,payload1(235).toDouble,payload1(236).toDouble,payload1(237).toDouble,payload1(238).toDouble)
}).toDF(typedCols: _*)


//insert dataframe in cassandra table
df
.write
.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.options(Map("keyspace" -> "ztedb4g", "table" -> nameTable.toLowerCase)) // tolowercase because the name table comes in uppercase
.save()

df.show(1)
println(s"${df.count()} rows processed.")
}

}
}
}
}

ssc.start()
ssc.awaitTermination()
}

生产者工作良好,并按照我想要的方式发布消息,但是当我执行消费者插入名为“Cellmodu”的表时,出现以下错误:

Exception in thread "streaming-job-executor-11" java.lang.ClassFormatError: com/datastax/demo/cellmodu
at com.datastax.demo.SparkKafkaConsumerCellmodu$$anonfun$1.apply(SparkKafkaConsumerCellmodu.scala:90)
at com.datastax.demo.SparkKafkaConsumerCellmodu$$anonfun$1.apply(SparkKafkaConsumerCellmodu.scala:57)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

对于不同的流作业,我不断地收到此错误,然后我的表中没有插入任何内容,请注意,我尝试对具有不同案例类的其他表执行完全相同的代码,当然这些代码与我的表架构匹配,并且工作得很好,我不明白为什么我只对像这样的几个表出现此错误

最佳答案

根据您的异常,这显然是 java.lang.ClassFormatError 错误。首先与其他在插入表中工作正常的类进行比较。如果您使用Cellmodu的xml配置信息,请检查其中是否有问题。

关于java - 线程中出现异常 "streaming-job-executor-11"java.lang.ClassFormatError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48806181/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com