gpt4 book ai didi

python - 具有实时输入和多个控制台的Python子进程

转载 作者:行者123 更新时间:2023-12-03 03:34:54 25 4
gpt4 key购买 nike

主要问题

简而言之:我需要两个用于控制台的控制台。一种用于 Activity 用户输入。另一个用于纯日志输出。 (包括接受的答案的工作代码在问题的文本中的“Edit-3”部分下。并且在“Edit-1”和“Edit-2”部分下是可行的解决方法。)

为此,我有一个主命令行Python脚本,该脚本应该打开一个附加控制台,仅用于日志输出。为此,我打算将日志输出重定向到第二个控制台的stdin,该日志输出将显示在主脚本的控制台上,该控制台作为子进程启动。 (我使用子进程,因为我没有找到其他方法来打开第二个控制台。)

问题是,看来我能够发送到第二个控制台的标准输入-但是,第二个控制台上没有任何内容。

以下是我用于实验的代码(在Windows 10下的PyDev上使用Python 3.4)。函数writing(input, pipe, process)包含以下部分:通过子进程打开的控制台,将生成的字符串复制到通过stdin传递为pipe的部分。函数writeing(...)通过类writetest(Thread)运行。 (我留下了一些代码,我将其注释掉了。)

import os
import sys
import io
import time
import threading
from cmd import Cmd
from queue import Queue
from subprocess import Popen, PIPE, CREATE_NEW_CONSOLE


REPETITIONS = 3


# Position of "The class" (Edit-2)


# Position of "The class" (Edit-1)


class generatetest(threading.Thread):

def __init__(self, queue):
self.output = queue
threading.Thread.__init__(self)

def run(self):
print('run generatetest')
generating(REPETITIONS, self.output)
print('generatetest done')

def getout(self):
return self.output


class writetest(threading.Thread):

def __init__(self, input=None, pipe=None, process=None):
if (input == None): # just in case
self.input = Queue()
else:
self.input = input

if (pipe == None): # just in case
self.pipe = PIPE
else:
self.pipe = pipe

if (process == None): # just in case
self.process = subprocess.Popen('C:\Windows\System32\cmd.exe', universal_newlines=True, creationflags=CREATE_NEW_CONSOLE)
else:
self.process = proc

threading.Thread.__init__(self)

def run(self):
print('run writetest')
writing(self.input, self.pipe, self.process)
print('writetest done')


# Position of "The function" (Edit-2)


# Position of "The function" (Edit-1)


def generating(maxint, outline):
print('def generating')
for i in range(maxint):
time.sleep(1)
outline.put_nowait(i)


def writing(input, pipe, process):
print('def writing')
while(True):
try:
print('try')
string = str(input.get(True, REPETITIONS)) + "\n"
pipe = io.StringIO(string)
pipe.flush()
time.sleep(1)
# print(pipe.readline())
except:
print('except')
break
finally:
print('finally')
pass


data_queue = Queue()
data_pipe = sys.stdin
# printer = sys.stdout
# data_pipe = os.pipe()[1]


# The code of 'C:\\Users\\Public\\Documents\\test\\test-cmd.py'
# can be found in the question's text further below under "More code"


exe = 'C:\Python34\python.exe'
# exe = 'C:\Windows\System32\cmd.exe'
arg = 'C:\\Users\\Public\\Documents\\test\\test-cmd.py'
arguments = [exe, arg]
# proc = Popen(arguments, universal_newlines=True, creationflags=CREATE_NEW_CONSOLE)
proc = Popen(arguments, stdin=data_pipe, stdout=PIPE, stderr=PIPE,
universal_newlines=True, creationflags=CREATE_NEW_CONSOLE)


# Position of "The call" (Edit-2 & Edit-1) - file init (proxyfile)


# Position of "The call" (Edit-2) - thread = sockettest()
# Position of "The call" (Edit-1) - thread0 = logtest()
thread1 = generatetest(data_queue)
thread2 = writetest(data_queue, data_pipe, proc)
# time.sleep(5)


# Position of "The call" (Edit-2) - thread.start()
# Position of "The call" (Edit-1) - thread0.start()
thread1.start()
thread2.start()


# Position of "The call" (Edit-2) - thread.join()
# Position of "The call" (Edit-1) - thread.join()
thread1.join(REPETITIONS * REPETITIONS)
thread2.join(REPETITIONS * REPETITIONS)

# data_queue.join()
# receiver = proc.communicate(stdin, 5)
# print('OUT:' + receiver[0])
# print('ERR:' + receiver[1])

print("1st part finished")

略有不同的方法

以下附加代码段适用于从子流程中提取标准输出。但是,先前发送的stdin仍然无法在第二个控制台上打印。另外,第二个控制台将立即关闭。
proc2 = Popen(['C:\Python34\python.exe', '-i'],
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
creationflags=CREATE_NEW_CONSOLE)
proc2.stdin.write(b'2+2\n')
proc2.stdin.flush()
print(proc2.stdout.readline())
proc2.stdin.write(b'len("foobar")\n')
proc2.stdin.flush()
print(proc2.stdout.readline())
time.sleep(1)
proc2.stdin.close()
proc2.terminate()
proc2.wait(timeout=0.2)

print("Exiting Main Thread")

更多信息

一旦我使用参数 stdin=data_pipe, stdout=PIPE, stderr=PIPE之一启动子进程,生成的第二个控制台将不处于 Activity 状态,并且不接受键盘输入(虽然这可能是有用的信息,但不希望这样做)。

子进程方法 communicate()不能用于此目的,因为它等待进程结束。

更多代码

最后是文件的代码,它是第二个控制台的代码。

C:\ Users \ Public \ Documents \ test \ test-cmd.py
from cmd import Cmd
from time import sleep
from datetime import datetime

INTRO = 'command line'
PROMPT = '> '


class CommandLine(Cmd):
"""Custom console"""

def __init__(self, intro=INTRO, prompt=PROMPT):
Cmd.__init__(self)
self.intro = intro
self.prompt = prompt
self.doc_header = intro
self.running = False

def do_dummy(self, args):
"""Runs a dummy method."""
print("Do the dummy.")
self.running = True
while(self.running == True):
print(datetime.now())
sleep(5)

def do_stop(self, args):
"""Stops the dummy method."""
print("Stop the dummy, if you can.")
self.running = False

def do_exit(self, args):
"""Exits this console."""
print("Do console exit.")
exit()

if __name__ == '__main__':
cl = CommandLine()
cl.prompt = PROMPT
cl.cmdloop(INTRO)

思想

到目前为止,我什至不确定Windows命令行界面是否可以接受键盘输入以外的其他输入(而不是所需的stdin管道或类似输入)。不过,我希望它具有某种被动模式。

为什么这不起作用?

编辑1:通过文件的解决方法(概念证明)

正如 Working multiple consoles in python的答案所建议的那样,使用文件作为变通办法以显示其新内容,通常可以正常工作。但是,由于日志文件将增长到许多GB,因此在这种情况下,这不是实际的解决方案。至少需要文件分割和适当的处理。

类(class):
class logtest(threading.Thread):

def __init__(self, file):
self.file = file
threading.Thread.__init__(self)

def run(self):
print('run logtest')
logging(self.file)
print('logtest done')

功能:
def logging(file):
pexe = 'C:\Python34\python.exe '
script = 'C:\\Users\\Public\\Documents\\test\\test-004.py'
filek = '--file'
filev = file

file = open(file, 'a')
file.close()
time.sleep(1)

print('LOG START (outer): ' + script + ' ' + filek + ' ' + filev)
proc = Popen([pexe, script, filek, filev], universal_newlines=True, creationflags=CREATE_NEW_CONSOLE)
print('LOG FINISH (outer): ' + script + ' ' + filek + ' ' + filev)

time.sleep(2)

电话:
# The file tempdata is filled with several strings of "0\n1\n2\n"
# Looking like this:
# 0
# 1
# 2
# 0
# 1
# 2

proxyfile = 'C:\\Users\\Public\\Documents\\test\\tempdata'
f = open(proxyfile, 'a')
f.close()
time.sleep(1)

thread0 = logtest(proxyfile)
thread0.start()
thread0.join(REPETITIONS * REPETITIONS)

尾脚本(“test-004.py”):

由于Windows不提供tail命令,因此我改用了以下脚本(基于 How to implement a pythonic equivalent of tail -F?的答案),该脚本可用于此目的。额外但又不必要的一种 class CommandLine(Cmd)最初是试图使第二个控制台保持打开状态(因为缺少脚本文件参数)。但是,它也证明自己对保持控制台流畅地打印新日志文件内容很有用。否则,输出是不确定的/不可预测的。
import time
import sys
import os
import threading
from cmd import Cmd
from argparse import ArgumentParser


def main(args):
parser = ArgumentParser(description="Parse arguments.")
parser.add_argument("-f", "--file", type=str, default='', required=False)
arguments = parser.parse_args(args)

if not arguments.file:
print('LOG PRE-START (inner): file argument not found. Creating new default entry.')
arguments.file = 'C:\\Users\\Public\\Documents\\test\\tempdata'

print('LOG START (inner): ' + os.path.abspath(os.path.dirname(__file__)) + ' ' + arguments.file)

f = open(arguments.file, 'a')
f.close()
time.sleep(1)

words = ['word']
console = CommandLine(arguments.file, words)
console.prompt = ''

thread = threading.Thread(target=console.cmdloop, args=('', ))
thread.start()
print("\n")

for hit_word, hit_sentence in console.watch():
print("Found %r in line: %r" % (hit_word, hit_sentence))

print('LOG FINISH (inner): ' + os.path.abspath(os.path.dirname(__file__)) + ' ' + arguments.file)


class CommandLine(Cmd):
"""Custom console"""

def __init__(self, fn, words):
Cmd.__init__(self)
self.fn = fn
self.words = words

def watch(self):
fp = open(self.fn, 'r')
while True:
time.sleep(0.05)
new = fp.readline()
print(new)
# Once all lines are read this just returns ''
# until the file changes and a new line appears

if new:
for word in self.words:
if word in new:
yield (word, new)

else:
time.sleep(0.5)


if __name__ == '__main__':
print('LOG START (inner - as main).')
main(sys.argv[1:])

编辑1:更多想法

我没有尝试过并且可能可行的三种解决方法是套接字(也在此答案 Working multiple consoles in python中建议),通过进程ID获取进程对象以进行更多控制,以及使用ctypes库直接访问Windows控制台API,从而允许设置屏幕缓冲区,因为控制台可以有多个缓冲区,但是只有一个 Activity 缓冲区(在 CreateConsoleScreenBuffer function的文档注释中指出)。

但是,使用套接字可能是最简单的一种。至少日志的大小与这种方式无关紧要。不过,连接问题可能是这里的问题。

编辑2:通过套接字的解决方法(概念验证)

使用套接字作为解决方法以便显示新的日志实体,正如 Working multiple consoles in python的答案中所建议的那样,通常也可以正常工作。但是,对于某些事情来说似乎太费力了,应该将其简单地发送到接收控制台的过程中。

类(class):
class sockettest(threading.Thread):

def __init__(self, host, port, file):
self.host = host
self.port = port
self.file = file
threading.Thread.__init__(self)

def run(self):
print('run sockettest')
socketing(self.host, self.port, self.file)
print('sockettest done')

功能:
def socketing(host, port, file):
pexe = 'C:\Python34\python.exe '
script = 'C:\\Users\\Public\\Documents\\test\test-005.py'
hostk = '--address'
hostv = str(host)
portk = '--port'
portv = str(port)
filek = '--file'
filev = file

file = open(file, 'a')
file.close()
time.sleep(1)

print('HOST START (outer): ' + pexe + script + ' ' + hostk + ' ' + hostv + ' ' + portk + ' ' + portv + ' ' + filek + ' ' + filev)
proc = Popen([pexe, script, hostk, hostv, portk, portv, filek, filev], universal_newlines=True, creationflags=CREATE_NEW_CONSOLE)

print('HOST FINISH (outer): ' + pexe + script + ' ' + hostk + ' ' + hostv + ' ' + portk + ' ' + portv + ' ' + filek + ' ' + filev)

time.sleep(2)

电话:
# The file tempdata is filled with several strings of "0\n1\n2\n"
# Looking like this:
# 0
# 1
# 2
# 0
# 1
# 2

proxyfile = 'C:\\Users\\Public\\Documents\\test\\tempdata'
f = open(proxyfile, 'a')
f.close()
time.sleep(1)

thread = sockettest('127.0.0.1', 8888, proxyfile)
thread.start()
thread.join(REPETITIONS * REPETITIONS)

套接字脚本(“test-005.py”):

以下脚本基于 Python: Socket programming server-client application using threads。在这里,我只是保留 class CommandLine(Cmd)作为日志条目生成器。在这一点上,将客户端放入主脚本中(这将调用第二个控制台,然后使用真正的日志实体而不是(新)文件行来馈入队列)应该不是问题。 (服务器是打印机。)
import socket
import sys
import threading
import time
from cmd import Cmd
from argparse import ArgumentParser
from queue import Queue

BUFFER_SIZE = 5120

class CommandLine(Cmd):
"""Custom console"""

def __init__(self, fn, words, queue):
Cmd.__init__(self)
self.fn = fn
self.words = words
self.queue = queue

def watch(self):
fp = open(self.fn, 'r')
while True:
time.sleep(0.05)
new = fp.readline()

# Once all lines are read this just returns ''
# until the file changes and a new line appears
self.queue.put_nowait(new)


def main(args):
parser = ArgumentParser(description="Parse arguments.")
parser.add_argument("-a", "--address", type=str, default='127.0.0.1', required=False)
parser.add_argument("-p", "--port", type=str, default='8888', required=False)
parser.add_argument("-f", "--file", type=str, default='', required=False)
arguments = parser.parse_args(args)

if not arguments.address:
print('HOST PRE-START (inner): host argument not found. Creating new default entry.')
arguments.host = '127.0.0.1'
if not arguments.port:
print('HOST PRE-START (inner): port argument not found. Creating new default entry.')
arguments.port = '8888'
if not arguments.file:
print('HOST PRE-START (inner): file argument not found. Creating new default entry.')
arguments.file = 'C:\\Users\\Public\\Documents\\test\\tempdata'

file_queue = Queue()

print('HOST START (inner): ' + ' ' + arguments.address + ':' + arguments.port + ' --file ' + arguments.file)

# Start server
thread = threading.Thread(target=start_server, args=(arguments.address, arguments.port, ))
thread.start()
time.sleep(1)

# Start client
thread = threading.Thread(target=start_client, args=(arguments.address, arguments.port, file_queue, ))
thread.start()

# Start file reader
f = open(arguments.file, 'a')
f.close()
time.sleep(1)

words = ['word']
console = CommandLine(arguments.file, words, file_queue)
console.prompt = ''

thread = threading.Thread(target=console.cmdloop, args=('', ))
thread.start()
print("\n")

for hit_word, hit_sentence in console.watch():
print("Found %r in line: %r" % (hit_word, hit_sentence))

print('HOST FINISH (inner): ' + ' ' + arguments.address + ':' + arguments.port)


def start_client(host, port, queue):
host = host
port = int(port) # arbitrary non-privileged port
queue = queue

soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

try:
soc.connect((host, port))
except:
print("Client connection error" + str(sys.exc_info()))
sys.exit()

print("Enter 'quit' to exit")
message = ""

while message != 'quit':
time.sleep(0.05)
if(message != ""):
soc.sendall(message.encode("utf8"))
if soc.recv(BUFFER_SIZE).decode("utf8") == "-":
pass # null operation

string = ""
if (not queue.empty()):
string = str(queue.get_nowait()) + "\n"

if(string == None or string == ""):
message = ""
else:
message = string

soc.send(b'--quit--')


def start_server(host, port):
host = host
port = int(port) # arbitrary non-privileged port

soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# SO_REUSEADDR flag tells the kernel to reuse a local socket in TIME_WAIT state, without waiting for its natural timeout to expire
soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
print("Socket created")

try:
soc.bind((host, port))
except:
print("Bind failed. Error : " + str(sys.exc_info()))
sys.exit()

soc.listen(5) # queue up to 5 requests
print("Socket now listening")

# infinite loop- do not reset for every requests
while True:
connection, address = soc.accept()
ip, port = str(address[0]), str(address[1])
print("Connected with " + ip + ":" + port)

try:
threading.Thread(target=client_thread, args=(connection, ip, port)).start()
except:
print("Thread did not start.")
traceback.print_exc()

soc.close()


def client_thread(connection, ip, port, max_buffer_size=BUFFER_SIZE):
is_active = True

while is_active:
client_input = receive_input(connection, max_buffer_size)

if "--QUIT--" in client_input:
print("Client is requesting to quit")
connection.close()
print("Connection " + ip + ":" + port + " closed")
is_active = False
elif not client_input == "":
print("{}".format(client_input))
connection.sendall("-".encode("utf8"))
else:
connection.sendall("-".encode("utf8"))


def receive_input(connection, max_buffer_size):
client_input = connection.recv(max_buffer_size)
client_input_size = sys.getsizeof(client_input)

if client_input_size > max_buffer_size:
print("The input size is greater than expected {}".format(client_input_size))

decoded_input = client_input.decode("utf8").rstrip() # decode and strip end of line
result = process_input(decoded_input)

return result


def process_input(input_str):
return str(input_str).upper()


if __name__ == '__main__':
print('HOST START (inner - as main).')
main(sys.argv[1:])

编辑2:进一步的想法

直接控制子进程的控制台输入管道/缓冲区将是解决此问题的首选方法。因为这是500声望的赏金。

不幸的是我没时间了。因此,我现在可能会使用其中一种解决方法,并在以后用适当的解决方案替换它们。或者,也许我必须使用无核选项,只有一个控制台,在该控制台中,正在进行的日志输出会在任何用户键盘输入期间暂停,然后再打印。当然,当用户决定只键入一半内容时,这可能会导致缓冲区问题。

编辑3:包含接受的答案的代码(一个文件)

有了James Kent的回答,当我通过Windows命令行(cmd)或PowerShell使用代码启动脚本时,我得到了预期的行为。但是,当我使用“Python run”通过Eclipse / PyDev启动相同的脚本时,输出总是打印在主Eclipse / PyDev控制台上,而子流程的第二个控制台仍然为空且保持不 Activity 状态。不过,我想这是另一个系统/环境专业,并且是另一个问题。
from sys import argv, stdin, stdout
from threading import Thread
from cmd import Cmd
from time import sleep
from datetime import datetime
from subprocess import Popen, PIPE, CREATE_NEW_CONSOLE

INTRO = 'command line'
PROMPT = '> '


class CommandLine(Cmd):
"""Custom console"""

def __init__(self, subprocess, intro=INTRO, prompt=PROMPT):
Cmd.__init__(self)
self.subprocess = subprocess
self.intro = intro
self.prompt = prompt
self.doc_header = intro
self.running = False

def do_date(self, args):
"""Prints the current date and time."""
print(datetime.now())
sleep(1)

def do_exit(self, args):
"""Exits this command line application."""
print("Exit by user command.")
if self.subprocess is not None:
try:
self.subprocess.terminate()
except:
self.subprocess.kill()
exit()


class Console():

def __init__(self):
if '-r' not in argv:
self.p = Popen(
['python.exe', __file__, '-r'],
stdin=PIPE,
creationflags=CREATE_NEW_CONSOLE
)
else:
while True:
data = stdin.read(1)
if not data:
# break
sleep(1)
continue
stdout.write(data)

def write(self, data):
self.p.stdin.write(data.encode('utf8'))
self.p.stdin.flush()

def getSubprocess(self):
if self.p:
return self.p
else:
return None


class Feeder (Thread):

def __init__(self, console):
self.console = console
Thread.__init__(self)

def run(self):
feeding(self.console)


def feeding(console):
for i in range(0, 100):
console.write('test %i\n' % i)
sleep(1)


if __name__ == '__main__':
p = Console()
if '-r' not in argv:
thread = Feeder(p)
thread.setDaemon(True)
thread.start()

cl = CommandLine(subprocess=p.getSubprocess())
cl.use_rawinput = False
cl.prompt = PROMPT
cl.cmdloop('\nCommand line is waiting for user input (e.g. help).')

编辑3:荣誉奖

在上面的问题文本中,我提到了使用ctypes库直接访问Windows控制台API作为另一轮工作(在“Edit-1:更多思想”下)。或仅以某种方式使用一个控制台,输入提示始终作为解决整个问题的核心选择始终位于底部。 (在“编辑2:进一步的想法”下)

对于使用ctypes库,我将自己对 Change console font in Windows的以下回答进行了定位。如果只使用一个控制台,我会尝试以下对 Keep console input line below output的回答。我认为这两个答案都可以提供有关此问题的潜在优点,并且也许对其他人如何帮助这篇文章有所帮助。另外,如果我有时间,我会尝试他们是否以某种方式工作。

最佳答案

您要解决的问题是Windows上的控制台子系统的体系结构,您通常看到的控制台窗口不是由cmd.exe托管,而是由conhost.exe托管,conhost窗口的子进程只能连接到一个conhost实例,这意味着每个进程只能使用一个窗口。

然后,这会导致您希望拥有的每个控制台窗口都有一个额外的过程,然后为了查看在该窗口中显示的内容,您需要查看stdin和stdout的正常处理方式,因为它们是从中读写的通过conhost实例,除非您将stdin转换为管道(以便您可以写入进程),否则它不再来自conhost,而是来自父进程,因此conhost没有可见性。这意味着写入标准输入的任何内容只能由子进程读取,因此不会由conhost显示。

据我所知,没有办法共享这样的管道。

副作用是,如果将stdin设置为管道,则发送到新控制台窗口的所有键盘输入都将消失,因为stdin未连接到该窗口。

对于仅输出功能,这意味着您可以生成一个新进程,该进程通过到stdin的管道与父进程进行通信,并将所有内容回显到stdout。

继承人的尝试:

#!python3

import sys, subprocess, time

class Console():
def __init__(self):
if '-r' not in sys.argv:
self.p = subprocess.Popen(
['python.exe', __file__, '-r'],
stdin=subprocess.PIPE,
creationflags=subprocess.CREATE_NEW_CONSOLE
)
else:
while True:
data = sys.stdin.read(1)
if not data:
break
sys.stdout.write(data)

def write(self, data):
self.p.stdin.write(data.encode('utf8'))
self.p.stdin.flush()

if (__name__ == '__main__'):
p = Console()
if '-r' not in sys.argv:
for i in range(0, 100):
p.write('test %i\n' % i)
time.sleep(1)

因此,两个进程之间有一个很好的简单管道,并将输入作为子进程回显到输出,我使用了-r来表示实例是否是一个进程,但是还有其他方式取决于实现方式。

需要注意的几件事:
  • 写入标准输入后需要刷新,因为python通常使用缓冲。
  • 编写此方法的方式旨在放在自己的模块中,因此使用__file__
  • 由于使用了 __file__,所以
  • 如果使用cx_Freeze或类似方法冻结,则可能需要修改此方法。

  • 编辑1

    对于可以使用cx_Freeze冻结的版本:

    Console.py
    import sys, subprocess

    class Console():
    def __init__(self, ischild=True):
    if not ischild:
    if hasattr(sys, 'frozen'):
    args = ['Console.exe']
    else:
    args = [sys.executable, __file__]
    self.p = subprocess.Popen(
    args,
    stdin=subprocess.PIPE,
    creationflags=subprocess.CREATE_NEW_CONSOLE
    )
    else:
    while True:
    data = sys.stdin.read(1)
    if not data:
    break
    sys.stdout.write(data)

    def write(self, data):
    self.p.stdin.write(data.encode('utf8'))
    self.p.stdin.flush()

    if (__name__ == '__main__'):
    p = Console()

    test.py
    from Console import Console
    import sys, time

    if (__name__ == '__main__'):
    p = Console(False)
    for i in range(0, 100):
    p.write('test %i\n' % i)
    time.sleep(1)

    setup.py
    from cx_Freeze import setup, Executable

    setup(
    name = 'Console-test',
    executables = [
    Executable(
    'Console.py',
    base=None,
    ),
    Executable(
    'test.py',
    base=None,
    )
    ]
    )

    编辑2

    应该在开发工具(如IDLE)下工作的新版本

    Console.py
    #!python3

    import ctypes, sys, subprocess

    Kernel32 = ctypes.windll.Kernel32

    class Console():
    def __init__(self, ischild=True):
    if ischild:
    # try allocate new console
    result = Kernel32.AllocConsole()
    if result > 0:
    # if we succeed open handle to the console output
    sys.stdout = open('CONOUT$', mode='w')
    else:
    # if frozen we assume its names Console.exe
    # note that when frozen 'Win32GUI' must be used as a base
    if hasattr(sys, 'frozen'):
    args = ['Console.exe']
    else:
    # otherwise we use the console free version of python
    args = ['pythonw.exe', __file__]
    self.p = subprocess.Popen(
    args,
    stdin=subprocess.PIPE
    )
    return
    while True:
    data = sys.stdin.read(1)
    if not data:
    break
    sys.stdout.write(data)

    def write(self, data):
    self.p.stdin.write(data.encode('utf8'))
    self.p.stdin.flush()

    if (__name__ == '__main__'):
    p = Console()

    test.py
    from Console import Console
    import sys, time

    if (__name__ == '__main__'):
    p = Console(False)
    for i in range(0, 100):
    p.write('test %i\n' % i)
    time.sleep(1)

    setup.py
    from cx_Freeze import setup, Executable

    setup(
    name = 'Console-test',
    executables = [
    Executable(
    'Console.py',
    base='Win32GUI',
    ),
    Executable(
    'test.py',
    base=None,
    )
    ]
    )

    这可以变得更加健壮,即在创建新控制台之前始终检查现有控制台并分离它(如果找到),并且可能会更好地进行错误处理。

    关于python - 具有实时输入和多个控制台的Python子进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53548865/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com