- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我最近一直在对我们的 Spark Streaming 应用程序进行压力测试。压力测试每秒接收大约 20,000 条消息,消息大小在 200 字节 - 1K 之间变化到 Kafka,其中 Spark Streaming 每 4 秒读取一次批次。
我们的 Spark 集群运行在带有独立集群管理器的 1.6.1 版上,我们的代码使用 Scala 2.10.6。
经过大约 15-20 小时的运行,其中一个启动检查点(以 40 秒间隔完成)的执行程序被以下堆栈跟踪卡住并且永远不会完成:
java.net.SocketInputStream.socketRead0(Native Method) java.net.SocketInputStream.socketRead(SocketInputStream.java:116) java.net.SocketInputStream.read(SocketInputStream.java:170) java.net.SocketInputStream.read(SocketInputStream.java:141) sun.security.ssl.InputRecord.readFully(InputRecord.java:465) sun.security.ssl.InputRecord.readV3Record(InputRecord.java:593) sun.security.ssl.InputRecord.read(InputRecord.java:532) sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:973) sun.security.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1375) sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1403) sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1387) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:533) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:401) org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:177) org.apache.http.impl.conn.AbstractPoolEntry.open(AbstractPoolEntry.java:144) org.apache.http.impl.conn.AbstractPooledConnAdapter.open(AbstractPooledConnAdapter.java:131) org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:610) org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:445) org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:326) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:277) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1038) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2250) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2179) org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120) org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575) org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174) sun.reflect.GeneratedMethodAccessor32.invoke(Unknown Source) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:497) org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) org.apache.hadoop.fs.s3native.$Proxy18.retrieveMetadata(Unknown Source) org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:472) org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1424) org.apache.spark.rdd.ReliableCheckpointRDD$.writePartitionToCheckpointFile(ReliableCheckpointRDD.scala:168) org.apache.spark.rdd.ReliableCheckpointRDD$$anonfun$writeRDDToCheckpointDirectory$1.apply(ReliableCheckpointRDD.scala:136) org.apache.spark.rdd.ReliableCheckpointRDD$$anonfun$writeRDDToCheckpointDirectory$1.apply(ReliableCheckpointRDD.scala:136) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) org.apache.spark.scheduler.Task.run(Task.scala:89) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)
streaming-job-executor-0
下的驱动程序线程转储清楚地表明它正在等待这个任务完成:
java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:502) org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:612) org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) org.apache.spark.SparkContext.runJob(SparkContext.scala:1922) org.apache.spark.rdd.ReliableCheckpointRDD$.writeRDDToCheckpointDirectory(ReliableCheckpointRDD.scala:135) org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint(ReliableRDDCheckpointData.scala:58) org.apache.spark.rdd.RDDCheckpointData.checkpoint(RDDCheckpointData.scala:74) org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply$mcV$sp(RDD.scala:1682) org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1679) org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1679) org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1678) org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$1.apply(RDD.scala:1684) org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1$$anonfun$apply$mcV$sp$1.apply(RDD.scala:1684) scala.collection.immutable.List.foreach(List.scala:318)
最佳答案
套接字挂起是由于 HttpClient
中的错误引起的org.jets3t
使用的库其中 SSL 握手不使用指定的超时。您可以找到问题详细信息 here .
此错误在 v4.5.1 以下的 HttpClient 版本中重现,并已修复。不幸的是,Spark 1.6.x 使用 v4.3.2,它没有提供的修复程序。
到目前为止,我想到了三种可能的解决方法:
spark.speculation
使用 Spark 的推测机制配置设置。这有助于解决挂起的边缘情况,因为它很少在负载下重现。请注意,这可能会在流式作业开始时导致一些误报,其中 spark 对运行中值任务的时间没有很好的印象,但这绝对不会导致明显的滞后。If set to "true", performs speculative execution of tasks. This means if one or more tasks are running slowly in a stage, they will be re-launched.
spark-submit \
--conf "spark.speculation=true" \
--conf "spark.speculation.multiplier=5" \
CP=''; for f in /path/to/httpcomponents-client-4.5.2/lib/*.jar; do CP=$CP$f:; done
SPARK_CLASSPATH="$CP" sbin/start-master.sh # on your master machine
SPARK_CLASSPATH="$CP" sbin/start-slave.sh 'spark://master_name:7077'
SPARK_CLASSPATH
在 spark-env.sh
. 关于apache-spark - 长时间正常运行后,Spark 有状态流作业在检查点到 S3 时挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38606653/
这个问题在这里已经有了答案: Why filter() after flatMap() is "not completely" lazy in Java streams? (8 个答案) 关闭 6
我正在创建一个应用程序来从 Instagram 收集数据。我正在寻找像 Twitter 流 API 这样的流 API,这样我就可以自动实时收集数据而无需发送请求。 Instagram 有类似的 API
我正在使用 Apache Commons 在 Google App Engine 中上传一个 .docx 文件,如此链接中所述 File upload servlet .上传时,我还想使用 Apach
我尝试使用 DynamoDB 流和 AWS 提供的 Java DynamoDB 流 Kinesis 适配器捕获 DynamoDB 表更改。我正在 Scala 应用程序中使用 AWS Java 开发工具
我目前有一个采用 H.264 编码的 IP 摄像机流式视频 (RTSP)。 我想使用 FFmpeg 将此 H.264 编码流转换为另一个 RTSP 流,但 MPEG-2 编码。我该怎么做?我应该使用哪
Redis 流是否受益于集群模式?假设您有 10 个流,它们是分布在整个集群中还是都分布在同一节点上?我计划使用 Redis 流来实现真正的高吞吐量(200 万条消息/秒),所以我担心这种规模的 Re
这件事困扰了我一段时间。 所以我有一个 Product 类,它有一个 Image 列表(该列表可能为空)。 我想做 product.getImages().stream().filter(...) 但
是否可以使用 具有持久存储的 Redis 流 还是流仅限于内存数据? 我知道可以将 Redis 与核心数据结构的持久存储一起使用,但我已经能够理解是否也可以使用 Redis 中的流的持久存储。 最佳答
我开始学习 Elixir 并遇到了一个我无法轻松解决的挑战。 我正在尝试创建一个函数,该函数接受一个 Enumerable.t 并返回另一个 Enumerable.t ,其中包含下 n 个项目。它与
我试图从 readLine 调用创建一个无限的字符串流: import java.io.{BufferedReader, InputStreamReader} val in = new Buffere
你能帮我使用 Java 8 流 API 编写以下代码吗? SuperUser superUser = db.getSuperUser; for (final Client client : super
我正在尝试服用补品routeguide tutorial,并将客户端变成rocket服务器。我只是接受响应并将gRPC转换为字符串。 service RouteGuide { rpc GetF
流程代码可以是run here. 使用 flow,我有一个函数,它接受一个键值对对象并获取它的值 - 它获取的值应该是字符串、数字或 bool 值。 type ValueType = string
如果我有一个函数返回一个包含数据库信息的对象或一个空对象,如下所示: getThingFromDB: async function(id:string):Promise{ const from
我正在尝试使用javascript api和FB.ui将ogg音频文件发布到流中, 但是我不知道该怎么做。 这是我给FB.ui的电话: FB.ui( { method: '
我正在尝试删除工作区(或克隆它以使其看起来像父工作区,但我似乎两者都做不到)。但是,当我尝试时,我收到此消息:无法删除工作区 test_workspace,因为它有一个非空的默认组。 据我所知,这意味
可以使用 Stream|Map 来完成此操作,这样我就不需要将结果放入外部 HashMap 中,而是使用 .collect(Collectors.toMap(...)); 收集结果? Map rep
当我们从集合列表中获取 Stream 时,幕后到底发生了什么?我发现很多博客都说Stream不存储任何数据。如果这是真的,请考虑代码片段: List list = new ArrayList(); l
我对流及其工作方式不熟悉,我正在尝试获取列表中添加的特定对象的出现次数。 我找到了一种使用Collections来做到这一点的方法。其过程如下: for (int i = 0; i p.conten
我希望将一个 map 列表转换为另一个分组的 map 列表。 所以我有以下 map 列表 - List [{ "accId":"1", "accName":"TestAcc1", "accNumber
我是一名优秀的程序员,十分优秀!