- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我每天都会收到一个 zip 存档“2018-06-26.zip”,大小约为。 250 Mb 压缩后,包含 165-170.000 个小 XML 文件 (Kb)。我将 zip 存档加载到 HDFS(避免小文件问题),并使用 SPARK 从 zip 中提取它们(zip 不可拆分),制作配对 RDD,以文件名作为键,以内容作为值并保存它们通过配对的 RDD 作为序列文件。一切都运行顺利,一个小的 zip 存档仅包含 3 个用于测试目的的 XML 文件,但是当我向它提供大存档时,我得到了
java.lang.OutOfMemoryError: GC overhead limit exceeded
at java.util.Arrays.copyOf(Arrays.java:2367)
at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
...
...
我在 Cloudera Quickstart VM 上运行:CDH 5.13.3(HDFS:2.60、JDK:1.7.0.67、SPARK:1.6.0、Scala 2.10)
我还没有在成熟的集群上运行它,因为我想在部署它之前确保我的代码是正确的......
垃圾收集器在超出开销限制的情况下继续运行 OOM。我知道要增加驱动程序和 Java 堆空间的内存量,但我怀疑我的方法占用了太多内存......监视内存使用情况,但没有发现任何内存泄漏......
代码如下:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import java.util.zip.{ZipEntry, ZipInputStream}
import org.apache.spark.input.PortableDataStream
import scala.collection.mutable
val conf = new SparkConf().setMaster("local").setAppName("ZipToSeq")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var xml_map = new mutable.HashMap[String, String]()
sc.binaryFiles("/user/cloudera/test/2018-06-26.zip", 10).collect
.foreach { zip_file : (String, PortableDataStream) =>
val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
var zip_entry : ZipEntry = null
while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
if (!zip_entry.isDirectory) {
val key_file_name = zip_entry.getName
val value_file_content = scala.io.Source.fromInputStream(zip_stream, "iso-8859-1").getLines.mkString("\n")
xml_map += ( key_file_name -> value_file_content )
}
zip_stream.closeEntry()
}
zip_stream.close()
}
val xml_rdd = sc.parallelize(xml_map.toSeq).saveAsSequenceFile("/user/cloudera/2018_06_26")
非常感谢任何帮助或想法。
最佳答案
我的最终解决方案:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import java.util.zip.{ZipEntry, ZipInputStream}
import org.apache.spark.input.PortableDataStream
import scala.collection.mutable
val conf = new SparkConf().setMaster("local").setAppName("ZipToSeq")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var xml_map = new mutable.HashMap[String, String]()
sc.binaryFiles("/user/cloudera/test/2018-06-26.zip").collect
.foreach { zip_file : (String, PortableDataStream) =>
val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
var zip_entry : ZipEntry = null
while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
if (!zip_entry.isDirectory) {
val key_file_name = zip_entry.getName
val value_file_content = scala.io.Source.fromInputStream(zip_stream, "iso-8859-1").getLines.mkString("\n")
xml_map += ( key_file_name -> value_file_content )
}
zip_stream.closeEntry()
}
zip_stream.close()
}
val xml_rdd = sc.parallelize(xml_map.toSeq, 75).saveAsSequenceFile("/user/cloudera/2018_06_26")
原始 zip 文件 325 Mb,包含 170.000 个 XML 文件结果是 75 个分区,每个分区大约35 MB。总计约 2.5 GB在我的 Windows PC 上本地运行时间:1.2 分钟:-)
关于java - 使用 SPARK 从 zip 到 seq,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52254617/
我是 F# 的新手,目前想知道如何将序列的字节序列转换为序列的浮点序列 seq -> seq 所以我有以下字节序列 let colourList = seq[ seq[10uy;20uy;30uy];
我想在一个序列中聚合兼容的元素,即转换 Seq[T]成Seq[Seq[T]]其中每个子序列中的元素彼此兼容,同时保留原始 seq 顺序,例如从 case class X(i: Int, n: Int)
以下函数files返回seq> 。如何让它返回seq相反? type R = { .... } let files = seqOfStrs |> Seq.choose(fun s -> mat
我正在尝试转换如下所示的数据: val inputData = Seq(("STUDY1", "Follow-up", 1), ("STUDY1", "Off Study", 2),
稍微简化一下,我的问题来自字符串列表 input我想用函数解析 parse返回 Either[String,Int] . 然后list.map(parse)返回 Either 的列表s。程序的下一步是
如标题中所述,我不明白为什么这些函数无法编译并要求 Seq。 def f1[V a + b } error: type mismatch; found : Seq[Int] required:
我有一个类型为 Flow[T, Seq[Seq[String]], NotUsed] 的流。 我想以示例流的方式将其展平 ev1: Seq(Seq("a", "b"), Seq("n", "m") e
我对 Scala 比较陌生,但我想我理解它的类型系统和并行集合,但我无法理解这个错误: 我有一个函数 def myFun(a : Seq[MyType], b : OtherType) : Seq[M
在学习 F# 时,我正在做一个小挑战: Enter a string and the program counts the number of vowels in the text. For adde
------------------------- clojure.core/seq ([coll]) Returns a seq on the collection. If the collec
我担心不知道什么时候可以使用 "Seq", "seq"。你能告诉我有哪些不同之处吗? 这是我的代码。为什么不使用“seq”? let s = ResizeArray() s.Add(1.1) s
我试图返回一个带有直到循环的可变序列,但我有一个不可变的序列作为 (0 until nbGenomes) 的返回: def generateRandomGenome(nbGenomes:Int):
将 Seq(Seq) 分配到多个类型化数组而不先将 Seq 分配给标量的正确语法是什么? Seq 是否会以某种方式变平?这失败了: class A { has Int $.r } my A (@ra1
我正在尝试训练 序列到序列 一个简单的正弦波模型。目标是获得Nin数据点和预测 Nout下一个数据点。任务看起来很简单,模型对大频率的预测很好 freq (y = sin(freq * x))。例如,
我正在努力重构一些使用 Seq 的 Node.js 代码,以及文档和 this answer ,我知道我使用 this() 转到下一个 .seq(),但是如何将变量传递给下一个 .seq( )的功能?
我有一个像这样的字符串序列(文件中的行) [20150101] error a details 1 details 2 [20150101] error b details [20150101] er
给定两个序列 a 和 b,声明如下: var a = @[1, 2, 3] b = @[4, 5, 6] a = b 会创建一个新的 seq 将所有内容从 b 复制到 a 还是重用 a?我有特
type Suit = Spades | Clubs | Hearts | Diamonds type Rank = Ace | Two | Three | Four | Five | Six | S
慢慢地掌握列表匹配和尾递归的窍门,我需要一个函数将列表“缝合”在一起,去掉中间值(更容易显示而不是解释): 合并 [[1;2;3];[3;4;5];[5;6;7]]//-> [1;2;3;4;5;6;
为什么这段代码不起作用? type Test() = static member func (a: seq) = 5. let a = [[4.]] Test.func(a) 它给出以下错误: T
我是一名优秀的程序员,十分优秀!