|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。
1 R( s$ z' E& ^1 T* i% I, j1 g
) m6 } Y; |; F& K 大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。
4 v9 W% a+ W1 W6 C, `& W. m
& y' H9 G6 A$ u! v5 I+ R7 ?8 z 本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。 5 [( r4 R" g! n+ Q, c+ I
( `1 U' A; _% W$ S. s 数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。
5 A# d, n! \% Z, A: Z4 F/ M 其中几个关键点解释一下: ! |6 K5 Z. H( a6 d: o4 d
(1)首先定义拟合正太分布的函数 ( k) a# x) g4 S
def norm_fit(data):2 S, R/ G4 Q8 Y
loc, scale = st.norm.fit(data)) I) o0 v6 \* C
return np.array([loc, scale])
( B" w) j" C# X# M 这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。
! B1 ~3 m* W' O! J0 G5 }! Y- ]4 p+ e (2)xarray分块 ) D, I& W' j7 h1 [
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
: D8 l2 S: W, K! r- B x = x.chunk({"lev":5, "lat":100,"lon":100}) 6 o& E& e) l# A9 t
xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。
! t# K0 k; i5 U$ k9 O4 P3 u (3)xarray.apply_ufunc函数
& D; p! H. N; ^, A" m+ P9 V result = xr.apply_ufunc(norm_fit,
, n) J1 }+ b4 Y2 v# C x,& n8 y. ]0 R) P) Z
input_core_dims=[["time"]],
! t+ u7 @) r y output_core_dims=[["paras"]],
5 X- R* h7 l2 e& ^; [# V2 q dask="parallelized",- M( n) P+ e0 }0 C5 K* ~3 c
output_dtypes=[np.float],5 h$ y( \( U* n
dask_gufunc_kwargs={output_sizes:{"paras":2}}7 u; v: ^$ a; `- k4 @. p2 V
)
* N& w5 ^& U+ c+ s3 \ apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)
8 i k F" w% M0 u" c; k$ ]; N- [/ l
以下是全部代码:
% u- ]9 ~ U' `7 N# P from scipy import stats as st
_/ d. Z8 T& u" @. T" G4 s# y7 ]. r- X: j1 }
import xarray as xr1 t' P/ k4 I y9 f8 ^& e4 G
import dask# d4 [7 }+ a; O g( A
import numpy as np4 i- _7 @1 w6 K5 X8 j, W. J/ s# K# {1 [
from dask.diagnostics import ProgressBar
8 q9 o+ e$ j2 t: k% W; E2 ]5 r$ K/ F: D. z9 @. u2 |7 ~
def norm_fit(data):
3 k: n& ?% r8 b0 }6 D' h loc, scale = st.norm.fit(data)- k! p. a' i: j. g: `' e
return np.array([loc, scale])
& O. G1 k0 d, h: K1 t$ u! Y
2 Q; a1 Z* ^& Q3 F y o. ` rs = np.random.RandomState(0): N4 B7 N' u |
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
: m& y% Y- H- W. y, @3 }* E x = x.chunk({"lev":5, "lat":100,"lon":100})" \% ~2 P6 w4 P! ?$ |9 A" `& J8 A
, l0 o |% r3 p: P #使用apply_ufunc计算,并用dask的并行计算
3 @5 h) G+ S/ _) x; _1 j9 _+ N result = xr.apply_ufunc(norm_fit,
' i7 w+ ?. B8 L0 _+ i% d" u x,
/ U! B# X& n6 | input_core_dims=[["time"]],
! x3 B$ s4 A6 n9 o0 \ l output_core_dims=[["paras"]],1 S0 ~$ y) \2 f; X5 C
dask="parallelized",
& [+ U$ J* K8 D output_dtypes=[np.float],
7 n8 C' u& o% Z) p6 P+ @ m dask_gufunc_kwargs={output_sizes:{"paras":2}}) C" H& F* T: E4 r5 @
)
5 B3 m7 P" j( `- K1 W4 i4 I2 o* @5 Q* Q" X- ~* a
#compute进行真实的计算并显示进度
" c& G, S4 Z$ J6 a with ProgressBar():
+ J% O* ~7 w4 a5 a) Q& y4 | result = results.compute()
3 G) J. ^1 \8 t4 b8 L N1 |) `& s' w; }( z$ h
#结果冲命名保存到nc文件
% Q/ F; m/ c% K4 H result = result.rename("norm_paras") b/ c& ^6 q0 m; k
result = result.to_netcdf("norm_fit_paras.nc",compute=False); c9 J$ n2 u8 g9 c/ q
with ProgressBar():+ J+ D7 z' e0 S* m% ~1 X5 n
result.compute() 1 n+ Y ]# j- {9 o
转自:气海同途
) z2 u0 O4 |- Y* l$ E- J, n 关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |