gpt4 book ai didi

python - 使用 Xarray 和 Numpy 数组进行多处理

转载 作者:太空宇宙 更新时间:2023-11-04 04:23:10 25 4
gpt4 key购买 nike

所以我正在尝试实现一个已经描述过的解决方案 here ,但我正在稍微改变一下。我不是仅仅尝试通过操作更改数组,而是尝试使用 xarray 从 NetCDF 文件中读取,然后使用多处理模块写入共享的 numpy 数组。

我觉得我已经很接近了,但是出了点问题。我在下面粘贴了一个可复制的、简单的复制/粘贴示例。如您所见,当我运行进程时,它们都可以读取我创建的文件,但它们没有正确更新我尝试写入的共享 numpy 数组。任何帮助将不胜感激。

代码

import ctypes
import logging
import multiprocessing as mp
import xarray as xr

from contextlib import closing

import numpy as np

info = mp.get_logger().info


def main():

data = np.arange(10)

for i in range(4):
ds = xr.Dataset({'x': data})
ds.to_netcdf('test_{}.nc'.format(i))

ds.close()


logger = mp.log_to_stderr()
logger.setLevel(logging.INFO)

# create shared array
N, M = 4, 10
shared_arr = mp.Array(ctypes.c_float, N * M)
arr = tonumpyarray(shared_arr, dtype=np.float32)
arr = arr.reshape((N, M))

# Fill with random values
arr[:, :] = np.zeros((N, M))
arr_orig = arr.copy()

files = ['test_0.nc', 'test_1.nc', 'test_2.nc', 'test_3.nc']

parameter_tuples = [
(files[0], 0),
(files[1], 1),
(files[2], 2),
(files[3], 3)
]

# write to arr from different processes
with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
# many processes access different slices of the same array
p.map_async(g, parameter_tuples)
p.join()

print(arr_orig)
print(tonumpyarray(shared_arr, np.float32).reshape(N, M))


def init(shared_arr_):
global shared_arr
shared_arr = shared_arr_ # must be inherited, not passed as an argument


def tonumpyarray(mp_arr, dtype=np.float64):
return np.frombuffer(mp_arr.get_obj(), dtype)


def g(params):
"""no synchronization."""
print("Current File Name: ", params[0])

tmp_dataset = xr.open_dataset(params[0])

print(tmp_dataset["x"].data[:])

arr = tonumpyarray(shared_arr)
arr[params[1], :] = tmp_dataset["x"].data[:]

tmp_dataset.close()


if __name__ == '__main__':
mp.freeze_support()
main()

最佳答案

怎么了?

1.你忘记在 tonumpyarray 之后重新整形。
2.你在tonumpyarray中使用了错误的dtype

代码

import ctypes
import logging
import multiprocessing as mp
import xarray as xr

from contextlib import closing

import numpy as np

info = mp.get_logger().info


def main():

data = np.arange(10)

for i in range(4):
ds = xr.Dataset({'x': data})
ds.to_netcdf('test_{}.nc'.format(i))

ds.close()


logger = mp.log_to_stderr()
logger.setLevel(logging.INFO)

# create shared array
N, M = 4, 10
shared_arr = mp.Array(ctypes.c_float, N * M)
arr = tonumpyarray(shared_arr, dtype=np.float32)
arr = arr.reshape((N, M))

# Fill with random values
arr[:, :] = np.zeros((N, M))
arr_orig = arr.copy()

files = ['test_0.nc', 'test_1.nc', 'test_2.nc', 'test_3.nc']

parameter_tuples = [
(files[0], 0),
(files[1], 1),
(files[2], 2),
(files[3], 3)
]

# write to arr from different processes
with closing(mp.Pool(initializer=init, initargs=(shared_arr, N, M))) as p:
# many processes access different slices of the same array
p.map_async(g, parameter_tuples)
p.join()

print(arr_orig)
print(tonumpyarray(shared_arr, np.float32).reshape(N, M))


def init(shared_arr_, N_, M_): # add shape
global shared_arr
global N, M
shared_arr = shared_arr_ # must be inherited, not passed as an argument
N = N_
M = M_


def tonumpyarray(mp_arr, dtype=np.float32): # change type
return np.frombuffer(mp_arr.get_obj(), dtype)


def g(params):
"""no synchronization."""
print("Current File Name: ", params[0])

tmp_dataset = xr.open_dataset(params[0])

print(tmp_dataset["x"].data[:])

arr = tonumpyarray(shared_arr).reshape(N, M) # reshape
arr[params[1], :] = tmp_dataset["x"].data[:]

tmp_dataset.close()


if __name__ == '__main__':
mp.freeze_support()
main()

关于python - 使用 Xarray 和 Numpy 数组进行多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54083743/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com