|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。 : p7 w) G2 V" `' w8 O/ S' N
: W/ s/ `' L/ f! K/ R% r1 U
大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。
* Z" E1 m. |0 A/ k
1 [1 l$ T5 Q9 C { 本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。 7 e, i) N1 Q4 ^; F' w! ?+ @4 ^* c
) j/ O1 \0 Q. a6 }, d: ]8 ]& I7 Q
数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。
2 G" T, r# J2 D 其中几个关键点解释一下: & v# Z) C0 y7 g6 k
(1)首先定义拟合正太分布的函数 ) ~* a I2 b6 S
def norm_fit(data):
" A) j8 M8 n* Q" ?. J7 ~; U& C8 l loc, scale = st.norm.fit(data)
7 O! L2 j0 b9 N# V7 a return np.array([loc, scale])
J9 h6 Q: N( A+ j# X0 G 这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。
% f; D8 H3 O8 J& G. a (2)xarray分块 * y1 Z+ ]* ^$ s1 o
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
3 ]- v7 a; d' U1 X x = x.chunk({"lev":5, "lat":100,"lon":100})
a4 c3 q+ C6 G- x$ w& O xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。
) m0 {- k% ~& B8 f/ p2 l) y/ }9 f8 ~% m (3)xarray.apply_ufunc函数
+ T E3 _3 d. Z: L2 r result = xr.apply_ufunc(norm_fit,
. z) |8 j! h+ b: N H x,9 ~8 t/ C/ ]5 _: g+ w. X
input_core_dims=[["time"]],
' s! I' \, d* T# \ output_core_dims=[["paras"]],7 E) d. l8 \# M& W* O! h
dask="parallelized",* d& ?/ [# t) x% _; U3 }/ q7 ?
output_dtypes=[np.float],
( V& R# _8 @" |. } dask_gufunc_kwargs={output_sizes:{"paras":2}}
+ j( u" z$ J+ Q: }' Z5 \ )
/ h; I+ v+ }/ l5 q apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点) - i7 o" @& q8 g( {2 y9 P# c3 ~% h( _! _
- f6 ?. P, E# N( J3 R6 K: y3 A# w1 h
以下是全部代码: 0 z$ G" [6 T; r8 Y; ~0 g$ C( }7 D
from scipy import stats as st2 j2 I* |( O1 x, p3 w* g1 F- E
2 C* a6 ]* ^( i9 r& D1 m; o
import xarray as xr( d: h& }0 ?/ B) P2 }# ]
import dask: U3 _- ~* L2 L+ L2 `$ y& Z% G7 N8 j3 j/ g
import numpy as np
; \0 F9 |3 F. E& k from dask.diagnostics import ProgressBar
: a4 w7 j1 l4 R0 v8 b
1 C* _' J: H1 b: u/ n/ S3 m5 `) ? def norm_fit(data):
# o! z2 X( N) _) O! k loc, scale = st.norm.fit(data)9 j4 ~% G) c; d; w! y
return np.array([loc, scale])
' U' B+ O5 c: P( o y; N0 I; }; W+ g, c5 W" a3 z- p8 I5 ~' y
rs = np.random.RandomState(0), }0 A1 z1 Y( k- v$ S4 \3 o1 k% ^
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
0 f+ u" s5 q/ {0 p; ^ x = x.chunk({"lev":5, "lat":100,"lon":100}); v* N+ g) n. @% R6 w5 |
4 V z) V. ^; b #使用apply_ufunc计算,并用dask的并行计算* h9 {3 t3 _9 `. P
result = xr.apply_ufunc(norm_fit,
* l( a+ } {) r- P! f& p# r# l x,8 r1 O) } `. U5 P, u0 s4 U9 O
input_core_dims=[["time"]]," Q; b0 x, M R3 U3 r
output_core_dims=[["paras"]],
/ P0 C" Q$ w0 P" f( y: W dask="parallelized",
5 |% V& j( R6 `) s# L5 V output_dtypes=[np.float],3 B# G% I% p# z6 H' L& s! a) j
dask_gufunc_kwargs={output_sizes:{"paras":2}}4 V+ t7 c0 x( |- o: M6 ?
)
) D1 d0 ?' b' @" q7 U8 ?
4 ] W8 e1 K0 I2 H2 m% A$ j6 j: n. u7 q #compute进行真实的计算并显示进度: k# S& b% l f
with ProgressBar():0 ]2 V# P# l, N& x' o4 A
result = results.compute()
5 c4 x4 I$ y. E- V; I$ f$ }. [3 S& K# F
#结果冲命名保存到nc文件
0 i6 V, c2 L) |$ g# v result = result.rename("norm_paras")
* s! t" b) {7 [. l result = result.to_netcdf("norm_fit_paras.nc",compute=False); f! x* p' L/ s6 S
with ProgressBar():; W# ~9 O0 K8 j, W
result.compute() $ v Y6 W$ o; D; }1 T5 a
转自:气海同途
8 i" y. n! W9 a# B2 P 关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |