gpt4 book ai didi

hadoop - Spark 流 "ERROR JobScheduler: error in job generator"

转载 作者:可可西里 更新时间:2023-11-01 14:52:27 34 4
gpt4 key购买 nike

我构建了一个 spark Streaming 应用程序来持续接收来自 Kafka 的消息,然后将它们写入表 HBase。

此应用在前 25 分钟内运行良好。当我在 Kafka-console-producer 中输入 1;name1, 2;name2 这样的 KV 对时,它们可以保存在 Hbase 表中:

ROW       COLUMN+CELL

1 column=cf1:column-Name, timestamp=1471905340560, value=name1

2 column=cf1:column-Name, timestamp=1471905348165, value=name2

但是大约 25 分钟后,我的应用停止并出现错误 ERROR JobSchedular: ERROR in job generator。此错误的详细信息如下所示:

16/08/29 18:01:10 ERROR JobScheduler: Error in job generator
java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:221)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:166)
at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:223)
at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:272)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:166)
at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:223)
at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:272)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
16/08/29 18:01:10 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
16/08/29 18:01:10 INFO JobGenerator: Stopping JobGenerator immediately

它在前 25 分钟内运行良好,但之后由于某种我不知道的原因,作业生成器似乎突然无法正确实例化。

我的代码如下所示:

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
import org.apache.hadoop.io.{LongWritable, Writable, IntWritable, Text}
import org.apache.hadoop.mapreduce.Job

object ReceiveKafkaAsDstream {
case class SampleKafkaRecord(id: String, name: String)
object SampleKafkaRecord extends Serializable {
def parseToSampleRecord(line: String): SampleKafkaRecord = {
val values = line.split(";")
SampleKafkaRecord(values(0), values(1))
}

def SampleToHbasePut(CSVData: SampleKafkaRecord): (ImmutableBytesWritable, Put) = {
val rowKey = CSVData.id
val putOnce = new Put(rowKey.getBytes)

putOnce.addColumn("cf1".getBytes, "column-Name".getBytes, CSVData.name.getBytes)
return (new ImmutableBytesWritable(rowKey.getBytes), putOnce)
}
}


def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ReceiveKafkaAsDstream")
val ssc = new StreamingContext(sparkConf, Seconds(1))

val topics = "test"
val brokers = "10.0.2.15:6667"

val topicSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
"zookeeper.connection.timeout.ms" -> "1000")

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)

val tableName = "KafkaTable"
val conf = HBaseConfiguration.create()
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
conf.set("zookeeper.znode.parent", "/hbase-unsecure")
conf.set("hbase.zookeeper.property.clientPort", "2181")

val job = Job.getInstance(conf)
job.setOutputKeyClass(classOf[Text])
job.setOutputValueClass(classOf[Text])
job.setOutputFormatClass(classOf[TableOutputFormat[Text]])

val records = messages
.map(_._2)
.map(SampleKafkaRecord.parseToSampleRecord)

records
.foreachRDD{ rdd => {
rdd.map(SampleKafkaRecord.SampleToHbasePut).saveAsNewAPIHadoopDataset(job.getConfiguration) }
}
records.print()

ssc.start()
ssc.awaitTermination()
}
}

感觉是配置问题。任何帮助表示赞赏。

最佳答案

我添加了一个名为 zookeeper.session.timeout.ms 的属性通过添加代码:

val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
"zookeeper.connect" -> xxxxxx:2181",
"zookeeper.connection.timeout.ms" -> "10000",
"zookeeper.session.timeout.ms" -> "10000")

并将 Spark 流的间隔设置为 10 秒。通过这样做,我的 Spark Streaming 应用程序可以保持运行很长时间。

但是当我检查内存时,它仍然在减少,我不知道如何解决这个问题。

关于hadoop - Spark 流 "ERROR JobScheduler: error in job generator",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39213254/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com