- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
当我的 Spark 应用程序必须从 S3 访问大量 CSV 文件(每个 ~1000 @ 63MB)并将它们通过管道传输到 Spark RDD 时,它失败了。拆分 CSV 的实际过程似乎可行,但对 S3NativeFileSystem 的额外函数调用似乎导致错误和作业崩溃。
首先,以下是我的 PySpark 应用程序:
from pyspark import SparkContext
sc = SparkContext("local", "Simple App")
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
import time
startTime = float(time.time())
dataPath = 's3://PATHTODIRECTORY/'
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", "MYKEY")
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", "MYSECRETKEY")
def buildSchemaDF(tableName, columnList):
currentRDD = sc.textFile(dataPath + tableName).map(lambda line: line.split("|"))
currentDF = currentRDD.toDF(columnList)
return currentDF
loadStartTime = float(time.time())
lineitemDF = buildSchemaDF('lineitem*', ['l_orderkey','l_partkey','l_suppkey','l_linenumber','l_quantity','l_extendedprice','l_discount','l_tax','l_returnflag','l_linestatus','l_shipdate','l_commitdate','l_receiptdate','l_shipinstruct','l_shipmode','l_comment'])
lineitemDF.registerTempTable("lineitem")
loadTimeElapsed = float(time.time()) - loadStartTime
queryStartTime = float(time.time())
qstr = """
SELECT
lineitem.l_returnflag,
lineitem.l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_discount) as sum_disc,
sum(l_tax) as sum_tax,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(l_orderkey) as count_order
FROM
lineitem
WHERE
l_shipdate <= '19981001'
GROUP BY
l_returnflag,
l_linestatus
ORDER BY
l_returnflag,
l_linestatus
"""
tpch1DF = sqlContext.sql(qstr)
queryTimeElapsed = float(time.time()) - queryStartTime
totalTimeElapsed = float(time.time()) - startTime
tpch1DF.show()
queryResults = [qstr, loadTimeElapsed, queryTimeElapsed, totalTimeElapsed]
distData = sc.parallelize(queryResults)
distData.saveAsTextFile(dataPath + 'queryResults.csv')
print 'Load Time: ' + str(loadTimeElapsed)
print 'Query Time: ' + str(queryTimeElapsed)
print 'Total Time: ' + str(totalTimeElapsed)
为了循序渐进,我首先使用以下 AWS CLI 命令启动一个 Spark EMR 集群(添加回车以提高可读性):
aws emr create-cluster --name "Big TPCH Spark cluster2" --release-label emr-4.6.0
--applications Name=Spark --ec2-attributes KeyName=blazing-test-aws
--log-uri s3://aws-logs-132950491118-us-west-2/elasticmapreduce/j-1WZ39GFS3IX49/
--instance-type m3.2xlarge --instance-count 6 --use-default-roles
在 EMR 集群完成配置后,我将 Pyspark 应用程序复制到位于“/home/hadoop/pysparkApp.py”的主节点上。通过复制它,我可以为 spark-submit 添加步骤。
aws emr add-steps --cluster-id j-1DQJ8BDL1394N --steps
Type=spark,Name=SparkTPCHTests,Args=[--deploy-mode,cluster,-
conf,spark.yarn.submit.waitAppCompletion=true,--num-executors,5,--executor
cores,5,--executor memory,20g,/home/hadoop/tpchSpark.py]
,ActionOnFailure=CONTINUE
现在,如果我仅对上述几个 CSV 文件运行此步骤,将生成最终结果,但脚本仍会声称已失败。
我认为它与对 S3NativeFileSystem 的额外调用有关,但我不确定。这些是我得到的 Yarn 日志消息,它们让我得出了这个结论。第一个电话似乎工作得很好:
16/05/15 23:18:00 INFO HadoopRDD: Input split: s3://data-set-builder/splitLineItem2/lineitemad:0+64901757
16/05/15 23:18:00 INFO latency: StatusCode=[200], ServiceName=[Amazon S3], AWSRequestID=[ED8011CE4E1F6F18], ServiceEndpoint=[https://data-set-builder.s3-us-west-2.amazonaws.com], HttpClientPoolLeasedCount=0, RetryCapacityConsumed=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=2, ClientExecuteTime=[77.956], HttpRequestTime=[77.183], HttpClientReceiveResponseTime=[20.028], RequestSigningTime=[0.229], CredentialsRequestTime=[0.003], ResponseProcessingTime=[0.128], HttpClientSendRequestTime=[0.35],
虽然第二个似乎没有正确执行,导致“部分结果”(206 错误):
16/05/15 23:18:00 INFO S3NativeFileSystem: Opening 's3://data-set-builder/splitLineItem2/lineitemad' for reading
16/05/15 23:18:00 INFO latency: StatusCode=[206], ServiceName=[Amazon S3], AWSRequestID=[10BDDE61AE13AFBE], ServiceEndpoint=[https://data-set-builder.s3.amazonaws.com], HttpClientPoolLeasedCount=0, RetryCapacityConsumed=0, RequestCount=1, HttpClientPoolPendingCount=0, HttpClientPoolAvailableCount=2, Client Execute Time=[296.86], HttpRequestTime=[295.801], HttpClientReceiveResponseTime=[293.667], RequestSigningTime=[0.204], CredentialsRequestTime=[0.002], ResponseProcessingTime=[0.34], HttpClientSendRequestTime=[0.337],
16/05/15 23:18:02 INFO ApplicationMaster: Waiting for spark context initialization ...
我不明白为什么当第一个似乎有效响应甚至拆分文件时它甚至对 S3NativeFileSystem 进行第二次调用。这是我的 EMR 配置的产物吗?我知道 S3Native 有文件限制问题,直接 S3 调用是最佳的,这是我尝试做的,但无论我做什么,这个调用似乎都存在。请帮忙!
此外,在我的 Yarn 日志中添加一些其他错误消息,以防它们相关。
1)
16/05/15 23:19:22 ERROR ApplicationMaster: SparkContext did not initialize after waiting for 100000 ms. Please check earlier log output for errors. Failing the application.
16/05/15 23:19:22 INFO ApplicationMaster: Final app status: FAILED, exitCode: 13, (reason: Timed out waiting for SparkContext.)
2)
16/05/15 23:19:22 ERROR DiskBlockObjectWriter: Uncaught exception while reverting partial writes to file /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/2b/temp_shuffle_3fe2e09e-f8e4-4e5d-ac96-1538bdc3b401
java.io.FileNotFoundException: /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/2b/temp_shuffle_3fe2e09e-f8e4-4e5d-ac96-1538bdc3b401 (No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
at org.apache.spark.storage.DiskBlockObjectWriter.revertPartialWritesAndClose(DiskBlockObjectWriter.scala:162)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.stop(BypassMergeSortShuffleWriter.java:226)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/05/15 23:19:22 ERROR BypassMergeSortShuffleWriter: Error while deleting file /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/2b/temp_shuffle_3fe2e09e-f8e4-4e5d-ac96-1538bdc3b401
16/05/15 23:19:22 WARN TaskMemoryManager: leak 32.3 MB memory from org.apache.spark.unsafe.map.BytesToBytesMap@762be8fe
16/05/15 23:19:22 ERROR Executor: Managed memory leak detected; size = 33816576 bytes, TID = 14
16/05/15 23:19:22 ERROR Executor: Exception in task 13.0 in stage 1.0 (TID 14)
java.io.FileNotFoundException: /mnt/yarn/usercache/hadoop/appcache/application_1463354019776_0001/blockmgr-f847744b-c87a-442c-9135-57cae3d1f6f0/3a/temp_shuffle_b9001fca-bba9-400d-9bc4-c23c002e0aa9 (No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
最佳答案
spark 配置的优先顺序是:
SparkContext(代码/应用程序)> Spark-submit > Spark-defaults.conf
这里有几点要指出 -
在您的 spark 提交命令中使用 YARN 集群作为部署模式和 master -
spark-submit --deploy-mode cluster --master yarn ...
或
spark-submit --master yarn-cluster ...
从代码中的 sc = SparkContext("local", "Simple App")
行中删除“local”字符串。使用 conf = SparkConf().setAppName(appName)
初始化 Spark 上下文。
sc = SparkContext(conf=conf)
引用 - http://spark.apache.org/docs/latest/programming-guide.html
关于hadoop - S3NativeFileSystem 调用是否会在 AWS EMR 4.6.0 上杀死我的 Pyspark 应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37259403/
如何在终止父进程时关闭我的子文件描述符? 我创建了一个执行以下操作的程序: 派生 2 个子进程。 进程 1 是一个读取器。它从 STDIN_FILENO 读取并使用 scanf/printf 写入 S
我试着写了一个小的暴力破解程序。密码程序在密码正确时返回 1,错误时返回 0。所以它很简单。 在 bruteforce 程序中,我使用 createprocess() 调用 pw 程序。 我的问题是,
谁能帮我解释一下我从一本书中得到的这个脚本。练习是编写一个名为 killalljobs 的脚本来终止所有后台作业。 为此给出的代码是: kill "$@" $( jobs -p) 我确定我在这里真
我正在开发一个包含许多库的应用程序。后来我注意到有几次应用程序进程在关闭应用程序后仍在耗尽 CPU。 我先终止了进程,但它继续运行。我卸载了该应用程序 - 但它仍然存在! (使用开发人员选项中的“显示
有没有办法在无人机完成或超时之前杀死它? 无人机的默认超时时间为 6 小时 ( https://github.com/drone/drone/blob/master/cmd/drone/drone.g
我有几个自动启动的菜单栏程序/进程/应用程序。我希望能够使用单个命令或脚本将它们全部关闭;有时带宽受到限制或受限,它们会导致(或至少导致)旋转的沙滩球死亡。目前,我手动关闭每一个。 关注 answer
当我阅读 learnyousomeerlang.com 上的一篇文章时,我有一个问题。 http://learnyousomeerlang.com/errors-and-processes 它说: E
有什么方法可以通过 OpenCL API 终止正在运行的 OpenCL 内核吗?我没有在规范中找到任何内容。 我能想出的唯一解决方案是 1) 定期检查内核中的标志,当主机希望内核停止时写入该标志,或
我已经对套接字(使用fsockopen()和stream_socket_client())和cURL进行了一些测试,以强制关闭连接(TCP/HTTP)。但是,没有运气。 无论我使用的是1毫秒的超时时间
已关闭。这个问题是 off-topic 。目前不接受答案。 想要改进这个问题吗? Update the question所以它是on-topic用于堆栈溢出。 已关闭10 年前。 Improve th
我以不太优雅的方式杀死了 IRB 提示符(从 heroku run irb 开始),现在我有一个僵尸进程,但我似乎无法杀死它: Process State Co
致kill background process inside Codeship我们需要使用以下命令: #!/bin/bash nohup bash -c "YOUR_COMMAND 2>&1 &"
我第一次在这里发帖,因为我在互联网上找不到干净的解决方案。 我的目标很简单,我需要创建 一个 后台操作 (goroutine 或进程或其他...)我可以 正确杀死 (不要留在后台)。 我尝试了很多事情
我有一个进程调用: p=multiprocessing.Process(target=func_a) 然后func_a启动一个子进程: subprocess.Popen(["nc", "-l", "-
我正在运行一个基本上运行一堆服务器以进行本地测试的脚本。 这些 jar 在不同的 screen 中运行,因为它们需要独立地接受键盘输入。为此,我使用了 screen 。 command1="java
我有一个用 java 编写的应用程序,它在 Unix 上运行,并在启动时启动两个子进程(通过 Runtime.getRuntime().exec())。如果应用程序由于某种原因崩溃,子进程不会被终止。
我想要像 Pushbullet、SmartLockScreen 或 WhatsApp 那样独立运行的服务,它正在等待某个事件的发生。我已经尝试过前台服务,在 onStartCommand 中返回 ST
强制停止应用程序后,是否可以在 Android 应用程序中获取位置更新。在 IOS 中,如果我们强制停止应用程序,则有可能获得位置更新,以类似的方式,是否有任何服务可以为在 android 中被杀死的
我正在调查是否有任何方法可以防止 android 服务因未捕获的异常而被杀死。 我们有 10 个 UI 应用程序与 5-6 个服务通信。该平台是Android 2.2。 由于不可预见的情况,服务中的某
我刚刚将我的 javascript 转移到 jQuery 来实现简单的 AJAX 功能。不过,我尝试将灯箱插件与 jQuery 结合使用,因为我想保留相同的功能,但不想包含 10 个不同的库。如果我删
我是一名优秀的程序员,十分优秀!