gpt4 book ai didi

apache-spark - 如何将JavaInputDStream JSON转换为ElasticSearch JAVA

转载 作者:行者123 更新时间:2023-12-03 00:55:11 25 4
gpt4 key购买 nike

嗨,大家好,我正在与kafka合作> Spark Streaming> Elasticsearch。
但我不让SparkStream将JavaInputDStream JSON传输到elasticsearch。

我的代码:

    SparkConf conf = new SparkConf()
.setAppName("Streaming")
.setMaster("local")
.set("es.nodes","localhost:9200")
.set("es.index.auto.create","true");
JavaStreamingContext streamingContext = new JavaStreamingContext(conf, new Duration(5000));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "exastax");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("loglar");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);

JavaPairDStream<String, String> finisStream = stream.mapToPair(record -> new Tuple2<>("", record.value()));
finisStream.print();
JavaEsSparkStreaming.saveJsonToEs(finisStream,"spark/docs");
streamingContext.start();
streamingContext.awaitTermination();


}

JavaEsSparkStreaming.saveJsonToEs(finisStream,“spark / docs”); >> finisStream无法正常运行,因为它不是JavaDStream。
如何转换JavaDStream的?

最佳答案

JavaEsSparkStreaming.saveJsonToEsJavaDStream一起使用
JavaEsSparkStreaming.saveToEsWithMetaJavaPairDStream一起使用

要修复您的代码:

JavaDStream<String> finisStream = stream.map(new Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> stringStringTuple2) throws Exception {
return stringStringTuple2._2();
}
});

JavaEsSparkStreaming.saveJsonToEs(finisStream,"");

关于apache-spark - 如何将JavaInputDStream JSON转换为ElasticSearch JAVA,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45507324/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com