gpt4 book ai didi

扭曲的大文件传输

转载 作者:行者123 更新时间:2023-12-04 22:06:18 25 4
gpt4 key购买 nike

我这样编写客户端-服务器应用程序:
客户端(c#)<-> 服务器(扭曲;ftp 代理和附加功能)<-> ftp 服务器

服务器有两个类:我自己的类协议(protocol)继承自 LineReceiever 协议(protocol)和 FTPClient 来自 twisted.protocols.ftp。

但是当客户端发送或获取大文件(10 Gb - 20 Gb)时,服务器会捕获 MemoryError。我的代码中没有使用任何缓冲区。当调用 transport.write(data) 数据附加到 react 器编写器的内部缓冲区时会发生这种情况(如果我错了,请纠正我)。

我应该用什么来避免这个问题?还是我应该改变解决问题的方法?

我发现对于大流,我应该使用 IConsumer 和 IProducer 接口(interface)。但最后它会调用 transfer.write 方法,效果是一样的。还是我错了?

更新:

这是文件下载/上传的逻辑(从 ftp 通过 Twisted 服务器到 Windows 上的客户端):

客户端向 Twisted 服务器发送一些 header ,然后开始发送文件。 Twisted 服务器接收 header ,然后(如果需要)调用 setRawMode() ,打开 ftp 连接并从/向客户端接收/发送字节,并在所有关闭连接之后。这是上传文件的部分代码:

FTPManager 类

def _ftpCWDSuccees(self, protocol, fileName):
self._ftpClientAsync.retrieveFile(fileName, FileReceiver(protocol))



class FileReceiver(Protocol):
def __init__(self, proto):
self.__proto = proto

def dataReceived(self, data):
self.__proto.transport.write(data)

def connectionLost(self, why = connectionDone):
self.__proto.connectionLost(why)

主要代理服务器类:
class SSDMProtocol(LineReceiver)
...

在 SSDMProtocol 对象(调用 obSSDMProtocol)解析 header 后,它调用打开 ftp 连接的方法( FTPClient 来自 twisted.protocols.ftp)并设置 FTPManager 字段的对象 _ftpClientAsync 并调用 _ftpCWDSuccees(self, protocol, fileName)protocol = obSSDMProtocol当收到文件的字节时调用 dataReceived(self, data) FileReceiver 对象。

self.__proto.transport.write(data)调用时,数据附加到内部缓冲区比发送回客户端更快,因此内存耗尽。当缓冲区达到一定大小时我可以停止读取并在缓冲区全部发送到客户端后恢复读取?或类似的东西?

最佳答案

如果您将 20 GB(千兆位?)字符​​串传递给 transport.write ,您将需要至少 20 GB(千兆位?)的内存 - 可能更像是 40 或 60,因为在 Python 中处理字符串时需要额外的复制。

即使您从未将单个字符串传递给 transport.write如果您反复调用 transport.write,则为 20 GB(千兆位?)如果短字符串的速度比您的网络可以处理的快,发送缓冲区最终会变得太大而无法放入内存,您将遇到 MemoryError .

这两个问题的解决方案是生产者/消费者系统。使用 IProducer 的优点和 IConsumer给你的是你永远不会有一个 20 GB(千兆位?)的字符串,你永远不会用太多较短的字符串填充发送缓冲区。网络将受到限制,因此字节的读取速度不会超过您的应用程序处理它们并忘记它们的速度。您的字符串最终将达到 16kB - 64kB 的大小,应该很容易放入内存中。

您只需要调整您对 FileReceiver 的使用包括将传入连接注册为传出连接的生产者:

class FileReceiver(Protocol):
def __init__(self, outgoing):
self._outgoing = outgoing

def connectionMade(self):
self._outgoing.transport.registerProducer(self.transport, streaming=True)

def dataReceived(self, data):
self._outgoing.transport.write(data)

现在每当 self._outgoing.transport的发送缓冲区已满,它会告诉 self.transport暂停。一旦发送缓冲区清空,它会告诉 self.transport恢复。 self.transport现在了解如何在 TCP 级别执行这些操作,从而使进入服务器的数据也减慢。

关于扭曲的大文件传输,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12891192/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com