gpt4 book ai didi

python - 看门狗兼容性 : A workaround for "CancelIoEx"

转载 作者:太空宇宙 更新时间:2023-11-03 18:12:26 24 4
gpt4 key购买 nike

使用Python watchdog文件系统事件监视库我注意到,在 Windows Server 2003 下使用时,它会进入“轮询模式”,从而停止使用异步操作系统通知,因此,在大量文件更改下会严重降低系统性能。

我将问题追溯到watchdog/observers/winapi.py文件在哪里 CancelIoEx使用系统调用来停止 ReadDirectoryChangesW当用户想要停止监视被监视的目录或文件时调用lock:

(winapi.py)

CancelIoEx = ctypes.windll.kernel32.CancelIoEx
CancelIoEx.restype = ctypes.wintypes.BOOL
CancelIoEx.errcheck = _errcheck_bool
CancelIoEx.argtypes = (
ctypes.wintypes.HANDLE, # hObject
ctypes.POINTER(OVERLAPPED) # lpOverlapped
)

...
...
...

def close_directory_handle(handle):
try:
CancelIoEx(handle, None) # force ReadDirectoryChangesW to return
except WindowsError:
return

CancelIoEx 的问题调用是它直到 Windows Server 2008 才可用: http://msdn.microsoft.com/en-us/library/windows/desktop/aa363792(v=vs.85).aspx

一种可能的替代方案是更改 close_directory_handle为了使其在受监视的目录中创建一个模拟文件,从而解锁等待 ReadDirectoryChangesW 的线程返回。

但是,我注意到CancelIo系统调用是 in fact available在 Windows Server 2003 中:

Cancels all pending input and output (I/O) operations that are issued by the calling thread for the specified file. The function does not cancel I/O operations that other threads issue for a file handle. To cancel I/O operations from another thread, use the CancelIoEx function.

但是打电话CancelIo不会影响等待线程。

您知道如何解决这个问题吗?可能是threading.enumerate()可以用来发出由每个线程处理的信号 CancelIo从这些处理程序调用?

最佳答案

自然的方法是实现一个完成例程并使用其重叠模式调用ReadDirectoryChangesW。以下示例展示了执行此操作的方法:

RDCW_CALLBACK_F = ctypes.WINFUNCTYPE(None, ctypes.wintypes.DWORD, ctypes.wintypes.DWORD, ctypes.POINTER(OVERLAPPED))

首先,创建一个 WINFUNCTYPE 工厂,它将用于从 python 方法生成(可从 Windows API 调用)类似 C 的函数。本例中没有返回值,对应3个参数

VOID CALLBACK FileIOCompletionRoutine(
_In_ DWORD dwErrorCode,
_In_ DWORD dwNumberOfBytesTransfered,
_Inout_ LPOVERLAPPED lpOverlapped
);

FileIOCompletionRoutine header 。

需要将回调引用以及重叠结构添加到 ReadDirectoryChangesW 参数列表中:

ReadDirectoryChangesW = ctypes.windll.kernel32.ReadDirectoryChangesW

ReadDirectoryChangesW.restype = ctypes.wintypes.BOOL
ReadDirectoryChangesW.errcheck = _errcheck_bool
ReadDirectoryChangesW.argtypes = (
ctypes.wintypes.HANDLE, # hDirectory
LPVOID, # lpBuffer
ctypes.wintypes.DWORD, # nBufferLength
ctypes.wintypes.BOOL, # bWatchSubtree
ctypes.wintypes.DWORD, # dwNotifyFilter
ctypes.POINTER(ctypes.wintypes.DWORD), # lpBytesReturned
ctypes.POINTER(OVERLAPPED), # lpOverlapped
RDCW_CALLBACK_F # FileIOCompletionRoutine # lpCompletionRoutine
)

从这里开始,我们准备好执行重叠的系统调用。这是一个简单的 bacl 调用,可用于测试一切是否正常:

def dir_change_callback(dwErrorCode,dwNumberOfBytesTransfered,p):
print("dir_change_callback! PID:" + str(os.getpid()))
print("CALLBACK THREAD: " + str(threading.currentThread()))

准备并执行通话:

event_buffer = ctypes.create_string_buffer(BUFFER_SIZE)
nbytes = ctypes.wintypes.DWORD()
overlapped_read_dir = OVERLAPPED()
call2pass = RDCW_CALLBACK_F(dir_change_callback)

hand = get_directory_handle(os.path.abspath("/test/"))

def docall():
ReadDirectoryChangesW(hand, ctypes.byref(event_buffer),
len(event_buffer), False,
WATCHDOG_FILE_NOTIFY_FLAGS,
ctypes.byref(nbytes),
ctypes.byref(overlapped_read_dir), call2pass)

print("Waiting!")
docall()

如果您将所有这些代码加载并执行到 DreamPie 中交互式 shell 您可以检查系统调用是否完成以及回调是否执行,从而在 c:\test 目录下完成第一次更改后打印线程和 pid 号。此外,您会注意到它们与主线程和进程相同:尽管事件是由单独的线程引发的,但回调在与我们的主程序相同的进程和线程中运行,从而提供了不期望的行为:

lck = threading.Lock()

def dir_change_callback(dwErrorCode,dwNumberOfBytesTransfered,p):
print("dir_change_callback! PID:" + str(os.getpid()))
print("CALLBACK THREAD: " + str(threading.currentThread()))

...
...
...

lck.acquire()
print("Waiting!")
docall()
lck.acquire()

该程序将锁定主线程并且回调将永远不会执行。我尝试了很多同步工具,甚至 Windows API 信号量也总是得到相同的行为,所以最后,我决定在使用 管理和同步的单独进程中使用 ReadDirectoryChangesW 的同步配置来实现异步调用多处理 python 库:

调用 get_directory_handle 不会返回 Windows API 给出的句柄编号,而是返回由 winapi 库管理的句柄编号,为此我实现了一个句柄生成器:

class FakeHandleFactory():
_hl = threading.Lock()
_next = 0
@staticmethod
def next():
FakeHandleFactory._hl.acquire()
ret = FakeHandleFactory._next
FakeHandleFactory._next += 1
FakeHandleFactory._hl.release()
return ret

每个生成的句柄必须与文件系统路径全局关联:

handle2file = {}

每次调用read_directory_changes现在都会生成ReadDirectoryRequest(派生自multiprocessing.Process)对象:

class ReadDirectoryRequest(multiprocessing.Process):

def _perform_and_wait4request(self, path, recursive, event_buffer, nbytes):
hdl = CreateFileW(path, FILE_LIST_DIRECTORY, WATCHDOG_FILE_SHARE_FLAGS,
None, OPEN_EXISTING, WATCHDOG_FILE_FLAGS, None)
#print("path: " + path)
aux_buffer = ctypes.create_string_buffer(BUFFER_SIZE)
aux_n = ctypes.wintypes.DWORD()
#print("_perform_and_wait4request! PID:" + str(os.getpid()))
#print("CALLBACK THREAD: " + str(threading.currentThread()) + "\n----------")
try:
ReadDirectoryChangesW(hdl, ctypes.byref(aux_buffer),
len(event_buffer), recursive,
WATCHDOG_FILE_NOTIFY_FLAGS,
ctypes.byref(aux_n), None, None)
except WindowsError as e:
print("!" + str(e))
if e.winerror == ERROR_OPERATION_ABORTED:
nbytes = 0
event_buffer = []
else:
nbytes = 0
event_buffer = []
# Python 2/3 compat
nbytes.value = aux_n.value
for i in xrange(self.int_class(aux_n.value)):
event_buffer[i] = aux_buffer[i]
CloseHandle(hdl)
try:
self.lck.release()
except:
pass



def __init__(self, handle, recursive):
buffer = ctypes.create_string_buffer(BUFFER_SIZE)
self.event_buffer = multiprocessing.Array(ctypes.c_char, buffer)
self.nbytes = multiprocessing.Value(ctypes.wintypes.DWORD, 0)
targetPath = handle2file.get(handle, None)
super(ReadDirectoryRequest, self).__init__(target=self._perform_and_wait4request, args=(targetPath, recursive, self.event_buffer, self.nbytes))
self.daemon = True
self.lck = multiprocessing.Lock()
self.result = None
try:
self.int_class = long
except NameError:
self.int_class = int
if targetPath is None:
self.result = ([], -1)

def CancelIo(self):
try:
self.result = ([], 0)
self.lck.release()
except:
pass

def read_changes(self):
#print("read_changes! PID:" + str(os.getpid()))
#print("CALLBACK THREAD: " + str(threading.currentThread()) + "\n----------")
if self.result is not None:
raise Exception("ReadDirectoryRequest object can be used only once!")
self.lck.acquire()
self.start()
self.lck.acquire()
self.result = (self.event_buffer, self.int_class(self.nbytes.value))
return self.result

此类指定Process,提供执行系统调用并等待直到(或)的进程:

  • 已引发更改事件。
  • 主线程通过调用 ReadDirectoryRequest 对象 CancelIo 方法取消请求。

请注意:

  • 获取目录句柄
  • close_directory_handle
  • 读取目录更改

角色现在用于管理请求。为此,需要线程锁和辅助数据结构:

rqIndexLck = threading.Lock() # Protects the access to `rqIndex`
rqIndex = {} # Maps handles to request objects sets.

获取目录句柄

def get_directory_handle(path):
rqIndexLck.acquire()
ret = FakeHandleFactory.next()
handle2file[ret] = path
rqIndexLck.release()
return ret

close_directory_handle

def close_directory_handle(handle):
rqIndexLck.acquire()
rqset4handle = rqIndex.get(handle, None)
if rqset4handle is not None:
for rq in rqset4handle:
rq.CancelIo()
del rqIndex[handle]
if handle in handle2file:
del handle2file[handle]
rqIndexLck.release()

最后但并非最不重要的一点:read_directory_changes

def read_directory_changes(handle, recursive):
rqIndexLck.acquire()
rq = ReadDirectoryRequest(handle, recursive)
set4handle = None
if handle in rqIndex:
set4handle = rqIndex[handle]
else:
set4handle = set()
rqIndex[handle] = set4handle
set4handle.add(rq)
rqIndexLck.release()
ret = rq.read_changes()
rqIndexLck.acquire()
if rq in set4handle:
set4handle.remove(rq)
rqIndexLck.release()
return ret

关于python - 看门狗兼容性 : A workaround for "CancelIoEx",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25669139/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com