gpt4 book ai didi

java - 如何将时间戳附加到 rdd 并推送到 elasticsearch

转载 作者:塔克拉玛干 更新时间:2023-11-02 19:49:06 27 4
gpt4 key购买 nike

我是 spark streaming 和 elasticsearch 的新手,我正在尝试使用 spark 从 kafka 主题读取数据并将数据存储为 rdd。在 rdd 中我想附加时间戳,一旦新数据到来然后推送到 elasticsearch。

lines.foreachRDD(rdd -> {
if(!rdd.isEmpty()){
// rdd.collect().forEach(System.out::println);
String timeStamp = new
SimpleDateFormat("yyyy::MM::dd::HH::mm::ss").format(new Date());
List<String> myList = new ArrayList<String>(Arrays.asList(timeStamp.split("\\s+")));
List<String> f = rdd.collect();


Map<List<String>, ?> rddMaps = ImmutableMap.of(f, 1);
Map<List<String>, ?> myListrdd = ImmutableMap.of(myList, 1);

JavaRDD<Map<List<String>, ?>> javaRDD = sc.parallelize(ImmutableList.of(rddMaps));

JavaEsSpark.saveToEs(javaRDD, "sample/docs");
}
});

最佳答案

Spark ?

据我了解,spark streaming 用于实时流数据计算,如 mapreducejoin窗口。好像没必要用这么强大的工具,我们需要的只是给事件加个时间戳。

Logstash?

如果是这种情况,Logstash 可能更适合我们的情况。 kafka and ES

Logstash 会在事件到来时记录时间戳,它还有persistent queue。和 Dead Letter Queues确保数据弹性。原生支持向ES推送数据(毕竟属于系列产品),推送数据非常方便。

output {
elasticsearch {
hosts => ["localhost:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
}
}

更多

  • 有关 Logstash 的更多信息,here是介绍。
  • here是一个示例 logstash 配置文件。

希望对您有所帮助。

引用

关于java - 如何将时间戳附加到 rdd 并推送到 elasticsearch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46945441/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com