|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。 / O9 x* u' h9 R; ?
9 U& t" K& |4 O A9 w 大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。
) o# y* r! n2 c
4 O2 }/ l/ p, c8 z7 f 本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。 8 }9 }, {* K9 k8 x( c, `4 X' R
5 |5 y' L) s6 f- F
数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。
6 z4 ?" F: n: b( H1 k; G 其中几个关键点解释一下: ' Z" r; q- B7 F
(1)首先定义拟合正太分布的函数 $ o$ i5 h" J3 q- g2 f# K. C
def norm_fit(data):
" d* D1 j" s* ]1 g$ j' _ loc, scale = st.norm.fit(data)
0 h5 w3 M* z+ O. f, \ return np.array([loc, scale]) : R# X- t1 ?7 C
这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。 % S. `# q, x. X" w) B1 q
(2)xarray分块
" |- V) r1 y' _% R$ {( Z% _ G x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])# }; k& b h' W3 N+ D
x = x.chunk({"lev":5, "lat":100,"lon":100}) ! [4 U6 h& s9 K* e# M
xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。
7 x. \( L, y& Q/ T' a6 R5 U (3)xarray.apply_ufunc函数
8 x8 k3 v4 Z# m" e. _ I9 ]0 I& y result = xr.apply_ufunc(norm_fit,
/ S5 }- i! l4 N1 h! a x,
% D$ b; X$ v8 p) e/ T9 x* m5 y input_core_dims=[["time"]],4 Q) v! ?: a- s0 D; I/ R, B. z
output_core_dims=[["paras"]],
4 t1 s4 g0 w4 X5 s- e, p dask="parallelized",2 ?; \( U+ e2 _4 e5 r' t. c& N
output_dtypes=[np.float],5 p# C# C2 [# T" d
dask_gufunc_kwargs={output_sizes:{"paras":2}}" D0 P+ I% {; l: Z' w4 J+ `
) " z4 P3 z9 }5 [) L* Q: j& I
apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)
5 o8 ?! O& N: n5 K' W# p
' ?$ e3 g$ K% ^ 以下是全部代码: & B( e1 v/ |* V6 w1 U1 x/ b" P
from scipy import stats as st
! }2 W' o% i6 C" v) A! P0 U9 }) y3 z. m; X& K5 ?; `+ D' ^* a
import xarray as xr
0 M) B# B& w6 {7 O! G import dask
- K, f! U8 |) X% D import numpy as np1 B$ u& r a* N, i3 N5 m6 ~
from dask.diagnostics import ProgressBar3 k1 N' E! Y) y5 p7 ~+ b
/ k% H: T5 f6 G3 s
def norm_fit(data):
8 A5 h( Y0 f" N _ loc, scale = st.norm.fit(data)
( c( f! a# @. q; |# |: x' U3 G return np.array([loc, scale])* Q6 f) Y. `* h5 h. ^
Y" \4 c+ T/ s7 a Y6 G; M rs = np.random.RandomState(0)7 [9 \3 E8 j, B* B- f* t$ q! x
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])$ I( O% M2 u4 q/ E3 ]9 U( Z
x = x.chunk({"lev":5, "lat":100,"lon":100})
3 q" p$ j* K7 O6 e5 d& Y7 f: a$ ]* Z/ V7 X- p+ h
#使用apply_ufunc计算,并用dask的并行计算: b3 I% [; f" j1 j$ w3 [
result = xr.apply_ufunc(norm_fit,9 \3 l1 @# s. c: p
x,
4 Z9 ?2 f( Y. y7 V, p4 v input_core_dims=[["time"]],7 H3 g# W: ]' h0 J' w2 T2 { b
output_core_dims=[["paras"]],. P3 d- C/ C; W5 ` F
dask="parallelized",8 u4 p3 x; ^2 ^) Z! {% }) Y
output_dtypes=[np.float],
: y6 c" t' m8 f8 w4 n dask_gufunc_kwargs={output_sizes:{"paras":2}}7 B( X! g+ Q% m& f( h8 C
)
9 S" X E" L; `! c) V3 ^
. F) R, W# Z7 I; _) O n3 c* j& i #compute进行真实的计算并显示进度
5 s. V7 T- z' o with ProgressBar():
) P' Q( q( }, ]% I3 t; x# S result = results.compute(): W! _4 Q h, u$ J# M
6 t; M9 {; [* |
#结果冲命名保存到nc文件
' s$ C: }# j6 B+ C result = result.rename("norm_paras")
. P8 g8 I2 v4 o result = result.to_netcdf("norm_fit_paras.nc",compute=False)6 i' p$ [2 i, E1 C6 ]# z
with ProgressBar():( f; W0 Y# ]3 U, l) _4 M: w
result.compute() 3 V+ d8 @+ d' ]$ D$ K' ~/ S7 s
转自:气海同途 ; N" v# z4 T( N+ J2 N' O
关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |