gpt4 book ai didi

hadoop - 如何在 Hadoop 中创建从 1..N 开始的迭代器?

转载 作者:可可西里 更新时间:2023-11-01 16:50:38 25 4
gpt4 key购买 nike

我想使用 Hadoop 作为管理网格作业的简单系统。 (我之前使用 SGE 和 pbs/Torque 执行此操作,但我们正在转向 Hadoop。)我有 1000 个 ZIP 文件,每个文件包含 1000 个文件,总共 1M 个文件。我想将它们全部上传到 Amazon S3。理想情况下,我想在不将文件放入 HDFS 的情况下执行此操作。所有文件都可以在 WWW 上访问。

我想做的是:

  1. 有一个从 0..999 开始的迭代器
  2. 对于每个 map 作业,获取迭代器并:
    • 获取 ZIP 文件(大约 500MB,因此它将被写入临时存储)
    • 阅读 ZIP 目录。
    • 提取每个文件并将其上传到 Amazon S3。

我知道如何在 Java 和 Python 中施展 ZIP 文件魔法。我的问题是:如何创建迭代器以便映射器获得数字 0..999?

reducer 的输出将是每次上传所花费的时间。然后我想要第二个映射/减少步骤来生成时间直方图。所以我想正确的做法是将时间和故障代码写入 HDFS(尽管将它们写入 SQL 数据库似乎更有意义)。

我有兴趣在传统的 MapReduce 中(最好是在 Python 中,但如果必须的话,我会在 Java 或 Scala 中进行)和 Spark(为此我需要在 Scala 中进行,对吧?)。尽管我可以看出在 Spark 中这样做并没有真正的优势。

最佳答案

在 Spark 中,您可以简单地并行化范围:

  • python

    n = ...  # Desired parallelism 

    rdd = sc.parallelize(range(1000), n)

    def do_something_for_side_effects(i): ...
    rdd.foreach(do_something_for_side_effects)

    def do_something(i): ...
    rdd.map(do_something).saveAsTextFile(...) # Or another save* method
  • 斯卡拉

    val n: Int = ???  // Desired parallelism 

    val rdd = sc.parallelize(1 until 1000, n)

    def doSomethingForSideEffects(i: Int): Unit = ???
    rdd.foreach(doSomethingForSideEffects)

    def doSomething(i: Int) = ???
    rdd.foreach(doSomething).saveAsTextFile(...) // Or another save* method

关于hadoop - 如何在 Hadoop 中创建从 1..N 开始的迭代器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33601312/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com