gpt4 book ai didi

apache-flink - 在 Apache Flink 中从 HDFS 地址流式传输文件

转载 作者:行者123 更新时间:2023-12-04 04:13:54 26 4
gpt4 key购买 nike

在我的 Flink 代码中,我正在流式传输位于 HDFS 文件夹上的文件,我收到错误“(没有此类文件或目录)”,但是我确信文件名和地址是正确的,因为我使用了相同的文件名和地址在批处理方法中,一切都很顺利。有谁知道可能是什么问题?这是我的代码:

DataStream<FebrlObject> myStream = 
env.addSource(new MyObjectGenerator("hdfs://../Data/Dataset1.csv"));

及其相关类:

public class MyObjectGenerator implements SourceFunction<MyObject> {

private String dataFilePath;
private float servingSpeedFactor;
private Integer rowNo ;
private transient BufferedReader reader;
private transient InputStream inputStream;

public MyObjectGenerator(String dataFilePath) {
this(dataFilePath, 1.0f);
}

public MyObjectGenerator(String dataFilePath, float servingSpeedFactor) {
this.dataFilePath = dataFilePath;
this.servingSpeedFactor = servingSpeedFactor;
rowNo = 0 ;
}

@Override
public void run(SourceContext<MyObject> sourceContext) throws Exception {
long servingStartTime = Calendar.getInstance().getTimeInMillis();
inputStream = new DataInputStream(new FileInputStream(dataFilePath));
reader = new BufferedReader(new InputStreamReader(inputStream));
String line;
long dataStartTime;
rowNo++;
if (reader.ready() && (line = reader.readLine()) != null ) {
MyObject myObject = MyObject.fromString(line);
if (febrlObject!= null )
sourceContext.collect(myObject);
} else {
return;
}
while (reader.ready() && (line = reader.readLine()) != null) {
MyObject myObject = MyObject.fromString(line);
sourceContext.collect( febrlObject );
}
this.reader.close();
this.reader = null;
this.inputStream.close();
this.inputStream = null;
}

@Override
public void cancel() {
try {
if (this.reader != null) {
this.reader.close();
}
if( this.inputStream != null) {
this.inputStream.close();
}
} catch (IOException ioe) {
//
} finally {
this.reader = null;
this.inputStream = null;
}
}
}

最佳答案

您尝试使用 Java 的常规 FileInputStream 访问 HDFS 中的文件。 FileInputStream can only access the local file system. It does not know anything about talking to HDFS. You need to use the HDFS client to read files from HDFS. See Flink's以 FileInputFormat` 为例。

但是,如果可能的话,我会尽量避免自己实现这一点。您可以尝试使用Flink的FileInputFormat逐行读取文件(返回 DataStream<String> )和解析该行的连续(平面)映射器。

关于apache-flink - 在 Apache Flink 中从 HDFS 地址流式传输文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37416179/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com