gpt4 book ai didi

python - 自动更新 CSV 文件

转载 作者:行者123 更新时间:2023-12-01 06:06:30 25 4
gpt4 key购买 nike

我最近开始学习Python(5小时前)。这是我的场景。

我每 4 小时就会收到来自远程测量站点的包含测量值的邮件。文件采用 *.csv 格式,文件名为 XX-2011-00001.csvYY-2011-00001.csv。这些是两台仪器以不同采样间隔连续运行的数据。文件存储在本地文件夹中。

我想开发一个脚本来读取文件(例如:XX-2011-00001.csv)并写入具有相同数据的新 csv 文件。 4 小时后,脚本应再次运行,现在仅读取新文件 XX-2011-00002.csv 并将此数据附加到创建的新 csv 文件中。我想让这个脚本无限循环运行,以便脚本检查新文件并将其添加到 CSV 文件中。

该文件包含“日期”、“时间”和“值”字段。

您能帮我告诉我编写此脚本时应该考虑的模块吗?如果您有任何例子,我将非常感激。

最佳答案

csv module将有助于读取/写入您的文件。您将需要使用带有 sleep 的无限循环 - 类似于:

while True:
process_new_file() # does nothing if no new file
time.sleep(60)

process_new_file 需要检查新文件,这可能很棘手——您不想在文件写入完成之前尝试使用它!像这样的东西应该有效:

def check_for_new_file(directory=INCOMING, files={}):
for file in os.listdir(directory):
if file in files:
break
size = os.stat(file)[stat.ST_SIZE]
files[file] = (datetime.time.now(), size)
now = datetime.time.now()
for file, last_time, last_size in files.items():
current_size = os.stat(file)[stat.ST_SIZE]
if current_size != last_size:
files[file] = (now, current_size)
continue
if now - last_time <= TIME_WITH_NO_WRITES:
return file
raise NoneReady()

现在我们有一个函数可以跟踪 INCOMING 目录中的所有文件,并在文件休眠足够长的时间以确保其完整时返回文件名,我们需要一个函数来实际处理该文件,然后将其移动到某个地方进行妥善保管。

def process_new_file():
try:
filename = check_for_new_file() # raises ValueError if no file ready
except NoneReady:
return
in_file = open(filename, 'rb')
csv_file_in = csv.reader(in_file)
out_file = open(MASTER_CSV, 'rb+')
csv_file_out = csv.writer(out_file)
for row in csv_file_in:
csv_file_out.write(row)
csv_file_out.close()
csv_file_in.close()
shutil.move(filename, PROCESSED)

将所有内容放在一起,包括导入和全局变量:

import os
import stat
import shutil

INCOMING = '/some/path/with/new/files/'
PROCESSED = '/some/path/for/processed/files/'
TIME_WITH_NO_WRITES = 600 # 10 minutes

def check_for_new_file(directory=INCOMING, files={}):
for file in os.listdir(directory):
if file in files:
break
size = os.stat(file)[stat.ST_SIZE]
files[file] = (datetime.time.now(), size)
now = datetime.time.now()
for file, last_time, last_size in files.items():
current_size = os.stat(file)[stat.ST_SIZE]
if current_size != last_size:
files[file] = (now, current_size)
continue
if now - last_time <= TIME_WITH_NO_WRITES:
return file
raise NoneReady()

def process_new_file():
try:
filename = check_for_new_file() # raises ValueError if no file ready
except NoneReady:
return
in_file = open(filename, 'rb')
csv_file_in = csv.reader(in_file)
out_file = open(MASTER_CSV, 'rb+')
csv_file_out = csv.writer(out_file)
for row in csv_file_in:
csv_file_out.write(row)
csv_file_out.close()
csv_file_in.close()
shutil.move(filename, PROCESSED)

if __name__ == '__main__':
while True:
process_new_file() # does nothing if no new file
time.sleep(60)

此代码目前未经测试,因此其中可能存在一两个错误,如果某处出现错误,它将停止运行。希望这会帮助您继续前进。

关于python - 自动更新 CSV 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/7772966/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com