- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我在 hdfs 上有文件,我想将其放入 Hive 表中。该操作由 Java 应用程序中的 Spark 批处理执行。执行该任务的代码如下:
[...]
final Dataset<File> fileDs = rawDs.map(record -> {
return FileService.map(record.getList(2));
}, Encoders.bean(File.class));
final Dataset<Row> fileDsWithId = fileDs.withColumn("id", functions.lit(id));
fileDsWithId.repartition(fileDsWithId.col("id"));
fileWithId.write().mode(SaveMode.Append)
.format("orc")
.partitionBy("id")
.option("path", hdfs://..../mydatabase.db/mytable")
.saveAsTable("mydatabase.mytable");
当我使用小文件(1 或 2 行数据)时,应用程序运行正常,作业在 30 秒内成功结束。该表是在 Hive 中创建的,我可以使用 Select * 查询显示数据。当表已经存在时它也可以工作。数据只需添加到现有数据中即可。hive 中的生成表结构看起来不错。它与我的数据相符。
但是当我尝试处理更大的文件(3.7Mo,大约 1000 行数据)时,作业在 15 分钟后失败。相应的orc文件在hdfs中创建,但它是空的,Hive不知道它。
日志文件显示了几个如下错误:
2019-05-31 14:20:07,500 - [ERROR] [ dispatcher-event-loop-3] pache.spark.scheduler.cluster.YarnClusterScheduler - [{}] - Lost executor 31 on XXXXXX: Container marked as failed: container_e71_1559121287708_0019_02_000032 on host: XXXXXXXXX. Exit status: 143. Diagnostics: Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143.
Killed by external signal
[...]
java.lang.RuntimeException: java.io.IOException: Connection reset by peer
at org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:273)
at org.apache.spark.network.crypto.AuthClientBootstrap.doSparkAuth(AuthClientBootstrap.java:105)
at org.apache.spark.network.crypto.AuthClientBootstrap.doBootstrap(AuthClientBootstrap.java:79)
...
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
...
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more
[...]
2019-05-31 14:20:17,898 - [ERROR] [ shuffle-client-4-1] org.apache.spark.network.client.TransportClient - [{}] - Failed to send RPC 9035939448873337359 to XXXXXXXX: java.nio.channels.ClosedChannelExceptionsg
java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown Source)
2019-05-31 14:20:17,899 - [ERROR] [ Executor task launch worker for task 244] apache.spark.network.client.TransportClientFactory - [{}] - Exception while bootstrapping client after 5999 mssg
java.lang.RuntimeException: java.io.IOException: Failed to send RPC 9035939448873337359 to XXXXXXXXX: java.nio.channels.ClosedChannelException
at org.spark_project.guava.base.Throwables.propagate(Throwables.java:160)
at org.apache.spark.network.client.TransportClient.sendRpcSync(TransportClient.java:273)
at org.apache.spark.network.sasl.SaslClientBootstrap.doBootstrap(SaslClientBootstrap.java:70)
at org.apache.spark.network.crypto.AuthClientBootstrap.doSaslAuth(AuthClientBootstrap.java:115)
...
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to send RPC 9035939448873337359 to XXXXXXXXXXXX: java.nio.channels.ClosedChannelException
at org.apache.spark.network.client.TransportClient.lambda$sendRpc$2(TransportClient.java:237)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:122)
at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:852)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:738)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1251)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:733)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:725)
at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:35)
...
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more
[...]
2019-05-31 14:20:22,907 - [INFO ] [ Block Fetch Retry-6-1] .apache.spark.network.shuffle.RetryingBlockFetcher - [{}] - Retrying fetch (2/3) for 1 outstanding blocks after 5000 mssg
2019-05-31 14:20:27,909 - [ERROR] [ Block Fetch Retry-6-2] .apache.spark.network.shuffle.RetryingBlockFetcher - [{}] - Exception while beginning fetch of 1 outstanding blocks (after 2 retries)sg
java.io.IOException: Failed to connect to XXXXXXXXX
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
...
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connexion refused: XXXXXXXX
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)
...
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
... 2 more
[...]
2019-05-31 14:20:32,915 - [WARN ] [ Executor task launch worker for task 244] org.apache.spark.storage.BlockManager - [{}] - Failed to fetch remote block broadcast_2_piece0 from BlockManagerId(1, XXXXXXX, 44787, None) (failed attempt 1)sg
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:105)
at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:642)
...
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to connect to XXXXXXXXX
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)
...
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
... 1 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connexion refused: XXXXXXXX
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
...
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
... 2 more
我不明白那里发生了什么。我已经检查过内存问题,但看起来没问题。这些机器用于处理更大的文件(通常是几十GB)。为什么连接丢失/被拒绝/重置? Spark 事先创建表方案是否存在任何问题可以解释这一点?
<小时/>在 Ram Ghadiyaram 的回答后更新:
我尝试将 spark.network.timeout
设置为 6000 秒。环境中没有配置其他超时设置。结果似乎是一样的。 10 分钟后作业失败,并在日志文件中显示相同的错误:“连接被对等方重置”、“无法发送 RPC”等
设置spark.core.connection.ack.wait.timeout
、spark.storage.blockManagerSlaveTimeoutMs
、spark.shuffle.io.connectionTimeout
、 spark.rpc.askTimeout
和 spark.rpc.lookupTimeout
设置为相同的值(6000s)似乎也不起作用。
我认为我的数据集太困惑,无论如何都无法正确处理。我将尝试更改数据模型,然后使用这些超时设置再次运行应用程序。
<小时/>更新于 2019 年 1 月 7 日:
我简化了数据模型。该模型很复杂,导致数据集中出现一些空结构,因为系统无法通过继承链接某些字段。我已经扁平化了结构,以便每种可能的类型都作为泛型类的实际属性出现,因此我删除了继承。
总结一下,类似这样的:
文件.class
|-字段1
|-field2
|-field3
|- 通用类
|-Class1
|-2级
|-Class3
我没有使用带有一些子类的抽象类,而是使用其他类作为属性创建了一个通用类。这很脏(我不建议这样做),但这样数据集就干净多了。
执行此更改后,我不再遇到超时问题。我猜之前的模型太乱了,Spark 无法高效编写。
我尝试过以 ORC 和 Avro 格式编写,两者都很好。在 Avro 中,我设法在一分钟内写入大约 300000 行,因此默认超时设置不再是问题。
最佳答案
Q:Why are the connection lost/denied/reset ?
org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
似乎是大型或繁重工作负载的典型超时问题。
我不知道你用的是哪个版本的spark。但它在这里失败了。基本上它的作用是等待一段时间(默认超时)然后失败。请参阅ThreadUtils
/**
* Preferred alternative to `Await.result()`.
*
* This method wraps and re-throws any exceptions thrown by the underlying `Await` call, ensuring
* that this thread's stack trace appears in logs.
*
* In addition, it calls `Awaitable.result` directly to avoid using `ForkJoinPool`'s
* `BlockingContext`. Codes running in the user's thread may be in a thread of Scala ForkJoinPool.
* As concurrent executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this
* method basically prevents ForkJoinPool from running other tasks in the current waiting thread.
* In general, we should use this method because many places in Spark use [[ThreadLocal]] and it's
* hard to debug when [[ThreadLocal]]s leak to other tasks.
*/
@throws(classOf[SparkException])
def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
try {
// `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
// See SPARK-13747.
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
awaitable.result(atMost)(awaitPermission)
} catch {
case e: SparkFatalException =>
throw e.throwable
// TimeoutException is thrown in the current thread, so not need to warp the exception.
case NonFatal(t) if !t.isInstanceOf[TimeoutException] =>
throw new SparkException("Exception thrown in awaitResult: ", t)
}
}
您必须增加超时,请参阅 networking docs
<小时/>
spark.network.timeout
120s Default timeout for all network interactions. This config will be used in place ofspark.core.connection.ack.wait.timeout
,spark.storage.blockManagerSlaveTimeoutMs
,spark.shuffle.io.connectionTimeout
,spark.rpc.askTimeout
orspark.rpc.lookupTimeout
if they are not configured.
To sumup : For small workloads timeout is enough for large work loads time out needs to be increasaed.
关于apache-spark - 如何解决向 Hive 表发送大文件时的连接问题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56396297/
@Cacheable在同一类中方法调用无效 上述图片中,同一个类中genLiveBullets()方法调用同类中的queryLiveByRoom()方法,这样即便标识了Cacheable标签,
目录 @Transaction注解导致动态切换更改数据库失效 使用场景 遇到问题 解决 @Transaction
@RequestBody不能class类型匹配 在首次第一次尝试使用@RequestBody注解 开始加载字符串使用post提交(貌似只能post),加Json数据格式传输的时候,
目录 @Autowired注入static接口问题 @Autowired自动注入普通service很方便 但是如果注入static修饰的serv
目录 @RequestBody部分属性丢失 问题描述 JavaBean实现 Controller实现
目录 解决@PathVariable参数接收不完整的问题 今天遇到的问题是: 解决办法: @PathVariable接受的参
这几天在项目里面发现我使用@Transactional注解事务之后,抛了异常居然不回滚。后来终于找到了原因。 如果你也出现了这种情况,可以从下面开始排查。 1、特性 先来了解一下@Trans
概述: ? 1
场景: 在处理定时任务时,由于这几个方法都是静态方法,在aop的切面中使用@Around注解,进行监控方法调用是否有异常。 发现aop没有生效。 代码如下:
最近做项目的时候 用户提出要上传大图片 一张图片有可能十几兆 本来用的第三方的上传控件 有限制图片上传大小的设置 以前设置的是2M&nb
我已经实现了这个SCIM reference code在我们的应用程序中。 我实现的代码确实通过了此postman link中存在的所有用户测试集合。 。我的 SCIM Api 也被 Azure 接受
我一直对“然后”不被等待的行为感到困扰,我明白其原因。然而,我仍然需要绕过它。这是我的用例。 doWork(family) { return doWork1(family)
我正在尝试查找 channel 中的消息是否仍然存在,但是,我不确定如何解决 promise ,查看其他答案和文档,我可以看到它可能是通过函数实现的,但我是不完全确定如何去做。我希望能在这方面获得一些
我有以下情况: 同一工作区中的 2 个 Eclipse 项目:Apa 和 Bepa(为简洁起见,使用化名)。 Apa 项目引用(包括)Bepa 项目。 我在 Bepa 有一个类 X,具有公共(publ
这个问题已经有答案了: Why am I getting a NoClassDefFoundError in Java? (31 个回答) 已关闭 6 年前。 我正在努力学习 spring。所以我输入
我正在写一个小游戏,屏幕上有许多圆圈在移动。 我在两个线程中管理圈子,如下所示: public void run() { int stepCount = 0; int dx;
我在使用 Sympy 求解方程时遇到问题。当我运行代码时,例如: 打印(校正(10)) 我希望它打印一个数字 f。相反,它给我错误:执行中止。 def correction(r): from
好吧,我制作的每个页面都有这个问题。我不确定我做错了什么,但我所有的页面都不适用于所有分辨率。可能是因为我使用的是宽屏?大声笑我不确定,但在小于宽屏分辨率的情况下,它永远不会看起来正确。它的某些部分你
我正在尝试像这样进行一个非常简单的文化 srting 检查 if(culture.ToUpper() == "ES-ES" || "IT-IT") { //do something } else
Closed. This question is off-topic. It is not currently accepting answers. Learn more。 想改进这个问题吗?Upda
我是一名优秀的程序员,十分优秀!