gpt4 book ai didi

java - 将 Spark Streaming 输出写入 HDFS 时跳过数据

转载 作者:行者123 更新时间:2023-12-01 11:04:39 26 4
gpt4 key购买 nike

我每 10 秒运行一个 Spark Streaming 应用程序,它的工作是使用来自 kafka 的数据,将其转换并根据 key 将其存储到 HDFS 中。即每个唯一键一个文件。我使用 Hadoop 的 saveAsHadoopFile() API 来存储输出,我看到为每个唯一键生成了一个文件,但问题是每个唯一键只存储一行,尽管 DStream 有更多行相同的 key 。

例如,考虑以下具有唯一键的 DStream,

  key                  value
===== =====================================
Key_1 183.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0
Key_1 184.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0
Key_1 181.33 70.0 2.12 1.0 1.0 1.0 11.0 4.0
Key_1 185.33 70.0 1.12 1.0 1.0 1.0 11.0 4.0
Key_1 185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0

我看到 HDFS 文件中只存储了一行(而不是 5 行),

185.33 70.0 0.12 1.0 1.0 1.0 11.0 4.0

以下代码用于将输出存储到HDFS中,

dStream.foreachRDD(new Function<JavaPairRDD<String, String>, Void> () {
@Override
public Void call(JavaPairRDD<String, String> pairRDD) throws Exception {
long timestamp = System.currentTimeMillis();
int randomInt = random.nextInt();
pairRDD.saveAsHadoopFile("hdfs://localhost:9000/application-" + timestamp +"-"+ randomInt, String.class, String.class, RDDMultipleTextOutputFormat.class);
}
});

其中RDDMultipleTextOutputFormat的实现如下,

public class RDDMultipleTextOutputFormat<K,V> extends    MultipleTextOutputFormat<K,V> {

public K generateActualKey(K key, V value) {
return null;
}

public String generateFileNameForKeyValue(K key, V value, String name) {
return key.toString();
}
}

如果我遗漏了什么,请告诉我?感谢您的帮助。

最佳答案

因为键是相同的,所以值每次都会被替换,因此您将获得提供给 hadoop 的最后一个值。

关于java - 将 Spark Streaming 输出写入 HDFS 时跳过数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33076506/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com