gpt4 book ai didi

java - JavaRDD 转换为 JavaRDD 时出现问题

转载 作者:行者123 更新时间:2023-12-02 09:35:41 25 4
gpt4 key购买 nike

我正在尝试将推文从 twitter 保存到 MongoDb 数据库。

我有RDD<Status>我正在尝试借助 ObjectMapper 将其转换为 JSON 格式。但是此转换存在一些问题(

public class Main {


//set system credentials for access to twitter
private static void setTwitterOAuth() {
System.setProperty("twitter4j.oauth.consumerKey", TwitterCredentials.consumerKey);
System.setProperty("twitter4j.oauth.consumerSecret", TwitterCredentials.consumerSecret);
System.setProperty("twitter4j.oauth.accessToken", TwitterCredentials.accessToken);
System.setProperty("twitter4j.oauth.accessTokenSecret", TwitterCredentials.accessTokenSecret);
}


public static void main(String [] args) {

setTwitterOAuth();

SparkConf conf = new SparkConf().setMaster("local[2]")
.setAppName("SparkTwitter");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
JavaStreamingContext jssc = new JavaStreamingContext(sparkContext, new Duration(1000));
JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);

//Stream that contains just tweets in english
JavaDStream<Status> enTweetsDStream=twitterStream.filter((status) -> "en".equalsIgnoreCase(status.getLang()));
enTweetsDStream.persist(StorageLevel.MEMORY_AND_DISK());


enTweetsDStream.print();
jssc.start();
jssc.awaitTermination();
}

static void saveRawTweetsToMondoDb(JavaRDD<Status> rdd,JavaSparkContext sparkContext) {
try {
ObjectMapper objectMapper = new ObjectMapper();
SQLContext sqlContext = new SQLContext(sparkContext);
JavaRDD<String> tweet = rdd.map(status -> objectMapper.writeValueAsString(status));

DataFrame dataFrame = sqlContext.read().json(tweet);

Map<String, String> writeOverrides = new HashMap<>();
writeOverrides.put("uri", "mongodb://127.0.0.1/forensicdb.LiveRawTweets");
WriteConfig writeConfig = WriteConfig.create(sparkContext).withJavaOptions(writeOverrides);
MongoSpark.write(dataFrame).option("collection", "LiveRawTweets").mode("append").save();

} catch (Exception e) {
System.out.println("Error saving to database");
}
}

JavaRDD<String> tweet = rdd.map(status -> objectMapper.writeValueAsString(status));

这里有一个问题。需要不兼容的类型 JavaRDD<String>但 map 被推断为 javaRDD<R>

最佳答案

不幸的是,Java 类型推断并不总是 super 智能,因此在这些情况下我所做的就是提取 lambda 的所有位作为变量,直到找到 Java 无法为其提供准确类型的类型。然后我给表达式赋予我认为它应该具有的类型,并看看为什么 Java 会提示它。有时这只是编译器中的限制,您必须显式地将表达式“转换”为所需的类型,有时您会发现代码存在问题。就你而言,代码对我来说没问题,所以一定还有其他东西。

但是我有一个评论:这里您需要支付一次 JSON 序列化(从 Status 到 JSON 字符串)然后反序列化(从 JSON 字符串到 Row)的成本。另外,您没有向Dataset提供任何架构,因此它必须对数据(或根据您的配置进行样本)进行两次传递才能推断架构。如果数据很大,所有这些都可能非常昂贵。如果性能是一个问题并且 Status 相对简单,我建议您直接编写从 StatusRow 的转换。

另一个“顺便说一下”:您正在隐式序列化您的ObjectMapper,您很可能不想这样做。该类似乎确实支持 Java 序列化,但带有 special logic 。由于 Spark 的默认配置是使用 Kryo(它的性能比 Java 序列化好得多),我怀疑它在使用默认的 FieldSerializer 时是否会做正确的事情。您有三个选择:

  • 使对象映射器静态以避免序列化
  • 将您的 Kryo 注册器配置为使用 Java 序列化来序列化/反序列化 ObjectMapper 类型的对象。这会起作用,但不值得付出努力。
  • 到处使用Java序列化而不是Kryo。馊主意!它很慢并且占用大量空间(内存和磁盘取决于序列化对象的写入位置)。

关于java - JavaRDD<Status> 转换为 JavaRDD<String> 时出现问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57538359/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com