- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 Databricks spark-csv 从 EMR Spark 集群上 S3 上的 CSV 源创建一个 DataFrame
包和 flights dataset :
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true').load('s3n://h2o-airlines-unpacked/allyears.csv')
df.first()
这不会在 4 个 m3.xlarge
的集群上终止。我正在寻找从 PySpark 中 S3 上的 CSV 文件创建 DataFrame
的建议。或者,我尝试将文件放在 HDFS 上并从 HFDS 读取,但这也不会终止。该文件不是太大 (12 GB)。
最佳答案
要读取一个只有 12GB 的性能良好的 csv 文件,您可以将它复制到您的所有工作人员和驱动程序机器上,然后在“,”上手动拆分。这可能无法解析任何 RFC4180 csv,但它解析了我拥有的内容。
c3.2xlarge
。如果您不打算让集群闲置并且能够负担得起更大的费用,那就扩大规模。更大的机器意味着更少的磁盘文件复制开始。我经常在现货市场上看到 c3.8xlarge 低于 0.50 美元/小时。 将文件复制到您的每个工作人员,在每个工作人员的同一目录中。这应该是物理连接的驱动器,即每台机器上的不同物理驱动器。
确保驱动程序机器上也有相同的文件和目录。
raw = sc.textFile("/data.csv")
print "Counted %d lines in /data.csv" % raw.count()
raw_fields = raw.first()
# this regular expression is for quoted fields. i.e. "23","38","blue",...
matchre = r'^"(.*)"$'
pmatchre = re.compile(matchre)
def uncsv_line(line):
return [pmatchre.match(s).group(1) for s in line.split(',')]
fields = uncsv_line(raw_fields)
def raw_to_dict(raw_line):
return dict(zip(fields, uncsv_line(raw_line)))
parsedData = (raw
.map(raw_to_dict)
.cache()
)
print "Counted %d parsed lines" % parsedData.count()
parsedData 将是字典的 RDD,其中字典的键是第一行的 CSV 字段名称,值是当前行的 CSV 值。如果您的 CSV 数据中没有标题行,这可能不适合您,但应该清楚的是,您可以覆盖读取此处第一行的代码并手动设置字段。
请注意,这对于创建数据框或注册 spark SQL 表不是立即有用的。但是其他的就OK了,如果需要转储到spark SQL中,还可以进一步抽取转化成更好的格式。
我在一个 7GB 的文件上使用它没有任何问题,除了我已经删除了一些过滤逻辑来检测有效数据,这些逻辑有一个副作用,即从解析的数据中删除 header 。您可能需要重新实现一些过滤。
关于csv - Spark : spark-csv takes too long,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32265649/
我有一个代表每个年龄段的人口(M,F)的集合。 为了通过时间来预测人口,我必须首先对女性进行计算,以便我可以根据男性出生率的统计常数来计算新出生的男性和女性的百分比。 也就是说,我有一个包含每岁男性和
我正在尝试从队列中获取 n 条消息(使用 langohr)。我有一个工作版本,但我想知道是否有更好的 clojurist 方法来做到这一点: (def not-nil? (complement nil
我有这些结果用于分析一个简单的查询,该查询不会从少于 200 条记录的表中返回超过 150 条记录,因为我有一个存储最新值的表,而其他字段是数据的 FK . 更新:稍后查看同一查询的新结果。该网站未公
我正在使用 .Take() 来获取固定数量的结果。 获取 TotalCountBeforeTake 的最佳方法是什么(即好像我没有使用 .Take())? 我可以在不运行查询两次的情况下获得 Tota
我有一个 BatchConfigurable 类 public class BatchConfigurable() {} 我正在尝试为其编写一个包装器。这将是另一个类,它采用此类或任何扩展 Batch
byte[] result = memStream.ToArray(); memStream.Close(); byte[] temp = r
很简单的问题。我有一个值列表,我想用空值填充这些值,这样我总是会返回 X 个项目。 List list = new List() { 10, 20, 30 }; IEnumerable values
我正在构建一个购物车,并且我使用了一个购物车服务,在该服务中我将数量分配给产品/将产品添加到购物车。除了使用 take 获取可观察项 $ 的第一个实例之外,还有其他方法吗? 我正在正确导入 take
这是欧拉计划的问题 8。 我试图通过数字数组foreach,每次跳过最后一个数字并拉接下来的13个相邻数字数组。 我的代码: for(int x = 0; x product) {
我有 3 个 div 元素,一个是父元素,另外两个是子元素: dinesh pathak and their css are: #table {
我在 Hudson 上发现了异常行为。Hudson 作业大约需要 25 分钟,而当我在本地运行相同的作业时,需要 9 分钟。我在这里缺少什么? 我增加了 JAVA_OPTS、MAVEN_OPTS,甚至
let a = [1;2;3;] for i in (a |> Seq.take 10) do Console.WriteLine(i) for i in (a |> Seq.take 100) do
我正在尝试编写一些 LINQ To SQL 代码来生成类似 SQL 的代码 SELECT t.Name, g.Name FROM Theme t INNER JOIN ( SELECT TOP
给定这样的设置.. class Product { int Cost; // other properties unimportant } var products = new List
我有一个 List 类型的元素 public class FriendList { public List friends { get; set; } // List
给定以下 LINQ 语句,哪个更有效? 一个: public List GetLatestLogEntries() { var logEntries = from entry in db.Lo
我只是在尝试新的 kotlin 语言。我遇到了生成无限列表的序列。我生成了一个序列并尝试打印前 10 个元素。但是下面的代码没有打印任何东西: fun main(args: Array) {
我见过 sagas 以 3 种方式监听 Action : 1。 while(true) take() function* onUserDetailsRequest() { while(true)
假设我有一些神奇的分页黑盒类,它使用 pageIndex 和 pageSize 检索数据,如下所示: public class PaginatedList { // ... // Ch
我有两个 git 分支 b' 和 b" 具有完全相同的 SHA-1 和因此内容。我提交 b ' 并在提交时,我使用 -x 应用 cherry-pick 而不是 merge 或 rebase单个提交到我
我是一名优秀的程序员,十分优秀!