почему методы итерации политики и итерации значений дают разные результаты для оптимальных значений и оптимальной политики?

В настоящее время я изучаю динамическое программирование в обучении с подкреплением, в котором я столкнулся с двумя концепциями Value-Iteration и Policy-Iteration. Чтобы понять то же самое, я реализую пример gridworld из Sutton, который говорит:

Нетерминальные состояния: S = {1, 2, . . ., 14}. В каждом состоянии возможны четыре действия, A = {вверх, вниз, вправо, влево}, которые детерминистически вызывают соответствующие переходы между состояниями, за исключением того, что действия, которые выводят агента из сети, на самом деле оставляют состояние неизменным. Так, например, p(6, 1|5, right) = 1, p(7, 1|7, right) = 1 и p(10, r |5, right) = 0 для всех r, принадлежащих R Это эпизодическая задача без скидок. Вознаграждение равно -1 за все переходы, пока не будет достигнуто конечное состояние. Конечное состояние на рисунке заштриховано (хотя оно показано в двух местах, формально это одно состояние). Таким образом, ожидаемая функция вознаграждения r(s, a, s') = 1 для всех состояний s, s' и действий a. Предположим, агент следует равновероятной случайной политике (все действия равновероятны).

Ниже представлена ​​реализация двух методов

class GridWorld:
    def __init__(self,grid_size = 5, gamma = 0.9, penalty = -1.0, theta = 1e-2):
        self.grid_size = grid_size
        self.discount = gamma
        self.actions = [np.array([0, -1]),
                   np.array([-1, 0]),
                   np.array([0, 1]),
                   np.array([1, 0])]
        self.action_prob = 1/len(self.actions)
        self.theta = theta
        print('action prob : ',self.action_prob)
        self.penalty_reward = penalty
        self.re_init()

    def re_init(self):
        self.values = np.zeros((self.grid_size, self.grid_size))
        self.policy = np.zeros(self.values.shape, dtype=np.int)

    def checkTerminal(self,state):
        x, y = state
        if x == 0 and y == 0:
            return 1
        elif (x == self.grid_size - 1 and y == self.grid_size - 1):
            return 1
        else : 
            return 0


    def step(self, state, action):
        #print(state)
        if self.checkTerminal(state):
            next_state = state
            reward = 0
        else:
            next_state = (np.array(state) + action).tolist()
            x, y = next_state
            if x < 0 or x >= self.grid_size or y < 0 or y >= self.grid_size:
                next_state = state
            reward = self.penalty_reward     

        return next_state, reward


    def compValueIteration(self):
        new_state_values = np.zeros((self.grid_size, self.grid_size))
        policy = np.zeros((self.grid_size, self.grid_size))
        iter_cnt = 0
        while True:
            #delta = 0
            state_values = new_state_values.copy()
            old_state_values = state_values.copy()

            for i in range(self.grid_size):
                for j in range(self.grid_size):
                    values = []
                    for action in self.actions:
                        (next_i, next_j), reward = self.step([i, j], action)
                        values.append(reward + self.discount*state_values[next_i, next_j])
                    new_state_values[i, j] = np.max(values)
                    policy[i, j] = np.argmax(values)
                    #delta = max(delta, np.abs(old_state_values[i, j] - new_state_values[i, j]))
            delta = np.abs(old_state_values - new_state_values).max()        
            print(f'Difference: {delta}')
            if delta < self.theta:
                break
            iter_cnt += 1

        return new_state_values, policy, iter_cnt



    def policyEvaluation(self,policy,new_state_values):
        #new_state_values = np.zeros((self.grid_size, self.grid_size))
        iter_cnt = 0
        while True:
            delta = 0 
            state_values = new_state_values.copy()
            old_state_values = state_values.copy()
            for i in range(self.grid_size):
                for j in range(self.grid_size):
                    action = policy[i, j]
                    (next_i, next_j), reward = self.step([i, j], action)
                    value = self.action_prob * (reward + self.discount * state_values[next_i, next_j])
                    new_state_values[i, j] = value
                    delta = max(delta, np.abs(old_state_values[i, j] - new_state_values[i, j]))

            print(f'Difference: {delta}')
            if delta < self.theta:
                break
            iter_cnt += 1

        return new_state_values


    def policyImprovement(self, policy, values, actions):

        #expected_action_returns = np.zeros((self.grid_size, self.grid_size, np.size(actions)))
        policy_stable = True

        for i in range(self.grid_size):
            for j in range(self.grid_size):
                old_action = policy[i, j]
                act_cnt = 0
                expected_rewards = []
                for action in self.actions:
                    (next_i, next_j), reward = self.step([i, j], action)
                    expected_rewards.append(self.action_prob * (reward + self.discount * values[next_i, next_j]))
                #max_reward = np.max(expected_rewards)
                #new_action = np.random.choice(np.where(expected_rewards == max_reward)[0])
                new_action = np.argmax(expected_rewards)
                #print('new_action : ',new_action)
                #print('old_action : ',old_action)
                if old_action != new_action:
                    policy_stable = False
                policy[i, j] = new_action

        return policy, policy_stable



    def compPolicyIteration(self):
        iterations = 0
        total_start_time = time.time()

        while True:
            start_time = time.time()
            self.values = self.policyEvaluation(self.policy, self.values)
            elapsed_time = time.time() - start_time
            print(f'PE => Elapsed time {elapsed_time} seconds')
            start_time = time.time()

            self.policy, policy_stable = self.policyImprovement(self.policy,self.values, self.actions)
            elapsed_time = time.time() - start_time
            print(f'PI => Elapsed time {elapsed_time} seconds')

            if policy_stable:
                break

            iterations += 1

        total_elapsed_time = time.time() - total_start_time
        print(f'Optimal policy is reached after {iterations} iterations in {total_elapsed_time} seconds')
        return self.values, self.policy

Но обе мои реализации дают разные оптимальные политики и оптимальные значения. Я в точности следовал алгоритмам, приведенным в книге.

Результат с итерацией политики:

values :
[[ 0.         -0.33300781 -0.33300781 -0.33300781]
 [-0.33300781 -0.33300781 -0.33300781 -0.33300781]
 [-0.33300781 -0.33300781 -0.33300781 -0.33300781]
 [-0.33300781 -0.33300781 -0.33300781  0.        ]]
****************************************************************************************************
****************************************************************************************************
policy :
[[0 0 0 0]
 [1 0 0 0]
 [0 0 0 3]
 [0 0 2 0]]

Результат с итерацией значений:

values :
[[ 0.0 -1.0 -2.0 -3.0]
 [-1.0 -2.0 -3.0 -2.0]
 [-2.0 -3.0 -2.0 -1.0]
 [-3.0 -2.0 -1.0  0.0]]
****************************************************************************************************
****************************************************************************************************
[[0. 0. 0. 0.]
 [1. 0. 0. 3.]
 [1. 0. 2. 3.]
 [1. 2. 2. 0.]]

Кроме того, итерация значения сходится после 4 итераций, а итерация политики - после 2 итераций.

Где я неправ? Могут ли они дать разные оптимальные политики? Но я считаю, что то, что написано в книге, после третьей итерации, значения, которые мы получаем, являются оптимальными. Тогда должны быть некоторые проблемы с итерацией политики моего кода, которые я не могу увидеть. В принципе, как мне исправить политику?

1 ответ

Думаю, проблема в этих строках:

(1) value = self.action_prob * (reward + self.discount * state_values ​​[next_i, next_j])
(2) new_state_values ​​[i, j] = значение

Здесь вы напрямую присваиваете значение, которое получаете только от одного действия. Если вы посмотрите на уравнение ожидания Беллмана, в начале есть сумма для всех действий. Вы должны рассмотреть все действия из состояния, выполнить расчет (1) для всех возможных действий, просуммировать их и присвоить эту сумму в (2) новому значению для (i,j).

Другие вопросы по тегам