收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

/ O9 x* u' h9 R; ?
9 U& t" K& |4 O A9 w

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

) o# y* r! n2 c 4 O2 }/ l/ p, c8 z7 f

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

8 }9 }, {* K9 k8 x( c, `4 X' R 5 |5 y' L) s6 f- F

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

6 z4 ?" F: n: b( H1 k; G

其中几个关键点解释一下:

' Z" r; q- B7 F

(1)首先定义拟合正太分布的函数

$ o$ i5 h" J3 q- g2 f# K. C
def norm_fit(data): " d* D1 j" s* ]1 g$ j' _ loc, scale = st.norm.fit(data) 0 h5 w3 M* z+ O. f, \ return np.array([loc, scale])
: R# X- t1 ?7 C

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

% S. `# q, x. X" w) B1 q

(2)xarray分块

" |- V) r1 y' _% R$ {( Z% _ G
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])# }; k& b h' W3 N+ D x = x.chunk({"lev":5, "lat":100,"lon":100})
! [4 U6 h& s9 K* e# M

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

7 x. \( L, y& Q/ T' a6 R5 U

(3)xarray.apply_ufunc函数

8 x8 k3 v4 Z# m" e. _ I9 ]0 I& y
result = xr.apply_ufunc(norm_fit, / S5 }- i! l4 N1 h! a x, % D$ b; X$ v8 p) e/ T9 x* m5 y input_core_dims=[["time"]],4 Q) v! ?: a- s0 D; I/ R, B. z output_core_dims=[["paras"]], 4 t1 s4 g0 w4 X5 s- e, p dask="parallelized",2 ?; \( U+ e2 _4 e5 r' t. c& N output_dtypes=[np.float],5 p# C# C2 [# T" d dask_gufunc_kwargs={output_sizes:{"paras":2}}" D0 P+ I% {; l: Z' w4 J+ ` )
" z4 P3 z9 }5 [) L* Q: j& I

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

5 o8 ?! O& N: n5 K' W# p ' ?$ e3 g$ K% ^

以下是全部代码:

& B( e1 v/ |* V6 w1 U1 x/ b" P
from scipy import stats as st ! }2 W' o% i6 C" v) A! P0 U9 }) y3 z. m; X& K5 ?; `+ D' ^* a import xarray as xr 0 M) B# B& w6 {7 O! G import dask - K, f! U8 |) X% D import numpy as np1 B$ u& r a* N, i3 N5 m6 ~ from dask.diagnostics import ProgressBar3 k1 N' E! Y) y5 p7 ~+ b / k% H: T5 f6 G3 s def norm_fit(data): 8 A5 h( Y0 f" N _ loc, scale = st.norm.fit(data) ( c( f! a# @. q; |# |: x' U3 G return np.array([loc, scale])* Q6 f) Y. `* h5 h. ^ Y" \4 c+ T/ s7 a Y6 G; M rs = np.random.RandomState(0)7 [9 \3 E8 j, B* B- f* t$ q! x x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])$ I( O% M2 u4 q/ E3 ]9 U( Z x = x.chunk({"lev":5, "lat":100,"lon":100}) 3 q" p$ j* K7 O6 e5 d& Y7 f: a$ ]* Z/ V7 X- p+ h #使用apply_ufunc计算,并用dask的并行计算: b3 I% [; f" j1 j$ w3 [ result = xr.apply_ufunc(norm_fit,9 \3 l1 @# s. c: p x, 4 Z9 ?2 f( Y. y7 V, p4 v input_core_dims=[["time"]],7 H3 g# W: ]' h0 J' w2 T2 { b output_core_dims=[["paras"]],. P3 d- C/ C; W5 ` F dask="parallelized",8 u4 p3 x; ^2 ^) Z! {% }) Y output_dtypes=[np.float], : y6 c" t' m8 f8 w4 n dask_gufunc_kwargs={output_sizes:{"paras":2}}7 B( X! g+ Q% m& f( h8 C ) 9 S" X E" L; `! c) V3 ^ . F) R, W# Z7 I; _) O n3 c* j& i #compute进行真实的计算并显示进度 5 s. V7 T- z' o with ProgressBar(): ) P' Q( q( }, ]% I3 t; x# S result = results.compute(): W! _4 Q h, u$ J# M 6 t; M9 {; [* | #结果冲命名保存到nc文件 ' s$ C: }# j6 B+ C result = result.rename("norm_paras") . P8 g8 I2 v4 o result = result.to_netcdf("norm_fit_paras.nc",compute=False)6 i' p$ [2 i, E1 C6 ]# z with ProgressBar():( f; W0 Y# ]3 U, l) _4 M: w result.compute()
3 V+ d8 @+ d' ]$ D$ K' ~/ S7 s

转自:气海同途

; N" v# z4 T( N+ J2 N' O

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在前天 18:39
快速回复 返回顶部 返回列表