xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

5 M X" \7 d* z9 c: Z4 Q
! C" |# j3 t0 C- |

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

4 K; P: s& J G1 y3 H2 U 0 W, q, f& C/ ^6 G+ G6 T- v

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

L ?0 ]9 U, R8 W8 u9 s" P0 y4 w- H9 B* b

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

" I) W9 v Q* `9 `& G

其中几个关键点解释一下:

; j, H5 c; N! O2 q% H

(1)首先定义拟合正太分布的函数

) d- u; i6 r/ R& I0 A
def norm_fit(data): 0 _" s! i$ e, L; O( Z! t loc, scale = st.norm.fit(data) ; t9 z4 f0 P) n: t return np.array([loc, scale])
! l; p5 D7 J$ J* q" e

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

2 N, G; z F6 o: n4 Y

(2)xarray分块

3 O$ h4 y' t2 \6 C
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])0 ]1 T6 d/ s# V/ ~ x = x.chunk({"lev":5, "lat":100,"lon":100})
" ~3 T/ T) T! d, n% {9 D

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

# K; r6 K2 i* ]" g' L

(3)xarray.apply_ufunc函数

) s2 s( b8 u+ L t1 a8 G
result = xr.apply_ufunc(norm_fit, X) m% w$ W9 W, k3 D' N x,( X P; ]9 h& T8 f input_core_dims=[["time"]],' y8 ]4 d4 Q# r; A7 E) ~ output_core_dims=[["paras"]],# A8 @. H& @1 z! }! J- M' W2 A: z dask="parallelized", $ Y9 T# \9 ^+ k4 p0 W output_dtypes=[np.float], " K% T- X- Y8 W7 V& Z: b dask_gufunc_kwargs={output_sizes:{"paras":2}}2 d3 G X/ }+ _$ a* g# \0 S/ n. x )
' K$ ?) a; e. D* O' B

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

- }1 b+ v$ Z$ O$ r' Q3 e: L% ~" x+ ~/ n0 i/ l+ i, A

以下是全部代码:

( I" E" U& l! V% I- U9 J0 e( ?/ O
from scipy import stats as st4 p2 h! B3 L, q0 c9 @ % m) g' y; _4 Q$ k! K$ v import xarray as xr& k' ]7 u& z- c9 Q import dask% A! `8 Z) N9 `7 P4 Q import numpy as np + L: Q9 `7 ]2 Y9 K: q' N, c- h from dask.diagnostics import ProgressBar 7 {' _% p8 k' o) J+ g; _) i) X$ d3 r- K/ o def norm_fit(data): - A! |7 e+ m H0 U0 s" ? loc, scale = st.norm.fit(data) , a( d& j+ B5 @0 x; Y) F) T5 H return np.array([loc, scale])6 l# j, s! k2 S" E9 C$ b . f- c! ^2 Z' Q" A rs = np.random.RandomState(0)$ s z0 y& J m5 m4 ?- i x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) , j, k) z1 A% v! f. ~! B x = x.chunk({"lev":5, "lat":100,"lon":100}) . G6 d. B' m% p. q2 f: z4 j( \# q; Q R8 | #使用apply_ufunc计算,并用dask的并行计算 , _4 _: d& ^+ q: V4 u* f result = xr.apply_ufunc(norm_fit,% @( C4 J8 u7 J" d1 `2 J6 U x, - }: ?1 I7 o* a: e& w. L input_core_dims=[["time"]], H& `0 E0 p8 I output_core_dims=[["paras"]], # u/ C, H: M( f" S! ~. f) \* q dask="parallelized"," B( n* I. {( {0 p) U- @ output_dtypes=[np.float], ! y2 _5 c& N0 z, \0 }& \+ m dask_gufunc_kwargs={output_sizes:{"paras":2}} + O, C$ J5 [) P9 u ). \$ t* D U r B/ d4 V 3 S, I" C f% P9 Z: j0 [ #compute进行真实的计算并显示进度2 i' t. k! P! H* P' B with ProgressBar():* N: {$ Y4 ~3 f% T& v6 J1 o5 | result = results.compute()7 t$ [( G- l: H2 p ' q, M/ ~" \! p1 P #结果冲命名保存到nc文件 9 _; }% I0 J( T z* K6 J result = result.rename("norm_paras")3 Z; j2 t' R) m result = result.to_netcdf("norm_fit_paras.nc",compute=False)' ^, u% }9 I( J+ Q1 C with ProgressBar(): % {* @/ u' R3 ~- d3 q result.compute()
# k: F7 p) p. A: @

转自:气海同途

6 y! j/ s+ f* {& l

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在昨天 18:16
快速回复 返回顶部 返回列表