Передача символического theano.tensor в скомпилированную theano.function

Я пытаюсь реорганизовать свой код, чтобы легче было легко менять архитектуру. В настоящее время я строю рекуррентную нейронную сеть следующим образом.

# input (where first dimension is time)
x = T.matrix()
# target (where first dimension is time)
t = T.matrix()

# recurrent weights as a shared variable
W_hh = theano.shared(numpy.random.uniform(size=(n, n), low=-.01, high=.01))
# input to hidden layer weights
W_hx = theano.shared(numpy.random.uniform(size=(n, nin), low=-.01, high=.01))
# hidden to output layer weights
W_yh = theano.shared(numpy.random.uniform(size=(nout, n), low=-.01, high=.01))
# hidden layer bias weights
b_h = theano.shared(numpy.zeros((n)))
# output layer bias weights
b_y = theano.shared(numpy.zeros((nout)))
# initial hidden state of the RNN
h0 = theano.shared(numpy.zeros((n)))

# recurrent function
def step(x_t, h_tm1):
    h_t = T.nnet.sigmoid(T.dot(W_hx, x_t) + T.dot(W_hh, h_tm1) + b_h)
    y_t = T.nnet.sigmoid(T.dot(W_yh, h_t) + b_y)
    return h_t, y_t

# loop over the recurrent function for the entire sequence
[h, y], _ = theano.scan(step,
                        sequences=x,
                        outputs_info=[h0, None])

# predict function outputs y for a given x
predict = theano.function(inputs=[x,], outputs=y)

Это работает просто отлично. Но проблема с этой реализацией заключается в том, что мне приходится жестко кодировать веса и проверять правильность математики каждый раз, когда я меняю архитектуру. Вдохновленный учебником Multilayer Perceptron, я попытался реорганизовать свой код, представив класс Layer.

class Layer:
    def __init__(self, inputs=[], nins=[], nout=None, Ws=[], b=None, activation=T.tanh):
        """
        inputs:               an array of theano symbolic vectors
        activation:           the activation function for the hidden layer
        nins, nouts, Ws, bs:  either pass the dimensions of the inputs and outputs, or pass
                              the shared theano tensors for the weights and bias.
        """
        n = len(inputs)
        assert(n is not 0)

        self.inputs = inputs
        self.activation = activation

        # create the shared weights if necessary
        if len(Ws) is 0:
            assert(len(nins) is n)
            assert(nout is not None)
            for i in range(n):
                input = inputs[i]
                nin = nins[i]
                W = theano.shared(
                    numpy.random.uniform(
                        size=(nout, nin),
                        low=-numpy.sqrt(6. / (nin + nout)),
                        high=numpy.sqrt(6. / (nin + nout))
                    ),
                )
                Ws.append(W)

        # create the shared biases if necessary
        if b is None:
            assert(nout is not None)
            b = theano.shared(numpy.zeros((nout,)))

        self.Ws = Ws
        self.b = b
        self.params = self.Ws + [b]
        self.weights = Ws

        linear = self.b
        for i in range(n):
            linear += T.dot(self.Ws[i], self.inputs[i])

        if self.activation:
            self.output = self.activation(linear)
        else:
            self.output = linear

Это позволяет мне писать код RNN намного чище, менее подвержен ошибкам и намного проще менять архитектуру.

# one step of the input
x = T.vector()
# the previous hidden layer
h_tm1 = T.vector()

# the input and the hidden layer go into the input layer
hiddenLayer = Layer(inputs=[x, h_tm1],
                    nins=[nin, n],
                    nout=n,
                    activation=T.nnet.sigmoid)

# the hidden layer vector
h = hiddenLayer.output

# the hidden layer output goes to the output
outputLayer = Layer(inputs=[h],
                    nins=[n],
                    nout=nout,
                    activation=T.nnet.sigmoid)

# the output layer vector
y = outputLayer.output

# recurrent function
step = theano.function(inputs=[x, h_tm1],
                       outputs=[h, y])

# next we need to scan over all steps for a given array of observations
# input (where first dimension is time)
Xs = T.matrix()
# initial hidden state of the RNN
h0 = theano.shared(numpy.zeros((n)))

# loop over the recurrent function for the entire sequence
[Hs, Ys], _ = theano.scan(step,
                        sequences=Xs,
                        outputs_info=[h0, None])

# predict function outputs y for a given x
predict = theano.function(inputs=[Xs,], outputs=Ys)

Однако, когда я запускаю свою программу, я получаю сообщение об ошибке

TypeError: ('Bad input argument to theano function at index 0(0-based)', 'Expected an array-like object, but found a Variable: maybe you are trying to call a function on a (possibly shared) variable instead of a numeric array?')

Проблема здесь в том, что операция сканирования передает символическую переменную (подстенор X) в скомпилированную функцию шага.

Весь смысл рефакторинга моего кода заключался в том, чтобы мне не пришлось определять все вычисления внутри функции шага. Теперь у меня осталось 4 символических переменных (x, h_tm1, h, y), которые определяют сегмент вычислительного графа, который мне нужно просмотреть Xs, Тем не менее, я не уверен, как это сделать, потому что theano.function не может принимать символическую переменную.

Вот упрощенный пример того, что я пытаюсь сделать, используя пример возведения в степень.

import theano
import theano.tensor as T
import numpy

# one step of the computation is defined elsewhere, perhaps in a class.
A = T.vector("A")
prior_result = T.vector('p')
next_result = prior_result*A

# Now we compile a function to represent one step of the computation given
# some symbolic variables
step = theano.function(inputs=[prior_result, A], outputs=next_result)

# Finally, we have to loop through that step k times
k = T.iscalar("k")

result, updates = theano.scan(step, 
                              outputs_info=T.ones_like(A),
                              n_steps=k,
                              non_sequences=[A])

final_result = result[-1]

# And now we can define out power function
power = theano.function(inputs=[A,k], outputs=final_result)

print power(range(10),2)
print power(range(10),4)

Есть идеи как обойти эту ошибку?

2 ответа

Решение

Таким образом, решение заключается в использовании theano.clone с replaces Ключевой аргумент. Например, в примере возведения в степень вы можете определить пошаговую функцию следующим образом:

def step(p, a):
    replaces = {prior_result: p, A: a}
    n = theano.clone(next_result, replace=replaces)
    return n

Вы просто не можете использовать скомпилированную функцию Theano как опцию сканирования.

Чтобы обойти это, нужно, чтобы ваши классы Layer имели функцию, которая возвращает функцию, которая строит ваше дерево вычислений, которое вы затем можете использовать для компиляции операции сканирования.

Другие вопросы по тегам