Почему NUMPY коррелят и corrcoef возвращают разные значения и как "нормализовать" коррелят в "полном" режиме?

Я пытаюсь использовать анализ временных рядов в Python, используя Numpy.

У меня есть два ряда средних размеров, по 20 тыс. Значений в каждом, и я хочу проверить скользящую корреляцию.

Corrcoef дает мне в качестве выходных данных матрицу автокорреляции / коэффициентов корреляции. Само по себе ничего полезного в моем случае нет, так как одна из серий содержит задержку.

Функция корреляции (в mode="full") возвращает список из 40 тыс. Элементов, который выглядит как результат, к которому я стремлюсь (пиковое значение находится так же далеко от центра списка, как указывалось бы в Lag), но все значения странные - до 500, когда я ожидал что-то от -1 до 1.

Я не могу просто разделить все на максимальное значение; Я знаю, что максимальная корреляция не 1.

Как можно нормализовать "взаимную корреляцию" (корреляцию в "полном" режиме), чтобы возвращаемые значения были корреляцией на каждом шаге запаздывания вместо этих очень больших, странных значений?

3 ответа

Решение

Вы ищете нормализованную взаимную корреляцию. Эта опция еще не доступна в Numpy, но патч ждет проверки, которая делает именно то, что вы хотите. Это не должно быть слишком трудно применить это, я думаю. Большая часть патча - просто строковая документация. Единственные строки кода, которые он добавляет,

if normalize:
    a = (a - mean(a)) / (std(a) * len(a))
    v = (v - mean(v)) /  std(v)

где a и v - введенные массивы, для которых вы находите взаимную корреляцию. Нетрудно добавить их в свой собственный дистрибутив Numpy или просто скопировать функцию корреляции и добавить туда строки. Я бы сделал последнее лично, если бы выбрал этот путь.

Другая, возможно, лучшая альтернатива - просто выполнить нормализацию для входных векторов, прежде чем отправлять их для корреляции. Вам решать, каким образом вы хотели бы это сделать.

Между прочим, это, кажется, правильная нормализация согласно странице Википедии о взаимной корреляции, за исключением деления на len(a) скорее, чем (len(a)-1), Я чувствую, что расхождение сродни стандартному отклонению выборки по сравнению со стандартным отклонением выборки и, на мой взгляд, не будет иметь большого значения.

Согласно этому слайду, я бы предложил сделать это следующим образом:

def cross_correlation(a1, a2):
        lags = range(-len(a1)+1, len(a2))
        cs = []
        for lag in lags:
            idx_lower_a1 = max(lag, 0)
            idx_lower_a2 = max(-lag, 0)
            idx_upper_a1 = min(len(a1), len(a1)+lag)
            idx_upper_a2 = min(len(a2), len(a2)-lag)
            b1 = a1[idx_lower_a1:idx_upper_a1]
            b2 = a2[idx_lower_a2:idx_upper_a2]
            c = np.correlate(b1, b2)[0]
            c = c / np.sqrt((b1**2).sum() * (b2**2).sum())
            cs.append(c)
        return cs

Для full режим, имеет ли смысл вычислять corrcoefнепосредственно по отстающему сигналу / функции? Код

      from dataclasses import dataclass
from typing import Any, Optional, Sequence

import numpy as np

ArrayLike = Any


@dataclass
class XCorr:
    cross_correlation: np.ndarray
    lags: np.ndarray


def cross_correlation(
    signal: ArrayLike, feature: ArrayLike, lags: Optional[Sequence[int]] = None
) -> XCorr:
    """
    Computes normalized cross correlation between the `signal` and the `feature`.
    Current implementation assumes the `feature` can't be longer than the `signal`.
    You can optionally provide specific lags, if not provided `signal` is padded
    with the length of the `feature` - 1, and the `feature` is slid/padded (creating lags)
    with 0 padding to match the length of the new signal. Pearson product-moment
    correlation coefficients is computed for each lag.

    See: https://en.wikipedia.org/wiki/Cross-correlation

    :param signal: observed signal
    :param feature: feature you are looking for
    :param lags: optional lags, if not provided equals to (-len(feature), len(signal))
    """
    signal_ar = np.asarray(signal)
    feature_ar = np.asarray(feature)
    if np.count_nonzero(feature_ar) == 0:
        raise ValueError("Unsupported - feature contains only zeros")
    assert (
        signal_ar.ndim == feature_ar.ndim == 1
    ), "Unsupported - only 1d signal/feature supported"
    assert len(feature_ar) <= len(
        signal
    ), "Unsupported - signal should be at least as long as the feature"
    padding_sz = len(feature_ar) - 1
    padded_signal = np.pad(
        signal_ar, (padding_sz, padding_sz), "constant", constant_values=0
    )
    lags = lags if lags is not None else range(-padding_sz, len(signal_ar), 1)
    if np.max(lags) >= len(signal_ar):
        raise ValueError("max positive lag must be shorter than the signal")
    if np.min(lags) <= -len(feature_ar):
        raise ValueError("max negative lag can't be longer than the feature")
    assert np.max(lags) < len(signal_ar), ""
    lagged_patterns = np.asarray(
        [
            np.pad(
                feature_ar,
                (padding_sz + lag, len(signal_ar) - lag - 1),
                "constant",
                constant_values=0,
            )
            for lag in lags
        ]
    )
    return XCorr(
        cross_correlation=np.corrcoef(padded_signal, lagged_patterns)[0, 1:],
        lags=np.asarray(lags),
    )

Пример:

      signal = [0, 0, 1, 0.5, 1, 0, 0, 1]
feature = [1, 0, 0, 1]
xcorr = cross_correlation(signal, feature)
assert xcorr.lags[xcorr.cross_correlation.argmax()] == 4
Другие вопросы по тегам