收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

: p7 w) G2 V" `' w8 O/ S' N
: W/ s/ `' L/ f! K/ R% r1 U

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

* Z" E1 m. |0 A/ k 1 [1 l$ T5 Q9 C {

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

7 e, i) N1 Q4 ^; F' w! ?+ @4 ^* c ) j/ O1 \0 Q. a6 }, d: ]8 ]& I7 Q

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

2 G" T, r# J2 D

其中几个关键点解释一下:

& v# Z) C0 y7 g6 k

(1)首先定义拟合正太分布的函数

) ~* a I2 b6 S
def norm_fit(data): " A) j8 M8 n* Q" ?. J7 ~; U& C8 l loc, scale = st.norm.fit(data) 7 O! L2 j0 b9 N# V7 a return np.array([loc, scale])
J9 h6 Q: N( A+ j# X0 G

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

% f; D8 H3 O8 J& G. a

(2)xarray分块

* y1 Z+ ]* ^$ s1 o
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) 3 ]- v7 a; d' U1 X x = x.chunk({"lev":5, "lat":100,"lon":100})
a4 c3 q+ C6 G- x$ w& O

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

) m0 {- k% ~& B8 f/ p2 l) y/ }9 f8 ~% m

(3)xarray.apply_ufunc函数

+ T E3 _3 d. Z: L2 r
result = xr.apply_ufunc(norm_fit, . z) |8 j! h+ b: N H x,9 ~8 t/ C/ ]5 _: g+ w. X input_core_dims=[["time"]], ' s! I' \, d* T# \ output_core_dims=[["paras"]],7 E) d. l8 \# M& W* O! h dask="parallelized",* d& ?/ [# t) x% _; U3 }/ q7 ? output_dtypes=[np.float], ( V& R# _8 @" |. } dask_gufunc_kwargs={output_sizes:{"paras":2}} + j( u" z$ J+ Q: }' Z5 \ )
/ h; I+ v+ }/ l5 q

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

- i7 o" @& q8 g( {2 y9 P# c3 ~% h( _! _ - f6 ?. P, E# N( J3 R6 K: y3 A# w1 h

以下是全部代码:

0 z$ G" [6 T; r8 Y; ~0 g$ C( }7 D
from scipy import stats as st2 j2 I* |( O1 x, p3 w* g1 F- E 2 C* a6 ]* ^( i9 r& D1 m; o import xarray as xr( d: h& }0 ?/ B) P2 }# ] import dask: U3 _- ~* L2 L+ L2 `$ y& Z% G7 N8 j3 j/ g import numpy as np ; \0 F9 |3 F. E& k from dask.diagnostics import ProgressBar : a4 w7 j1 l4 R0 v8 b 1 C* _' J: H1 b: u/ n/ S3 m5 `) ? def norm_fit(data): # o! z2 X( N) _) O! k loc, scale = st.norm.fit(data)9 j4 ~% G) c; d; w! y return np.array([loc, scale]) ' U' B+ O5 c: P( o y; N0 I; }; W+ g, c5 W" a3 z- p8 I5 ~' y rs = np.random.RandomState(0), }0 A1 z1 Y( k- v$ S4 \3 o1 k% ^ x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) 0 f+ u" s5 q/ {0 p; ^ x = x.chunk({"lev":5, "lat":100,"lon":100}); v* N+ g) n. @% R6 w5 | 4 V z) V. ^; b #使用apply_ufunc计算,并用dask的并行计算* h9 {3 t3 _9 `. P result = xr.apply_ufunc(norm_fit, * l( a+ } {) r- P! f& p# r# l x,8 r1 O) } `. U5 P, u0 s4 U9 O input_core_dims=[["time"]]," Q; b0 x, M R3 U3 r output_core_dims=[["paras"]], / P0 C" Q$ w0 P" f( y: W dask="parallelized", 5 |% V& j( R6 `) s# L5 V output_dtypes=[np.float],3 B# G% I% p# z6 H' L& s! a) j dask_gufunc_kwargs={output_sizes:{"paras":2}}4 V+ t7 c0 x( |- o: M6 ? ) ) D1 d0 ?' b' @" q7 U8 ? 4 ] W8 e1 K0 I2 H2 m% A$ j6 j: n. u7 q #compute进行真实的计算并显示进度: k# S& b% l f with ProgressBar():0 ]2 V# P# l, N& x' o4 A result = results.compute() 5 c4 x4 I$ y. E- V; I$ f$ }. [3 S& K# F #结果冲命名保存到nc文件 0 i6 V, c2 L) |$ g# v result = result.rename("norm_paras") * s! t" b) {7 [. l result = result.to_netcdf("norm_fit_paras.nc",compute=False); f! x* p' L/ s6 S with ProgressBar():; W# ~9 O0 K8 j, W result.compute()
$ v Y6 W$ o; D; }1 T5 a

转自:气海同途

8 i" y. n! W9 a# B2 P

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2026-4-7
快速回复 返回顶部 返回列表