gpt4 book ai didi

java - 如何修复Flink 'error: cannot infer type arguments for FlinkKafkaConsumer011<>'

转载 作者:行者123 更新时间:2023-11-30 12:05:28 27 4
gpt4 key购买 nike

我正在关注将 Flink 与 Kafka 结合使用的示例。我只找到像 this page 这样的结果它不能正确编译并给出错误信息,难以查找。

基本上,当我尝试编译这段代码时,我得到了错误:

import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

public final class Main {

public static FlinkKafkaConsumer011<String> createStringConsumerForTopic(
String topic, String kafkaAddress, String kafkaGroup ) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer011<String> consumer =
new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);

return consumer;
}
}

这是我在 build.gradle 文件中的依赖项:

group 'myapp'
version '1.0-SNAPSHOT'

apply plugin: 'java'
sourceCompatibility = 1.8

repositories {
jcenter()
}

dependencies {
ecj 'org.eclipse.jdt.core.compiler:ecj:4.6.1'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.2.0'
compile group: 'org.apache.flink', name: 'flink-java', version: '1.5.0'
compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.5.0'
compile group: 'org.apache.flink', name: 'flink-avro', version: '1.8.0'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.5.0'
compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.11_2.11', version: '1.5.0'

compile group: 'org.apache.kafka', name: 'kafka_2.11', version: '1.1.0'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '1.1.0'

compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
}

这是使用构建工具运行代码时的错误:

$ gradle build
> Task :compileJava FAILED
/Users/john/dev/john/flink-example/src/main/java/com/company/opi/flinkexample/Main.java:55: error: cannot infer type arguments for FlinkKafkaConsumer011<>
new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);
^
Note: /Users/john/dev/john/flink-example/src/main/java/com/company/opi/flinkexample/EnvironmentConfig.java uses unchecked or unsafe operations.
Note: Recompile with -Xlint:unchecked for details.
1 error


FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':compileJava'.
> Compilation failed; see the compiler error output for details.

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.

* Get more help at https://help.gradle.org

BUILD FAILED in 3s
1 actionable task: 1 executed

这是他的 source code 的链接.

最佳答案

一个问题:您使用的所有 flink 库都应该具有相同的版本号——您似乎混合使用了 1.2.0、1.5.0 和 1.8.0 版本。以下是将正确编译的更新依赖项和源代码。

(build.gradle)

group 'myapp'
version '1.0-SNAPSHOT'

apply plugin: 'java'
sourceCompatibility = 1.8

repositories {
jcenter()
}

dependencies {
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.8.0'
compile group: 'org.apache.flink', name: 'flink-java', version: '1.8.0'
compile group: 'org.apache.flink', name: 'flink-clients_2.11', version: '1.8.0'
compile group: 'org.apache.flink', name: 'flink-avro', version: '1.8.0'
compile group: 'org.apache.flink', name: 'flink-core', version: '1.8.0'
compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.12', version: '1.8.0'

compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
}

(workingCode.java)

import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;

public final class Main {

public static FlinkKafkaConsumer<String> createStringConsumerForTopic(
String topic, String kafkaAddress, String kafkaGroup ) {
Properties props = new Properties();
props.setProperty("bootstrap.servers", kafkaAddress);
props.setProperty("group.id",kafkaGroup);
FlinkKafkaConsumer011<String> consumer =
new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(),props);

return consumer;
}
}

此外,但与编译错误无关,因为您使用的是 Kafka 1.1,您最好使用 Flink 的 Kafka 连接器的更新版本,而不是用于 Kafka 0.11 的连接器。 FlinkKafkaConsumer(名称中没有版本号的类)是适用于 Kafka 1.0.0 及更高版本的连接器。

关于java - 如何修复Flink 'error: cannot infer type arguments for FlinkKafkaConsumer011<>',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56279601/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com