gpt4 book ai didi

java - 避免对大型数据集使用收集

转载 作者:行者123 更新时间:2023-12-01 19:56:35 27 4
gpt4 key购买 nike

我们使用 Apache Spark 进行处理。我们有几个步骤需要使用collect()将JavaRDD添加到列表中,但我们希望避免这样做,以便对列表进行操作。我们知道我们想要避免这种情况,因为它将一切都带回给驾驶员。最终我们耗尽了内存,因为我们正在处理 500 万到 2 亿条记录。这是我们迄今为止所拥有的示例。

private InputStream createCSVObject(JavaRDD<Object[]> args) {
System.out.println("inside createCSVObject");
try {
StringBuilder value = new StringBuilder(CHUNK_SIZE);

args.collect().forEach(i -> {
value.append(i[0].toString());
for (int j = 1; j < i.length; ++j) {
value.append("," + i[j]);
}
value.append("\n");
});
System.out.println("Out of createCSVObject for loops");
byte[] strBytes = value.toString().getBytes();

InputStream myInputStream = new ByteArrayInputStream(strBytes);
return (myInputStream);
} catch (Exception e) {
System.err.println(String.format("ERROR: FileWriterService - writeFile: %s", e.getMessage()));
return null;
}
}

我在 SO 和谷歌上一遍又一遍地搜索这个问题,但找不到任何明确的结果。有人有什么想法吗???

注意:args.collect() 处的 COLLECT

编辑:

在研究了下面提出的答案后,我们为其设计了一个简单的概念证明,我们得出的结论是每 40 秒进行一次迭代。逻辑并不复杂,为什么这么慢?

        System.out.println("inside createCSVObject");
try {
StringBuilder value = new StringBuilder();
System.out.println("args length " + args.toLocalIterator().next().length);

while (args.toLocalIterator().hasNext()) {
Object[] objects = args.toLocalIterator().next();
System.out.println("Inside iterator");
value.append(objects[0].toString());
for (int j = 1; j < objects.length; ++j) {
value.append("," + objects[j]);
}
value.append("\n");
}

System.out.println("Out of createCSVObject for loops");
byte[] strBytes = value.toString().getBytes();

InputStream myInputStream = new ByteArrayInputStream(strBytes);
return (myInputStream);
} catch (Exception e) {
System.err.println(String.format("ERROR: FileWriterService - writeFile: %s", e.getMessage()));
e.printStackTrace();
return null;
}

最佳答案

您可以使用 JavaRDD.toLocalIterator() 来迭代驱动程序上的整个 RDD,而无需将其全部收集到列表中。相反,它一次将每个分区传送给驱动程序,因此使用的内存不会超过最大分区 ( documentation ) 的大小。

显然,在您给出的示例中,您仍然存在将所有内容收集到一个巨大的字节数组中的问题,这仍然会使用相当多的内存。相反,您可以编写一个自定义 InputStream 类来包装一个 Iterator(由 toLocalIterator 返回),并且一次仅缓冲一个元素,仅当 InputStream.read() 需要更多数据时才在迭代器上调用 next()

关于java - 避免对大型数据集使用收集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59036838/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com