- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在将 800,000 行表读入数据帧。然后,我循环遍历每一列和列中的每一行,以收集统计信息,例如最大长度、最小长度、最大值、不同值等。
我可以使用 SLURM 访问 32 核计算,因此我想使用 pool.map_async 在单独的进程中处理数据帧中的每一列。
它比仅使用 for 循环慢得多。
我尝试将 CPU 数量减少到 8 个、4 个等,看看是否是进程启动导致的。
我怀疑是800,000行的panda系列的序列化导致的?
import cx_Oracle
import csv
import os
import glob
import datetime
import multiprocessing as mp
import get_column_stats as gs
import pandas as pd
import pandas.io.sql as psql
def get_data():
print("Starting Job: " + str(datetime.datetime.now()))
# Step 1: Init multiprocessing.Pool()
pool = mp.Pool(mp.cpu_count())
print("CPU Count: " + str(mp.cpu_count()))
dsn_tns = cx_Oracle.makedsn('myserver.net', '1521', service_name='myservice')
con = cx_Oracle.connect(user='ARIEL', password='zzzzz', dsn=dsn_tns)
stats_results = [["OWNER","TABLE","COLUMN_NAME","RECORD_COUNT","DISTINCT_VALUES","MIN_LENGTH","MAX_LENGTH","MIN_VAL","MAX_VAL"]]
sql = "SELECT * FROM ARIEL.DIM_REGISTRATION_SET"
cur = con.cursor()
print("Start Executing SQL: " + str(datetime.datetime.now()))
df = psql.read_sql(sql, con);
print("End SQL Execution: " + str(datetime.datetime.now()))
col_names = df.columns.values.tolist()
col_index = 0
print("Start In-Memory Iteration of Dataset: " + str(datetime.datetime.now()))
# we go through every field
# start at column 0
col_index = 0
# iterate through each column, to gather stats from each column using parallelisation
proc_results = pool.map_async(gs.get_column_stats, df.iteritems()).get()
# Step 3: Don't forget to close
pool.close()
pool.join()
for result in proc_results:
stats_results.append(result)
print("End In-Memory Iteration of Dataset: " + str(datetime.datetime.now()))
# end filename for
cur.close()
outfile = open('C:\jupyter\Experiment\stats_dim_registration_set.csv','w')
writer=csv.writer(outfile,quoting=csv.QUOTE_ALL, lineterminator='\n')
writer.writerows(stats_results)
outfile.close()
print("Ending Job: " + str(datetime.datetime.now()))
get_data()
被称为异步的代码:
import os
import sys
def strip_crlf(value):
return value.replace('\n', ' ').replace('\r', '')
def get_column_stats(args):
# args is a tuple, the first value is the column name of the panda series, the second value is the panda data series
col_name, rs = args
sys.stdout = open("col_" + col_name + ".out", "a")
print("Starting Iteration of Column: " + col_name)
max_length = 0
min_length = 100000 # abitrarily large number!!
max_value = ""
min_value = "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz" # abitrarily large number!!
distinct_value_count = 0
has_values = False # does the column have any non-null values
has_null_values = False
row_count = 0
# create a dictionary into which we can add the individual items present in each row of data
# a dictionary will not let us add the same value more than once, so we can simply count the
# dictionary values at the end
distinct_values = {}
row_index = 0
# go through every row, for the current column being processed to gather the stats
for row_value in rs.values:
row_count += 1
if row_value is None:
value_length = 0
else:
value_length = len(str(row_value))
if value_length > max_length:
max_length = value_length
if value_length < min_length:
if value_length > 0:
min_length = value_length
if row_value is not None:
if str(row_value) > max_value:
max_value = str(row_value)
if str(row_value) < min_value:
min_value = str(row_value)
# capture distinct values
if row_value is None:
row_value = "Null"
has_null_values = True
else:
has_values = True
distinct_values[row_value] = 1
row_index += 1
# end row for
distinct_value_count = len(distinct_values)
if has_values == False:
distinct_value_count = None
min_length = None
max_length = None
min_value = None
max_value = None
elif has_null_values == True and distinct_value_count > 0:
distinct_value_count -= 1
if min_length == 0 and max_length > 0 and has_values == True:
min_length = max_length
print("Ending Iteration of Column: " + col_name)
return ["ARIEL","DIM_REGISTRATION_SET", col_name,row_count, distinct_value_count, min_length, max_length,
strip_crlf(str(min_value)), strip_crlf(str(max_value))]
最佳答案
您可以通过让每个进程查询一列来提高速度,而不是在开始时查询所有列,这需要将它们复制到所有子进程。
一种想法是只查询中央进程中的列名称,然后仅将列名称发送到子进程。然后用 SELECT columnX
而不是 SELECT *
。
关于python - 通过 pool.map_async 进行多处理对于大数据帧来说非常慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56615216/
我有一个关于 map_async 的有趣问题,我无法弄清楚。 我正在使用带有进程池的 python 多处理库。我正在尝试传递要比较的字符串列表和要与使用 map_async() 的函数进行比较的字符串
我按预期使用map_async - 使用以下方法将可迭代映射到多个处理核心: cores = mp.cpu_count() pool = mp.Pool() r = pool.map_async(fu
我想跳过从 map_async 返回的结果.它们在内存中增长,但我不需要它们。 这是一些代码: def processLine(line): #process something pr
我有一个通过话语解析器运行的 80,000 个字符串的列表,为了提高这个过程的速度,我一直在尝试使用 python 多处理包。 解析器代码需要 python 2.7,我目前正在使用字符串的子集在 2
我使用 iPython 的并行处理工具进行大 map 操作。在等待 map 操作完成时,我想向用户显示有多少作业已完成,有多少正在运行,还有多少剩余。我怎样才能找到这些信息? 这是我的工作。我创建了一
我有以下功能 from multiprocessing import Pool def do_comparison(tupl): x, y = tupl # unpack arguments
调试代码花了我一晚上的时间,终于发现了这个棘手的问题。请看下面的代码。 from multiprocessing import Pool def myfunc(x): return [i fo
使用map和map_async有什么区别?将列表中的项目分配给 4 个进程后,它们是否运行相同的功能? 那么假设两者都异步且并行运行是错误的吗? def f(x): return 2*x p=P
在处理由 pool.map 调用的函数内的数据时,我遇到了非常奇怪的问题。例如,以下代码按预期工作... import csv import multiprocessing import iterto
阅读 multiprocessing.Pool doc我知道 map_async 和 apply_async 是 map 和 appy 的两个版本,应该更快,但是不保证输入的处理顺序与提供的顺序相同。
我正在尝试运行一个与 pool.apply_async 配合得很好的胖函数 现在,我正在尝试 pool.map_async 函数(使用 functools.partial 方法传递了 2 个参数),程
我正在将 800,000 行表读入数据帧。然后,我循环遍历每一列和列中的每一行,以收集统计信息,例如最大长度、最小长度、最大值、不同值等。 我可以使用 SLURM 访问 32 核计算,因此我想使用 p
尝试围绕执行 map_async() 的函数编写一些单元测试手术。更具体地说,我想确认在某个进程中发生异常时某些文件会被清理。下面提供了具有意图的示例伪代码。 foo.py def write_chu
我似乎无法在使用 map_async() 时让我的回调工作。当我使用稍微修改过的代码来循环遍历我的数组时,它会通过 apply_async() 添加任务。从文档看来我应该能够将回调与 map_asyn
我尝试在 python 中将多处理包与池一起使用。 我有一个由 map_async 函数调用的函数 f: from multiprocessing import Pool def f(host, x)
我想要一个长时间运行的进程通过队列(或类似的东西)返回它的进度,我将把它提供给进度条对话框。当过程完成时,我还需要结果。此处的测试示例失败并出现 RuntimeError: Queue objects
我这里有一个奇怪的问题。 我有一个 python 程序,它执行保存在单独的 .py 文件中的代码,这些代码被设计为依次执行,一个接一个。这些代码工作正常,但运行时间太长。我的计划是使用 multipr
因此,我正在开发一个应用程序,每次启动时都必须根据哈希列表检查约 50 GB 的数据。显然这需要并行化,我不希望应用程序在“正在加载...”屏幕上挂起一分半钟。 我正在使用 multiprocessi
我有这样的程序: from multiprocessing import Pool import time def f(x): # I make a heavy code here to take t
下面的代码在 Unix 上完美运行,但在 Windows 7 上生成一个 multiprocessing.TimeoutError(两个操作系统都使用 python 2.7)。 知道为什么吗?谢谢。
我是一名优秀的程序员,十分优秀!