gpt4 book ai didi

java - flink+卡夫卡: getHostnamePort

转载 作者:搜寻专家 更新时间:2023-11-01 03:00:38 25 4
gpt4 key购买 nike

我想从flink读取一个kafka主题

package Toletum.pruebas;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class LeeKafka {
public static void main(String[] args) throws Exception {
final ParameterTool parameterTool = ParameterTool.fromArgs(args);

// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

FlinkKafkaConsumer082<String> kafkaSrc = new FlinkKafkaConsumer082<String>("test02",
new SimpleStringSchema(),
parameterTool.getProperties());

DataStream<String> messageStream = env.addSource(kafkaSrc);

messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;

public String map(String value) throws Exception {
return "Kafka and Flink says: " + value;
}
}).print();

env.execute("LeeKafka");
}

}

此代码成功运行:

java -cp Package.jar Toletum.pruebas.LeeKafka --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup

但是,当我尝试使用 flink 时:

flink run -c Toletum.pruebas.LeeKafka pruebas-0.0.1-SNAPSHOT-jar-with-dependencies.jar --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup

我得到一个错误:

java.lang.NoSuchMethodError: org.apache.flink.util.NetUtils.getHostnamePort(Ljava/lang/String;)Ljava/net/URL;        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:592)        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:280)        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082.(FlinkKafkaConsumer082.java:49)        at Toletum.pruebas.LeeKafka.main(LeeKafka.java:22)        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)        at java.lang.reflect.Method.invoke(Method.java:606)        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)        at org.apache.flink.client.program.Client.runBlocking(Client.java:252)        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676)        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978)        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028)

最佳答案

旧版本库.....

正确的 pom.xml:



<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.10.1</version>
</dependency>

关于java - flink+卡夫卡: getHostnamePort,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35374482/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com