gpt4 book ai didi

python - 在没有其他低级库的情况下使用 Python 监视文件系统事件

转载 作者:塔克拉玛干 更新时间:2023-11-03 01:09:54 26 4
gpt4 key购买 nike

例如,我需要在 linux 操作系统的某个目录上捕获删除和添加文件事件。我为它们找到了像 inotify 和 python 包装器这样的库,但是如果我想使用清晰的 python 代码,我应该每秒观察一次 os.listdir(path) 输出,还是有一些方法可以完成这样的任务?

最佳答案

来源:http://code.activestate.com/recipes/215418-watching-a-directory-tree-on-unix/

watch_directories() 函数获取一个路径列表和一个可调用对象,然后重复遍历以这些路径为根的目录树,监视被删除或修改时间被更改的文件。然后向可调用对象传递两个列表,其中包含已更改的文件和已删除的文件。

from __future__ import nested_scopes

import os, time

def watch_directories (paths, func, delay=1.0):
"""(paths:[str], func:callable, delay:float)
Continuously monitors the paths and their subdirectories
for changes. If any files or directories are modified,
the callable 'func' is called with a list of the modified paths of both
files and directories. 'func' can return a Boolean value
for rescanning; if it returns True, the directory tree will be
rescanned without calling func() for any found changes.
(This is so func() can write changes into the tree and prevent itself
from being immediately called again.)
"""

# Basic principle: all_files is a dictionary mapping paths to
# modification times. We repeatedly crawl through the directory
# tree rooted at 'path', doing a stat() on each file and comparing
# the modification time.

all_files = {}
def f (unused, dirname, files):
# Traversal function for directories
for filename in files:
path = os.path.join(dirname, filename)

try:
t = os.stat(path)
except os.error:
# If a file has been deleted between os.path.walk()
# scanning the directory and now, we'll get an
# os.error here. Just ignore it -- we'll report
# the deletion on the next pass through the main loop.
continue

mtime = remaining_files.get(path)
if mtime is not None:
# Record this file as having been seen
del remaining_files[path]
# File's mtime has been changed since we last looked at it.
if t.st_mtime > mtime:
changed_list.append(path)
else:
# No recorded modification time, so it must be
# a brand new file.
changed_list.append(path)

# Record current mtime of file.
all_files[path] = t.st_mtime

# Main loop
rescan = False
while True:
changed_list = []
remaining_files = all_files.copy()
all_files = {}
for path in paths:
os.path.walk(path, f, None)
removed_list = remaining_files.keys()
if rescan:
rescan = False
elif changed_list or removed_list:
rescan = func(changed_list, removed_list)

time.sleep(delay)

if __name__ == '__main__':
def f (changed_files, removed_files):
print changed_files
print 'Removed', removed_files

watch_directories(['.'], f, 1)

如果您希望以某种方式将作业发送到守护进程,但又不想使用某些 IPC 机制(例如套接字或管道),则此秘诀很有用。相反,守护进程可以坐下来观察提交目录,并且可以通过将文件或目录放入提交目录来提交作业。

不考虑锁定。 watch_directories() 函数本身并不真正需要进行锁定;如果它错过了一次修改,它会在下一次通过时注意到它。但是,如果将作业直接写入监视目录,则可调用对象可能会在作业文件仅写入一半时开始运行。为了解决这个问题,你可以使用一个锁文件;可调用对象在运行时必须获取锁,提交者在希望添加新作业时必须获取锁。一种更简单的方法是依赖 rename() 系统调用是原子的:将作业写入一个不被监视的临时目录,一旦文件完成,使用 os.rename() 将其移动到提交目录。

关于python - 在没有其他低级库的情况下使用 Python 监视文件系统事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7849117/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com