gpt4 book ai didi

python - 如何使用 dask 和 xarray 加载和处理 zarr 文件

转载 作者:行者123 更新时间:2023-12-05 00:56:58 37 4
gpt4 key购买 nike

我在 s3 中有每月 zarr 文件,其中包含网格温度数据。我想为一个纬度/经度提取多个月的数据并创建该时间序列的数据框。一些伪代码:

datasets=[]
for file in files:
s3 = s3fs.S3FileSystem()
zarr_store = s3fs.S3Map(file, s3=s3)
zarr = xr.open_zarr(store=zarr_store, consolidated=True)
ds = zarr.sel(latitude=lat,
longitude=long,
time=slice(start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d"))
)
datasets.append(ds)

con = xr.concat(datasets, dim='time')
df = con.to_dataframe()

所以这段代码可以工作,但速度非常慢。我希望使用 dask 来加快速度。我的计划是更改一次处理一个文件并返回数据帧的方法。然后我会调用 client.map() 并生成所有 dfs,然后在最后将它们连接在一起。所以我总结了类似的东西:

def load(file, lat: float, long: float, start_date, end_date):

s3 = s3fs.S3FileSystem()
s3_path = file['s3_bucket'] + '/' + file['zarr_s3_key']
zarr_store = s3fs.S3Map(s3_path, s3=s3)
zarr = xr.open_zarr(store=zarr_store, consolidated=True)

ds = zarr.sel(latitude=lat,
longitude=long,
time=slice(start_date.strftime("%Y-%m-%d"),
end_date.strftime("%Y-%m-%d"))
)

tmp = x.result().to_array().values
df_time = zarr.coords['time'].sel(time=slice(start_date.strftime("%Y-%m-%d"), end_date.strftime("%Y-%m-%d"))).values
df = pd.DataFrame({'time': df_time, 'lat': lat, 'long': long, 'dat': tmp})
df.set_index(['time', 'lat', 'long'], inplace=True)

return df

if __name__ == '__main__':
client = Client('tcp://xxx')

start_date = date(2000, 1, 7)
end_date = date(2000, 10, 20)
lat = 2
lon = 10

# get the s3 locations of the zarr files from the db
files = get_files()

# try just running with one file
res = client.submit(load, files[0], lat, lon, start_date, end_date)

# run them all
future = client.map(load, files,
repeat(lat), repeat(lon),
repeat(start_date), repeat(end_date))
x = client.gather(future)

当我将客户端连接到本地计算机时,此代码运行良好。但是当我尝试连接到远程集群时,我在 xr.open_zarr 调用中收到以下错误:

KeyError: 'XXX/data.zarr/.zmetadata'

我尝试更改代码并在方法调用之外加载 zarrs 并将它们传递进来,但结果只给了我 nans。有什么我想念的吗?这不是解决我想要做的事情的正确方法吗?

最佳答案

如果您只想提取某个时间点的时间序列,您可以创建一个 Dask 客户端,然后让 xarray 并行执行此操作。在下面的示例中,我们只有一个 zarr 数据集,但只要工作人员忙于处理每个 Zarr 文件中的 block ,您就不会从并行解析 Zarr 文件中获得任何 yield 。

import xarray as xr
import fsspec
import hvplot.xarray

from dask.distributed import Client

url = 's3://mur-sst/zarr' # Amazon Public Data

ds = xr.open_zarr(fsspec.get_mapper(url, anon=True), consolidated=True)

timeseries = ds['analysed_sst'].sel(time=slice('2015-01-01','2020-01-01'),
lat=43,
lon=-70).persist()

timeseries.hvplot()

产生:

enter image description here

这里是 Full Jupyter Notebook

关于python - 如何使用 dask 和 xarray 加载和处理 zarr 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61260950/

37 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com