- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
Python 3.6.2 。
Win 10 内存 8G,CPU I5 1.6 GHz 。
这个作品来源于一个日志解析工具的开发,这个开发过程中遇到的一个痛点,就是日志文件多,日志数据量大,解析耗时长。在这种情况下,寻思一种高效解析数据解析方案.
1、采用多线程读取文件 。
2、采用按块读取文件替代按行读取文件 。
由于日志文件都是文本文件,需要读取其中每一行进行解析,所以一开始会很自然想到采用按行读取,后面发现合理配置下,按块读取,会比按行读取更高效.
按块读取来的问题就是,可能导致完整的数据行分散在不同数据块中,那怎么解决这个问题呢?解答如下:
将数据块按换行符 \n 切分得到日志行列表,列表第一个元素可能是一个完整的日志行,也可能是上一个数据块末尾日志行的组成部分,列表最后一个元素可能是不完整的日志行(即下一个数据块开头日志行的组成部分),也可能是空字符串(日志块中的日志行数据全部是完整的),根据这个规律,得出以下公式,通过该公式,可以得到一个新的数据块,对该数据块二次切分,可以得到数据完整的日志行 。
上一个日志块首部日志行 +\n + 尾部日志行 + 下一个数据块首部日志行 + \n + 尾部日志行 + ...
3、将数据解析操作拆分为可并行解析部分和不可并行解析部分 。
数据解析往往涉及一些不可并行的操作,比如数据求和,最值统计等,如果不进行拆分,并行解析时势必需要添加互斥锁,避免数据覆盖,这样就会大大降低执行的效率,特别是不可并行操作占比较大的情况下.
对数据解析操作进行拆分后,可并行解析操作部分不用加锁。考虑到Python GIL的问题,不可并行解析部分替换为单进程解析.
4、采用多进程解析替代多线程解析 。
采用多进程解析替代多线程解析,可以避开Python GIL全局解释锁带来的执行效率问题,从而提高解析效率.
5、采用队列实现“协同”效果 。
引入队列机制,实现一边读取日志,一边进行数据解析:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import re
import time
from datetime import datetime
from joblib import Parallel, delayed, parallel_backend
from collections import deque
from multiprocessing import cpu_count
import threading
class LogParser(object):
def __init__(self, chunk_size=1024*1024*10, process_num_for_log_parsing=cpu_count()):
self.log_unparsed_queue = deque() # 用于存储未解析日志
self.log_line_parsed_queue = deque() # 用于存储已解析日志行
self.is_all_files_read = False # 标识是否已读取所有日志文件
self.process_num_for_log_parsing = process_num_for_log_parsing # 并发解析日志文件进程数
self.chunk_size = chunk_size # 每次读取日志的日志块大小
self.files_read_list = [] # 存放已读取日志文件
self.log_parsing_finished = False # 标识是否完成日志解析
def read_in_chunks(self, filePath, chunk_size=1024*1024):
"""
惰性函数(生成器),用于逐块读取文件。
默认区块大小:1M
"""
with open(filePath, 'r', encoding='utf-8') as f:
while True:
chunk_data = f.read(chunk_size)
if not chunk_data:
break
yield chunk_data
def read_log_file(self, logfile_path):
'''
读取日志文件
这里假设日志文件都是文本文件,按块读取后,可按换行符进行二次切分,以便获取行日志
'''
temp_list = [] # 二次切分后,头,尾行日志可能是不完整的,所以需要将日志块头尾行日志相连接,进行拼接
for chunk in self.read_in_chunks(logfile_path, self.chunk_size):
log_chunk = chunk.split('\n')
temp_list.extend([log_chunk[0], '\n'])
temp_list.append(log_chunk[-1])
self.log_unparsed_queue.append(log_chunk[1:-1])
self.log_unparsed_queue.append(''.join(temp_list).split('\n'))
self.files_read_list.remove(logfile_path)
def start_processes_for_log_parsing(self):
'''启动日志解析进程'''
with parallel_backend("multiprocessing", n_jobs=self.process_num_for_log_parsing):
Parallel(require='sharedmem')(delayed(self.parse_logs)() for i in range(self.process_num_for_log_parsing))
self.log_parsing_finished = True
def parse_logs(self):
'''解析日志'''
method_url_re_pattern = re.compile('(HEAD|POST|GET)\s+([^\s]+?)\s+',re.DOTALL)
url_time_taken_extractor = re.compile('HTTP/1\.1.+\|(.+)\|\d+\|', re.DOTALL)
while self.log_unparsed_queue or self.files_read_list:
if not self.log_unparsed_queue:
continue
log_line_list = self.log_unparsed_queue.popleft()
for log_line in log_line_list:
#### do something with log_line
if not log_line.strip():
continue
res = method_url_re_pattern.findall(log_line)
if not res:
print('日志未匹配到请求URL,已忽略:\n%s' % log_line)
continue
method = res[0][0]
url = res[0][1].split('?')[0] # 去掉了 ?及后面的url参数
# 提取耗时
res = url_time_taken_extractor.findall(log_line)
if res:
time_taken = float(res[0])
else:
print('未从日志提取到请求耗时,已忽略日志:\n%s' % log_line)
continue
# 存储解析后的日志信息
self.log_line_parsed_queue.append({'method': method,
'url': url,
'time_taken': time_taken,
})
def collect_statistics(self):
'''收集统计数据'''
def _collect_statistics():
while self.log_line_parsed_queue or not self.log_parsing_finished:
if not self.log_line_parsed_queue:
continue
log_info = self.log_line_parsed_queue.popleft()
# do something with log_info
with parallel_backend("multiprocessing", n_jobs=1):
Parallel()(delayed(_collect_statistics)() for i in range(1))
def run(self, file_path_list):
# 多线程读取日志文件
for file_path in file_path_list:
thread = threading.Thread(target=self.read_log_file,
name="read_log_file",
args=(file_path,))
thread.start()
self.files_read_list.append(file_path)
# 启动日志解析进程
thread = threading.Thread(target=self.start_processes_for_log_parsing, name="start_processes_for_log_parsing")
thread.start()
# 启动日志统计数据收集进程
thread = threading.Thread(target=self.collect_statistics, name="collect_statistics")
thread.start()
start = datetime.now()
while threading.active_count() > 1:
print('程序正在努力解析日志...')
time.sleep(0.5)
end = datetime.now()
print('解析完成', 'start', start, 'end', end, '耗时', end - start)
if __name__ == "__main__":
log_parser = LogParser()
log_parser.run(['access.log', 'access2.log'])
注意:
需要合理的配置单次读取文件数据块的大小,不能过大,或者过小,否则都可能会导致数据读取速度变慢。笔者实践环境下,发现10M~15M每次是一个比较高效的配置.
最后此篇关于Python大数据量文本文件高效解析方案代码实现的文章就讲到这里了,如果你想了解更多关于Python大数据量文本文件高效解析方案代码实现的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我应该编写一个函数来打印一组给定的三个数字中两个较大数字的平方和。 我对这种情况的处理相当笨拙。我没有编写返回一组 3 中最大的两个数字的函数,而是编写了函数,以便表达式减少到两个所需的数字。 # S
如果有人可以提供帮助,我将不胜感激。我一直在敲我的头一天试图让这个工作。我已经在互联网上搜索并重新阅读了手册,但我就是不明白。 guile << __EOF__ ( define heading-li
目前我正在处理一个方案问题,其中我们正在使用方案列表表示一个图。我们使用的第一个变体是表示为 的边列表图 '((x y) (y z) (x z)) 我们正在使用的图的第二个变体被称为 x 图,表示为
我正在尝试创建一个函数,该函数将两个函数作为参数并执行它们。 我尝试使用 cond ,但它只执行 action1 . (define seq-action (lambda (action1 act
我提前为我的原始英语道歉;我会尽量避免语法错误等。 两周前,我决定更新我对 Scheme(及其启示)的知识,同时实现我在手上获得的一些数学 Material ,特别是我注册的自动机理论和计算类(cla
Scheme中有没有函数支持分数的“div”操作? 意思是 - 11 格 2.75 = 4。 最佳答案 我认为你的问题的答案是:没有,但你可以定义它: #lang racket (define (di
我在scheme中实现合并排序,我必须通过定义两个辅助方法来实现:merge和split。 Merge 需要两个列表(已经按递增顺序)并将它们合并在一起。我这样做了如下: (define merge
尝试从终端加载方案文件。我创建了一个名为 test.scm 的文件,其中包含以下代码: (define (square x) (* x x)) (define (sum-of-squares x y)
我有以下代码: (define (howMany list) (if (null? list) 0 (+ 1 (howMany (cdr list))))) 如果我们执行以
我有点了解如何将基本函数(例如算术)转换为Scheme中的连续传递样式。 但如果函数涉及递归怎么办?例如, (define funname (lambda (arg0 arg1)
我正在尝试附加两个字符串列表 但我不知道如何在两个单词之间添加空格。 (define (string-concat lst1 lst2) (map string-append lst1
这个问题已经有答案了: How do I pass a list as a list of arguments in racket? (2 个回答) 已关闭 8 年前。 我有一个函数,它需要无限数量的
我对这段代码的工作方式感到困惑: (define m (list 1 2 3 '(5 8))) (let ((l (cdr m))) (set! l '(28 88))) ==>(1 2 3 (5 8
我正在为学校做一项计划作业,有一个问题涉及我们定义记录“类型”(作为列表实现)(代表音乐记录)。 我遇到的问题是我被要求创建一个过程来创建这些记录的列表,然后创建一个将记录添加到该列表的函数。这很简单
我有以下代码: (define (howMany list) (if (null? list) 0 (+ 1 (howMany (cdr list))))) 如果我们执行以
我正在尝试附加两个字符串列表 但我不知道如何在两个单词之间添加空格。 (define (string-concat lst1 lst2) (map string-append lst1
如何使用抽象列表函数(foldr、foldl、map 和 filter 编写函数),无需递归,消耗数字列表 (list a1 a2 a3 ...) 并产生交替和 a1 - a2 + a3 ...? 最
我试图找出在 Scheme 中发生的一些有趣的事情: (define last-pair (lambda (x) (if (null? (cdr x))
这个问题在这里已经有了答案: Count occurrence of element in a list in Scheme? (4 个答案) 关闭 8 年前。 我想实现一个函数来计算列表中元素出现
我正在尝试使用下面的代码获取方案中的导数。谁能告诉我哪里出错了?我已经尝试了一段时间了。 (define d3 (λ (e) (cond ((number? e) 0) ((e
我是一名优秀的程序员,十分优秀!