- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
当我使用 Manager 对象跨进程共享列表时,此代码的非并行版本如何比并行版本运行得更快。我这样做是为了避免任何序列化,而且我不需要编辑列表。
我从 Oracle 返回一个 800,000 行的数据集,将其转换为列表并使用 Manager.list() 将其存储在共享内存中。
我并行地遍历查询结果中的每一列,以获得一些统计信息(我知道我可以在 SQL 中完成)。
主要代码:
import cx_Oracle
import csv
import os
import glob
import datetime
import multiprocessing as mp
import get_column_stats as gs;
import pandas as pd
import pandas.io.sql as psql
def get_data():
print("Starting Job: " + str(datetime.datetime.now()));
manager = mp.Manager()
# Step 1: Init multiprocessing.Pool()
pool = mp.Pool(mp.cpu_count())
print("CPU Count: " + str(mp.cpu_count()))
dsn_tns = cx_Oracle.makedsn('myserver.net', '1521', service_name='PARIELGX');
con = cx_Oracle.connect(user='fred', password='password123', dsn=dsn_tns);
stats_results = [["OWNER","TABLE","COLUMN_NAME","RECORD_COUNT","DISTINCT_VALUES","MIN_LENGTH","MAX_LENGTH","MIN_VAL","MAX_VAL"]];
sql = "SELECT * FROM ARIEL.DIM_REGISTRATION_SET"
cur = con.cursor();
print("Start Executing SQL: " + str(datetime.datetime.now()));
cur.execute(sql);
print("End SQL Execution: " + str(datetime.datetime.now()));
print("Start SQL Fetch: " + str(datetime.datetime.now()));
rs = cur.fetchall();
print("End SQL Fetch: " + str(datetime.datetime.now()));
print("Start Creation of Shared Memory List: " + str(datetime.datetime.now()));
lrs = manager.list(list(rs)) # shared memory list
print("End Creation of Shared Memory List: " + str(datetime.datetime.now()));
col_names = [];
for field in cur.description:
col_names.append(field[0]);
#print(col_names)
#print('-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-')
#print(rs)
#print('-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-')
#print(lrs)
col_index = 0;
print("Start In-Memory Iteration of Dataset: " + str(datetime.datetime.now()));
# we go through every field
for field in cur.description:
col_names.append(field[0]);
# start at column 0
col_index = 0;
# iterate through each column, to gather stats from each column using parallelisation
pool_results = pool.map_async(gs.get_column_stats_rs, [(lrs, col_name, col_names) for col_name in col_names]).get()
for i in pool_results:
stats_results.append(i)
# Step 3: Don't forget to close
pool.close()
print("End In-Memory Iteration of Dataset: " + str(datetime.datetime.now()));
# end filename for
cur.close();
outfile = open('C:\jupyter\Experiment\stats_dim_registration_set.csv','w');
writer=csv.writer(outfile,quoting=csv.QUOTE_ALL, lineterminator='\n');
writer.writerows(stats_results);
outfile.close()
print("Ending Job: " + str(datetime.datetime.now()));
get_data();
并行调用的代码:
def get_column_stats_rs(args):
# rs is a list recordset of the results
rs, col_name, col_names = args
col_index = col_names.index(col_name)
sys.stdout = open("col_" + col_name + ".out", "a")
print("Starting Iteration of Column: " + col_name)
max_length = 0
min_length = 100000 # abitrarily large number!!
max_value = ""
min_value = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz" # abitrarily large number!!
distinct_value_count = 0
has_values = False # does the column have any non-null values
has_null_values = False
row_count = 0
# create a dictionary into which we can add the individual items present in each row of data
# a dictionary will not let us add the same value more than once, so we can simply count the
# dictionary values at the end
distinct_values = {}
row_index = 0
# go through every row, for the current column being processed to gather the stats
for val in rs:
row_value = val[col_index]
row_count += 1
if row_value is None:
value_length = 0
else:
value_length = len(str(row_value))
if value_length > max_length:
max_length = value_length
if value_length < min_length:
if value_length > 0:
min_length = value_length
if row_value is not None:
if str(row_value) > max_value:
max_value = str(row_value)
if str(row_value) < min_value:
min_value = str(row_value)
# capture distinct values
if row_value is None:
row_value = "Null"
has_null_values = True
else:
has_values = True
distinct_values[row_value] = 1
row_index += 1
# end row for
distinct_value_count = len(distinct_values)
if has_values == False:
distinct_value_count = None
min_length = None
max_length = None
min_value = None
max_value = None
elif has_null_values == True and distinct_value_count > 0:
distinct_value_count -= 1
if min_length == 0 and max_length > 0 and has_values == True:
min_length = max_length
print("Ending Iteration of Column: " + col_name)
return ["ARIEL", "DIM_REGISTRATION_SET", col_name, row_count, distinct_value_count, min_length, max_length,
strip_crlf(str(min_value)), strip_crlf(str(max_value))]
辅助函数:
def strip_crlf(value):
return value.replace('\n', ' ').replace('\r', '')
我正在使用 Manager.list() 对象在进程之间共享状态:
lrs = manager.list(list(rs)) # shared memory list
并在 map_async() 方法中传递列表:
pool_results = pool.map_async(gs.get_column_stats_rs, [(lrs, col_name, col_names) for col_name in col_names]).get()
最佳答案
管理器开销会增加您的运行时间。此外,您并未在此处直接使用共享内存。您只使用多处理管理器,它比共享内存或单线程实现慢。如果您的代码不需要同步,这意味着您没有修改共享数据,只需跳过管理器并直接使用共享内存对象。
https://docs.python.org/3.7/library/multiprocessing.html
Server process managers are more flexible than using shared memory objects because they can be made to support arbitrary object types. Also, a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory.
关于Python 多处理处理列表的速度较慢,即使使用共享内存也是如此,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56623176/
有人可以解释一下,在 DOM 中搜索元素时,为什么 Xpath 被认为比 CSS 选择器慢。不同的选择器是否有不同的引擎(例如 Xpath、CSS 选择器等) 谢谢 最佳答案 Xpath 并不是被认为
在我们的一个 MVC 页面中尝试加速某些 ajax 调用时,我遇到了一些我无法真正解释的奇怪行为。我每隔 N 秒就会进行一些 ajax 调用,以轮询一些统计数据。 似乎在物理上不同的文件中对 Cont
Background 尝试进行一个简单的实验,看看传统的 if 语句检查 null 是否比 Apache Commons Lang StringUtils isEmpty/isBlank 更快。 为了
我正在从 Android 设备调用 rest api,并且看到与 PC 相比的速度差异,我感到非常惊讶。下面是来自 PC 上的休息工具的图像。 我尝试了几个库,如 Retrofit、Volley 和常
为什么 scipy.distance.cdist 使用 float32 和 float64 时性能差异很大? from scipy.spatial import distance import num
我是一名优秀的程序员,十分优秀!