Улучшить производительность цикла for в Python (возможно, с помощью numpy или numba)
Я хочу улучшить производительность for
цикл в этой функции.
import numpy as np
import random
def play_game(row, n=1000000):
"""Play the game! This game is a kind of random walk.
Arguments:
row (int[]): row index to use in the p matrix for each step in the
walk. Then length of this array is the same as n.
n (int): number of steps in the random walk
"""
p = np.array([[ 0.499, 0.499, 0.499],
[ 0.099, 0.749, 0.749]])
X0 = 100
Y0 = X0 % 3
X = np.zeros(n)
tempX = X0
Y = Y0
for j in range(n):
tempX = X[j] = tempX + 2 * (random.random() < p.item(row.item(j), Y)) - 1
Y = tempX % 3
return np.r_[X0, X]
Трудность заключается в том, что ценность Y
рассчитывается на каждом шаге на основе значения X
и это Y
затем используется на следующем шаге, чтобы обновить значение для X
,
Интересно, есть ли какая-нибудь хитрая уловка, которая могла бы иметь большое значение. Использование Numba - это честная игра (я пробовал, но без особого успеха). Однако я не хочу использовать Cython.
1 ответ
Быстрое замечание говорит нам, что между итерациями в коде функции есть зависимость от данных. Теперь существуют разные виды зависимостей от данных. Тип зависимости данных, которую вы просматриваете, - это индексирование зависимости, то есть выбор данных на любой итерации зависит от предыдущих итерационных вычислений. Эта зависимость, казалось, трудно проследить между итерациями, поэтому этот пост не является векторизованным решением. Скорее, мы постараемся как можно больше предварительно вычислить значения, которые будут использоваться в цикле. Основная идея заключается в том, чтобы выполнять минимальную работу внутри цикла.
Вот краткое объяснение того, как мы можем приступить к предварительным расчетам и, таким образом, найти более эффективное решение:
Учитывая, относительно небольшую форму
p
из каких элементов строки должны быть извлечены на основе вводаrow
Вы можете предварительно выбрать все эти строки изp
сp[row]
,Для каждой итерации вы вычисляете случайное число. Вы можете заменить это случайным массивом, который вы можете установить перед циклом, и, таким образом, вы также предварительно рассчитали бы эти случайные значения.
Основываясь на предварительно рассчитанных значениях, у вас будут индексы столбцов для всех строк в
p
, Обратите внимание, что эти индексы столбцов будут большим ndarray, содержащим все возможные индексы столбцов, и внутри нашего кода будет выбран только один из них на основе вычислений за каждую итерацию. Используя индексы столбцов для каждой итерации, вы увеличиваете или уменьшаетеX0
чтобы получить вывод за итерацию.
Реализация будет выглядеть так -
randarr = np.random.rand(n)
p = np.array([[ 0.499, 0.419, 0.639],
[ 0.099, 0.749, 0.319]])
def play_game_partvect(row,n,randarr,p):
X0 = 100
Y0 = X0 % 3
signvals = 2*(randarr[:,None] < p[row]) - 1
col_idx = (signvals + np.arange(3)) % 3
Y = Y0
currval = X0
out = np.empty(n+1)
out[0] = X0
for j in range(n):
currval = currval + signvals[j,Y]
out[j+1] = currval
Y = col_idx[j,Y]
return out
Для проверки исходного кода вы должны изменить исходный код следующим образом:
def play_game(row,n,randarr,p):
X0 = 100
Y0 = X0 % 3
X = np.zeros(n)
tempX = X0
Y = Y0
for j in range(n):
tempX = X[j] = tempX + 2 * (randarr[j] < p.item(row.item(j), Y)) - 1
Y = tempX % 3
return np.r_[X0, X]
Обратите внимание, что, поскольку этот код предварительно вычисляет эти случайные значения, то это уже даст вам хорошее ускорение по сравнению с кодом в вопросе.
Испытания во время выполнения и проверка вывода -
In [2]: # Inputs
...: n = 1000
...: row = np.random.randint(0,2,(n))
...: randarr = np.random.rand(n)
...: p = np.array([[ 0.499, 0.419, 0.639],
...: [ 0.099, 0.749, 0.319]])
...:
In [3]: np.allclose(play_game_partvect(row,n,randarr,p),play_game(row,n,randarr,p))
Out[3]: True
In [4]: %timeit play_game(row,n,randarr,p)
100 loops, best of 3: 11.6 ms per loop
In [5]: %timeit play_game_partvect(row,n,randarr,p)
1000 loops, best of 3: 1.51 ms per loop
In [6]: # Inputs
...: n = 10000
...: row = np.random.randint(0,2,(n))
...: randarr = np.random.rand(n)
...: p = np.array([[ 0.499, 0.419, 0.639],
...: [ 0.099, 0.749, 0.319]])
...:
In [7]: np.allclose(play_game_partvect(row,n,randarr,p),play_game(row,n,randarr,p))
Out[7]: True
In [8]: %timeit play_game(row,n,randarr,p)
10 loops, best of 3: 116 ms per loop
In [9]: %timeit play_game_partvect(row,n,randarr,p)
100 loops, best of 3: 14.8 ms per loop
Таким образом, мы видим ускорение около 7.5x+
, неплохо!