|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。
" r+ q$ _( M% K) [# @ + n& G5 e$ G* y! F% q
大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。 % m- N O" T. O
7 q! a9 b. M5 s! h 本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。 9 ^. _! V8 N+ ?) T" d$ M
9 u, m& W, N* x+ \( v 数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。 9 \' c A5 h) q% j( m
其中几个关键点解释一下:
6 r% D1 c, _. b% n3 ` (1)首先定义拟合正太分布的函数 9 [6 _0 m' n6 ^% j( z
def norm_fit(data):
5 j& c1 R$ F$ e4 G: X3 H1 ~ i loc, scale = st.norm.fit(data)7 ]' j0 ?: h5 ^* w3 f- d
return np.array([loc, scale]) 8 f' y6 @0 @2 A" R9 n2 }' H
这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。
6 Y, _7 b9 w: Y& L. I/ r- W (2)xarray分块 8 d# q* I+ w @
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])9 x3 h; O/ |& O% m2 I# H! {
x = x.chunk({"lev":5, "lat":100,"lon":100}) 7 a8 J j1 `( e) o# S7 ?5 V
xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。
, s# _* ]7 a7 k+ F, _2 j* F (3)xarray.apply_ufunc函数
: g& E1 l& G8 j0 v result = xr.apply_ufunc(norm_fit,4 ~0 z' O5 D5 v0 Z% _+ a: l+ B
x,
4 C9 j, J- V% E& j5 V# L input_core_dims=[["time"]],$ O3 h! C$ b2 ^' E K
output_core_dims=[["paras"]],/ m- J5 M6 e( G/ j& ~/ Q' w
dask="parallelized",% B: @+ w3 V+ W5 t
output_dtypes=[np.float],
: u ]# B, [1 t% I+ ] dask_gufunc_kwargs={output_sizes:{"paras":2}}
/ i3 a) b) j( W- n, @! R/ L ) , v7 K3 }9 T0 `, }& w4 Z& B% ^
apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点) 5 ^1 o$ d6 o3 V, O& h- y m1 j7 [
+ ~7 ?! f! c( }4 |0 @% h& t
以下是全部代码: * s0 Y# o% _( q1 B" @! }: n( V
from scipy import stats as st: G7 S v9 _" Z' z; L0 r
. [# p4 k$ Z$ G8 _/ U& _ import xarray as xr
, k; n5 }7 ~4 X. e- F* e import dask
1 M1 ]+ n% B$ U: d7 X import numpy as np. k: z7 S# i w+ r- n# I
from dask.diagnostics import ProgressBar7 e4 J: _5 b, ?
$ g4 H# r! @- t& s' R
def norm_fit(data):
0 G1 E7 |/ W# m loc, scale = st.norm.fit(data)5 x9 S- U9 m6 C8 O, V* ^
return np.array([loc, scale])+ }! Z: z- h) x3 g* u
, D( z- R4 c" a% f; ^7 X1 t# A rs = np.random.RandomState(0)
; _% \; L! p: b x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
- h$ F4 W$ I+ u) y5 o, ~) `! C9 l x = x.chunk({"lev":5, "lat":100,"lon":100})
; C9 w$ g# a+ E- b6 _1 E+ Y3 @8 g8 o+ K$ j$ ~7 b. o% _5 [6 Y
#使用apply_ufunc计算,并用dask的并行计算+ Z! x2 O8 u+ ^, g7 {7 y5 b X- Y
result = xr.apply_ufunc(norm_fit,+ ~# z9 D8 ^ V& f
x,
& A4 q( E1 s; B4 m# ^ input_core_dims=[["time"]],1 n/ o: L$ A1 I
output_core_dims=[["paras"]], W" ^( _# J) }# D( J; |
dask="parallelized",
& k1 w7 j4 l+ G, u4 n6 r output_dtypes=[np.float],
* g- m& o' L) H/ } dask_gufunc_kwargs={output_sizes:{"paras":2}}
. F% _+ s D, r, Z) _' { )0 ~& a" M4 U; p- L- `
& x8 `1 n J- B3 X# ]
#compute进行真实的计算并显示进度
; q& ?. S8 l1 p3 w with ProgressBar():& p3 B% f5 B% G: E1 s# Q4 K
result = results.compute()
8 W4 ]" Q( P6 S" R) P
2 ^; _6 P8 ]1 K! Z" H# ]- N5 U #结果冲命名保存到nc文件, H7 m O9 p# E) [
result = result.rename("norm_paras")
8 c% C2 j; G+ V result = result.to_netcdf("norm_fit_paras.nc",compute=False)- A. s1 p) P4 q% J8 `: A
with ProgressBar():9 i& W0 Z, [: D
result.compute()
) e7 o+ Q4 E1 e( J7 x 转自:气海同途 2 Z6 w2 ?' E* D# }2 i6 u( h
关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |