- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 PySpark (Google Dataproc) 解析大约 100 万个 HTML 文件,并将相关字段写入压缩文件。每个 HTML 文件大约 200KB。因此,所有数据约为 200GB。
如果我使用数据的一个子集,下面的代码工作正常,但运行数小时然后在整个数据集上运行时崩溃。此外,未使用工作节点(<5% CPU),所以我知道存在一些问题。
我认为系统在从 GCS 摄取数据时出现问题。有一个更好的方法吗?另外,当我以这种方式使用 wholeTextFiles 时,主人是否会尝试下载所有文件然后将它们发送给执行者,还是让执行者下载它们?
def my_func(keyval):
keyval = (file_name, file_str)
return parser(file_str).__dict__
data = sc.wholeTextFiles("gs://data/*")
output = data.map(my_func)
output.saveAsTextFile("gs://results/a")
最佳答案
为了回答你的问题,master 不会读取所有包含的数据,但它会在开始工作之前获取所有输入文件的状态。默认情况下,Dataproc 将属性“mapreduce.input.fileinputformat.list-status.num-threads”设置为 20 以帮助缩短此查找时间,但在 GCS 中仍会针对每个文件执行 RPC。
您似乎发现了这样一种情况,即使添加线程也无济于事,只会导致驱动程序更快地出现 OOM。
关于如何并行读取,我有两个想法。
但首先,有一点警告:这些解决方案都不是非常稳健的目录被包含在 glob 中。您可能希望防止目录出现在要读取的文件列表中。
第一个是使用 python 和 hadoop 命令行工具完成的(这也可以使用 gsutil 完成)。下面是一个示例,它可能看起来如何并在工作人员上执行文件列表,将文件内容成对读取并最终计算(文件名,文件长度)对:
from __future__ import print_function
from pyspark.rdd import RDD
from pyspark import SparkContext
import sys
import subprocess
def hadoop_ls(file_glob):
lines = subprocess.check_output(["/usr/bin/hadoop", "fs", "-ls", file_glob]).split("\n")
files = [line.split()[7] for line in lines if len(line) > 0]
return files
def hadoop_cat(file):
return subprocess.check_output(["/usr/bin/hadoop", "fs", "-cat", file]).decode("utf-8")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Provide a list of path globs to read.")
exit(-1)
sc = SparkContext()
# This is just for testing. You'll want to generate a list
# of prefix globs instead of having a list passed in from the
# command line.
globs = sys.argv[1:]
# Desired listing partition count
lpc = 100
# Desired 'cat' partition count, should be less than total number of files
cpc = 1000
files = sc.parallelize(globs).repartition(lpc).flatMap(hadoop_ls)
files_and_content = files.repartition(cpc).map(lambda f: [f, hadoop_cat(f)])
files_and_char_count = files_and_content.map(lambda p: [p[0], len(p[1])])
local = files_and_char_count.collect()
for pair in local:
print("File {} had {} chars".format(pair[0], pair[1]))
我会首先从这个子流程解决方案开始,尝试 hadoop_ls 和 hadoop_cat 调用的分区,看看您是否能得到可接受的东西。
第二个解决方案更复杂,但可能会通过避免很多很多 exec 调用而产生性能更高的管道。
在第二个解决方案中,我们将编译一个特殊用途的帮助程序 jar,使用初始化操作将该 jar 复制到所有工作人员,最后使用驱动程序中的帮助程序。
我们的 scala jar 项目的最终目录结构将如下所示:
helper/src/main/scala/com/google/cloud/dataproc/support/PysparkHelper.scala
helper/build.sbt
在我们的 PysparkHelper.scala 文件中,我们将有一个小的 Scala 类,其功能与上面的纯 Python 解决方案非常相似。首先,我们将创建文件 glob 的 RDD,然后是文件名的 RDD,最后是文件名和文件内容对的 RDD。
package com.google.cloud.dataproc.support
import collection.JavaConversions._
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext}
import java.util.ArrayList
import java.nio.charset.StandardCharsets
class PysparkHelper extends Serializable {
def wholeTextFiles(
context: JavaSparkContext,
paths: ArrayList[String],
partitions: Int): JavaPairRDD[String, String] = {
val globRDD = context.sc.parallelize(paths).repartition(partitions)
// map globs to file names:
val filenameRDD = globRDD.flatMap(glob => {
val path = new Path(glob)
val fs: FileSystem = path.getFileSystem(new Configuration)
val statuses = fs.globStatus(path)
statuses.map(s => s.getPath.toString)
})
// Map file name to (name, content) pairs:
// TODO: Consider adding a second parititon count parameter to repartition before
// the below map.
val fileNameContentRDD = filenameRDD.map(f => {
Pair(f, readPath(f, new Configuration))
})
new JavaPairRDD(fileNameContentRDD)
}
def readPath(file: String, conf: Configuration) = {
val path = new Path(file)
val fs: FileSystem = path.getFileSystem(conf)
val stream = fs.open(path)
try {
IOUtils.toString(stream, StandardCharsets.UTF_8)
} finally {
stream.close()
}
}
}
helper/build.sbt 文件看起来像这样:
organization := "com.google.cloud.dataproc.support"
name := "pyspark_support"
version := "0.1"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "2.7.1" % "provided"
exportJars := true
然后我们可以用 sbt 构建助手:
$ cd helper && sbt package
输出助手 jar 应该是 target/scala-2.10/pyspark_support_2.10-0.1.jar
我们现在需要将这个 jar 放到我们的集群上,为此,我们需要做两件事:1) 将 jar 上传到 GCS 和 2) 在 GCS 中创建一个初始化操作以将 jar 复制到集群节点。
为了便于说明,我们假设您的存储桶名为 MY_BUCKET(在此处插入适当的海象相关模因)。
$ gsutil cp target/scala-2.10/pyspark_support_2.10-0.1.jar gs://MY_BUCKET/pyspark_support.jar
创建一个初始化操作(我们称它为 pyspark_init_action.sh,根据需要替换 MY_BUCKET):
#!/bin/bash
gsutil cp gs://MY_BUCKET/pyspark_support.jar /usr/lib/hadoop/lib/
最后将初始化 Action 上传到GCS:
$ gsutil cp pyspark_init_action.sh gs://MY_BUCKET/pyspark_init_action.sh
现在可以通过将以下标志传递给 gcloud 来启动集群:
--initialization-actions gs://MY_BUCKET/pyspark_init_action.sh
在构建、上传和安装我们的新库之后,我们最终可以从 pyspark 使用它:
from __future__ import print_function
from pyspark.rdd import RDD
from pyspark import SparkContext
from pyspark.serializers import PairDeserializer, UTF8Deserializer
import sys
class DataprocUtils(object):
@staticmethod
def wholeTextFiles(sc, glob_list, partitions):
"""
Read whole text file content from GCS.
:param sc: Spark context
:param glob_list: List of globs, each glob should be a prefix for part of the dataset.
:param partitions: number of partitions to use when creating the RDD
:return: RDD of filename, filecontent pairs.
"""
helper = sc._jvm.com.google.cloud.dataproc.support.PysparkHelper()
return RDD(helper.wholeTextFiles(sc._jsc, glob_list, partitions), sc,
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
if __name__ == "__main__":
if len(sys.argv) < 2:
print("Provide a list of path globs to read.")
exit(-1)
sc = SparkContext()
globs = sys.argv[1:]
partitions = 10
files_and_content = DataprocUtils.wholeTextFiles(sc, globs, partitions)
files_and_char_count = files_and_content.map(lambda p: (p[0], len(p[1])))
local = files_and_char_count.collect()
for pair in local:
print("File {} had {} chars".format(pair[0], pair[1]))
关于google-cloud-storage - PySpark + 谷歌云存储(wholeTextFiles),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36204925/
这两个包看起来非常相似: http://www.passportjs.org/packages/passport-google-oauth2/ http://www.passportjs.org/pa
我想在我的网站上添加通过 Google 和 Twitter 登录的按钮。我需要只使用应用程序的客户端而不是服务器端来完成此操作。但我没有找到任何 API。对于我发现的所有内容,我需要使用带有 key
我使用此链接通过 google plus 共享我的页面。 https://plus.google.com/share?url=http%3A%2F%2Fexample.com%2Fcompany%2
我正在尝试学习 google API,并且我的经验是使用 Python,因此我尝试使用 google api python 客户端来访问一些 google 服务,但在构建服务对象时遇到错误。 从 ap
在其实际的实时托管平台上构建实时站点的努力中,有没有办法告诉谷歌不要索引该网站?我发现了以下内容: http://support.google.com/webmasters/bin/answer.py
我正在开发一个 iOS 应用程序。当我运行用于 google+ 登录的程序时,在我点击允许访问按钮后,会显示此消息。 You've reached this page because we have
我有一个非常复杂的网站,每个页面包含 11 个 js 文件。 我最近添加了 google +1 按钮,代码如下: 这会正确显示 +1 按钮,直到我单击它。当我单击它时,出现此错误:https://
我正在尝试使用 google API 创建一个 html 文件,以便在 google MAPS 上显示 KML 文件。 这是 HTML 代码: function initMap() {
我是使用 Google Benchmark 的新手,在本地运行代码与在 Quick-Bench.com 上运行代码时,我收到了运行相同基准测试(下方)的不同结果,该基准测试使用 C++ 检索本地时间.
我已按照 Google 网站上的说明通过添加以下元标记在我的 AngularJS 网站上启用 Ajax 抓取: 呈现的内容有一些链接,如: User 1 User 2 User 3 还有一些呈现动态
通过 Google 手册实现 Google AppInvite - link . 启动 Invite Activity 并在 LogCat 中获取下一步: E/AppInviteAgent: Get
那么有人用过 Google 的 Go 吗?我想知道数学性能(例如触发器)与其他具有垃圾收集器的语言(如 Java 或 .NET)相比如何? 有人调查过吗? 最佳答案 理论性能:纯 Go 程序的理论性能
Stackdriver 测试我的网站启动速度慢 我们使用 cloudflare 作为我们的站点 CDN 提供商。我们使用 stackdriver 从外部测试站点可用性,我们将时间检查间隔设置为 1 分
我正在尝试使用 stax.GeneralConv() ( https://jax.readthedocs.io/en/latest/_modules/jax/experimental/stax.htm
我有一个从谷歌金融中提取日内数据的软件。但是,由于昨天 Google 更新了 API,所以软件报错了 Conversion from string HTML HEAD meta http-equiv=
我们在尝试从 Google 获取 oAuth token 时遇到“redirect_uri_mismatch”错误: [client 127.0.0.1:49892] {\n "error" : "
我的网站正在使用 Google reCAPTCHA 控件,但我听说它被阻止了 中国,反正我看到有人报告说将 API 更改为 https://www.recaptcha.net在中国工作? Anyone
背景 WordPress Google Adsense 谷歌自动插入 anchor 定广告 https://pptmon.com 问题 如下图所示,主播广告的容器高度太大了! 如何调整高度? 这是谷歌
我在使用 Google Colab 时遇到问题。当我想制作一个新的 Python3 Notebook 时,由于我登录了我的 Google 帐户,因此无法加载刚刚打开的新页面。 我该怎么办? 感谢您的帮
我正在使用 facebook和 google oauth2使用 passport js 登录, 有了这个流 用户点击登录按钮 重定向到 facebook/google auth 页面(取决于用户选择的
我是一名优秀的程序员,十分优秀!