gpt4 book ai didi

hadoop - Flink 在 HDFS 上写入产生空文件

转载 作者:可可西里 更新时间:2023-11-01 14:33:12 30 4
gpt4 key购买 nike

我有一个 flink 作业,它使用 TextOutputFormat 将数据写入目标。代码是这样的:

   String basePath = "/Users/me/out";
// String basePath = "hdfs://10.199.200.204:9000/data";
// ensure we have a format for this.
TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath, selection + "/" + uid));
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
format.configure(GlobalConfiguration.getConfiguration());
format.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
// then serialize and write.
String record = serializationFunction.map(value);
log.info("Writing " + record);
format.writeRecord(record);

当使用普通文件系统上的路径作为目标时,这工作得很好。但是,当我将基本路径更改为 hdfs 位置时,它不再按预期工作。发生的情况是,输出文件实际上是在 HDFS 上创建的,但是它的大小为零字节。我在通话期间没有收到任何异常。

我正在使用 Hadoop 2.6.0 和 Flink 0.10.1。使用命令行工具 (hadoop fs -put ...) 将文件复制到 hdfs 是可行的,所以我想我可以排除一些 Hadoop 错误配置。我还启动了 Wireshark 并看到数据正在传输到 Hadoop 服务器,所以我是否需要在实际写入之前以某种方式提交它?

最佳答案

为了将结果刷新到 HDFS,您必须在完成记录写入后调用 TextOutputFormatclose 方法。

// do writing
while (some condition) {
format.writeRecord(record);
}

// finished writing
format.close();

关于hadoop - Flink 在 HDFS 上写入产生空文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34328908/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com