|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。 3 X' `" C& j2 k. T
1 k, q5 j c- O( Y 大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。 / y) l8 E3 ?% }9 g
+ n8 m! X$ w. u; @
本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。
* r+ V) ?, C6 K7 N! P" D, y
0 i2 D0 J* {: f) i; `3 F 数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。
' O9 G% G4 E6 v) i& I% d 其中几个关键点解释一下: + ?. x$ r& k- j" y3 A/ D* H# z- T' l
(1)首先定义拟合正太分布的函数 " [, V+ D+ B) Y. g9 T8 b7 E
def norm_fit(data):& J9 g- Q" o) Y8 K: Z! u! c
loc, scale = st.norm.fit(data)+ W: p! Q; R+ O
return np.array([loc, scale])
O; r8 @1 L7 }3 q* d& f/ D 这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。 5 t* k7 K1 I7 ?3 T
(2)xarray分块
) m) W- f: F4 s& F1 `& _ x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
7 [9 w% x' |9 Z0 p' S x = x.chunk({"lev":5, "lat":100,"lon":100}) # q. n9 V2 Y P& o) v+ [0 H6 P+ B" e
xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。 2 A3 k; l- k0 d. U* E
(3)xarray.apply_ufunc函数
$ |* \/ Z2 Z U3 w: r7 Z4 E, R% c result = xr.apply_ufunc(norm_fit,( f* K" n, @2 G6 S B# [- }
x,/ E1 h4 c* K+ g$ K0 C* C" F6 b. Y
input_core_dims=[["time"]],- C: @# z9 b) g/ U
output_core_dims=[["paras"]],
. q; v. z& s3 m! x3 ?- a dask="parallelized",1 G6 c" e- c9 M& h5 T% h) b. U9 W) M
output_dtypes=[np.float],. F. B+ i- F' F. s) ]+ F
dask_gufunc_kwargs={output_sizes:{"paras":2}}! T& k( B% l {4 B* w
)
9 l- P3 ?6 O1 O6 M' { apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)
! z/ g7 X/ U. R: e: B/ t6 G
/ ^# F4 r7 F# `1 S 以下是全部代码:
! D) C% u9 w; Z from scipy import stats as st+ K. V# D; a: C: M7 x
( R9 n1 q* E5 @. ]: @3 V, F( r, `
import xarray as xr4 s7 Z, m* f' b; S! [/ ?6 b
import dask7 u: a8 _- O7 U( o
import numpy as np# U. O6 ? L7 {! ^
from dask.diagnostics import ProgressBar7 \7 m2 w7 G) ^9 M* `
! m' q% y& c$ P( g2 x' u def norm_fit(data):+ \ C$ \" x- h2 H
loc, scale = st.norm.fit(data)
% U% R1 } m) j' k return np.array([loc, scale]), p" I: D9 q" i# e
" w1 [9 n ]' k* b% a5 I rs = np.random.RandomState(0)
9 s, V: A( M- ]" M9 K x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
4 D9 U2 ]+ i3 K0 p* j1 ]' ]% t x = x.chunk({"lev":5, "lat":100,"lon":100})
' e# ]+ R; C" e" K5 e) v$ @# D7 O$ v
#使用apply_ufunc计算,并用dask的并行计算' {0 m. J8 m9 U: ?2 n
result = xr.apply_ufunc(norm_fit,
5 V9 F' ~2 h0 O( R R x,+ M( Y+ \2 F, f/ a6 z6 e) I
input_core_dims=[["time"]],
1 `3 l% W j5 Z. F6 ~3 C7 {8 C output_core_dims=[["paras"]],- ^8 @8 N. g9 r- d& S
dask="parallelized",
3 p* s$ ?6 g. T! k& b output_dtypes=[np.float],
6 B5 {7 a* O7 G, ^' E# P+ w4 v4 [ dask_gufunc_kwargs={output_sizes:{"paras":2}}
; |0 T \# Z; k3 H1 C4 S )1 m( e* r% k q# z
9 ]4 @- |, @# m) x7 i$ ^ #compute进行真实的计算并显示进度
3 w1 K8 y3 v7 I. @" A; D/ u: |5 X with ProgressBar():) d. U ~# _ K9 P3 ^
result = results.compute()
- [6 r1 K0 W1 w8 ]( c
, N$ P" g/ e- Q7 R+ p #结果冲命名保存到nc文件
0 b% I3 p1 G1 \7 n. q( L/ `3 V" x result = result.rename("norm_paras")
+ Z! j+ g0 ?1 i* K result = result.to_netcdf("norm_fit_paras.nc",compute=False)7 K. q8 @) Q. P, S% _# d" l
with ProgressBar():0 n' O3 g. ~, u: L: R7 o
result.compute() ' M" j( E" |3 M7 L. x
转自:气海同途 7 ]+ O+ F+ W. Y, S( H1 ]/ k2 ]
关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |