收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

e$ b7 P+ r! f
) ~ o6 V$ j# _% X; Y

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

+ V1 f) I# Y. b* |/ v' r; j) H# Q& J) ?" e w- q) i0 l

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

) `! X& v/ g+ S' E- Q# ? # b2 ^ i5 \4 Y# f$ ?

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

! L( U+ z6 o2 b* s8 Q

其中几个关键点解释一下:

: q; p5 [% w. I

(1)首先定义拟合正太分布的函数

. s8 C n4 ^ ^$ ~# @
def norm_fit(data):9 b1 R' [* J" a; t& n4 c9 s loc, scale = st.norm.fit(data)' i" W1 V" o! V( w0 J" J return np.array([loc, scale])
8 E0 q" D. O- ?! a0 W$ M

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

, \3 d' E+ Q; ^

(2)xarray分块

1 U' }; n6 O5 j% G& ]* L
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) ; H) ?# U1 ], s w x = x.chunk({"lev":5, "lat":100,"lon":100})
" F1 Y" W3 {2 ^

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

0 F" o; U7 h' u4 K8 v% p

(3)xarray.apply_ufunc函数

) y# X$ ~2 x5 N& \* T1 b
result = xr.apply_ufunc(norm_fit,6 j$ ?3 V* }% |; ?# t6 I- M2 a x,& l; ^' [+ u' d) w6 X input_core_dims=[["time"]], {' k* ]9 Q# x0 O, H. C1 x output_core_dims=[["paras"]],, a; V5 z4 U9 `4 d) c, L dask="parallelized",$ m! e# s5 M# U5 F output_dtypes=[np.float],2 t2 l. c: S- R, W: ^/ R. C+ s; S4 L dask_gufunc_kwargs={output_sizes:{"paras":2}}8 ]' d1 C# K4 K. k8 M )
, H2 T; H1 `3 `

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

: S, T4 X$ L4 e8 ~6 j2 A `: G! H6 @( J" _

以下是全部代码:

& s" R) o' o% |, t* ?
from scipy import stats as st. z4 Q9 {% K$ Y ^; ^& X + ^' o1 x# _/ v4 s2 w* i% G* e9 Y import xarray as xr : |/ V5 m. x6 N9 h) c import dask! O. n# p. Y9 ?: V# m import numpy as np2 b' ?8 d z9 C from dask.diagnostics import ProgressBar [) o) L: s' i" v9 [6 B. d% \ . o* j3 ~1 R1 N def norm_fit(data):5 g/ {6 R2 r8 @/ B# N loc, scale = st.norm.fit(data)4 O: a' E. a5 i& |5 c4 I) l2 | return np.array([loc, scale])/ @" H; ^5 ?, Q/ V* ?# \ 1 S2 K5 q4 V* w1 ^$ Q3 ~4 R. d rs = np.random.RandomState(0) + G) T+ n/ q, w- Y3 k, d$ ^( t x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) 6 w5 A: s( o. a4 l2 ~8 ~# r& p x = x.chunk({"lev":5, "lat":100,"lon":100}): }9 C6 @: I3 B1 ~: K 9 L& h! Y0 j0 v1 l6 o #使用apply_ufunc计算,并用dask的并行计算 ( Z1 Q" g9 l( b1 K9 ~0 S5 U8 E result = xr.apply_ufunc(norm_fit, - m. y/ \" J" A) K8 a( b* ?; W x,' n1 M- C' }3 t5 U9 Q* y8 S input_core_dims=[["time"]],* `% O u7 J2 H" p. b1 E& Y6 J- _ output_core_dims=[["paras"]], 5 s3 a4 E' e& e dask="parallelized", 6 j0 A3 @( B m/ S9 i/ j; `: c: Y+ n output_dtypes=[np.float],- Y6 t* I# ~; y6 [+ E. C dask_gufunc_kwargs={output_sizes:{"paras":2}} 7 I9 }( K/ o0 Z+ r" m, w ) & q6 B4 G8 {2 f: \. q" N% _9 P$ g, f8 j$ O& E #compute进行真实的计算并显示进度/ C2 z: R2 |1 I% `; D4 B with ProgressBar(): / q# O6 \2 B4 w& B& j7 Q result = results.compute()1 n4 m% _5 T/ Z4 X3 r8 K \ 1 j: ~( u1 r+ f7 n #结果冲命名保存到nc文件 * E# m9 M! ^# P- m- m$ J result = result.rename("norm_paras") 6 W+ F* `7 V; ?# o% _ result = result.to_netcdf("norm_fit_paras.nc",compute=False)* I: q( C* `1 B5 z' C7 b5 \3 g with ProgressBar(): ! Q9 j+ ^% i9 l* A result.compute()
7 n5 [9 z- P3 I0 l0 T7 ]# _& F4 \

转自:气海同途

& X: {/ f, z+ R& Z: \

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在昨天 18:42
快速回复 返回顶部 返回列表