gpt4 book ai didi

java - Apache Flink Channel 在完成当前部分记录之前收到一个事件

转载 作者:行者123 更新时间:2023-11-29 03:09:28 25 4
gpt4 key购买 nike

我使用 flink(java,maven 版本 8.1)从磁盘读取了一个 csv 文件(http://data.gdeltproject.org/events/index.html)并得到以下异常:

ERROR operators.DataSinkTask: Error in user code: Channel received an event before completing the current partial record.:  DataSink(Print to System.out) (4/4)
java.lang.IllegalStateException: Channel received an event before completing the current partial record.
at org.apache.flink.runtime.io.network.channels.InputChannel.readRecord(InputChannel.java:158)
at org.apache.flink.runtime.io.network.gates.InputGate.readRecord(InputGate.java:176)
at org.apache.flink.runtime.io.network.api.MutableRecordReader.next(MutableRecordReader.java:51)
at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:53)
at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:170)
at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
at java.lang.Thread.run(Thread.java:745)

我的代码:

public static void main(String[] args) {
// set up execution environment
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//env.setDegreeOfParallelism(1);
// get input points
DataSet<GeoTimeDataTupel> points = getPointDataSet(env);
points.print();
// execute program
try {
env.execute("KMeans Flink");
} catch (Exception e) {
e.printStackTrace();
}
}
private static DataSet<GeoTimeDataTupel> getPointDataSet(ExecutionEnvironment env) {
// load properties
Properties pro = new Properties();
try {
pro.load(new FileInputStream("./resources/config.properties"));
} catch (Exception e) {
e.printStackTrace();
}
String inputFile = pro.getProperty("input");
// map csv file
return env.readCsvFile(inputFile)
.ignoreInvalidLines()
.fieldDelimiter('\u0009')
.lineDelimiter("\n")
.includeFields(true, true, false, false, false, false, false, false, false, false, false
, false, false, false, false, false, false, false, false, false, false
, false, false, false, false, false, false, false, false, false, false
, false, false, false, false, false, false, false, false, true, true
, false, false, false, false, false, false, false, false, false, false
, false, false, false, false, false, false, false)
.types(String.class, Long.class, Double.class, Double.class)
.map(new TuplePointConverter());
}

也许有人有解决方案?

最好的问候保罗

最佳答案

我在这里发布来自 Apache Flink 邮件列表的答案,因此人们不必通读邮件列表存档:

错误原因是使用了自定义的序列化逻辑,反序列化函数出错,没有消费完所有数据。

最新的 master 对此改进了错误消息。

作为背景:

Flink 支持两种允许程序员实现自己的序列化例程的类型接口(interface):Writables(Hadoop 的核心类型接口(interface))和 Values(Flink 自己的自定义序列化接口(interface))。

关于java - Apache Flink Channel 在完成当前部分记录之前收到一个事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30213321/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com