Почему NUMPY коррелят и corrcoef возвращают разные значения и как "нормализовать" коррелят в "полном" режиме?
Я пытаюсь использовать анализ временных рядов в Python, используя Numpy.
У меня есть два ряда средних размеров, по 20 тыс. Значений в каждом, и я хочу проверить скользящую корреляцию.
Corrcoef дает мне в качестве выходных данных матрицу автокорреляции / коэффициентов корреляции. Само по себе ничего полезного в моем случае нет, так как одна из серий содержит задержку.
Функция корреляции (в mode="full") возвращает список из 40 тыс. Элементов, который выглядит как результат, к которому я стремлюсь (пиковое значение находится так же далеко от центра списка, как указывалось бы в Lag), но все значения странные - до 500, когда я ожидал что-то от -1 до 1.
Я не могу просто разделить все на максимальное значение; Я знаю, что максимальная корреляция не 1.
Как можно нормализовать "взаимную корреляцию" (корреляцию в "полном" режиме), чтобы возвращаемые значения были корреляцией на каждом шаге запаздывания вместо этих очень больших, странных значений?
3 ответа
Вы ищете нормализованную взаимную корреляцию. Эта опция еще не доступна в Numpy, но патч ждет проверки, которая делает именно то, что вы хотите. Это не должно быть слишком трудно применить это, я думаю. Большая часть патча - просто строковая документация. Единственные строки кода, которые он добавляет,
if normalize:
a = (a - mean(a)) / (std(a) * len(a))
v = (v - mean(v)) / std(v)
где a и v - введенные массивы, для которых вы находите взаимную корреляцию. Нетрудно добавить их в свой собственный дистрибутив Numpy или просто скопировать функцию корреляции и добавить туда строки. Я бы сделал последнее лично, если бы выбрал этот путь.
Другая, возможно, лучшая альтернатива - просто выполнить нормализацию для входных векторов, прежде чем отправлять их для корреляции. Вам решать, каким образом вы хотели бы это сделать.
Между прочим, это, кажется, правильная нормализация согласно странице Википедии о взаимной корреляции, за исключением деления на len(a)
скорее, чем (len(a)-1)
, Я чувствую, что расхождение сродни стандартному отклонению выборки по сравнению со стандартным отклонением выборки и, на мой взгляд, не будет иметь большого значения.
Согласно этому слайду, я бы предложил сделать это следующим образом:
def cross_correlation(a1, a2):
lags = range(-len(a1)+1, len(a2))
cs = []
for lag in lags:
idx_lower_a1 = max(lag, 0)
idx_lower_a2 = max(-lag, 0)
idx_upper_a1 = min(len(a1), len(a1)+lag)
idx_upper_a2 = min(len(a2), len(a2)-lag)
b1 = a1[idx_lower_a1:idx_upper_a1]
b2 = a2[idx_lower_a2:idx_upper_a2]
c = np.correlate(b1, b2)[0]
c = c / np.sqrt((b1**2).sum() * (b2**2).sum())
cs.append(c)
return cs
Для
full
режим, имеет ли смысл вычислять
corrcoef
непосредственно по отстающему сигналу / функции? Код
from dataclasses import dataclass
from typing import Any, Optional, Sequence
import numpy as np
ArrayLike = Any
@dataclass
class XCorr:
cross_correlation: np.ndarray
lags: np.ndarray
def cross_correlation(
signal: ArrayLike, feature: ArrayLike, lags: Optional[Sequence[int]] = None
) -> XCorr:
"""
Computes normalized cross correlation between the `signal` and the `feature`.
Current implementation assumes the `feature` can't be longer than the `signal`.
You can optionally provide specific lags, if not provided `signal` is padded
with the length of the `feature` - 1, and the `feature` is slid/padded (creating lags)
with 0 padding to match the length of the new signal. Pearson product-moment
correlation coefficients is computed for each lag.
See: https://en.wikipedia.org/wiki/Cross-correlation
:param signal: observed signal
:param feature: feature you are looking for
:param lags: optional lags, if not provided equals to (-len(feature), len(signal))
"""
signal_ar = np.asarray(signal)
feature_ar = np.asarray(feature)
if np.count_nonzero(feature_ar) == 0:
raise ValueError("Unsupported - feature contains only zeros")
assert (
signal_ar.ndim == feature_ar.ndim == 1
), "Unsupported - only 1d signal/feature supported"
assert len(feature_ar) <= len(
signal
), "Unsupported - signal should be at least as long as the feature"
padding_sz = len(feature_ar) - 1
padded_signal = np.pad(
signal_ar, (padding_sz, padding_sz), "constant", constant_values=0
)
lags = lags if lags is not None else range(-padding_sz, len(signal_ar), 1)
if np.max(lags) >= len(signal_ar):
raise ValueError("max positive lag must be shorter than the signal")
if np.min(lags) <= -len(feature_ar):
raise ValueError("max negative lag can't be longer than the feature")
assert np.max(lags) < len(signal_ar), ""
lagged_patterns = np.asarray(
[
np.pad(
feature_ar,
(padding_sz + lag, len(signal_ar) - lag - 1),
"constant",
constant_values=0,
)
for lag in lags
]
)
return XCorr(
cross_correlation=np.corrcoef(padded_signal, lagged_patterns)[0, 1:],
lags=np.asarray(lags),
)
Пример:
signal = [0, 0, 1, 0.5, 1, 0, 0, 1]
feature = [1, 0, 0, 1]
xcorr = cross_correlation(signal, feature)
assert xcorr.lags[xcorr.cross_correlation.argmax()] == 4