gpt4 book ai didi

java - Flink + Kafka + JSON - Java 示例

转载 作者:塔克拉玛干 更新时间:2023-11-01 21:53:38 24 4
gpt4 key购买 nike

我正在尝试使用以下代码从 Kafka 主题获取 JSON:

public class FlinkMain {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);

DataStream messageStream = env.addSource(
new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
, new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));

messageStream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;

@Override
public String map(String value) throws Exception {
return "Kafka and Flink says: " + value;
}
});

env.execute();
}
}

问题是:

1) 该程序未运行到期

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

The problem is at line: `messageStream.map(....`

2) 上述问题可能与DataStream有关。没有类型。但是如果我尝试做:

DataStream<String> messageStream = env.addSource(...

由于cannot resolve constructor FlinkKafkaConsumer09 ...,代码将无法编译

pom.xml (重要部分):

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.1.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.1.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.1.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.9_2.11</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>

我一直在 Flink 中寻找一些使用 JSON DeserializationSchema 的代码,但没有成功。我刚刚找到了 JSONKeyValueDeserializationSchema 的单元测试在这link

有谁知道正确的做法吗?

谢谢

最佳答案

我遵循了 Vishnu viswanath 的回答,但是 JSONKeyValueDeserializationSchema 在 JSON 解析器步骤中引发了异常,即使对于简单的 JSON 作为 {"name":"John Doe ".

抛出的代码是:

DataStream<ObjectNode> messageStream = env.addSource(
new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
, new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));

messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {
private static final long serialVersionUID = -6867736771747690202L;

@Override
public String map(ObjectNode node) throws Exception {
return "Kafka and Flink says: " + node.get(0);
}
}).print();

输出:

09/05/2016 11:16:02 Job execution switched to status FAILED.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:822)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:790)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2215)
at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:52)
at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:38)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)
at java.lang.Thread.run(Thread.java:745)

我使用另一个反序列化模式 JSONDeserializationSchema 成功了

        DataStream<ObjectNode> messageStream = env.addSource(
new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
, new JSONDeserializationSchema(), parameterTool.getProperties()));

messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {
private static final long serialVersionUID = -6867736771747690202L;

@Override
public String map(ObjectNode value) throws Exception {
return "Kafka and Flink says: " + value.get("key").asText();
}
}).print();

关于java - Flink + Kafka + JSON - Java 示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39300183/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com