gpt4 book ai didi

python - 如何使用来自 zcat 的多个标准输入的子进程

转载 作者:太空宇宙 更新时间:2023-11-03 15:13:40 27 4
gpt4 key购买 nike

我想使用子进程将以下 shell 命令转换为 python 代码。特别是,如何将多个 <(zcat ...) 转换为标准输入?理想情况下,代码应使用 subprocess.call,但 subprocess.Popen 也可以。

rsem-calculate-expression \
-p 2 \
--some-other-option \
--paired-end \
<(zcat a_1.fastq.gz b_1.fastq.gz c_1.fastq.gz) \
<(zcat b_2.fastq.gz b_2.fastq.gz c_2.fastq.gz) \
~/index/hg19 \
output_dir \
1>stdin.log \
2>stderr.log

最佳答案

另一种运行 bash 命令的方法是使用 shell=True:

from subprocess import check_call

check_call('rsem-calculate-expression -p 2 --some-other-option '
'--paired-end '
'<(zcat a_1.fastq.gz b_1.fastq.gz c_1.fastq.gz) '
'<(zcat b_2.fastq.gz b_2.fastq.gz c_2.fastq.gz) '
'~/index/hg19 '
'output_dir '
'1>stdin.log '
'2>stderr.log', shell=True, executable='/bin/bash')

为了比较,下面是不运行 shell 的方法:

#!/usr/bin/env python3
import os
import shlex
from contextlib import ExitStack # $ pip install contextlib2 on Python 2
from shutil import rmtree
from subprocess import Popen
from tempfile import mkdtemp

# convert command-line into a list (program arguments)
args = 'rsem-calculate-expression -p2 --some-other-option --paired-end'.split()
zcat_args = [shlex.split('zcat a_1.fastq.gz b_1.fastq.gz c_1.fastq.gz'),
shlex.split('zcat b_2.fastq.gz b_2.fastq.gz c_2.fastq.gz')]
npipes = len(zcat_args)
with ExitStack() as stack:
# create named pipes
pipenames = []
dirname = mkdtemp() # create temporary directory for named pipes
stack.callback(rmtree, dirname) # clean up at the end
for i in range(npipes):
pipename = os.path.join(dirname, 'zcat_named_pipe' + str(i))
os.mkfifo(pipename)
args.append(pipename) # append to the program args
pipenames.append(pipename)

# run rsem-calculate-expression that reads from the named pipes
args.append(os.path.expanduser('~/index/hg19'))
args.append('output_dir')
with open('stdout.log', 'wb', 0) as f1, open('stderr.log', 'wb', 0) as f2:
rsem = Popen(args, stdout=f1, stderr=f2)
stack.callback(rsem.wait)

# run zcat commands to populate the named pipes
for pipename, cmd in zip(pipenames, zcat_args):
if rsem.poll() is None: # it is still running and reading from pipes
with open(pipename, 'wb', 0) as pipe: # may block if no readers
stack.callback(Popen(cmd, stdout=pipe).wait)
exit(rsem.returncode != 0)

关于python - 如何使用来自 zcat 的多个标准输入的子进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23522539/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com