gpt4 book ai didi

python - 如何使用线程进行客户端服务器应用程序的功能测试?

转载 作者:行者123 更新时间:2023-11-28 21:27:23 26 4
gpt4 key购买 nike

我有客户端和服务器模块,每一个都可以由一个函数启动。我只需要找到一种并行运行 booth 的方法:

  1. 如果客户端/服务器发生异常,将停止另一个,这样测试运行器就不会卡住

  2. 如果客户端/服务器出现异常,将打印异常或将其传播给运行器,这样我就可以看到它并使用测试套件调试客户端/服务器

  3. 出于性能原因,最好使用线程

当在线程的运行方法中捕获异常时(这会杀死测试运行器...),第一个简单线程的尝试以丑陋的 os._exit(1) 结束线程包

第二个尝试(试图避免 os._exit())是使用 concurrent.futures.ThreadPoolExecutor。它允许从线程中获取异常,但我仍然找不到中止另一个线程的方法。

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
server_future = executor.submit(server)
client_future = executor.submit(client)

concurrent.futures.wait([server_future, client_future],
return_when=concurrent.futures.FIRST_EXCEPTION)

if client_future.done() && client_future.exception():
# we can handle the client exception here
# but how to stop the server from waiting the client?
# also, raise is blocking

if server_future.done() && server_future.exception():
# same here
  • 有没有办法用线程实现这一点?
  • 如果不使用线程,是否有一种简单的方法来测试客户端服务器应用程序? (我认为前两个要求足以有一个可用的解决方案)

编辑:客户端或服务器会在 accept() 或 receive() 调用时被阻塞,因此我无法定期收集标志以决定退出。(停止线程的经典方法之一)

最佳答案

您可以使用 threading包裹。请注意,强制终止线程不是一个好主意,as discussed here .似乎没有正式的方法可以在 Python 中杀死 Thread,但您可以按照链接帖子中给出的示例之一进行操作。

现在您需要等待一个线程退出,然后再停止另一个线程,避免您的测试运行程序卡住。您可以使用线程包装您的服务器/客户端启动,并让您的主线程等待客户端/服务器线程退出,然后再杀死另一个线程。

您可以像这样定义您的客户端/服务器线程:

# Server thread (replace 
class testServerThread (threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
# Do stuff if required

def run(self):
try:
startServer() # Or startClient() for your client thread
except: Exception
# Print your exception here, so you can debug

然后,启动客户端和服务器线程,并等待其中一个退出。一旦其中一个不再活着,您可以杀死另一个并继续进行测试。

# Create and start client/server
serverThread = testServerThread ()
clientThread = testClientThread ()

serverThread.start()
clientThread.start()

# Wait at most 5 seconds for them to exit, and loop if they're still both alive
while(serverThread.is_alive() and clientThread.is_alive()):
serverThread.join(5)
clientThread.join(5)

# Either client or server exited. Kill the other one.
# Note: the kill function you'll have to define yourself, as said above
if(serverThread.is_alive()):
serverThread.kill()

if(clientThread.islive()):
clientThread.kill()

# Done! Your Test runner can continue its work

核心代码是join()功能:

Wait until the thread terminates. This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception –, or until the optional timeout occurs.

所以在我们的例子中,它会为客户端等待 5 秒,为服务器等待 5 秒,如果之后它们都还活着,它将再次循环。每当其中一个线程退出时,循环就会停止,剩下的线程就会被杀死。

关于python - 如何使用线程进行客户端服务器应用程序的功能测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36212132/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com