Вещание только с определенными размерами ndarray в python
Рассмотрим TxFxM
ndarray. Я хочу умножить его с его сопряженным, только для M
измерение, оставляя другие измерения такими же, как представлено в следующем коде:
import numpy as np
T=2
F=3
M=4
x=np.random.rand(T,F,M)
result=np.zeros((T,F,M,M))
for i in range(x.shape[0]):
for j in range(x.shape[1]):
result[i,j,:,:]=np.matmul(np.expand_dims(x[i,j,:],axis=1),np.expand_dims(x[i,j,:],axis=0).conj())
Если я просто использую вещание, как в np.matmul(x,x.conj().T)
, Операция широковещания будет продолжаться до более высоких уровней измерений и будет продолжаться. С другой стороны, моя реализация очень медленная из-за двух циклов и очень непонимания для моего понимания.
Есть ли способ реализовать этот ST, он будет работать быстрее?
PS
- Мои размеры явно больше
T=3000,F=1024,M=4
, И эта операция повторяется, отсюда мое требование к быстрой реализации. - Я планирую усреднить это по измерению
T
, так что если бы общая реализация была быстрее, мне было бы очень интересно. *
0 ответов
Нужный вам массив можно вычислить с помощью широковещательной передачи, если вы вставите одноэлементные измерения в два разных места для x
а также x.conj()
. Еслиx
имеет форму (T,F,M)
затем массивы формы (T,F,M,1)
а также (T,F,1,M)
будет транслироваться (T,F,M,M)
именно так, как вы этого хотите. Вот ваш пример со сложными данными, чтобы убедиться, что мы чего-то не упускаем:
import numpy as np
T,F,M = 2,3,4
x = np.random.rand(T,F,M) + np.random.rand(T,F,M)*1j
result = np.zeros((T,F,M,M), dtype=complex)
# loop
for i in range(x.shape[0]):
for j in range(x.shape[1]):
result[i,j,:,:] = np.matmul(np.expand_dims(x[i,j,:],axis=1),
np.expand_dims(x[i,j,:],axis=0).conj())
# broadcasting
result2 = x[..., None] * x[..., None, :].conj()
# proof
print(np.array_equal(result, result2)) # True
Поскольку вы упомянули, что хотите T
-размерное измерение, мы должны решить, стоит ли ставить это измерение последним, чтобы среднее значение использовало как можно более непрерывные блоки памяти. Это означает следующие варианты:
def summed_original(x):
"""Assume x.shape == (T, F, M), return summed array of shape (F, M, M)"""
return (x[..., None] * x[..., None, :].conj()).mean(0)
def summed_transposed(x):
"""Assume x.shape == (F, M, T), return summed array of shape (F, M, M)"""
return (x[..., None, :] * x[:, None, ...].conj()).mean(-1)
x_transposed = x.transpose(1, 2, 0).copy() # ensure contiguous copy
print(np.allclose(summed_original(x), summed_transposed(x_transposed))) # True
Как видите, эти две функции вычисляют одно и то же, но предполагают, что входные данные имеют разный порядок памяти. Причина, по которой это важно, заключается в том, что может оказаться быстрее иметь исходный массив в другом макете памяти (за счет транспонирования и копирования его один раз в начале, если это необходимо).
Итак, давайте рассчитаем время с помощью IPython %timeit
магия и ваши настоящие размеры:
T,F,M = 3000,1024,4
x = np.random.rand(T, F, M) + np.random.rand(T, F, M)*1j
x_transposed = x.transpose(1, 2, 0).copy()
print(np.allclose(summed_original(x), summed_transposed(x_transposed))) # True
%timeit summed_original(x)
# 500 ms ± 16.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit summed_transposed(x_transposed)
# 352 ms ± 2.31 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Как видите, для ваших конкретных размеров и типов кажется целесообразным изменить размеры вашего массива так, чтобы T
Размер соответствует непрерывным блокам памяти, помогая кэшированию в ЦП. Вы можете сделать это с помощью.transpose(...).copy()
вызовите в начале, или, что еще лучше, вы можете создать свой массив в первую очередь, сделав код оптимальным.