收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

" r+ q$ _( M% K) [# @
+ n& G5 e$ G* y! F% q

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

% m- N O" T. O 7 q! a9 b. M5 s! h

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

9 ^. _! V8 N+ ?) T" d$ M 9 u, m& W, N* x+ \( v

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

9 \' c A5 h) q% j( m

其中几个关键点解释一下:

6 r% D1 c, _. b% n3 `

(1)首先定义拟合正太分布的函数

9 [6 _0 m' n6 ^% j( z
def norm_fit(data): 5 j& c1 R$ F$ e4 G: X3 H1 ~ i loc, scale = st.norm.fit(data)7 ]' j0 ?: h5 ^* w3 f- d return np.array([loc, scale])
8 f' y6 @0 @2 A" R9 n2 }' H

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

6 Y, _7 b9 w: Y& L. I/ r- W

(2)xarray分块

8 d# q* I+ w @
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])9 x3 h; O/ |& O% m2 I# H! { x = x.chunk({"lev":5, "lat":100,"lon":100})
7 a8 J j1 `( e) o# S7 ?5 V

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

, s# _* ]7 a7 k+ F, _2 j* F

(3)xarray.apply_ufunc函数

: g& E1 l& G8 j0 v
result = xr.apply_ufunc(norm_fit,4 ~0 z' O5 D5 v0 Z% _+ a: l+ B x, 4 C9 j, J- V% E& j5 V# L input_core_dims=[["time"]],$ O3 h! C$ b2 ^' E K output_core_dims=[["paras"]],/ m- J5 M6 e( G/ j& ~/ Q' w dask="parallelized",% B: @+ w3 V+ W5 t output_dtypes=[np.float], : u ]# B, [1 t% I+ ] dask_gufunc_kwargs={output_sizes:{"paras":2}} / i3 a) b) j( W- n, @! R/ L )
, v7 K3 }9 T0 `, }& w4 Z& B% ^

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

5 ^1 o$ d6 o3 V, O& h- y m1 j7 [ + ~7 ?! f! c( }4 |0 @% h& t

以下是全部代码:

* s0 Y# o% _( q1 B" @! }: n( V
from scipy import stats as st: G7 S v9 _" Z' z; L0 r . [# p4 k$ Z$ G8 _/ U& _ import xarray as xr , k; n5 }7 ~4 X. e- F* e import dask 1 M1 ]+ n% B$ U: d7 X import numpy as np. k: z7 S# i w+ r- n# I from dask.diagnostics import ProgressBar7 e4 J: _5 b, ? $ g4 H# r! @- t& s' R def norm_fit(data): 0 G1 E7 |/ W# m loc, scale = st.norm.fit(data)5 x9 S- U9 m6 C8 O, V* ^ return np.array([loc, scale])+ }! Z: z- h) x3 g* u , D( z- R4 c" a% f; ^7 X1 t# A rs = np.random.RandomState(0) ; _% \; L! p: b x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) - h$ F4 W$ I+ u) y5 o, ~) `! C9 l x = x.chunk({"lev":5, "lat":100,"lon":100}) ; C9 w$ g# a+ E- b6 _1 E+ Y3 @8 g8 o+ K$ j$ ~7 b. o% _5 [6 Y #使用apply_ufunc计算,并用dask的并行计算+ Z! x2 O8 u+ ^, g7 {7 y5 b X- Y result = xr.apply_ufunc(norm_fit,+ ~# z9 D8 ^ V& f x, & A4 q( E1 s; B4 m# ^ input_core_dims=[["time"]],1 n/ o: L$ A1 I output_core_dims=[["paras"]], W" ^( _# J) }# D( J; | dask="parallelized", & k1 w7 j4 l+ G, u4 n6 r output_dtypes=[np.float], * g- m& o' L) H/ } dask_gufunc_kwargs={output_sizes:{"paras":2}} . F% _+ s D, r, Z) _' { )0 ~& a" M4 U; p- L- ` & x8 `1 n J- B3 X# ] #compute进行真实的计算并显示进度 ; q& ?. S8 l1 p3 w with ProgressBar():& p3 B% f5 B% G: E1 s# Q4 K result = results.compute() 8 W4 ]" Q( P6 S" R) P 2 ^; _6 P8 ]1 K! Z" H# ]- N5 U #结果冲命名保存到nc文件, H7 m O9 p# E) [ result = result.rename("norm_paras") 8 c% C2 j; G+ V result = result.to_netcdf("norm_fit_paras.nc",compute=False)- A. s1 p) P4 q% J8 `: A with ProgressBar():9 i& W0 Z, [: D result.compute()
) e7 o+ Q4 E1 e( J7 x

转自:气海同途

2 Z6 w2 ?' E* D# }2 i6 u( h

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在半小时前
快速回复 返回顶部 返回列表