收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

1 R( s$ z' E& ^1 T* i% I, j1 g
) m6 } Y; |; F& K

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

4 v9 W% a+ W1 W6 C, `& W. m & y' H9 G6 A$ u! v5 I+ R7 ?8 z

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

5 [( r4 R" g! n+ Q, c+ I ( `1 U' A; _% W$ S. s

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

5 A# d, n! \% Z, A: Z4 F/ M

其中几个关键点解释一下:

! |6 K5 Z. H( a6 d: o4 d

(1)首先定义拟合正太分布的函数

( k) a# x) g4 S
def norm_fit(data):2 S, R/ G4 Q8 Y loc, scale = st.norm.fit(data)) I) o0 v6 \* C return np.array([loc, scale])
( B" w) j" C# X# M

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

! B1 ~3 m* W' O! J0 G5 }! Y- ]4 p+ e

(2)xarray分块

) D, I& W' j7 h1 [
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) : D8 l2 S: W, K! r- B x = x.chunk({"lev":5, "lat":100,"lon":100})
6 o& E& e) l# A9 t

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

! t# K0 k; i5 U$ k9 O4 P3 u

(3)xarray.apply_ufunc函数

& D; p! H. N; ^, A" m+ P9 V
result = xr.apply_ufunc(norm_fit, , n) J1 }+ b4 Y2 v# C x,& n8 y. ]0 R) P) Z input_core_dims=[["time"]], ! t+ u7 @) r y output_core_dims=[["paras"]], 5 X- R* h7 l2 e& ^; [# V2 q dask="parallelized",- M( n) P+ e0 }0 C5 K* ~3 c output_dtypes=[np.float],5 h$ y( \( U* n dask_gufunc_kwargs={output_sizes:{"paras":2}}7 u; v: ^$ a; `- k4 @. p2 V )
* N& w5 ^& U+ c+ s3 \

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

8 i k F" w% M0 u" c; k$ ]; N- [/ l

以下是全部代码:

% u- ]9 ~ U' `7 N# P
from scipy import stats as st _/ d. Z8 T& u" @. T" G4 s# y7 ]. r- X: j1 } import xarray as xr1 t' P/ k4 I y9 f8 ^& e4 G import dask# d4 [7 }+ a; O g( A import numpy as np4 i- _7 @1 w6 K5 X8 j, W. J/ s# K# {1 [ from dask.diagnostics import ProgressBar 8 q9 o+ e$ j2 t: k% W; E2 ]5 r$ K/ F: D. z9 @. u2 |7 ~ def norm_fit(data): 3 k: n& ?% r8 b0 }6 D' h loc, scale = st.norm.fit(data)- k! p. a' i: j. g: `' e return np.array([loc, scale]) & O. G1 k0 d, h: K1 t$ u! Y 2 Q; a1 Z* ^& Q3 F y o. ` rs = np.random.RandomState(0): N4 B7 N' u | x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) : m& y% Y- H- W. y, @3 }* E x = x.chunk({"lev":5, "lat":100,"lon":100})" \% ~2 P6 w4 P! ?$ |9 A" `& J8 A , l0 o |% r3 p: P #使用apply_ufunc计算,并用dask的并行计算 3 @5 h) G+ S/ _) x; _1 j9 _+ N result = xr.apply_ufunc(norm_fit, ' i7 w+ ?. B8 L0 _+ i% d" u x, / U! B# X& n6 | input_core_dims=[["time"]], ! x3 B$ s4 A6 n9 o0 \ l output_core_dims=[["paras"]],1 S0 ~$ y) \2 f; X5 C dask="parallelized", & [+ U$ J* K8 D output_dtypes=[np.float], 7 n8 C' u& o% Z) p6 P+ @ m dask_gufunc_kwargs={output_sizes:{"paras":2}}) C" H& F* T: E4 r5 @ ) 5 B3 m7 P" j( `- K1 W4 i4 I2 o* @5 Q* Q" X- ~* a #compute进行真实的计算并显示进度 " c& G, S4 Z$ J6 a with ProgressBar(): + J% O* ~7 w4 a5 a) Q& y4 | result = results.compute() 3 G) J. ^1 \8 t4 b8 L N1 |) `& s' w; }( z$ h #结果冲命名保存到nc文件 % Q/ F; m/ c% K4 H result = result.rename("norm_paras") b/ c& ^6 q0 m; k result = result.to_netcdf("norm_fit_paras.nc",compute=False); c9 J$ n2 u8 g9 c/ q with ProgressBar():+ J+ D7 z' e0 S* m% ~1 X5 n result.compute()
1 n+ Y ]# j- {9 o

转自:气海同途

) z2 u0 O4 |- Y* l$ E- J, n

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在前天 18:39
快速回复 返回顶部 返回列表