tensorflow 自定义环境的DQN代理问题

v1l68za4  于 2023-04-21  发布在  其他
关注(0)|答案(1)|浏览(156)

我尝试按照官方tensorflow文档中的1_dqn_tutorial实现我自己的DQN。我的observation_specs的形状是(20,20),action_specs是(3,)。我遇到的问题是,每当我尝试创建一个新的agent时,我都会得到以下错误:
ValueError:期望q_network发出具有内部dims(3,)的浮点Tensor;但看到网络输出规范:TensorSpec(shape=(20,3),dtype=tf.float32,name=None)调用可配置的'DqnAgent'(〈class 'tf_agents.agents.dqn.dqn_agent. DqnAgent'〉)
我的自定义环境:

class LearningEnvironment(py_environment.PyEnvironment):

def __init__(self):
    super().__init__()
    self._action_spec = array_spec.BoundedArraySpec(shape=(), dtype=np.int32, minimum=0, maximum=2, name='action')
    self._observation_spec = array_spec.BoundedArraySpec(shape=(BOARD_SIZE, BOARD_SIZE), dtype=np.int32, minimum=0,
                                                         maximum=3, name='observation')

    self._board = np.zeros((BOARD_SIZE, BOARD_SIZE), dtype=np.int32)
    self._snake_length = 1
    self._episode_ended = False

def observation_spec(self):
    return self._observation_spec

def action_spec(self):
    return self._action_spec

def _step(self, action):

    if self._episode_ended:
        return self.reset()

    if action == 0:
        res = main.next_tick(pyglet.window.key.A)
        if res is None:
            self._episode_ended = True
        else:
            self._board = np.array(res[0], dtype=np.int32)
            self._snake_length = res[1]
    elif action == 1:
        res = main.next_tick(pyglet.window.key.D)
        if res is None:
            self._episode_ended = True
        else:
            self._board = np.array(res[0], dtype=np.int32)
            self._snake_length = res[1]
    elif action == 2:
        res = main.next_tick(0)
        if res is None:
            self._episode_ended = True
        else:
            self._board = np.array(res[0], dtype=np.int32)
            self._snake_length = res[1]
    else:
        raise ValueError('action should be 0, 1 or 2')

    if self._episode_ended:
        reward = self._snake_length
        return time_step.termination(self._board, reward)
    else:
        return time_step.transition(self._board, reward=0.0, discount=1.0)

def _reset(self):
    self._snake_length = 1
    self._episode_ended = False
    return time_step.restart(self._board)

我的DQN尝试:

fc_layer_params = (100, 50)
action_tensor_spec = tensor_spec.from_spec(env.action_spec())
num_actions = action_tensor_spec.maximum - action_tensor_spec.minimum + 1

def dense_layer(num_inputs):
    return tf.keras.layers.Dense(
        num_inputs,
        activation=tf.keras.activations.relu,
        kernel_initializer=tf.keras.initializers.VarianceScaling(
            scale=2.0, mode='fan_in', distribution='truncated_normal'))

dense_layers = [dense_layer(num_units) for num_units in fc_layer_params]
q_values_layer = tf.keras.layers.Dense(
    num_actions,
    activation=None,
    kernel_initializer=tf.keras.initializers.RandomUniform(
        minval=-0.03, maxval=0.03),
    bias_initializer=tf.keras.initializers.Constant(-0.2))

q_net = sequential.Sequential(dense_layers + [q_values_layer])

optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

train_step_counter = tf.Variable(0)

#print(env.action_spec().shape)

agent = dqn_agent.DqnAgent(
    train_env.time_step_spec(),
    train_env.action_spec(),
    q_network=q_net,
    optimizer=optimizer,
    td_errors_loss_fn=common.element_wise_squared_loss,
    train_step_counter=train_step_counter)

agent.initialize()
iklwldmw

iklwldmw1#

一个可能的解决方案是扩展QNetwork类。_observation_spec仍然需要在env中是一个平面向量(1D),但在CustomQNetwork中重塑为矩阵(2D),然后配置网络接受多个输入并连接输出。
这是一个非常高级的代码示例,

class CustomQNetwork(q_network.QNetwork):
  def __init__(self, input_tensor_spec, action_spec, symbol_layer_params=None, 
               combined_layer_params=None, activation_fn=tf.keras.activations.relu, 
               kernel_initializer=None, batch_squash=True, dtype=tf.float32, name='CustomQNetwork'):
    super(CustomQNetwork, self).__init__(
          input_tensor_spec=input_tensor_spec,
          action_spec=action_spec,
          activation_fn=activation_fn,
          kernel_initializer=kernel_initializer,
          batch_squash=batch_squash,
          dtype=dtype,
          name=name)

    self.shape = (BOARD_SIZE, BOARD_SIZE)

    # ... and initial all needed layers

  def call(self, observation, step_type=None, network_state=(), training=False):
    symbol_outputs = []

    observation = observation.reshape(self.shape)

    for i in range(observation.shape[-1]):
      symbol_input = observation[:, :, i]
      symbol_output = self.pre_combined_block[i](symbol_input, training=training) # this the pre combine dense_layers
      symbol_outputs.append(symbol_output)

    state = tf.concat(symbol_outputs, axis=-1)

    q_value_logits, _ = self.post_combined_block(state) # this the post combined dense_layers 
    q_value_logits = tf.squeeze(q_value_logits, axis=-2)

    return q_value_logits, network_state

当创建dqn_agent.DqnAgent的示例时,q_network,

q_network = CustomQNetwork(
  train_env.observation_spec(),
  train_env.action_spec())

相关问题