- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个 Scala
Spark Streaming
应用程序,它从 3 个不同的 Kafka 生产者
接收来自同一主题的数据。
Spark 流应用程序位于主机 0.0.0.179
的计算机上,Kafka 服务器位于主机 0.0.0.178
的计算机上,Kafka 生产者
code> 在机器上,0.0.0.180
、0.0.0.181
、0.0.0.182
。
当我尝试运行 Spark Streaming
应用程序时出现以下错误
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 19.0 failed 1 times, most recent failure: Lost task 0.0 in stage 19.0 (TID 19, localhost): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1625) at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1198) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1204) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1203) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1211) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1190) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748)
现在我阅读了数千篇不同的帖子,但似乎没有人能够找到这个问题的解决方案。
我该如何在我的应用程序中处理这个问题?我是否需要修改 Kakfa 上的某些参数(目前 num.partition
参数设置为 1)?
以下是我的应用程序的代码:
// Create the context with a 5 second batch size
val sparkConf = new SparkConf().setAppName("SparkScript").set("spark.driver.allowMultipleContexts", "true").set("spark.streaming.concurrentJobs", "3").setMaster("local[4]")
val sc = new SparkContext(sparkConf)
val ssc = new StreamingContext(sc, Seconds(3))
case class Thema(name: String, metadata: String)
case class Tempo(unit: String, count: Int, metadata: String)
case class Spatio(unit: String, metadata: String)
case class Stt(spatial: Spatio, temporal: Tempo, thematic: Thema)
case class Location(latitude: Double, longitude: Double, name: String)
case class Datas1(location : Location, timestamp : String, windspeed : Double, direction: String, strenght : String)
case class Sensors1(sensor_name: String, start_date: String, end_date: String, data1: Datas1, stt: Stt)
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "0.0.0.178:9092",
"key.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"value.deserializer" -> classOf[StringDeserializer].getCanonicalName,
"group.id" -> "test_luca",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics1 = Array("topics1")
val s1 = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(record => {
implicit val formats = DefaultFormats
parse(record.value).extract[Sensors1]
}
)
s1.print()
s1.saveAsTextFiles("results/", "")
ssc.start()
ssc.awaitTermination()
谢谢
最佳答案
您的问题在这里:
s1.print()
s1.saveAsTextFiles("results/", "")
由于 Spark 创建了一个流图,并且您在此处定义了两个流:
Read from Kafka -> Print to console
Read from Kafka -> Save to text file
Spark 将尝试同时运行这两个图,因为它们彼此独立。由于 Kafka 使用缓存消费者方法,因此它实际上尝试对两个流执行使用相同的消费者。
您可以做的是在运行两个查询之前缓存DStream
:
val dataFromKafka = KafkaUtils.createDirectStream[String, String](ssc, PreferConsistent, Subscribe[String, String](topics1, kafkaParams)).map(/* stuff */)
val cachedStream = dataFromKafka.cache()
cachedStream.print()
cachedStream.saveAsTextFiles("results/", "")
关于multithreading - java.util.ConcurrentModificationException : KafkaConsumer is not safe for multi-threaded access,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48265714/
我目前正在制作一个将订阅作为 Multi-Tenancy 应用程序出售的 web 应用程序。我使用的技术是导轨。 但是,它不仅仅是使用当前应用程序的孤立租户。 每个租户创建产品并将其发布到他们的个人应
我们计划将 Azure Service Fabric 用于面向数据的 Multi-Tenancy 应用程序。通常有 100 多个客户,每个客户有 5 - 100 个用户。 查看文档,我得出的结论是,最
我们正在为我们正在构建的自定义 Saas 应用程序评估 Shiro。似乎一个伟大的框架可以完成我们想要的 90% 的工作,开箱即用。我对 Shiro 的理解是基本的,这就是我想要完成的。 我们有多个客
希望使用 NestJS 6 的新请求注入(inject)范围功能实现 Multi-Tenancy NestJS 解决方案。 对于任何给定的服务,我认为我可以做这样的事情: @Injectable({s
我正在寻找一个基于 PHP 的框架,该框架已准备好具有以下功能 1.带有登录/注销的简单仪表板 2. 多个数据库,每个数据库代表一个客户端 只是基本框架。 3.简单的注册支持 用例: 我从 githu
我正在尝试对这个已经回答的问题进行一些跟进...... Service Fabric multi-tenant 如果我要将我的租户设置为 Azure Service Fabric 无状态服务(他们将获
首先,我很清楚 Keycloak 中的多领域 Multi-Tenancy 方法。我接手了一个没有人想到 Multi-Tenancy 的遗留项目。现在,两年后,突然,客户需要这个功能。实际上,微服务已经
我正在使用 Apache Nifi 开发基于云的应用程序,为此我们需要支持 Multi-Tenancy 。但是当前的 Nifi 实现只支持基于角色的用户访问,对于单个流。 我可以理解流状态被保存为 N
对于我积极维护的客户基于 Web 的 CRM 的分支机构数量不断增加的 Multi-Tenancy ,我需要做出一个艰难的数据库设计决策。 我很早就决定为每个分支使用具有单独数据库的单独应用程序,因为
关闭。这个问题是opinion-based .它目前不接受答案。 想要改进这个问题? 更新问题,以便 editing this post 可以用事实和引用来回答它. 关闭 9 年前。 Improve
很抱歉我的英语不好,希望你能看到我说的。 在Lucene3 Junit测试代码中:org.apache.lucene.queryParser.TestMultiAnalyzer.testMultiAn
假设我们有一个多维数组。 multi[3][10] 那么&multi[0][0]将是multi 如果我们想访问这个数组中的任何元素。我们只需要一次解除引用。因为它位于连续的位置。我无法理解双重取消引用
表结构和示例数据 Wall_Update [INT VARCHAR VARCHAR TIMESTAMP TinyText]
我们需要构建一个软件框架(或中间件),以便在一台机器上运行的不同软件组件(或模块)之间实现消息传递。该框架将提供以下功能: 模块之间的通信是通过“消息传递”。 每个模块都有自己的消息队列和消息处理线程
我正在开发一个在多个域上运行的应用程序。 我想对所有这些都使用 Google 自定义搜索。但是 GCS 需要提供要搜索的网站域。 有没有办法动态指定域?理论上,我可以拥有数千个域,但我不喜欢手动添加所
在 here.com map 类 MapMarker 中,此方法 showInfoBubble () 无法在多 map 标记上显示多信息气泡,对此有任何解决方案吗? 最佳答案 来自 showInfoB
我正在开发一个 Multi-Tenancy 解决方案,我想使用最新的 ASP.NET Identity框架特别是Entity Framework执行。 基本上,我需要允许两个用户使用相同的用户名,尽管
我有 50 台可用台式计算机(配备 i5),每台都运行 Ubuntu 14.04 LTS。我需要通过 C 代码计算某些事件的概率,样本大小至少为 2^45。显然,在一台计算机上运行 C 代码不是一种选
我正在按照页面上的示例进行操作:Multi-input and multi-output models 用于预测新闻标题将收到多少转发和点赞的模型设置。那么 main_output 正在预测有多少
硬件:我们使用 24 核(2*12 核)机器。 SSD 磁盘和 SAS-RAID 0 磁盘有 2 个独立的 Controller 。操作系统:Windows 8.1。超线程已禁用。 软件: 2.1。有
我是一名优秀的程序员,十分优秀!