NumPy-версия "Экспоненциально-взвешенного скользящего среднего", эквивалентная pandas.ewm(). Mean()
Как мне получить экспоненциально-взвешенное скользящее среднее в NumPy, как в пандах?
import pandas as pd
import pandas_datareader as pdr
from datetime import datetime
# Declare variables
ibm = pdr.get_data_yahoo(symbols='IBM', start=datetime(2000, 1, 1), end=datetime(2012, 1, 1)).reset_index(drop=True)['Adj Close']
windowSize = 20
# Get PANDAS exponential weighted moving average
ewm_pd = pd.DataFrame(ibm).ewm(span=windowSize, min_periods=windowSize).mean().as_matrix()
print(ewm_pd)
Я попробовал следующее с NumPy
import numpy as np
import pandas_datareader as pdr
from datetime import datetime
# From this post: http://stackru.com/a/40085052/3293881 by @Divakar
def strided_app(a, L, S): # Window len = L, Stride len/stepsize = S
nrows = ((a.size - L) // S) + 1
n = a.strides[0]
return np.lib.stride_tricks.as_strided(a, shape=(nrows, L), strides=(S * n, n))
def numpyEWMA(price, windowSize):
weights = np.exp(np.linspace(-1., 0., windowSize))
weights /= weights.sum()
a2D = strided_app(price, windowSize, 1)
returnArray = np.empty((price.shape[0]))
returnArray.fill(np.nan)
for index in (range(a2D.shape[0])):
returnArray[index + windowSize-1] = np.convolve(weights, a2D[index])[windowSize - 1:-windowSize + 1]
return np.reshape(returnArray, (-1, 1))
# Declare variables
ibm = pdr.get_data_yahoo(symbols='IBM', start=datetime(2000, 1, 1), end=datetime(2012, 1, 1)).reset_index(drop=True)['Adj Close']
windowSize = 20
# Get NumPy exponential weighted moving average
ewma_np = numpyEWMA(ibm, windowSize)
print(ewma_np)
Но результаты не такие, как у панд.
Возможно, есть лучший подход для вычисления экспоненциально-взвешенной скользящей средней непосредственно в NumPy и получения того же результата, что и pandas.ewm().mean()
?
На 60 000 запросов на решение панд я получаю около 230 секунд. Я уверен, что с чистым NumPy это может быть значительно уменьшено.
15 ответов
Я думаю, что я наконец взломал это!
Вот векторизованная версия numpy_ewma
функция, которая, как утверждается, дает правильные результаты @RaduS's post
-
def numpy_ewma_vectorized(data, window):
alpha = 2 /(window + 1.0)
alpha_rev = 1-alpha
scale = 1/alpha_rev
n = data.shape[0]
r = np.arange(n)
scale_arr = scale**r
offset = data[0]*alpha_rev**(r+1)
pw0 = alpha*alpha_rev**(n-1)
mult = data*pw0*scale_arr
cumsums = mult.cumsum()
out = offset + cumsums*scale_arr[::-1]
return out
Дальнейшее повышение
Мы можем повысить его еще раз с помощью некоторого повторного использования кода, например:
def numpy_ewma_vectorized_v2(data, window):
alpha = 2 /(window + 1.0)
alpha_rev = 1-alpha
n = data.shape[0]
pows = alpha_rev**(np.arange(n+1))
scale_arr = 1/pows[:-1]
offset = data[0]*pows[1:]
pw0 = alpha*alpha_rev**(n-1)
mult = data*pw0*scale_arr
cumsums = mult.cumsum()
out = offset + cumsums*scale_arr[::-1]
return out
Испытание во время выполнения
Давайте сопоставим эти два с одной и той же цикличной функцией для большого набора данных.
In [97]: data = np.random.randint(2,9,(5000))
...: window = 20
...:
In [98]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized(data, window))
Out[98]: True
In [99]: np.allclose(numpy_ewma(data, window), numpy_ewma_vectorized_v2(data, window))
Out[99]: True
In [100]: %timeit numpy_ewma(data, window)
100 loops, best of 3: 6.03 ms per loop
In [101]: %timeit numpy_ewma_vectorized(data, window)
1000 loops, best of 3: 665 µs per loop
In [102]: %timeit numpy_ewma_vectorized_v2(data, window)
1000 loops, best of 3: 357 µs per loop
In [103]: 6030/357.0
Out[103]: 16.89075630252101
Ускорение в 17 раз!
Обновлено 27/11/2018
РАБОТАЯ ЧИСТАЯ ЧИСТОТА, БЫСТРОЕ И ВЕКТОРИЗОВАННОЕ РЕШЕНИЕ ДЛЯ БОЛЬШИХ ВХОДОВ
параметр out для вычисления на месте, параметр dtype, параметр порядка индекса
Ответ Дивакара приводит к проблемам с точностью с плавающей запятой, когда ввод слишком велик. Это потому что (1-alpha)**(n+1) -> 0
когда n -> inf
а также alpha -> 1
, что приводит к делению на ноль и NaN
значения выскакивают в расчете.
Вот мое самое быстрое решение без проблем точности, почти полностью векторизованное. Это стало немного сложнее, но производительность отличная, особенно для действительно огромных входов. Без использования вычислений на месте (что возможно при использовании out
параметр, экономящий время выделения памяти): 3,62 секунды для входного вектора элемента 100 М, 3,2 мс для вектора входа элемента 100 К и 293 мкс для вектора входа 5000 элементов на довольно старом ПК (результаты могут отличаться в зависимости от alpha
/row_size
ценности).
# tested with python3 & numpy 1.15.2
import numpy as np
def ewma_vectorized_safe(data, alpha, row_size=None, dtype=None, order='C', out=None):
"""
Reshapes data before calculating EWMA, then iterates once over the rows
to calculate the offset without precision issues
:param data: Input data, will be flattened.
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param row_size: int, optional
The row size to use in the computation. High row sizes need higher precision,
low values will impact performance. The optimal value depends on the
platform and the alpha being used. Higher alpha values require lower
row size.
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {'C', 'F', 'A'}, optional
Order to use when flattening the data. Defaults to 'C'.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the desired output. If not provided or `None`,
a freshly-allocated array is returned.
:return: The flattened result.
"""
if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float64
else:
dtype = np.dtype(dtype)
row_size = int(row_size) if row_size is not None
else get_max_row_size(alpha, dtype)
data = np.array(data, copy=False)
if data.size <= row_size:
# The normal function can handle this input, use that
return ewma_vectorized(data, alpha, dtype=dtype, order=order, out=out)
if data.ndim > 1:
# flatten input
data = np.reshape(data, -1, order=order)
if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype
row_n = int(data.size // row_size) # the number of rows to use
trailing_n = int(data.size % row_size) # the amount of data leftover
first_offset = data[0]
if trailing_n > 0:
# set temporary results to slice view of out parameter
out_main_view = np.reshape(out[:-trailing_n], (row_n, row_size))
data_main_view = np.reshape(data[:-trailing_n], (row_n, row_size))
else:
out_main_view = out
data_main_view = data
# get all the scaled cumulative sums with 0 offset
ewma_vectorized_2d(data_main_view, alpha, axis=1, offset=0, dtype=dtype,
order='C', out=out_main_view)
scaling_factors = (1 - alpha) ** (np.arange(1, row_size + 1, dtype=dtype))
last_scaling_factor = scaling_factors[-1]
# create offset array
offsets = np.empty(out_main_view.shape[0], dtype=dtype)
offsets[0] = first_offset
# iteratively calculate offset for each row
for i in range(1, out_main_view.shape[0]):
offsets[i] = offsets[i - 1] * last_scaling_factor + out_main_view[i - 1, -1]
# add the offsets to the result
out_main_view += offsets[:, np.newaxis] * scaling_factors[np.newaxis, :]
if trailing_n > 0:
# process trailing data in the 2nd slice of the out parameter
ewma_vectorized(data[-trailing_n:], alpha, offset=out_main_view[-1, -1],
dtype=dtype, order='C', out=out[-trailing_n:])
return out
def get_max_row_size(alpha, dtype=float):
assert 0. <= alpha < 1.
# This will return the maximum row size possible on
# your platform for the given dtype. I can find no impact on accuracy
# at this value on my machine.
# Might not be the optimal value for speed, which is hard to predict
# due to numpy's optimizations
# Use np.finfo(dtype).eps if you are worried about accuracy
# and want to be extra safe.
epsilon = np.finfo(dtype).tiny
# If this produces an OverflowError, make epsilon larger
return int(np.log(epsilon)/np.log(1-alpha)) + 1
Функция 1D EWMA:
def ewma_vectorized(data, alpha, offset=None, dtype=None, order='C', out=None):
"""
Calculates the exponential moving average over a vector.
Will fail for large inputs.
:param data: Input data
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param offset: optional
The offset for the moving average, scalar. Defaults to data[0].
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {'C', 'F', 'A'}, optional
Order to use when flattening the data. Defaults to 'C'.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the input. If not provided or `None`,
a freshly-allocated array is returned.
"""
if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float64
else:
dtype = np.dtype(dtype)
data = np.array(data, copy=False)
if data.ndim > 1:
# flatten input
data = data.reshape(-1, order)
if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype
if data.size < 1:
# empty input, return empty array
return out
if offset is None:
offset = data[0]
alpha = np.array(alpha, copy=False).astype(dtype, copy=False)
# scaling_factors -> 0 as len(data) gets large
# this leads to divide-by-zeros below
scaling_factors = np.power(1. - alpha, np.arange(row_size + 1, dtype=dtype),
dtype=dtype)
# create cumulative sum array
np.multiply(data, (alpha * scaling_factors[-2]) / scaling_factors[:-1],
dtype=dtype, out=out)
np.cumsum(out, dtype=dtype, out=out)
# cumsums / scaling
out /= scaling_factors[-2::-1]
if offset != 0:
offset = np.array(offset, copy=False).astype(dtype, copy=False)
# add offsets
out += offset * scaling_factors[1:]
return out
Функция 2D EWMA:
def ewma_vectorized_2d(data, alpha, axis=None, offset=None, dtype=None, order='C', out=None):
"""
Calculates the exponential moving average over a given axis.
:param data: Input data, must be 1D or 2D array.
:param alpha: scalar float in range (0,1)
The alpha parameter for the moving average.
:param axis: The axis to apply the moving average on.
If axis==None, the data is flattened.
:param offset: optional
The offset for the moving average. Must be scalar or a
vector with one element for each row of data. If set to None,
defaults to the first value of each row.
:param dtype: optional
Data type used for calculations. Defaults to float64 unless
data.dtype is float32, then it will use float32.
:param order: {'C', 'F', 'A'}, optional
Order to use when flattening the data. Ignored if axis is not None.
:param out: ndarray, or None, optional
A location into which the result is stored. If provided, it must have
the same shape as the desired output. If not provided or `None`,
a freshly-allocated array is returned.
"""
data = np.array(data, copy=False)
assert data.ndim <= 2
if dtype is None:
if data.dtype == np.float32:
dtype = np.float32
else:
dtype = np.float64
else:
dtype = np.dtype(dtype)
if out is None:
out = np.empty_like(data, dtype=dtype)
else:
assert out.shape == data.shape
assert out.dtype == dtype
if data.size < 1:
# empty input, return empty array
return out
if axis is None or data.ndim < 2:
# use 1D version
if isinstance(offset, np.ndarray):
offset = offset[0]
return ewma_vectorized(data, alpha, offset, dtype=dtype, order=order,
out=out)
assert -data.ndim <= axis < data.ndim
# create reshaped data views
out_view = out
if axis < 0:
axis = data.ndim - int(axis)
if axis == 0:
# transpose data views so columns are treated as rows
data = data.T
out_view = data.T
if offset is None:
# use the first element of each row as the offset
offset = np.copy(data[:, 0])
elif np.size(offset) == 1:
offset = np.reshape(offset, (1,))
alpha = np.array(alpha, copy=False).astype(dtype, copy=False)
# calculate the moving average
row_size = data.shape[1]
row_n = data.shape[0]
scaling_factors = np.power(1. - alpha, np.arange(row_size + 1, dtype=dtype),
dtype=dtype)
# create a scaled cumulative sum array
np.multiply(
data,
np.multiply(alpha * scaling_factors[-2], np.ones((row_n, 1), dtype=dtype),
dtype=dtype)
/ scaling_factors[np.newaxis, :-1],
dtype=dtype, out=out_view
)
np.cumsum(out_view, axis=1, dtype=dtype, out=out_view)
out_view /= scaling_factors[np.newaxis, -2::-1]
if not (np.size(offset) == 1 and offset == 0):
offset = offset.astype(dtype, copy=False)
# add the offsets to the scaled cumulative sums
out_view += offset[:, np.newaxis] * scaling_factors[np.newaxis, 1:]
return out
использование:
data_n = 100000000
data = ((0.5*np.random.randn(data_n)+0.5) % 1) * 100
window = 5000
sum_proportion = .875
alpha = 1 - np.exp(np.log(1 - sum_proportion) / window) # or 2/(window+1) for panda's span function
result = ewma_vectorized_safe(data, alpha)
Просто совет
Легко вычислить "размер окна" (технически экспоненциальные средние имеют бесконечные "окна") для данного alpha
в зависимости от вклада данных в этом окне в среднее значение. Это полезно, например, для выбора того, какую часть начала результата следует рассматривать как ненадежную из-за граничных эффектов.
sum_proportion = .99 # window covers 99% of contribution to the moving average
scale_factors = (1 - alpha) ** (np.arange(data.shape[0] + 1))
scale_factor_cumsum = np.cumsum(scale_factors)
# window_size is the index of the first partial sum of scale_factors
# where partial_sum > sum_proportion * total_sum.
# Increases with increased sum_proportion and decreased alpha
window_size = np.argmax(scale_factor_cumsum > sum_proportion * scale_factor_cumsum[-1])
или более математически, но эффективно:
def window_size(alpha, sum_proportion):
# solve (1-alpha)**window_size = (1-sum_proportion) for window_size
return int(np.log(1-sum_proportion) / np.log(1-alpha))
alpha = 2 / (window_size + 1.0)
Отношение, используемое в этой теме (опция 'span' от pandas), является очень грубым приближением обратной функции выше (с sum_proportion~=0.87
). alpha = 1 - np.exp(np.log(1-sum_proportion)/window_size)
является более точным (опция "период полураспада" от панд равна этой формуле sum_proportion=0.5
).
В следующем примере data
представляет непрерывный шумовой сигнал. cutoff_idx
это первая позиция в result
где по крайней мере 99% значения зависит от отдельных значений в data
(т.е. менее 1% зависит от данных [0]). Данные до cutoff_idx
исключен из окончательных результатов, поскольку он слишком зависит от первого значения в data
поэтому возможно искажение среднего.
result = ewma_vectorized_safe(data, alpha, chunk_size)
sum_proportion = .99
cutoff_idx = window_size(alpha, sum_proportion)
result = result[cutoff_idx:]
Чтобы проиллюстрировать проблему, описанную выше, вы можете запустить ее несколько раз, обратите внимание на часто появляющийся фальстарт красной линии, который пропускается после cutoff_idx
:
data_n = 100000
data = np.random.rand(data_n) * 100
window = 1000
chunk_size = 10000
sum_proportion = .99
alpha = 1 - np.exp(np.log(1-sum_proportion)/window)
result = ewma_vectorized_safe(data, alpha, chunk_size)
cutoff_idx = window_size(alpha, sum_proportion)
x = np.arange(start=0, stop=result.size)
import matplotlib.pyplot as plt
plt.plot(x[:cutoff_idx+1], result[:cutoff_idx+1], '-r',
x[cutoff_idx:], result[cutoff_idx:], '-b')
plt.show()
Обратите внимание, что cutoff_idx==window
потому что альфа была установлена с обратной window_size()
функция, с тем же sum_proportion
,
Самый быстрый EWMA 23x pandas
Вопрос строго задает numpy
Решение, однако, кажется, что ОП был на самом деле только после чистого numpy
решение для ускорения времени выполнения.
Я решил похожую проблему, но вместо этого посмотрел в сторону numba.jit
что значительно ускоряет время вычислений
In [24]: a = np.random.random(10**7)
...: df = pd.Series(a)
In [25]: %timeit numpy_ewma(a, 10) # /a/42915307/4013571
...: %timeit df.ewm(span=10).mean() # pandas
...: %timeit numpy_ewma_vectorized_v2(a, 10) # best w/o numba: /a/42926270/4013571
...: %timeit _ewma(a, 10) # fastest accurate (below)
...: %timeit _ewma_infinite_hist(a, 10) # fastest overall (below)
4.14 s ± 116 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
991 ms ± 52.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
396 ms ± 8.39 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
181 ms ± 1.01 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
39.6 ms ± 979 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Сокращение до меньших массивов a = np.random.random(100)
(результаты в том же порядке)
41.6 µs ± 491 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
945 ms ± 12 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
16 µs ± 93.5 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
1.66 µs ± 13.7 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
1.14 µs ± 5.57 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
Также стоит отметить, что мои функции, приведенные ниже, идентичны pandas
(см. примеры в docstr), в то время как некоторые ответы здесь имеют различные приближения. Например,
In [57]: print(pd.DataFrame([1,2,3]).ewm(span=2).mean().values.ravel())
...: print(numpy_ewma_vectorized_v2(np.array([1,2,3]), 2))
...: print(numpy_ewma(np.array([1,2,3]), 2))
[1. 1.75 2.61538462]
[1. 1.66666667 2.55555556]
[1. 1.18181818 1.51239669]
Исходный код, который я задокументировал для моей собственной библиотеки
import numpy as np
from numba import jit
from numba import float64
from numba import int64
@jit((float64[:], int64), nopython=True, nogil=True)
def _ewma(arr_in, window):
r"""Exponentialy weighted moving average specified by a decay ``window``
to provide better adjustments for small windows via:
y[t] = (x[t] + (1-a)*x[t-1] + (1-a)^2*x[t-2] + ... + (1-a)^n*x[t-n]) /
(1 + (1-a) + (1-a)^2 + ... + (1-a)^n).
Parameters
----------
arr_in : np.ndarray, float64
A single dimenisional numpy array
window : int64
The decay window, or 'span'
Returns
-------
np.ndarray
The EWMA vector, same length / shape as ``arr_in``
Examples
--------
>>> import pandas as pd
>>> a = np.arange(5, dtype=float)
>>> exp = pd.DataFrame(a).ewm(span=10, adjust=True).mean()
>>> np.array_equal(_ewma_infinite_hist(a, 10), exp.values.ravel())
True
"""
n = arr_in.shape[0]
ewma = np.empty(n, dtype=float64)
alpha = 2 / float(window + 1)
w = 1
ewma_old = arr_in[0]
ewma[0] = ewma_old
for i in range(1, n):
w += (1-alpha)**i
ewma_old = ewma_old*(1-alpha) + arr_in[i]
ewma[i] = ewma_old / w
return ewma
@jit((float64[:], int64), nopython=True, nogil=True)
def _ewma_infinite_hist(arr_in, window):
r"""Exponentialy weighted moving average specified by a decay ``window``
assuming infinite history via the recursive form:
(2) (i) y[0] = x[0]; and
(ii) y[t] = a*x[t] + (1-a)*y[t-1] for t>0.
This method is less accurate that ``_ewma`` but
much faster:
In [1]: import numpy as np, bars
...: arr = np.random.random(100000)
...: %timeit bars._ewma(arr, 10)
...: %timeit bars._ewma_infinite_hist(arr, 10)
3.74 ms ± 60.2 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
262 µs ± 1.54 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
Parameters
----------
arr_in : np.ndarray, float64
A single dimenisional numpy array
window : int64
The decay window, or 'span'
Returns
-------
np.ndarray
The EWMA vector, same length / shape as ``arr_in``
Examples
--------
>>> import pandas as pd
>>> a = np.arange(5, dtype=float)
>>> exp = pd.DataFrame(a).ewm(span=10, adjust=False).mean()
>>> np.array_equal(_ewma_infinite_hist(a, 10), exp.values.ravel())
True
"""
n = arr_in.shape[0]
ewma = np.empty(n, dtype=float64)
alpha = 2 / float(window + 1)
ewma[0] = arr_in[0]
for i in range(1, n):
ewma[i] = arr_in[i] * alpha + ewma[i-1] * (1 - alpha)
return ewma
Вот реализация, использующая NumPy, которая эквивалентна использованию df.ewm(alpha=alpha).mean()
, После прочтения документации, это всего лишь несколько матричных операций. Хитрость заключается в построении правильных матриц.
Стоит отметить, что, поскольку мы создаем матрицы с плавающей точкой, вы можете быстро перебрать вашу память, если входной массив слишком велик.
import pandas as pd
import numpy as np
def ewma(x, alpha):
'''
Returns the exponentially weighted moving average of x.
Parameters:
-----------
x : array-like
alpha : float {0 <= alpha <= 1}
Returns:
--------
ewma: numpy array
the exponentially weighted moving average
'''
# Coerce x to an array
x = np.array(x)
n = x.size
# Create an initial weight matrix of (1-alpha), and a matrix of powers
# to raise the weights by
w0 = np.ones(shape=(n,n)) * (1-alpha)
p = np.vstack([np.arange(i,i-n,-1) for i in range(n)])
# Create the weight matrix
w = np.tril(w0**p,0)
# Calculate the ewma
return np.dot(w, x[::np.newaxis]) / w.sum(axis=1)
Давайте проверим его:
alpha = 0.55
x = np.random.randint(0,30,15)
df = pd.DataFrame(x, columns=['A'])
df.ewm(alpha=alpha).mean()
# returns:
# A
# 0 13.000000
# 1 22.655172
# 2 20.443268
# 3 12.159796
# 4 14.871955
# 5 15.497575
# 6 20.743511
# 7 20.884818
# 8 24.250715
# 9 18.610901
# 10 17.174686
# 11 16.528564
# 12 17.337879
# 13 7.801912
# 14 12.310889
ewma(x=x, alpha=alpha)
# returns:
# array([ 13. , 22.65517241, 20.44326778, 12.1597964 ,
# 14.87195534, 15.4975749 , 20.74351117, 20.88481763,
# 24.25071484, 18.61090129, 17.17468551, 16.52856393,
# 17.33787888, 7.80191235, 12.31088889])
Очень простое решение, позволяющее избежать numba, но почти такое же быстрое, как решение Александра Макфарлейна, особенно для больших массивов и больших массивов.window
размеры, это использовать scipy's lfilter
функция (поскольку EWMA является линейным фильтром):
from scipy.signal import lfiltic, lfilter
# careful not to mix between scipy.signal and standard python signal
# (https://docs.python.org/3/library/signal.html) if your code handles some processes
def ewma_linear_filter(array, window):
alpha = 2 /(window + 1)
b = [alpha]
a = [1, alpha-1]
zi = lfiltic(b, a, array[0:1], [0])
return lfilter(b, a, array, zi=zi)[0]
Сроки следующие:
n = 10_000_000
window = 100_000
data = np.random.normal(0, 1, n)
%timeit _ewma_infinite_hist(data, window)
%timeit linear_filter(data, window)
86 ms ± 1.1 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
92.6 ms ± 751 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
Этот ответ может показаться неуместным. Но для тех, кому также необходимо рассчитать экспоненциально взвешенную дисперсию (а также стандартное отклонение) с помощью NumPy, будет полезно следующее решение:
import numpy as np
def ew(a, alpha, winSize):
_alpha = 1 - alpha
ws = _alpha ** np.arange(winSize)
w_sum = ws.sum()
ew_mean = np.convolve(a, ws)[winSize - 1] / w_sum
bias = (w_sum ** 2) / ((w_sum ** 2) - (ws ** 2).sum())
ew_var = (np.convolve((a - ew_mean) ** 2, ws)[winSize - 1] / w_sum) * bias
ew_std = np.sqrt(ew_var)
return (ew_mean, ew_var, ew_std)
Дано alpha
а также windowSize
Вот подход для моделирования соответствующего поведения на NumPy:
def numpy_ewm_alpha(a, alpha, windowSize):
wghts = (1-alpha)**np.arange(windowSize)
wghts /= wghts.sum()
out = np.full(df.shape[0],np.nan)
out[windowSize-1:] = np.convolve(a,wghts,'valid')
return out
Образцы прогонов для проверки -
In [54]: alpha = 0.55
...: windowSize = 20
...:
In [55]: df = pd.DataFrame(np.random.randint(2,9,(100)))
In [56]: out0 = df.ewm(alpha = alpha, min_periods=windowSize).mean().as_matrix().ravel()
...: out1 = numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize)
...: print "Max. error : " + str(np.nanmax(np.abs(out0 - out1)))
...:
Max. error : 5.10531254605e-07
In [57]: alpha = 0.75
...: windowSize = 30
...:
In [58]: out0 = df.ewm(alpha = alpha, min_periods=windowSize).mean().as_matrix().ravel()
...: out1 = numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize)
...: print "Max. error : " + str(np.nanmax(np.abs(out0 - out1)))
Max. error : 8.881784197e-16
Испытание во время выполнения на большом наборе данных -
In [61]: alpha = 0.55
...: windowSize = 20
...:
In [62]: df = pd.DataFrame(np.random.randint(2,9,(10000)))
In [63]: %timeit df.ewm(alpha = alpha, min_periods=windowSize).mean()
1000 loops, best of 3: 851 µs per loop
In [64]: %timeit numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize)
1000 loops, best of 3: 204 µs per loop
Дальнейшее повышение
Для дальнейшего повышения производительности мы могли бы избежать инициализации с помощью NaN и вместо этого использовать массив, выведенный из np.convolve
, вот так -
def numpy_ewm_alpha_v2(a, alpha, windowSize):
wghts = (1-alpha)**np.arange(windowSize)
wghts /= wghts.sum()
out = np.convolve(a,wghts)
out[:windowSize-1] = np.nan
return out[:a.size]
Сроки -
In [117]: alpha = 0.55
...: windowSize = 20
...:
In [118]: df = pd.DataFrame(np.random.randint(2,9,(10000)))
In [119]: %timeit numpy_ewm_alpha(df.values.ravel(), alpha = alpha, windowSize = windowSize)
1000 loops, best of 3: 204 µs per loop
In [120]: %timeit numpy_ewm_alpha_v2(df.values.ravel(), alpha = alpha, windowSize = windowSize)
10000 loops, best of 3: 195 µs per loop
Вот еще одно решение, которое O придумала тем временем. Это примерно в четыре раза быстрее, чем решение панд.
def numpy_ewma(data, window):
returnArray = np.empty((data.shape[0]))
returnArray.fill(np.nan)
e = data[0]
alpha = 2 / float(window + 1)
for s in range(data.shape[0]):
e = ((data[s]-e) *alpha ) + e
returnArray[s] = e
return returnArray
Я использовал эту формулу в качестве отправной точки. Я уверен, что это можно улучшить еще больше, но это, по крайней мере, отправная точка.
@ Ответ Дивакара, кажется, вызывает переполнение при работе с
numpy_ewma_vectorized(np.random.random(500000), 10)
То, что я использовал, это:
def EMA(input, time_period=10): # For time period = 10
t_ = time_period - 1
ema = np.zeros_like(input,dtype=float)
multiplier = 2.0 / (time_period + 1)
#multiplier = 1 - multiplier
for i in range(len(input)):
# Special Case
if i > t_:
ema[i] = (input[i] - ema[i-1]) * multiplier + ema[i-1]
else:
ema[i] = np.mean(input[:i+1])
return ema
Однако это намного медленнее, чем решение для панды:
from pandas import ewma as pd_ema
def EMA_fast(X, time_period = 10):
out = pd_ema(X, span=time_period, min_periods=time_period)
out[:time_period-1] = np.cumsum(X[:time_period-1]) / np.asarray(range(1,time_period))
return out
Вывод:
Реализация:
def ema(p:np.ndarray, a:float) -> np.ndarray:
o = np.empty(p.shape, p.dtype)
# (1-α)^0, (1-α)^1, (1-α)^2, ..., (1-α)^n
np.power(1.0 - a, np.arange(0.0, p.shape[0], 1.0, p.dtype), o)
# α*P0, α*P1, α*P2, ..., α*Pn
np.multiply(a, p, p)
# α*P0/(1-α)^0, α*P1/(1-α)^1, α*P2/(1-α)^2, ..., α*Pn/(1-α)^n
np.divide(p, o, p)
# α*P0/(1-α)^0, α*P0/(1-α)^0 + α*P1/(1-α)^1, ...
np.cumsum(p, out=p)
# (α*P0/(1-α)^0)*(1-α)^0, (α*P0/(1-α)^0 + α*P1/(1-α)^1)*(1-α)^1, ...
np.multiply(p, o, o)
return o
Примечание: ввод будет перезаписан.
Благодаря решению @Divakar, и это действительно быстро. Тем не менее, это вызывает проблему переполнения, на которую указал @Danny. Функция не возвращает правильных ответов, когда длина больше 13835 или около того на моем конце.
Следующее - мое решение, основанное на решении Дивакара и pandas.ewm(). Mean().
def numpy_ema(data, com=None, span=None, halflife=None, alpha=None):
"""Summary
Calculate ema with automatically-generated alpha. Weight of past effect
decreases as the length of window increasing.
# these functions reproduce the pandas result when the flag adjust=False is set.
References:
https://stackru.com/questions/42869495/numpy-version-of-exponential-weighted-moving-average-equivalent-to-pandas-ewm
Args:
data (TYPE): Description
com (float, optional): Specify decay in terms of center of mass, alpha=1/(1+com), for com>=0
span (float, optional): Specify decay in terms of span, alpha=2/(span+1), for span>=1
halflife (float, optional): Specify decay in terms of half-life, alpha=1-exp(log(0.5)/halflife), for halflife>0
alpha (float, optional): Specify smoothing factor alpha directly, 0<alpha<=1
Returns:
TYPE: Description
Raises:
ValueError: Description
"""
n_input = sum(map(bool, [com, span, halflife, alpha]))
if n_input != 1:
raise ValueError(
'com, span, halflife, and alpha are mutually exclusive')
nrow = data.shape[0]
if np.isnan(data).any() or (nrow > 13835) or (data.ndim == 2):
df = pd.DataFrame(data)
df_ewm = df.ewm(com=com, span=span, halflife=halflife,
alpha=alpha, adjust=False)
out = df_ewm.mean().values.squeeze()
else:
if com:
alpha = 1 / (1 + com)
elif span:
alpha = 2 / (span + 1.0)
elif halflife:
alpha = 1 - np.exp(np.log(0.5) / halflife)
alpha_rev = 1 - alpha
pows = alpha_rev**(np.arange(nrow + 1))
scale_arr = 1 / pows[:-1]
offset = data[0] * pows[1:]
pw0 = alpha * alpha_rev**(nrow - 1)
mult = data * pw0 * scale_arr
cumsums = np.cumsum(mult)
out = offset + cumsums * scale_arr[::-1]
return out
Разве экспоненциальный фильтр не то же самое, что БИХ-фильтр первого порядка? Почему бы вам не попробовать это:
from scipy import signal
signal.lfilter([alpha], [1, alpha-1], data)
где альфа варьируется от 0 до 1
Вот моя реализация для одномерных входных массивов с бесконечным размером окна. Поскольку он использует большие числа, он работает только с входными массивами с элементами с абсолютным значением < 1e16, когда используется float32, но это обычно должно иметь место.
Идея состоит в том, чтобы преобразовать входной массив в срезы ограниченной длины, чтобы не происходило переполнение, а затем выполнить вычисление ewm для каждого среза отдельно.
def ewm(x, alpha):
"""
Returns the exponentially weighted mean y of a numpy array x with scaling factor alpha
y[0] = x[0]
y[j] = (1. - alpha) * y[j-1] + alpha * x[j], for j > 0
x -- 1D numpy array
alpha -- float
"""
n = int(-100. / np.log(1.-alpha)) # Makes sure that the first and last elements in f are very big and very small (about 1e22 and 1e-22)
f = np.exp(np.arange(1-n, n, 2) * (0.5 * np.log(1. - alpha))) # Scaling factor for each slice
tmp = (np.resize(x, ((len(x) + n - 1) // n, n)) / f * alpha).cumsum(axis=1) * f # Get ewm for each slice of length n
# Add the last value of each previous slice to the next slice with corresponding scaling factor f and return result
return np.resize(tmp + np.tensordot(np.append(x[0], np.roll(tmp.T[n-1], 1)[1:]), f * ((1. - alpha) / f[0]), axes=0), len(x))
Все приведенные ниже ответы не учитывают отсутствующие значения, поэтому я даю свою версию, которая предполагает, что nan, а результат соответствует pandas EWM. Я использую numba для ускорения, и это в десять раз быстрее, чем реализация pandas.
matrix = df.values
@jit(nopython=True, nogil=True, parallel=True)
def ewm_mean(arr_in, com):
'''
calculate the exponential moving average for each column
$y_{t}=\frac{ewm_t}{w_t}$
$ewm_t = ewm_{t-1} (1 - \alpha) + x_t$
$w_t = w_{t-1} (1 - \alpha) + 1$
arr_in->ndarray(dtype=float64): ewm per column
com->int: $\alpha = 1/com + 1$
'''
t, m = arr_in.shape
ewma = np.empty((t, m), dtype=float32)
alpha = 1 / (com + 1)
# the size of blocks depending on the device, number of blocks should match the number of cores on your machine
sizeOfblock = 1000
numberOfblock = m // sizeOfblock
assert sizeOfblock*numberOfblock == m, "wrong split"
# main loop
for nb in prange(numberOfblock): # split columns to blocks
w = np.where(np.isnan(arr_in[0, nb*sizeOfblock: (nb+1)*sizeOfblock]), 0., 1.)
ewma_old = arr_in[0, nb*sizeOfblock: (nb+1)*sizeOfblock]
ewma[0, nb*sizeOfblock: (nb+1)*sizeOfblock] = ewma_old
for i in range(1, t): # accumulate row by row
data_now = arr_in[i, nb*sizeOfblock: (nb+1)*sizeOfblock]
ewma_old = np.where(np.isnan(ewma_old), 0., ewma_old)*(1-alpha) + data_now
if np.isnan(np.sum(data_now)): # check nan
nan_pos = np.isnan(data_now)
w = w * (1 - alpha) + np.where(nan_pos, 0., 1.)
d = ewma_old / w
ewma[i, nb*sizeOfblock: (nb+1)*sizeOfblock] = np.where(nan_pos, ewma[i-1, nb*sizeOfblock: (nb+1)*sizeOfblock], d)
else:
w = w * (1 - alpha) + 1.
d = ewma_old / w
ewma[i, nb*sizeOfblock: (nb+1)*sizeOfblock] = d
return ewma
np.isclose(df.ewm(com=2).mean().values, ewm_mean(matrix, com=2), equal_nan=True).all()
Набор данных выглядит так.
import pandas as pd
import numpy as np
np.random.seed(0)
df = pd.DataFrame(np.random.normal(0, 10, size=(4000, 5000)))
df.iloc[0:800, 1000:2000] = np.nan
df.iloc[800:1600, 2000:3000] = np.nan
df.iloc[1600:2400, 3000:4000] = np.nan
df.iloc[2400:3200, 4000:5000] = np.nan
Примечание. В этом коде много времени уходит на обработку чисел nan, поэтому я разбиваю данные на блоки и выполняю вычисления параллельно. Если ваши данные не имеют значений nan, вы можете внести незначительные изменения и значительно ускорить их. Кроме того, вы можете написать более сложную логику для назначения данных ядрам.
sizeOfblock
- параметр; играть с этим.
Основываясь на великолепном ответе Дивакара, вот реализация, которая соответствует adjust=True
флаг функции панд, то есть использование весов, а не рекурсии.
def numpy_ewma(data, window):
alpha = 2 /(window + 1.0)
scale = 1/(1-alpha)
n = data.shape[0]
scale_arr = (1-alpha)**(-1*np.arange(n))
weights = (1-alpha)**np.arange(n)
pw0 = (1-alpha)**(n-1)
mult = data*pw0*scale_arr
cumsums = mult.cumsum()
out = cumsums*scale_arr[::-1] / weights.cumsum()
return out