gpt4 book ai didi

java - 使用 Spark 的 MapReduce 调用不同的函数并聚合

转载 作者:行者123 更新时间:2023-11-30 10:07:35 24 4
gpt4 key购买 nike

我对 spark 非常不熟悉,但我很确定有一种好方法可以比我现在做的更快地完成我想做的事情。

本质上,我有一个 S3 存储桶,其中包含大量 JSON 推特数据。我想浏览所有这些文件,从 JSON 中获取文本,对文本进行情绪分析(目前使用 Stanford NLP),然后将 Tweet + Sentiment 上传到数据库(现在我正在使用 dynamo,但这不是成败)

我目前的代码是

        /**
* Per thread:
* 1. Download a file
* 2. Do sentiment on the file -> output Map<String, List<Float>>
* 3. Upload to Dynamo: (a) sentiment (b) number of tweets (c) timestamp
*
*/

List<String> keys = s3Connection.getKeys();

ThreadPoolExecutor threads = new ThreadPoolExecutor(40, 40, 10000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
threads.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

for (String key : keys) {
threads.submit(new Thread(() -> {
try {
S3Object s3Object = s3Connection.getObject(key);
Map<String, List<Float>> listOfTweetsWithSentiment = tweetSentimentService.getTweetsFromJsonFile(s3Object.getObjectContent());
List<AggregatedTweets> aggregatedTweets = tweetSentimentService.createAggregatedTweetsFromMap(listOfTweetsWithSentiment, key);

for (AggregatedTweets aggregatedTweet : aggregatedTweets) {
System.out.println(aggregatedTweet);
tweetDao.putItem(aggregatedTweet);
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}));
}

这有效而且很好。通过在特定日期范围内运行此代码(即 getKeys 仅获取特定日期范围内的 key )并在不同的 EC2 上启动此过程的许多实例,我能够将流程加快到仅约 2 小时,每个实例都作用于不同的日期范围。

但是,必须有一种更快的方法来使用良好的 ole map-reduce 来执行此操作,但我什至不知道如何开始研究它。是否可以在我的 map 中进行情绪分析,然后根据时间戳进行缩减?

此外,我正在考虑使用 AWS Glue,但我没有看到在那里使用 Stanford NLP 库的好方法。

我们将不胜感激任何帮助。

最佳答案

是的,您可以使用 Apache Spark 做到这一点。有很多方法可以设计您的应用程序、配置基础设施等。我提出一个简单的设计:

  1. 您在 AWS 上,因此使用 Spark 创建一个 EMR 集群。包含用于交互式调试的 Zeppelin 会很有用。

  2. Spark 使用多种数据抽象。你的 friend 是 RDD 和数据集(阅读关于它们的文档)。将数据读取到数据集的代码可能是相同的:

    SparkSession ss = SparkSession.builder().getOrCreate();
    Dataset<Row> dataset = ss.read("s3a://your_bucket/your_path");
  3. 现在你有一个 Dataset<Row> .这对于类似 SQL 的操作很有用。对于您的分析,您需要将其转换为 Spark RDD:

    JavaRDD<Tweet> analyticRdd = dataset.toJavaRDD().map(row -> {
    return TweetsFactory.tweetFromRow(row);
    });
  4. 所以,用 analyticRdd你可以做你的分析人员。只是不要忘记让所有处理数据的服务都可序列化。

关于java - 使用 Spark 的 MapReduce 调用不同的函数并聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54175062/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com