Вещание только с определенными размерами ndarray в python

Рассмотрим TxFxM ndarray. Я хочу умножить его с его сопряженным, только для M измерение, оставляя другие измерения такими же, как представлено в следующем коде:

import numpy as np

T=2
F=3
M=4

x=np.random.rand(T,F,M)

result=np.zeros((T,F,M,M))

for i in range(x.shape[0]):
    for j in range(x.shape[1]):
            result[i,j,:,:]=np.matmul(np.expand_dims(x[i,j,:],axis=1),np.expand_dims(x[i,j,:],axis=0).conj())

Если я просто использую вещание, как в np.matmul(x,x.conj().T), Операция широковещания будет продолжаться до более высоких уровней измерений и будет продолжаться. С другой стороны, моя реализация очень медленная из-за двух циклов и очень непонимания для моего понимания.

Есть ли способ реализовать этот ST, он будет работать быстрее?

PS

  • Мои размеры явно больше T=3000,F=1024,M=4, И эта операция повторяется, отсюда мое требование к быстрой реализации.
  • Я планирую усреднить это по измерению T, так что если бы общая реализация была быстрее, мне было бы очень интересно. *

0 ответов

Нужный вам массив можно вычислить с помощью широковещательной передачи, если вы вставите одноэлементные измерения в два разных места для x а также x.conj(). Еслиx имеет форму (T,F,M) затем массивы формы (T,F,M,1) а также (T,F,1,M) будет транслироваться (T,F,M,M)именно так, как вы этого хотите. Вот ваш пример со сложными данными, чтобы убедиться, что мы чего-то не упускаем:

import numpy as np 

T,F,M = 2,3,4 

x = np.random.rand(T,F,M) + np.random.rand(T,F,M)*1j 

result = np.zeros((T,F,M,M), dtype=complex) 

# loop
for i in range(x.shape[0]): 
    for j in range(x.shape[1]):
        result[i,j,:,:] = np.matmul(np.expand_dims(x[i,j,:],axis=1),
                                    np.expand_dims(x[i,j,:],axis=0).conj())

# broadcasting
result2 = x[..., None] * x[..., None, :].conj()

# proof
print(np.array_equal(result, result2))  # True

Поскольку вы упомянули, что хотите T-размерное измерение, мы должны решить, стоит ли ставить это измерение последним, чтобы среднее значение использовало как можно более непрерывные блоки памяти. Это означает следующие варианты:

def summed_original(x): 
    """Assume x.shape == (T, F, M), return summed array of shape (F, M, M)""" 
    return (x[..., None] * x[..., None, :].conj()).mean(0) 

def summed_transposed(x): 
    """Assume x.shape == (F, M, T), return summed array of shape (F, M, M)""" 
    return (x[..., None, :] * x[:, None, ...].conj()).mean(-1) 

x_transposed = x.transpose(1, 2, 0).copy()  # ensure contiguous copy 
print(np.allclose(summed_original(x), summed_transposed(x_transposed)))  # True

Как видите, эти две функции вычисляют одно и то же, но предполагают, что входные данные имеют разный порядок памяти. Причина, по которой это важно, заключается в том, что может оказаться быстрее иметь исходный массив в другом макете памяти (за счет транспонирования и копирования его один раз в начале, если это необходимо).

Итак, давайте рассчитаем время с помощью IPython %timeit магия и ваши настоящие размеры:

T,F,M = 3000,1024,4 
x = np.random.rand(T, F, M) + np.random.rand(T, F, M)*1j 
x_transposed = x.transpose(1, 2, 0).copy() 
print(np.allclose(summed_original(x), summed_transposed(x_transposed)))  # True

%timeit summed_original(x)
# 500 ms ± 16.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

%timeit summed_transposed(x_transposed)
# 352 ms ± 2.31 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Как видите, для ваших конкретных размеров и типов кажется целесообразным изменить размеры вашего массива так, чтобы TРазмер соответствует непрерывным блокам памяти, помогая кэшированию в ЦП. Вы можете сделать это с помощью.transpose(...).copy() вызовите в начале, или, что еще лучше, вы можете создать свой массив в первую очередь, сделав код оптимальным.

Другие вопросы по тегам