Утомительный цикл поиска улучшений
В моем коде мне нужно много раз вычислить значения вектора, которые являются средними значениями из разных патчей другого массива. Вот пример моего кода, показывающий, как я это делаю, но я обнаружил, что он слишком неэффективен в работе...
import numpy as np
vector_a = np.zeros(10)
array_a = np.random.random((100,100))
for i in range(len(vector_a)):
vector_a[i] = np.mean(array_a[:,i+20:i+40]
Есть ли способ сделать его более эффективным? Любые комментарии или предложения приветствуются! Большое спасибо!
Да, 20 и 40 фиксированы.
3 ответа
РЕДАКТИРОВАТЬ:
На самом деле вы можете сделать это намного быстрее. Предыдущая функция может быть улучшена путем работы с суммированными столбцами, например:
def rolling_means_faster1(array_a, n, first, size):
# Sum each relevant columns
sum_a = np.sum(array_a[:, first:(first + size + n - 1)], axis=0)
# Reshape as before
strides_b = (sum_a.strides[0], sum_a.strides[0])
array_b = np.lib.stride_tricks.as_strided(sum_a, (n, size), (strides_b))
# Average
v = np.sum(array_b, axis=1)
v /= (len(array_a) * size)
return v
Другой способ - работать с накопленными суммами, добавляя и удаляя по мере необходимости для каждого элемента вывода.
def rolling_means_faster2(array_a, n, first, size):
# Sum each relevant columns
sum_a = np.sum(array_a[:, first:(first + size + n - 1)], axis=0)
# Add a zero a the beginning so the next operation works fine
sum_a = np.insert(sum_a, 0, 0)
# Sum the initial `size` elements and add and remove partial sums as necessary
v = np.sum(sum_a[:size]) - np.cumsum(sum_a[:n]) + np.cumsum(sum_a[-n:])
# Average
v /= (size * len(array_a))
return v
Сравнительный анализ с предыдущим решением:
import numpy as np
np.random.seed(100)
array_a = np.random.random((1000, 1000))
n = 100
first = 100
size = 200
%timeit rolling_means_orig(array_a, n, first, size)
# 12.7 ms ± 55.8 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit rolling_means(array_a, n, first, size)
# 5.49 ms ± 43.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit rolling_means_faster1(array_a, n, first, size)
# 166 µs ± 874 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)
%timeit rolling_means_faster2(array_a, n, first, size)
# 182 µs ± 2.04 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
Таким образом, последние два, похоже, очень близки по производительности. Это может зависеть от относительных размеров входов.
Это возможное векторизованное решение:
import numpy as np
# Data
np.random.seed(100)
array_a = np.random.random((100, 100))
# Take all the relevant columns
slice_a = array_a[:, 20:40 + 10]
# Make a "rolling window" with stride tricks
strides_b = (slice_a.strides[1], slice_a.strides[0], slice_a.strides[1])
array_b = np.lib.stride_tricks.as_strided(slice_a, (10, 100, 20), (strides_b))
# Take mean
result = np.mean(array_b, axis=(1, 2))
# Original method for testing correctness
vector_a = np.zeros(10)
idv1 = np.arange(10) + 20
idv2 = np.arange(10) + 40
for i in range(len(vector_a)):
vector_a[i] = np.mean(array_a[:,idv1[i]:idv2[i]])
print(np.allclose(vector_a, result))
# True
Вот быстрый тест в IPython (размеры увеличены для оценки):
import numpy as np
def rolling_means(array_a, n, first, size):
slice_a = array_a[:, first:(first + size + n)]
strides_b = (slice_a.strides[1], slice_a.strides[0], slice_a.strides[1])
array_b = np.lib.stride_tricks.as_strided(slice_a, (n, len(array_a), size), (strides_b))
return np.mean(array_b, axis=(1, 2))
def rolling_means_orig(array_a, n, first, size):
vector_a = np.zeros(n)
idv1 = np.arange(n) + first
idv2 = np.arange(n) + (first + size)
for i in range(len(vector_a)):
vector_a[i] = np.mean(array_a[:,idv1[i]:idv2[i]])
return vector_a
np.random.seed(100)
array_a = np.random.random((1000, 1000))
n = 100
first = 100
size = 200
%timeit rolling_means(array_a, n, first, size)
# 5.48 ms ± 26.6 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
%timeit rolling_means_orig(array_a, n, first, size)
# 32.8 ms ± 762 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
Это решение работает в предположении, что вы пытаетесь вычислить скользящее среднее для подмножества окна столбцов. В качестве примера и игнорирования строк, приведенных [0, 1, 2, 3, 4]
и окно 2
средние [0.5, 1.5, 2.5, 3.5]
и что вам могут понадобиться только второе и третье средние значения.
Ваше текущее решение неэффективно, так как оно пересчитывает среднее значение для столбца для каждого вывода в vector_a
, При условии (a / n) + (b / n) == (a + b) / n
мы можем уйти от вычисления среднего значения каждого столбца только один раз, а затем объединить значения столбцов по мере необходимости для получения окончательного результата.
window_first_start = idv1.min() # or idv1[0]
window_last_end = idv2.max() # or idv2[-1]
window_size = idv2[0] - idv1[0]
assert ((idv2 - idv1) == window_size).all(), "sanity check, not needed if assumption holds true"
# a view of the columns we are interested in, no copying is done here
view = array_a[:,window_first_start:window_last_end]
# calculate the means for each column
col_means = view.mean(axis=0)
# cumsum is used to find the rolling sum of means and so the rolling average
# We use an out variable to make sure we have a 0 in the first element of cum_sum.
# This makes like a little easier in the next step.
cum_sum = np.empty(len(col_means) + 1, dtype=col_means.dtype)
cum_sum[0] = 0
np.cumsum(col_means, out=cum_sum[1:])
result = (cum_sum[window_size:] - cum_sum[:-window_size]) / window_size
Протестировав это с вашим собственным кодом, описанное выше значительно быстрее (увеличивается с размером входного массива) и немного быстрее, чем решение, предоставляемое jdehesa. С входным массивом 1000x1000 это на два порядка быстрее, чем ваше решение, и на один порядок быстрее, чем у jdehesa.
Попробуй это:
import numpy as np
array_a = np.random.random((100,100))
vector_a = [np.mean(array_a[:,i+20:i+40]) for i in range(10)]