Утомительный цикл поиска улучшений

В моем коде мне нужно много раз вычислить значения вектора, которые являются средними значениями из разных патчей другого массива. Вот пример моего кода, показывающий, как я это делаю, но я обнаружил, что он слишком неэффективен в работе...

import numpy as np
vector_a = np.zeros(10)
array_a = np.random.random((100,100))
for i in range(len(vector_a)):
    vector_a[i] = np.mean(array_a[:,i+20:i+40]

Есть ли способ сделать его более эффективным? Любые комментарии или предложения приветствуются! Большое спасибо!

Да, 20 и 40 фиксированы.

3 ответа

Решение

РЕДАКТИРОВАТЬ:

На самом деле вы можете сделать это намного быстрее. Предыдущая функция может быть улучшена путем работы с суммированными столбцами, например:

def rolling_means_faster1(array_a, n, first, size):
    # Sum each relevant columns
    sum_a = np.sum(array_a[:, first:(first + size + n - 1)], axis=0)
    # Reshape as before
    strides_b = (sum_a.strides[0], sum_a.strides[0])
    array_b = np.lib.stride_tricks.as_strided(sum_a, (n, size), (strides_b))
    # Average
    v = np.sum(array_b, axis=1)
    v /= (len(array_a) * size)
    return v

Другой способ - работать с накопленными суммами, добавляя и удаляя по мере необходимости для каждого элемента вывода.

def rolling_means_faster2(array_a, n, first, size):
    # Sum each relevant columns
    sum_a = np.sum(array_a[:, first:(first + size + n - 1)], axis=0)
    # Add a zero a the beginning so the next operation works fine
    sum_a = np.insert(sum_a, 0, 0)
    # Sum the initial `size` elements and add and remove partial sums as necessary
    v = np.sum(sum_a[:size]) - np.cumsum(sum_a[:n]) + np.cumsum(sum_a[-n:])
    # Average
    v /= (size * len(array_a))
    return v

Сравнительный анализ с предыдущим решением:

import numpy as np

np.random.seed(100)
array_a = np.random.random((1000, 1000))
n = 100
first = 100
size = 200

%timeit rolling_means_orig(array_a, n, first, size)
# 12.7 ms ± 55.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit rolling_means(array_a, n, first, size)
# 5.49 ms ± 43.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit rolling_means_faster1(array_a, n, first, size)
# 166 µs ± 874 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
%timeit rolling_means_faster2(array_a, n, first, size)
# 182 µs ± 2.04 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

Таким образом, последние два, похоже, очень близки по производительности. Это может зависеть от относительных размеров входов.


Это возможное векторизованное решение:

import numpy as np

# Data
np.random.seed(100)
array_a = np.random.random((100, 100))

# Take all the relevant columns
slice_a = array_a[:, 20:40 + 10]
# Make a "rolling window" with stride tricks
strides_b = (slice_a.strides[1], slice_a.strides[0], slice_a.strides[1])
array_b = np.lib.stride_tricks.as_strided(slice_a, (10, 100, 20), (strides_b))
# Take mean
result = np.mean(array_b, axis=(1, 2))

# Original method for testing correctness
vector_a = np.zeros(10)
idv1 = np.arange(10) + 20
idv2 = np.arange(10) + 40
for i in range(len(vector_a)):
    vector_a[i] = np.mean(array_a[:,idv1[i]:idv2[i]])
print(np.allclose(vector_a, result))
# True

Вот быстрый тест в IPython (размеры увеличены для оценки):

import numpy as np

def rolling_means(array_a, n, first, size):
    slice_a = array_a[:, first:(first + size + n)]
    strides_b = (slice_a.strides[1], slice_a.strides[0], slice_a.strides[1])
    array_b = np.lib.stride_tricks.as_strided(slice_a, (n, len(array_a), size), (strides_b))
    return np.mean(array_b, axis=(1, 2))

def rolling_means_orig(array_a, n, first, size):
    vector_a = np.zeros(n)
    idv1 = np.arange(n) + first
    idv2 = np.arange(n) + (first + size)
    for i in range(len(vector_a)):
        vector_a[i] = np.mean(array_a[:,idv1[i]:idv2[i]])
    return vector_a

np.random.seed(100)
array_a = np.random.random((1000, 1000))
n = 100
first = 100
size = 200

%timeit rolling_means(array_a, n, first, size)
# 5.48 ms ± 26.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit rolling_means_orig(array_a, n, first, size)
# 32.8 ms ± 762 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

Это решение работает в предположении, что вы пытаетесь вычислить скользящее среднее для подмножества окна столбцов. В качестве примера и игнорирования строк, приведенных [0, 1, 2, 3, 4] и окно 2 средние [0.5, 1.5, 2.5, 3.5]и что вам могут понадобиться только второе и третье средние значения.

Ваше текущее решение неэффективно, так как оно пересчитывает среднее значение для столбца для каждого вывода в vector_a, При условии (a / n) + (b / n) == (a + b) / n мы можем уйти от вычисления среднего значения каждого столбца только один раз, а затем объединить значения столбцов по мере необходимости для получения окончательного результата.

window_first_start = idv1.min() # or idv1[0]
window_last_end = idv2.max() # or idv2[-1]
window_size = idv2[0] - idv1[0]
assert ((idv2 - idv1) == window_size).all(), "sanity check, not needed if assumption holds true"

# a view of the columns we are interested in, no copying is done here
view = array_a[:,window_first_start:window_last_end]

# calculate the means for each column
col_means = view.mean(axis=0)

# cumsum is used to find the rolling sum of means and so the rolling average
# We use an out variable to make sure we have a 0 in the first element of cum_sum. 
# This makes like a little easier in the next step.
cum_sum = np.empty(len(col_means) + 1, dtype=col_means.dtype)
cum_sum[0] = 0
np.cumsum(col_means, out=cum_sum[1:])

result = (cum_sum[window_size:] - cum_sum[:-window_size]) / window_size 

Протестировав это с вашим собственным кодом, описанное выше значительно быстрее (увеличивается с размером входного массива) и немного быстрее, чем решение, предоставляемое jdehesa. С входным массивом 1000x1000 это на два порядка быстрее, чем ваше решение, и на один порядок быстрее, чем у jdehesa.

Попробуй это:

import numpy as np     
array_a = np.random.random((100,100))
vector_a = [np.mean(array_a[:,i+20:i+40]) for i in range(10)]
Другие вопросы по тегам