- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我构建了一个 spark Streaming 应用程序来持续接收来自 Kafka 的消息,然后将它们写入表 HBase。
此应用在前 25 分钟内运行良好。当我在 Kafka-console-producer 中输入 1;name1
, 2;name2
这样的 KV 对时,它们可以保存在 Hbase 表中:
ROW COLUMN+CELL
1 column=cf1:column-Name, timestamp=1471905340560, value=name1
2 column=cf1:column-Name, timestamp=1471905348165, value=name2
但是大约 25 分钟后,我的应用停止并出现错误 ERROR JobSchedular: ERROR in job generator
。此错误的详细信息如下所示:
16/08/29 18:01:10 ERROR JobScheduler: Error in job generator
java.lang.IllegalArgumentException: requirement failed
at scala.Predef$.require(Predef.scala:221)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:166)
at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:223)
at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:272)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221)
at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.cleanupOldBatches(ReceivedBlockTracker.scala:166)
at org.apache.spark.streaming.scheduler.ReceiverTracker.cleanupOldBlocksAndBatches(ReceiverTracker.scala:223)
at org.apache.spark.streaming.scheduler.JobGenerator.clearMetadata(JobGenerator.scala:272)
at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
16/08/29 18:01:10 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook
16/08/29 18:01:10 INFO JobGenerator: Stopping JobGenerator immediately
它在前 25 分钟内运行良好,但之后由于某种我不知道的原因,作业生成器似乎突然无法正确实例化。
我的代码如下所示:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka._
import kafka.serializer.StringDecoder
import org.apache.hadoop.io.{LongWritable, Writable, IntWritable, Text}
import org.apache.hadoop.mapreduce.Job
object ReceiveKafkaAsDstream {
case class SampleKafkaRecord(id: String, name: String)
object SampleKafkaRecord extends Serializable {
def parseToSampleRecord(line: String): SampleKafkaRecord = {
val values = line.split(";")
SampleKafkaRecord(values(0), values(1))
}
def SampleToHbasePut(CSVData: SampleKafkaRecord): (ImmutableBytesWritable, Put) = {
val rowKey = CSVData.id
val putOnce = new Put(rowKey.getBytes)
putOnce.addColumn("cf1".getBytes, "column-Name".getBytes, CSVData.name.getBytes)
return (new ImmutableBytesWritable(rowKey.getBytes), putOnce)
}
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("ReceiveKafkaAsDstream")
val ssc = new StreamingContext(sparkConf, Seconds(1))
val topics = "test"
val brokers = "10.0.2.15:6667"
val topicSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
"zookeeper.connection.timeout.ms" -> "1000")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet)
val tableName = "KafkaTable"
val conf = HBaseConfiguration.create()
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
conf.set("zookeeper.znode.parent", "/hbase-unsecure")
conf.set("hbase.zookeeper.property.clientPort", "2181")
val job = Job.getInstance(conf)
job.setOutputKeyClass(classOf[Text])
job.setOutputValueClass(classOf[Text])
job.setOutputFormatClass(classOf[TableOutputFormat[Text]])
val records = messages
.map(_._2)
.map(SampleKafkaRecord.parseToSampleRecord)
records
.foreachRDD{ rdd => {
rdd.map(SampleKafkaRecord.SampleToHbasePut).saveAsNewAPIHadoopDataset(job.getConfiguration) }
}
records.print()
ssc.start()
ssc.awaitTermination()
}
}
感觉是配置问题。任何帮助表示赞赏。
最佳答案
我添加了一个名为 zookeeper.session.timeout.ms 的属性通过添加代码:
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
"zookeeper.connect" -> xxxxxx:2181",
"zookeeper.connection.timeout.ms" -> "10000",
"zookeeper.session.timeout.ms" -> "10000")
并将 Spark 流的间隔设置为 10 秒。通过这样做,我的 Spark Streaming 应用程序可以保持运行很长时间。
但是当我检查内存时,它仍然在减少,我不知道如何解决这个问题。
关于hadoop - Spark 流 "ERROR JobScheduler: error in job generator",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39213254/
我想做的是分派(dispatch)一个 Job,然后在前一个 Job 完成后继续分派(dispatch)同一个 Job,这样就可以连续循环分派(dispatch) Job。如选项一所示,这以前是与数据
我想知道当一个过程通过一个作业执行时会发生什么,在它完成之前是作业调用该过程的下一次执行的时间。这是我创建的工作: DECLARE X NUMBER; BEGIN SYS.DB
我使用以下代码显示超时为 120 秒的 PowerShell 作业的结果。我想通过合并 Write-Progress(基于完成的作业数)来增强此代码。我尝试使用 this example然而,作为引用
我使用以下代码显示超时为 120 秒的 PowerShell 作业的结果。我想通过合并 Write-Progress(基于完成的作业数)来增强此代码。我尝试使用 this example然而,作为引用
这个关于 ECMAScript 规范(ECMA-262 第 8 版)的问题 这些天,我对作业和作业队列有点困惑。 这里有一些问题。 1:在ECMA-262中,有两种作业队列。一个是 ScriptJob
子进程是作业的一部分,由创建作业的进程启动。父进程尚未设置作业属性以允许脱离作业。需要在 JOB 上设置“JOB_OBJECT_LIMIT_BREAKAWAY_OK”标志以允许子进程脱离作业,但未设置
有没有人有类似于Path's Android Priority Job Queue的iOS作业队列?他们不介意与社区分享?我是 iOS 的新手,所以我不确定平台本身是否提供这样的解决方案。在 Andr
我正在关注 this在 Heroku 上安排我的 Django cron 作业。 程序文件: web: gunicorn tango.wsgi --log-file - clock: python c
UI协同程序指南包含有关如何管理UI协同程序生命周期的section。它说明了我们应该创建一个顶级Job实例,并将复合协程上下文contextJob + UI传递给我们启动的所有协程: launch(
我在 Spark 上创建了一个 Master 和一个 Worker。然后我创建了一个 Spark 流作业并尝试提交它,但在 Master 上它显示了一长串 java 错误 使用此命令启动主控: spa
我必须在 Spring Batch 上设置 jobparemeters,但使用 Spring Boot Batch 则无法轻松做到这一点。 我需要重新运行作业,但如果参数相同,spring-batch
众所周知,Apache Pig 是一种数据流语言。如果我编写了一个 Pig 脚本并且 Pig 决定拆分并运行两个或多个作业来执行手头的任务,那么 Pig 如何存储它从作业 1 传递到作业 2 的数据?
我以为他们指的是 Reducer 但在我的程序中我有 public static class MyMapper extends Mapper 和 public static class MyReduc
我需要创建一个恢复模式。 在我的模式中,我只能在给定的时间窗口内启 Action 业。 如果作业失败,它只会在下一个时间窗口重新启动,完成后我想开始为此窗口提前计划的计划作业。 作业之间的唯一区别是时
使用 play 框架 1.2.4 和 scala。我几乎没有类似的游戏工作 @OnApplicationStart class MyOtherJob extends Job { ... } @Ev
作业通知选项“作业成功时”和“作业完成时”有何区别。从表面上看,我假设“作业完成时”选项包含作业成功和作业失败,而“作业成功时”选项仅包含作业成功运行时。这是正确的吗? 最佳答案 作业成功时作业成功完
我正在尝试创建迁移,但由于以下错误而失败: Error from server (BadRequest): error when creating "kubernetes/migration-job.
Cron Job 和 hybris 中的 Job 有什么区别? 两者的创建/实现之间有什么区别? 最佳答案 下图描述了 Hybris 中 Jobs/Cronjobs 工作原理的完整 View
我正在运行多个作业,并且我希望每个作业都有一个单独的作业存储库(内存中实现)。 请在下面找到我尝试过的 bean 定义。请注意,我尝试指定具有作用域原型(prototype)的 bean。 我收到 j
Quartz 中是否有一种机制可以在启动另一个作业时删除现有作业?我需要暂停其他作业的原因是因为新作业需要所有资源可用,只有当其他作业未运行时才会如此。 这是一个示例: 我有 2 份工作:工作 A 和
我是一名优秀的程序员,十分优秀!