gpt4 book ai didi

java - Apache Flink连接elasticsearch的问题

转载 作者:行者123 更新时间:2023-12-01 21:38:56 25 4
gpt4 key购买 nike

我在 Flink 站点内使用了一段代码将 Apache Flink 连接到 Elastic Search。我想通过 Maven 项目从 NetBeans 软件运行这段代码。

public class FlinkElasticCon {

public static void main(String[] args) throws Exception {

final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
port = params.getInt("port");
} catch (Exception e) {
System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
return;
}

// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");

// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts = text

.flatMap((String value, Collector<WordWithCount> out) -> {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
})

.keyBy("word")
.timeWindow(Time.seconds(5))

.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});

// print the results with a single thread, rather than in parallel
//windowCounts.print().setParallelism(1);

env.execute("Socket Window WordCount");


List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));

ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);

return Requests
.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}

@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);


windowCounts.addSink((SinkFunction<WordWithCount>) esSinkBuilder);

}

public static class WordWithCount {

public String word;
public long count;

public WordWithCount() {}

public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}

@Override
public String toString() {
return word + " : " + count;
}
}
}

添加Dependency时,没有识别elasticsearchsink类。鉴于我给它添加了不同的依赖,但问题仍然没有解决。导入时:

import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink

红线在代码中被创建为未知。

我的pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-
instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-
4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.flink</groupId>
<artifactId>mavenproject1</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.8.1</version>
<scope>provided</scope>
<type>jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch-base_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.0.0-alpha1</version>
<!--<version>6.0.0-alpha1</version>-->
<type>jar</type>
</dependency>




<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.8.1</version>
<type>jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.8.1</version>
</dependency>
</dependencies>



<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>

</properties>

Apache Flink 版本:1.8.1 Elasticsearch 版本:7.4.2网 bean 版本:8.2java版本:8

请帮助我。

最佳答案

Flink Elasticsearch Connector 7

请找到我提供的有效且详细的答案 here

关于java - Apache Flink连接elasticsearch的问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58796555/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com