gpt4 book ai didi

apache-spark - 如何在 Spark 中处理增量 S3 文件

转载 作者:行者123 更新时间:2023-12-03 07:47:36 24 4
gpt4 key购买 nike

我制作了以下管道:任务管理器 -> SQS -> scraper worker(我的应用程序) -> AWS Firehose -> S3 文件 -> Spark ->(?) Redshift。

我正在尝试解决/改进的一些问题,我很乐意获得指导:

  1. 抓取工具可能会获取重复的数据,并将它们再次刷新到 Firehose,这将导致 Spark 中出现重复。我应该在开始计算之前使用 Distinct 函数在 Spark 中解决这个问题吗?
  2. 我不会删除 S3 处理的文件,因此数据会变得越来越大。这是一个好的做法吗? (以 s3 作为输入数据库)或者我应该处理每个文件并在 Spark 完成后将其删除吗?目前我正在做 sc.textFile("s3n://...../*/*/*") - 它将收集我的所有存储桶文件并运行计算。
  3. 要将结果放入 Redshift(或 s3)-> 如何逐步执行此操作?也就是说,如果 s3 变得越来越大,红移将会有重复的数据......我应该先刷新它吗?怎么办?

最佳答案

我以前遇到过这些问题,尽管不是在单个管道中。这是我所做的。

  1. 删除重复项

    a.我用过BloomFilter删除本地重复项。请注意,该文档相对不完整,但您可以轻松保存/加载/联合/相交布隆过滤器对象。您甚至可以对过滤器执行 reduce 操作。

    b.如果您直接将数据从 Spark 保存到 RedShift,您很可能需要花费一些时间和精力来更新当前批处理的 BloomFilter、广播它,然后进行过滤以确保全局没有重复。之前我在RDS中使用了UNIQUE约束并忽略了错误,但不幸的是RedShift does not honour the constraint .

  2. 以及 3. 数据变得越来越大

我使用 EMR 集群来运行 s3-dist-cp command移动和合并数据(因为通常有很多小日志文件,这会影响 Spark 的性能)。如果您碰巧使用 EMR 来托管 Spark 集群,只需在分析之前添加一个步骤,将数据从一个存储桶移动到另一个存储桶。该步骤将 command-runner.jar 作为自定义 jar,命令如下所示

s3-dist-cp --src=s3://INPUT_BUCKET/ --dest=s3://OUTPUT_BUCKET_AND_PATH/ --groupBy=".*\.2016-08-(..)T.*" --srcPattern=".*\.2016-08.*" --appendToLastFile --deleteOnSuccess

注意原来的distcp不支持合并文件。

通常,您应该尽量避免将已处理和未处理的数据放在同一个存储桶(或至少路径)中。

关于apache-spark - 如何在 Spark 中处理增量 S3 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38374407/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com