gpt4 book ai didi

python - 访问来自MRjob的hdfs的流输出

转载 作者:行者123 更新时间:2023-12-02 18:06:12 25 4
gpt4 key购买 nike

我正在尝试使用Python驱动程序来运行迭代的MRjob程序。退出标准取决于计数器。

工作本身似乎正在运行。如果从命令行运行单个迭代,则可以hadoop fs -cat /user/myname/myhdfsdir/part-00000并查看单个迭代的预期结果。

但是,我需要使用Python驱动程序来运行代码并从runner访问计数器。这是因为它是一种迭代算法,需要计数器的值来确定退出标准。

OUTPUT_PATH = /user/myname/myhdfsdir
!hadoop fs -rm -r {OUTPUT_PATH}

from my_custom_MRjob import my_custom_MRjob

mr_job = my_custom_MRjob(args=["localDir/localTextFile.txt",
"-r", "hadoop",
"--output-dir=hdfs://"+OUTPUT_PATH,
"--no-output"])

while True:
with mr_job.make_runner() as runner:
print runner.get_opts()
runner.run()
with open('localDir/localTextFile.txt', 'w') as f:
for line in runner.stream_output():
key,value = mr_job.parse_output_line(line)
#
f.write(key +'\t'+ value +'\n')
print "End of MRjob iteration. Counters: {}".format(runner.counters())
# read a particular counter
# use counter value to evaluate exit criteria
if exit_criteria_met:
break

这将产生以下错误:
IOErrorTraceback (most recent call last)
<ipython-input-136-aded8ecaa727> in <module>()
25 runner.run()
26 with open('localDir/localTextFile.txt', 'w') as f:
---> 27 for line in runner.stream_output():
28 key,value = mr_job.parse_output_line(line)
29 #

/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/util.pyc in _to_lines(chunks)
391 leftovers = []
392
--> 393 for chunk in chunks:
394 # special case for b'' standing for EOF
395 if chunk == b'':

/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/runner.pyc in cat_output(self)
555 yield b'' # EOF of previous file
556
--> 557 for chunk in self.fs._cat_file(filename):
558 yield chunk
559

/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/fs/composite.pyc in _cat_file(self, path)
75
76 def _cat_file(self, path):
---> 77 for line in self._do_action('_cat_file', path):
78 yield line
79

/home/myname/.conda/envs/py27/lib/python2.7/site-packages/mrjob/fs/hadoop.pyc in _cat_file(self, filename)
272
273 if returncode != 0:
--> 274 raise IOError("Could not stream %s" % filename)
275
276 def mkdir(self, path):

IOError: Could not stream hdfs://hdfs:/user/myname/myhdfsdir/part-00000

尤其令人困惑和沮丧的是: hdfs://hdfs:/user/myname/myhdfsdir/part-00000。请注意,URL中存在两种 hdfs方案,但在hdfs的第二个实例中只有一个正斜杠。我尝试在mrjob args中添加和删除文字 hdfs://: "--output-dir=hdfs://"+OUTPUT_PATH。在两种情况下,我都会得到相同的错误签名。

如果我以“本地”模式而不是Hadoop运行驱动程序,那么我不会有问题,但显而易见的是,我无法访问Hadoop引擎。这很好用:
mr_job = my_custom_MRjob(args=["localDir/localTextFile.txt"])

我需要始终从本地文件系统(即使在Hadoop模式下)读取初始输入文件。然后运行MRjob迭代,其输出将覆盖本地文件系统输入文件。然后从运行者访问计数器并评估退出标准。如果不符合退出条件,请使用本地文件系统的输入再次运行该作业,这次使用从上一次运行更新的本地输入文件。

最佳答案

只要您的路径包含hdfs:/,您就不会成功,因为那将永远无效。
在您提到的评论中,您尝试手动添加hdfs://,这可能是一个不错的技巧,但是在您的代码中,我看不到您“清理”了错误的hdfs:/。因此,即使添加正确的前缀,下一行也将是错误的前缀,并且代码仍然没有成功的机会。
因此,请清理干净。

实用说明:这个问题是从前不久开始的,如果软件本身存在问题,现在可以解决。如果问题仍然存在,则可能是您尝试使用的代码有些奇怪。也许从可靠来源的一个简单例子开始,以排除这种可能性。

关于python - 访问来自MRjob的hdfs的流输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49472471/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com