收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

3 X' `" C& j2 k. T
1 k, q5 j c- O( Y

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

/ y) l8 E3 ?% }9 g + n8 m! X$ w. u; @

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

* r+ V) ?, C6 K7 N! P" D, y 0 i2 D0 J* {: f) i; `3 F

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

' O9 G% G4 E6 v) i& I% d

其中几个关键点解释一下:

+ ?. x$ r& k- j" y3 A/ D* H# z- T' l

(1)首先定义拟合正太分布的函数

" [, V+ D+ B) Y. g9 T8 b7 E
def norm_fit(data):& J9 g- Q" o) Y8 K: Z! u! c loc, scale = st.norm.fit(data)+ W: p! Q; R+ O return np.array([loc, scale])
O; r8 @1 L7 }3 q* d& f/ D

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

5 t* k7 K1 I7 ?3 T

(2)xarray分块

) m) W- f: F4 s& F1 `& _
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) 7 [9 w% x' |9 Z0 p' S x = x.chunk({"lev":5, "lat":100,"lon":100})
# q. n9 V2 Y P& o) v+ [0 H6 P+ B" e

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

2 A3 k; l- k0 d. U* E

(3)xarray.apply_ufunc函数

$ |* \/ Z2 Z U3 w: r7 Z4 E, R% c
result = xr.apply_ufunc(norm_fit,( f* K" n, @2 G6 S B# [- } x,/ E1 h4 c* K+ g$ K0 C* C" F6 b. Y input_core_dims=[["time"]],- C: @# z9 b) g/ U output_core_dims=[["paras"]], . q; v. z& s3 m! x3 ?- a dask="parallelized",1 G6 c" e- c9 M& h5 T% h) b. U9 W) M output_dtypes=[np.float],. F. B+ i- F' F. s) ]+ F dask_gufunc_kwargs={output_sizes:{"paras":2}}! T& k( B% l {4 B* w )
9 l- P3 ?6 O1 O6 M' {

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

! z/ g7 X/ U. R: e: B/ t6 G / ^# F4 r7 F# `1 S

以下是全部代码:

! D) C% u9 w; Z
from scipy import stats as st+ K. V# D; a: C: M7 x ( R9 n1 q* E5 @. ]: @3 V, F( r, ` import xarray as xr4 s7 Z, m* f' b; S! [/ ?6 b import dask7 u: a8 _- O7 U( o import numpy as np# U. O6 ? L7 {! ^ from dask.diagnostics import ProgressBar7 \7 m2 w7 G) ^9 M* ` ! m' q% y& c$ P( g2 x' u def norm_fit(data):+ \ C$ \" x- h2 H loc, scale = st.norm.fit(data) % U% R1 } m) j' k return np.array([loc, scale]), p" I: D9 q" i# e " w1 [9 n ]' k* b% a5 I rs = np.random.RandomState(0) 9 s, V: A( M- ]" M9 K x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) 4 D9 U2 ]+ i3 K0 p* j1 ]' ]% t x = x.chunk({"lev":5, "lat":100,"lon":100}) ' e# ]+ R; C" e" K5 e) v$ @# D7 O$ v #使用apply_ufunc计算,并用dask的并行计算' {0 m. J8 m9 U: ?2 n result = xr.apply_ufunc(norm_fit, 5 V9 F' ~2 h0 O( R R x,+ M( Y+ \2 F, f/ a6 z6 e) I input_core_dims=[["time"]], 1 `3 l% W j5 Z. F6 ~3 C7 {8 C output_core_dims=[["paras"]],- ^8 @8 N. g9 r- d& S dask="parallelized", 3 p* s$ ?6 g. T! k& b output_dtypes=[np.float], 6 B5 {7 a* O7 G, ^' E# P+ w4 v4 [ dask_gufunc_kwargs={output_sizes:{"paras":2}} ; |0 T \# Z; k3 H1 C4 S )1 m( e* r% k q# z 9 ]4 @- |, @# m) x7 i$ ^ #compute进行真实的计算并显示进度 3 w1 K8 y3 v7 I. @" A; D/ u: |5 X with ProgressBar():) d. U ~# _ K9 P3 ^ result = results.compute() - [6 r1 K0 W1 w8 ]( c , N$ P" g/ e- Q7 R+ p #结果冲命名保存到nc文件 0 b% I3 p1 G1 \7 n. q( L/ `3 V" x result = result.rename("norm_paras") + Z! j+ g0 ?1 i* K result = result.to_netcdf("norm_fit_paras.nc",compute=False)7 K. q8 @) Q. P, S% _# d" l with ProgressBar():0 n' O3 g. ~, u: L: R7 o result.compute()
' M" j( E" |3 M7 L. x

转自:气海同途

7 ]+ O+ F+ W. Y, S( H1 ]/ k2 ]

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2026-4-7
快速回复 返回顶部 返回列表