💻 Tutorial paso a paso

Entrenando un agente de Deep Reinforcement Learning

Construiremos un agente DQN desde cero con PyTorch que aprende a jugar a Pong. Cubriremos la arquitectura de la red, el replay buffer, la política epsilon-greedy, la red target, el training loop y la evaluación. Cada línea de código está explicada.

⏱️ ~90 min 📊 Nivel: intermedio
PyTorch · DQN · Gymnasium · Pong · Replay Buffer · Target Network

Requisitos previos

  • Python 3.9+ y PyTorch 2.x instalados
  • Conceptos básicos de redes neuronales (forward pass, loss, backpropagation)
  • Haber leído la teoría de Aprendizaje por Refuerzo (MDP, Q-value, Bellman)
  • Familiaridad con PyTorch (tutorial de PyTorch)
  • Opcional: GPU con CUDA (acelera el entrenamiento, pero CPU funciona para Pong)
1

Visión general: RL y DQN

En aprendizaje supervisado el modelo aprende de pares (entrada, etiqueta). En Reinforcement Learning no hay etiquetas: un agente interactúa con un entorno, recibe recompensas y aprende qué acciones maximizan la recompensa acumulada a largo plazo.

Agente (Red DQN) Entorno (Pong) acción aₜ sₜ₊₁ , rₜ

El ciclo es simple: en cada step el agente observa el estado sₜ, escoge una acción aₜ, el entorno le devuelve el nuevo estado sₜ₊₁ y una recompensa rₜ. El objetivo del agente es aprender la función de valor Q(s, a), que estima la recompensa futura descontada partiendo de (s, a):

Q(s, a) = rₜ + γ · maxa' Q(sₜ₊₁, a')

Esta es la ecuación de Bellman — la base de todo lo que haremos. Si tenemos un buen Q, la política óptima es trivial: en cada estado, elige la acción con mayor Q.

¿Qué es DQN?

Deep Q-Network (DQN) fue introducida por DeepMind en 2015 (Mnih et al., Nature) y demostró que una red neuronal puede aproximar Q(s,a) directamente desde los píxeles de un videojuego Atari, alcanzando nivel sobrehumano en varios juegos.

Las dos innovaciones clave que estabilizaron el entrenamiento fueron:

  • Experience Replay Buffer — almacenar transiciones y muestrearlas de forma aleatoria en mini-batches, rompiendo las correlaciones temporales.
  • Target Network — una copia retrasada de la red Q que se usa para calcular el target de Bellman, reduciendo la inestabilidad.

Qué vamos a construir

En este tutorial implementaremos DQN desde cero con PyTorch y lo entrenaremos para jugar a Pong (Atari) usando Gymnasium (antes OpenAI Gym). Al terminar tendrás:

¿Por qué Pong? — Es uno de los entornos más sencillos de Atari: el agente tiene solo 3 acciones útiles (subir, bajar, quieto), el espacio de estados es visual pero relativamente simple, y un DQN básico puede resolverlo en ~500 episodios. Perfecto para aprender sin necesitar un clúster de GPUs.

En Q-learning tabular mantenemos una tabla con una entrada para cada par (estado, acción). Esto funciona bien cuando el espacio de estados es pequeño y discreto (como un tablero 4×4).

Pero en Pong, un solo frame de 210×160 píxeles con 3 canales de color supone ~100 000 dimensiones. El número de estados posibles es astronómico — una tabla no cabe ni tiene sentido. La red neuronal generaliza: al entrenar sobre un subconjunto de estados, extrapola valores Q sensatos para estados nunca vistos.

2

Setup: entorno, imports y configuración

Antes de escribir código, necesitamos instalar las dependencias e importar los módulos. Usaremos Gymnasium (el sucesor oficial de OpenAI Gym) para los entornos Atari y PyTorch como framework de deep learning.

Instalación

bash
# Crear un entorno virtual (recomendado)
python -m venv dqn_env
source dqn_env/bin/activate   # Linux/Mac
# dqn_env\Scripts\activate    # Windows

# Instalar dependencias
pip install torch torchvision
pip install "gymnasium[atari,accept-rom-license]"
pip install ale-py
pip install matplotlib numpy
Nota sobre ROMs: Gymnasium necesita las ROMs de Atari. El flag accept-rom-license las descarga automáticamente. Si tienes problemas, consulta la documentación oficial.

Imports

python
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
import collections
import matplotlib.pyplot as plt
from collections import deque
L1 gymnasium — API unificada para entornos RL. Usaremos PongNoFrameskip-v4.
L3-4 torch y nn — para la red neuronal DQN y el optimizador.
L6-7 collections.deque — estructura ideal para el replay buffer circular (O(1) en push/pop).

Hiperparámetros

Centralizar los hiperparámetros al inicio facilita la experimentación. Estos son los valores que funcionan bien para Pong con DQN:

python
# ── Hiperparámetros ──────────────────────────────────────
BATCH_SIZE       = 32        # Tamaño del mini-batch
GAMMA            = 0.99      # Factor de descuento
LR               = 1e-4      # Learning rate del optimizador
BUFFER_SIZE      = 100_000   # Capacidad del replay buffer
MIN_BUFFER       = 10_000    # Transiciones antes de empezar a entrenar
EPSILON_START    = 1.0       # Epsilon inicial (100% exploración)
EPSILON_END      = 0.02      # Epsilon mínimo
EPSILON_DECAY    = 100_000   # Steps para decaer de start a end
TARGET_UPDATE    = 1_000     # Cada cuántos steps sincronizar target net
TAU              = 0.005     # Coeficiente para soft update (alternativa)
NUM_EPISODES     = 800       # Episodios de entrenamiento
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Usando dispositivo: {DEVICE}")
L3 GAMMA = 0.99 — el factor de descuento. Cuanto más cercano a 1, más peso le da el agente a recompensas futuras.
L5 BUFFER_SIZE — 100k transiciones es suficiente para Pong. Juegos más complejos pueden necesitar 1M.
L6 MIN_BUFFER — no empezamos a entrenar hasta tener 10k transiciones. Esto evita overfitting a las primeras experiencias.
L9 TARGET_UPDATE — cada 1000 steps copiamos los pesos a la red target (hard update). También implementaremos soft update con TAU.

No existe una fórmula mágica, pero hay buenas heurísticas:

  • LR (1e-4): Un valor estándar para Adam con DQN. Si el entrenamiento es inestable, prueba 5e-5. Si es demasiado lento, prueba 3e-4.
  • GAMMA (0.99): Para la mayoría de juegos, 0.99 funciona bien. En entornos con horizonte corto, prueba 0.95.
  • BATCH_SIZE (32): El paper original usa 32. Valores de 32-128 son habituales.
  • BUFFER_SIZE: Debe ser grande (100k-1M) para diversidad. Demasiado pequeño = el agente olvida experiencias útiles.
  • EPSILON_DECAY: ~100k steps da suficiente exploración inicial. En entornos más complejos se puede aumentar a 500k-1M.

Un buen punto de partida: empieza con los valores del paper original y ajusta uno por uno. Nunca cambies varios a la vez — no sabrás cuál fue el responsable de la mejora/empeoramiento.

Semilla para reproducibilidad

python
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)
⚠️ Atención: Los entornos Atari tienen aleatoriedad inherente (frameskip, resets). La reproducibilidad perfecta frame-a-frame no es posible, pero fijar las semillas asegura que los pesos iniciales y el muestreo del buffer sean consistentes.
3

El entorno: Gymnasium y preprocesamiento

Pong es un juego de Atari 2600 donde controlas una pala y compites contra la IA del juego. Cada punto ganado da +1, cada punto perdido da −1. El primer jugador en llegar a 21 puntos gana. Un agente que aún no ha aprendido obtiene una recompensa total de aproximadamente −21 (pierde todos los puntos).

Creando el entorno base

python
# Crear entorno sin frameskip (lo manejaremos nosotros)
env = gym.make("PongNoFrameskip-v4", render_mode="rgb_array")

print(f"Espacio de observación: {env.observation_space.shape}")  # (210, 160, 3)
print(f"Espacio de acciones:    {env.action_space.n}")           # 6 acciones
Salida esperada Espacio de observación: (210, 160, 3) Espacio de acciones: 6

El entorno devuelve frames RGB de 210×160×3 y acepta 6 acciones discretas (NOOP, FIRE, RIGHT, LEFT, RIGHTFIRE, LEFTFIRE). Para Pong solo 3 son útiles: NOOP (quieto), RIGHT (subir) y LEFT (bajar).

Wrappers de preprocesamiento

Los frames crudos son demasiado grandes y redundantes. Aplicamos una cadena de wrappers estándar de la literatura DQN:

Frame RGB 210×160×3 Grayscale 210×160 Resize 84×84 Frame Stack 4×84×84 Input DQN 4×84×84
python
import cv2

class PreprocessFrame(gym.ObservationWrapper):
    """Convierte a escala de grises, recorta y redimensiona a 84×84."""

    def __init__(self, env):
        super().__init__(env)
        self.observation_space = gym.spaces.Box(
            low=0, high=255, shape=(84, 84, 1), dtype=np.uint8
        )

    def observation(self, obs):
        # Escala de grises
        gray = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
        # Recortar zona de juego (eliminar marcador superior)
        cropped = gray[34:194, :]   # 160×160
        # Redimensionar a 84×84
        resized = cv2.resize(cropped, (84, 84), interpolation=cv2.INTER_AREA)
        return resized.reshape(84, 84, 1)


class FrameStack(gym.Wrapper):
    """Apila los últimos `k` frames como canales (captura movimiento)."""

    def __init__(self, env, k=4):
        super().__init__(env)
        self.k = k
        self.frames = deque(maxlen=k)
        shp = env.observation_space.shape
        self.observation_space = gym.spaces.Box(
            low=0, high=255,
            shape=(k, shp[0], shp[1]),
            dtype=np.uint8
        )

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        for _ in range(self.k):
            self.frames.append(obs.squeeze(-1))
        return np.array(self.frames), info

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        self.frames.append(obs.squeeze(-1))
        return np.array(self.frames), reward, terminated, truncated, info


class MaxAndSkip(gym.Wrapper):
    """Repite cada acción `skip` frames y devuelve el máximo de los
    últimos 2 frames (elimina parpadeo de sprites Atari)."""

    def __init__(self, env, skip=4):
        super().__init__(env)
        self.skip = skip
        self._obs_buffer = deque(maxlen=2)

    def step(self, action):
        total_reward = 0.0
        terminated = truncated = False
        for _ in range(self.skip):
            obs, reward, terminated, truncated, info = self.env.step(action)
            self._obs_buffer.append(obs)
            total_reward += reward
            if terminated or truncated:
                break
        max_frame = np.max(np.stack(self._obs_buffer), axis=0)
        return max_frame, total_reward, terminated, truncated, info


class ScaleObservation(gym.ObservationWrapper):
    """Normaliza píxeles de [0, 255] a [0.0, 1.0]."""

    def observation(self, obs):
        return np.array(obs, dtype=np.float32) / 255.0
L6-19 PreprocessFrame — Convierte a gris, recorta el marcador y redimensiona. Pasamos de ~100k píxeles a 7056 (84²).
L22-43 FrameStack — Apilamos 4 frames. Sin esto, la red no puede distinguir la dirección ni la velocidad de la pelota (un frame estático no tiene información de movimiento).
L46-60 MaxAndSkip — Repite cada acción 4 veces (reduce la frecuencia de decisiones) y toma el máximo de los últimos 2 frames (elimina parpadeo típico de Atari).
L63-66 ScaleObservation — Normalizar a [0,1] mejora la estabilidad numérica del entrenamiento.

Función para crear el entorno completo

python
def make_env(env_name="PongNoFrameskip-v4", render_mode=None):
    """Crea el entorno con toda la cadena de preprocesamiento."""
    env = gym.make(env_name, render_mode=render_mode)
    env = MaxAndSkip(env, skip=4)
    env = PreprocessFrame(env)
    env = FrameStack(env, k=4)
    env = ScaleObservation(env)
    return env

# Crear el entorno de entrenamiento
env = make_env()
obs, info = env.reset(seed=SEED)
print(f"Observación preprocesada: shape={obs.shape}, dtype={obs.dtype}")
print(f"Rango de valores: [{obs.min():.2f}, {obs.max():.2f}]")
Salida esperada Observación preprocesada: shape=(4, 84, 84), dtype=float32 Rango de valores: [0.00, 1.00]

84×84 es el tamaño que usó el paper original de DQN y se ha convertido en el estándar de facto. Es un buen equilibrio entre:

  • Información visual suficiente — la pelota y las palas son claramente visibles.
  • Eficiencia computacional — el tensor 4×84×84 cabe cómodamente en memoria y la CNN procesa rápido.
  • Comparabilidad — usar el mismo tamaño permite comparar resultados con la literatura.

Podrías usar 64×64 (más rápido, menor resolución) o 96×96 (más detalle, más lento). Para Pong la diferencia es mínima.

Todo el código de este tutorial funciona con cualquier entorno Atari de Gymnasium. Algunos populares:

EntornoDificultadEpisodios DQN
PongNoFrameskip-v4⭐ Fácil~500
BreakoutNoFrameskip-v4⭐⭐ Media~2000
SpaceInvadersNoFrameskip-v4⭐⭐ Media~3000
SeaquestNoFrameskip-v4⭐⭐⭐ Difícil~5000+
MontezumaRevengeNoFrameskip-v4⭐⭐⭐⭐ ExtremaDQN no resuelve

Para probar rápido, CartPole-v1 entrena en segundos (pero no necesita preprocesamiento visual, así que el wrapper chain cambia).

💡 Tip: Si tu máquina no tiene GPU, puedes usar CartPole-v1 sin wrappers visuales para verificar que todo funciona (entrena en 2 minutos en CPU). Más adelante daremos una versión adaptada.
4

La red neuronal: arquitectura DQN

La red DQN toma como entrada 4 frames apilados (4×84×84) y produce como salida un Q-value por acción. El diseño sigue el paper original de DeepMind: tres capas convolucionales seguidas de dos capas fully-connected.

Input 4×84×84 Conv2d 32×8×8, s=4 + ReLU Conv2d 64×4×4, s=2 + ReLU Conv2d 64×3×3, s=1 + ReLU Linear 3136 → 512 + ReLU Linear 512 → n_actions Frames Feature extraction Q(s,a) para cada acción
python
class DQN(nn.Module):
    """
    Red Deep Q-Network.
    Entrada: tensor (batch, 4, 84, 84) — 4 frames en escala de grises.
    Salida:  tensor (batch, n_actions)  — Q-value para cada acción.
    """

    def __init__(self, n_actions):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(4, 32, kernel_size=8, stride=4),  # → (32, 20, 20)
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2), # → (64, 9, 9)
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1), # → (64, 7, 7)
            nn.ReLU(),
        )
        # Tras las convoluciones: 64 × 7 × 7 = 3136
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(64 * 7 * 7, 512),
            nn.ReLU(),
            nn.Linear(512, n_actions),
        )

    def forward(self, x):
        features = self.conv(x)
        return self.fc(features)
L11 Conv2d(4, 32, 8, stride=4) — Primer filtro grande (8×8) con stride 4: captura patrones gruesos y reduce la resolución rápidamente de 84 a 20.
L13 Conv2d(32, 64, 4, stride=2) — Filtros medios (4×4): captura bordes, posiciones de la pelota y las palas. Output 9×9.
L15 Conv2d(64, 64, 3, stride=1) — Filtros finos (3×3) sin stride: extrae relaciones espaciales detalladas. Output 7×7.
L21 Linear(3136, 512) — Capa densa que mapea los features espaciales a una representación compacta.
L23 Linear(512, n_actions) — Capa final sin activación: produce los Q-values «crudos». Q puede ser positivo o negativo, por eso no usamos ReLU aquí.

Instanciando la red

python
n_actions = env.action_space.n

# Red principal (la que se entrena)
policy_net = DQN(n_actions).to(DEVICE)

# Red target (copia para estabilidad — la veremos en el Paso 8)
target_net = DQN(n_actions).to(DEVICE)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()  # No se entrena directamente

# Optimizador
optimizer = optim.Adam(policy_net.parameters(), lr=LR)

# Función de pérdida: Huber Loss (más robusta que MSE ante outliers)
loss_fn = nn.SmoothL1Loss()

# Contar parámetros
total_params = sum(p.numel() for p in policy_net.parameters())
print(f"Parámetros totales: {total_params:,}")
Salida esperada Parámetros totales: 1,686,534
¿Por qué Huber Loss? — La pérdida MSE puede producir gradientes muy grandes cuando el error de predicción es alto (especialmente al inicio del entrenamiento). La Huber Loss (alias SmoothL1Loss) se comporta como MSE para errores pequeños y como L1 para errores grandes, recortando gradientes extremos. Esto estabiliza el entrenamiento notablemente.

La arquitectura «vanilla» que usamos es la más simple. Existen variantes populares:

  • Dueling DQN (Wang et al., 2016): Separa la capa final en dos ramas — V(s) (valor del estado) y A(s,a) (ventaja de cada acción). Luego: Q(s,a) = V(s) + A(s,a) - mean(A). Esto permite que la red aprenda qué estados son valiosos sin necesitar evaluar cada acción.
  • Noisy Networks (Fortunato et al., 2018): Añade ruido aprendible a las capas lineales. Sustituye la exploración epsilon-greedy por exploración paramétrica — la red aprende cuánta exploración necesita.
  • Distributional DQN (C51, Bellemare et al., 2017): En lugar de estimar el valor esperado Q, modela la distribución completa de retornos. Más informativo y estable.

Para un primer tutorial, la versión vanilla es más que suficiente y resuelve Pong sin problemas.

Uno de los errores más frecuentes es calcular mal el tamaño tras las convoluciones. Si cambias el tamaño del frame (e.g., 64×64 en vez de 84×84), el Linear(64*7*7, 512) fallará con un error de dimensiones.

Puedes calcular el tamaño automáticamente con un forward pass de prueba:

python
# Truco: calcular tamaño post-convolución automáticamente
dummy = torch.zeros(1, 4, 84, 84)
with torch.no_grad():
    out = policy_net.conv(dummy)
print(f"Shape post-conv: {out.shape}")  # → (1, 64, 7, 7)
print(f"Flatten size:    {out.numel()}")  # → 3136
5

Replay Buffer: experiencias pasadas

Sin un replay buffer, la red se entrenaría con las transiciones en el orden en que ocurren. Esto causa dos problemas graves:

  • Correlación temporal — estados consecutivos son casi idénticos. El mini-batch tendría datos casi iguales → gradientes sesgados.
  • Olvido catastrófico — al aprender solo de la experiencia actual, la red olvida lo que aprendió de situaciones anteriores.

El Experience Replay Buffer resuelve ambos problemas almacenando transiciones (s, a, r, s', done) y muestreando mini-batches aleatorios para el entrenamiento.

t=0 (s,a,r,s') t=1 (s,a,r,s') t=2 (s,a,r,s') ··· t=N-1 (s,a,r,s') nuevo ← push sample(batch) Mini-batch aleatorio (32 transiciones) → calcular loss → backprop → actualizar pesos
python
class ReplayBuffer:
    """
    Buffer circular de experiencias para Experience Replay.
    Almacena transiciones (state, action, reward, next_state, done)
    y permite muestreo aleatorio de mini-batches.
    """

    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        """Almacena una transición."""
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        """Muestrea un mini-batch aleatorio y lo convierte a tensores."""
        batch = random.sample(self.buffer, batch_size)

        states, actions, rewards, next_states, dones = zip(*batch)

        states      = torch.FloatTensor(np.array(states)).to(DEVICE)
        actions     = torch.LongTensor(actions).to(DEVICE)
        rewards     = torch.FloatTensor(rewards).to(DEVICE)
        next_states = torch.FloatTensor(np.array(next_states)).to(DEVICE)
        dones       = torch.FloatTensor(dones).to(DEVICE)

        return states, actions, rewards, next_states, dones

    def __len__(self):
        return len(self.buffer)
L9 deque(maxlen=capacity) — Cuando está lleno, los elementos más antiguos se eliminan automáticamente. Operaciones O(1).
L17 random.sample — Muestreo uniforme sin reemplazo. Cada transición tiene la misma probabilidad de ser seleccionada.
L21-25 Convertimos a tensores de PyTorch y los movemos al dispositivo (CPU/GPU). np.array() primero para evitar copias lentas elemento a elemento.

Crear el buffer

python
buffer = ReplayBuffer(BUFFER_SIZE)
print(f"Buffer creado con capacidad: {BUFFER_SIZE:,}")
Memoria: Cada transición almacena dos estados de 4×84×84 floats. Con 100k transiciones: ~100k × 2 × 4 × 84 × 84 × 4 bytes ≈ ~21 GB si almacenamos floats. Para Pong esto es excesivo. Una optimización común es almacenar los estados como uint8 (1 byte) y convertir a float solo en sample(). Nuestro buffer simple funciona para fines educativos, pero ten esto en cuenta para proyectos reales.

En nuestro buffer muestreamos uniformemente — todas las transiciones tienen la misma probabilidad. Pero no todas son igual de útiles: las transiciones donde el agente cometió un gran error (alto TD-error) contienen más información.

Prioritized Experience Replay (Schaul et al., 2016) asigna probabilidades de muestreo proporcionales al TD-error:

P(i) ∝ |δᵢ|α + ε

Donde δᵢ es el error TD de la transición i, α controla cuánta priorización (0 = uniforme, 1 = total), y ε es un valor pequeño para que ninguna transición tenga probabilidad cero.

La contrapartida: requiere pesos de importancia (importance sampling weights) para corregir el sesgo del muestreo no uniforme. Es más complejo de implementar pero puede acelerar el aprendizaje 2-3×.

Para proyectos reales con buffers grandes (>100k), esta versión es mucho más eficiente:

python
class EfficientReplayBuffer:
    """Buffer que almacena frames como uint8 para ahorrar 4× memoria."""

    def __init__(self, capacity, obs_shape):
        self.capacity = capacity
        self.idx = 0
        self.size = 0
        # Almacenar como uint8 (0-255) en lugar de float32
        self.states      = np.zeros((capacity, *obs_shape), dtype=np.uint8)
        self.next_states = np.zeros((capacity, *obs_shape), dtype=np.uint8)
        self.actions     = np.zeros(capacity, dtype=np.int64)
        self.rewards     = np.zeros(capacity, dtype=np.float32)
        self.dones       = np.zeros(capacity, dtype=np.float32)

    def push(self, state, action, reward, next_state, done):
        # Convertir float [0,1] a uint8 [0,255] para almacenar
        self.states[self.idx]      = (state * 255).astype(np.uint8)
        self.next_states[self.idx] = (next_state * 255).astype(np.uint8)
        self.actions[self.idx]     = action
        self.rewards[self.idx]     = reward
        self.dones[self.idx]       = float(done)
        self.idx = (self.idx + 1) % self.capacity
        self.size = min(self.size + 1, self.capacity)

    def sample(self, batch_size):
        indices = np.random.choice(self.size, batch_size, replace=False)
        # Convertir de vuelta a float32 al muestrear
        return (
            torch.FloatTensor(self.states[indices] / 255.0).to(DEVICE),
            torch.LongTensor(self.actions[indices]).to(DEVICE),
            torch.FloatTensor(self.rewards[indices]).to(DEVICE),
            torch.FloatTensor(self.next_states[indices] / 255.0).to(DEVICE),
            torch.FloatTensor(self.dones[indices]).to(DEVICE),
        )

Esta versión usa ~5.3 GB para 100k transiciones de 4×84×84 vs ~21 GB de la versión con float32. La conversión a float ocurre solo para el mini-batch (32 muestras), que es despreciable.

6

Política epsilon-greedy: explorar vs explotar

El dilema exploración-explotación es central en RL: ¿debería el agente repetir acciones que ya sabe que funcionan (explotación) o probar acciones nuevas para descubrir algo mejor (exploración)?

La solución más simple y efectiva es la política ε-greedy: con probabilidad ε el agente escoge una acción aleatoria, y con probabilidad 1 − ε escoge la acción con mayor Q-value. Empezamos con ε alto (mucha exploración) y lo decaemos gradualmente.

Steps ε 1.0 0.0 0.02 Mucha exploración Máxima explotación 0 100k
python
def select_action(state, step_count):
    """
    Política epsilon-greedy con decaimiento lineal.

    Args:
        state:      observación actual (numpy array 4×84×84)
        step_count: número total de steps de entrenamiento

    Returns:
        acción (int)
    """
    # Calcular epsilon actual (decaimiento lineal)
    epsilon = max(
        EPSILON_END,
        EPSILON_START - step_count / EPSILON_DECAY
    )

    if random.random() < epsilon:
        # Exploración: acción aleatoria
        return env.action_space.sample(), epsilon
    else:
        # Explotación: acción con mayor Q-value
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(DEVICE)
        with torch.no_grad():
            q_values = policy_net(state_tensor)
        return q_values.argmax(dim=1).item(), epsilon
L13-16 Decaimiento lineal: ε baja de 1.0 a 0.02 en 100k steps. Es la forma más simple y funcional. Alternativas: decaimiento exponencial, cosine annealing.
L18-20 Exploración: env.action_space.sample() devuelve una acción uniformemente aleatoria entre 0 y n_actions-1.
L23-26 Explotación: Hacemos forward pass por la red y elegimos argmax. Usamos torch.no_grad() porque no necesitamos gradientes aquí — solo inferencia.

¿Por qué el decaimiento importa tanto?

ProblemaCausaSíntoma
ε demasiado alto siempre El agente sigue explorando y nunca explota lo aprendido Reward estancado en valores bajos
ε decae demasiado rápido El agente deja de explorar antes de descubrir buenas estrategias Converge a una política subóptima
ε final = 0 Sin exploración residual, el agente nunca corrige errores Performance inestable, posible divergencia
ε final = 0.02, no 0: Mantener un 2% de exploración residual asegura que el agente siga descubriendo nuevas situaciones durante todo el entrenamiento. Es un truco simple que previene que la política se «encierre» en un comportamiento subóptimo.

Existen políticas de exploración más sofisticadas:

  • Boltzmann (Softmax): Selecciona acciones con probabilidad proporcional a exp(Q(s,a)/τ) donde τ es una «temperatura». Asigna más probabilidad a acciones con mayor Q, en lugar del todo-o-nada de ε-greedy.
  • Upper Confidence Bound (UCB): Añade un bonus de exploración a las acciones menos visitadas: Q(s,a) + c·√(ln(N)/n(a)). Exploración más dirigida pero difícil de aplicar con estados continuos.
  • Noisy Networks: Ruido paramétrico en los pesos de la red. La red «aprende» cuándo explorar — elimina la necesidad de un schedule de ε.
  • Curiosity-driven: Recompensa intrínseca basada en la novedad del estado. Útil para entornos con recompensas sparse.

Para DQN clásico, ε-greedy con decaimiento lineal es suficiente y es lo que usan la mayoría de implementaciones de referencia.

7

Training loop: el algoritmo DQN completo

Ahora tenemos todas las piezas: la red Q, el replay buffer y la política ε-greedy. Falta la pieza central: el paso de optimización que actualiza los pesos de la red.

La función de optimización

En cada paso de entrenamiento, muestreamos un mini-batch del buffer y calculamos la pérdida usando la ecuación de Bellman:

Loss = Huber( Q(s, a) − [r + γ · maxa' Qtarget(s', a') · (1 − done)] )
Replay Buffer sample(32) Mini-batch (s, a, r, s', done) s s' Policy Net Q(s, a) Target Net max Q(s', a') r + γ·max·(1-done) Huber Loss ← backprop
python
def optimize_model():
    """
    Un paso de optimización DQN:
    1. Muestrear mini-batch del buffer
    2. Calcular Q(s, a) con la policy net
    3. Calcular el target: r + γ · max Q_target(s', a') · (1 - done)
    4. Calcular la pérdida (Huber) y hacer backprop
    """
    if len(buffer) < MIN_BUFFER:
        return None  # No entrenar hasta tener suficientes experiencias

    # 1. Muestrear mini-batch
    states, actions, rewards, next_states, dones = buffer.sample(BATCH_SIZE)

    # 2. Q(s, a) — seleccionamos solo el Q-value de la acción tomada
    #    policy_net(states) → shape (batch, n_actions)
    #    .gather(1, ...) selecciona el Q de cada acción
    q_values = policy_net(states).gather(1, actions.unsqueeze(1)).squeeze(1)

    # 3. Target: r + γ · max_a' Q_target(s', a') · (1 - done)
    with torch.no_grad():
        # max Q-value del siguiente estado, según la target net
        next_q_values = target_net(next_states).max(dim=1).values
        targets = rewards + GAMMA * next_q_values * (1 - dones)

    # 4. Calcular pérdida y actualizar pesos
    loss = loss_fn(q_values, targets)

    optimizer.zero_grad()
    loss.backward()
    # Gradient clipping para estabilidad
    torch.nn.utils.clip_grad_norm_(policy_net.parameters(), max_norm=10.0)
    optimizer.step()

    return loss.item()
L17 .gather(1, actions) — Selecciona el Q-value correspondiente a la acción que realmente se tomó. Es como indexar: Q_values[i, action[i]] para cada elemento del batch.
L21 torch.no_grad() — El target de Bellman no debe generar gradientes. Solo la policy net se actualiza por backprop.
L23 .max(dim=1).values — Para cada estado s' del batch, toma el máximo Q-value entre todas las acciones posibles.
L24 (1 - dones) — Si el episodio terminó, no hay futuro: el target es solo la recompensa. Multiplicar por 0 cuando done=True.
L30 clip_grad_norm_ — Limita la magnitud de los gradientes a 10. Previene actualizaciones explosivas al inicio del entrenamiento.

El bucle de entrenamiento completo

python
def train():
    """Bucle principal de entrenamiento DQN."""
    episode_rewards = []
    step_count = 0
    best_reward = -float('inf')

    for episode in range(NUM_EPISODES):
        state, info = env.reset()
        episode_reward = 0
        done = False

        while not done:
            # 1. Seleccionar acción con ε-greedy
            action, epsilon = select_action(state, step_count)

            # 2. Ejecutar acción en el entorno
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            # 3. Almacenar transición en el buffer
            buffer.push(state, action, reward, next_state, float(done))

            # 4. Optimizar la red (si hay suficientes datos)
            loss = optimize_model()

            # 5. Actualizar la target network (hard update)
            if step_count % TARGET_UPDATE == 0:
                target_net.load_state_dict(policy_net.state_dict())

            state = next_state
            episode_reward += reward
            step_count += 1

        episode_rewards.append(episode_reward)

        # ── Logging cada 10 episodios ─────────────────────────
        if (episode + 1) % 10 == 0:
            avg_reward = np.mean(episode_rewards[-10:])
            print(
                f"Ep {episode+1:4d} | "
                f"Reward: {episode_reward:6.1f} | "
                f"Avg(10): {avg_reward:6.1f} | "
                f"ε: {epsilon:.3f} | "
                f"Buffer: {len(buffer):,} | "
                f"Steps: {step_count:,}"
            )

            # Guardar mejor modelo
            if avg_reward > best_reward:
                best_reward = avg_reward
                torch.save(policy_net.state_dict(), "dqn_pong_best.pth")

    print(f"\n✅ Entrenamiento completado. Mejor reward promedio: {best_reward:.1f}")
    return episode_rewards
L14 Selección de acción: la función que implementamos en el Paso 6.
L17 Gymnasium devuelve terminated (fin natural) y truncated (timeout). Ambos terminan el episodio.
L27-28 Hard update: cada 1000 steps copiamos todos los pesos de policy_net a target_net. En el próximo paso veremos la alternativa «soft».
L48-49 Guardamos el modelo con mejor rendimiento promedio. Así, si el entrenamiento se degrada al final, tenemos la mejor versión.

¡Entrenar!

python
# Lanzar el entrenamiento
rewards_history = train()
Salida esperada (resumen) Ep 10 | Reward: -21.0 | Avg(10): -21.0 | ε: 0.997 | Buffer: 3,240 | Steps: 3,240 Ep 50 | Reward: -20.0 | Avg(10): -20.4 | ε: 0.840 | Buffer: 16,100 | Steps: 16,100 Ep 100 | Reward: -19.0 | Avg(10): -19.1 | ε: 0.670 | Buffer: 32,500 | Steps: 32,500 Ep 200 | Reward: -15.0 | Avg(10): -16.2 | ε: 0.340 | Buffer: 65,200 | Steps: 65,200 Ep 300 | Reward: -8.0 | Avg(10): -9.5 | ε: 0.040 | Buffer: 97,800 | Steps: 97,800 Ep 500 | Reward: 5.0 | Avg(10): 3.2 | ε: 0.020 | Buffer: 100,000 | Steps: 163,000 Ep 700 | Reward: 18.0 | Avg(10): 16.8 | ε: 0.020 | Buffer: 100,000 | Steps: 228,000 Ep 800 | Reward: 20.0 | Avg(10): 19.1 | ε: 0.020 | Buffer: 100,000 | Steps: 260,500 ✅ Entrenamiento completado. Mejor reward promedio: 19.3
⏱️ Tiempo estimado: En una GPU moderna (RTX 3060+), Pong converge en ~2-4 horas (800 episodios). En CPU, puede tardar 10-20 horas. Para probar rápido, usa CartPole-v1 (converge en 2 minutos en CPU).
SíntomaPosible causaSolución
Reward no sube de −21 LR demasiado bajo o buffer muy pequeño Aumentar LR a 2.5e-4; aumentar MIN_BUFFER
Loss se dispara a NaN LR demasiado alto o sin gradient clipping Reducir LR; añadir clip_grad_norm_
Reward sube y luego cae Target net se actualiza demasiado frecuente Aumentar TARGET_UPDATE o usar soft update (tau=0.005)
Entrenamiento muy lento Preprocesamiento en CPU Verificar que se está usando GPU (DEVICE = cuda)
El agente «se atasca» en una acción Epsilon decayó demasiado rápido Aumentar EPSILON_DECAY a 200k-500k
8

Target network y soft updates

Ya mencionamos la target network en el Paso 7. Ahora profundicemos en por qué es necesaria y en las dos formas de mantenerla sincronizada.

El problema de la inestabilidad

Imagina que solo tienes una red para estimar Q y para calcular el target de Bellman. En cada update:

  1. Usas la red para predecir Q(s, a)
  2. Usas la misma red para calcular max Q(s', a') (el target)
  3. Calculas la pérdida y ajustas los pesos

Pero al ajustar los pesos en el paso 3, el target del paso 2 también cambia. Es como intentar dar a una diana que se mueve cada vez que disparas. El resultado: el entrenamiento oscila y puede diverger.

❌ Sin target net Misma red Q(s,a) Y max Q(s',a') Target inestable 📉 ✅ Con target net Policy Q(s,a) Target max Q(s',a') Target estable ✅ sync periódico

Hard update vs Soft update

Hay dos estrategias para mantener la target network actualizada:

MétodoFórmulaCuándoPros/Contras
Hard update θ_target = θ_policy Cada N steps Simple, pero cambio abrupto del target
Soft update (Polyak) θ_target = τ·θ_policy + (1−τ)·θ_target Cada step Transición suave, más estable

El soft update (también llamado Polyak averaging) mezcla lentamente los pesos nuevos con los antiguos. Con τ = 0.005, en cada paso la target net incorpora un 0.5% de los pesos actualizados. El resultado es una evolución mucho más suave del target.

python
def soft_update(policy_net, target_net, tau=TAU):
    """
    Soft update (Polyak averaging) de la target network.
    θ_target = τ · θ_policy + (1 - τ) · θ_target
    """
    for target_param, policy_param in zip(
        target_net.parameters(), policy_net.parameters()
    ):
        target_param.data.copy_(
            tau * policy_param.data + (1.0 - tau) * target_param.data
        )
L6-8 zip(target, policy) — Iteramos sobre cada par de parámetros correspondientes de ambas redes.
L9-11 .data.copy_ — Modificamos los datos in-place, sin crear tensores nuevos ni afectar al grafo de gradientes.

Modificando el training loop

Para usar soft update en lugar de hard update, simplemente reemplazamos la sincronización periódica por un soft update en cada step:

python
# ── En el training loop, reemplazar: ─────────────────────
# ANTES (hard update):
# if step_count % TARGET_UPDATE == 0:
#     target_net.load_state_dict(policy_net.state_dict())

# DESPUÉS (soft update — en cada step):
# soft_update(policy_net, target_net, tau=TAU)

# ── Versión híbrida (recomendada para empezar): ─────────
# Usar hard update cada N steps + soft update continuo

def train_with_soft_update():
    """Training loop con soft update."""
    episode_rewards = []
    step_count = 0
    best_reward = -float('inf')

    for episode in range(NUM_EPISODES):
        state, info = env.reset()
        episode_reward = 0
        done = False

        while not done:
            action, epsilon = select_action(state, step_count)
            next_state, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated

            buffer.push(state, action, reward, next_state, float(done))
            loss = optimize_model()

            # ★ Soft update cada step en vez de hard update periódico
            soft_update(policy_net, target_net, tau=TAU)

            state = next_state
            episode_reward += reward
            step_count += 1

        episode_rewards.append(episode_reward)

        if (episode + 1) % 10 == 0:
            avg_reward = np.mean(episode_rewards[-10:])
            print(
                f"Ep {episode+1:4d} | "
                f"Reward: {episode_reward:6.1f} | "
                f"Avg(10): {avg_reward:6.1f} | "
                f"ε: {epsilon:.3f} | "
                f"Steps: {step_count:,}"
            )
            if avg_reward > best_reward:
                best_reward = avg_reward
                torch.save(policy_net.state_dict(), "dqn_pong_best.pth")

    return episode_rewards
¿Cuál elegir? — Ambos métodos funcionan para Pong. El hard update (paper original, 2015) es más simple y más fácil de debuggear. El soft update (popularizado por DDPG/TD3/SAC) produce un entrenamiento más suave y es el estándar en algoritmos modernos. Para tu primer DQN, empieza con hard update; experimenta con soft update después.
  • τ = 1.0 → hard update (copia directa). La target net se convierte inmediatamente en la policy net.
  • τ = 0.005 → el valor estándar. La target net tarda ~1000 steps en «absorber» un cambio significativo.
  • τ = 0.001 → más lento, más estable. Bueno para entornos complejos.
  • τ = 0.01 → más rápido, puede ser inestable. Bueno para entornos simples como CartPole.

Relación con hard update: si haces soft update cada step con τ = 0.005, y un hard update cada 1000 steps, el efecto acumulado es similar (1 − (1−0.005)1000 ≈ 99.3% de los pesos nuevos).

DQN estándar tiene un sesgo: max Q(s', a') tiende a sobreestimar los Q-values. ¿Por qué? Si la red tiene errores de estimación (inevitables), el operador max selecciona sistemáticamente la acción con mayor error positivo.

Double DQN (van Hasselt et al., 2016) desacopla la selección de acción de la evaluación:

python
# DQN estándar (sobreestima):
# target = r + γ · max_a' Q_target(s', a')

# Double DQN (más preciso):
# a* = argmax_a' Q_policy(s', a')   ← policy_net selecciona la acción
# target = r + γ · Q_target(s', a*) ← target_net la evalúa

with torch.no_grad():
    # Policy net elige la mejor acción
    best_actions = policy_net(next_states).argmax(dim=1)
    # Target net evalúa esa acción
    next_q = target_net(next_states).gather(
        1, best_actions.unsqueeze(1)
    ).squeeze(1)
    targets = rewards + GAMMA * next_q * (1 - dones)

En la práctica, Double DQN mejora el rendimiento especialmente en entornos donde las sobreestimaciones causan acciones subóptimas. Es un cambio de ~3 líneas en optimize_model().

9

Evaluación y visualización

El entrenamiento ha terminado. Ahora necesitamos dos cosas: evaluar el agente cuantitativamente y visualizar cómo juega. También graficaremos las curvas de aprendizaje para ver la evolución del entrenamiento.

Curvas de entrenamiento

python
def plot_training_curves(rewards, window=20):
    """Grafica la recompensa por episodio y su media móvil."""
    fig, ax = plt.subplots(1, 1, figsize=(10, 4))

    episodes = range(1, len(rewards) + 1)

    # Reward crudo (transparente)
    ax.plot(episodes, rewards, alpha=0.3, color='#00b894', linewidth=0.8,
            label='Reward por episodio')

    # Media móvil
    if len(rewards) >= window:
        moving_avg = np.convolve(
            rewards, np.ones(window) / window, mode='valid'
        )
        ax.plot(
            range(window, len(rewards) + 1), moving_avg,
            color='#00b894', linewidth=2,
            label=f'Media móvil ({window} episodios)'
        )

    # Líneas de referencia
    ax.axhline(y=0, color='#74b9ff', linestyle='--', alpha=0.5, label='Empate')
    ax.axhline(y=21, color='#55efc4', linestyle=':', alpha=0.5, label='Victoria perfecta')
    ax.axhline(y=-21, color='#ff7675', linestyle=':', alpha=0.5, label='Derrota total')

    ax.set_xlabel('Episodio')
    ax.set_ylabel('Reward total')
    ax.set_title('Curva de aprendizaje — DQN en Pong')
    ax.legend(loc='lower right', fontsize=8)
    ax.grid(True, alpha=0.2)
    fig.tight_layout()
    plt.savefig('training_curve.png', dpi=150, bbox_inches='tight')
    plt.show()

plot_training_curves(rewards_history)
¿Qué deberías ver? — La curva debería empezar en ~−21 (derrota total), mantenerse baja durante los primeros ~100-200 episodios (fase de exploración), y luego subir gradualmente hasta ~+18-21. La media móvil suaviza el ruido y muestra la tendencia real.

Evaluación cuantitativa

python
def evaluate(model, env_name="PongNoFrameskip-v4", n_episodes=10):
    """
    Evalúa el modelo entrenado sin exploración (ε=0).
    Devuelve la recompensa media y un resumen por episodio.
    """
    eval_env = make_env(env_name)
    model.eval()
    rewards = []

    for ep in range(n_episodes):
        state, _ = eval_env.reset()
        episode_reward = 0
        done = False

        while not done:
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(DEVICE)
            with torch.no_grad():
                q_values = model(state_tensor)
            action = q_values.argmax(dim=1).item()

            state, reward, terminated, truncated, _ = eval_env.step(action)
            done = terminated or truncated
            episode_reward += reward

        rewards.append(episode_reward)
        print(f"  Evaluación ep {ep+1}: reward = {episode_reward:.0f}")

    eval_env.close()

    mean_r = np.mean(rewards)
    std_r = np.std(rewards)
    print(f"\n📊 Resultado: {mean_r:.1f} ± {std_r:.1f} (sobre {n_episodes} episodios)")
    return rewards

# Cargar el mejor modelo y evaluar
policy_net.load_state_dict(torch.load("dqn_pong_best.pth", weights_only=True))
eval_rewards = evaluate(policy_net)
Salida esperada Evaluación ep 1: reward = 20 Evaluación ep 2: reward = 21 Evaluación ep 3: reward = 19 Evaluación ep 4: reward = 21 Evaluación ep 5: reward = 18 Evaluación ep 6: reward = 20 Evaluación ep 7: reward = 21 Evaluación ep 8: reward = 19 Evaluación ep 9: reward = 20 Evaluación ep 10: reward = 21 📊 Resultado: 20.0 ± 1.0 (sobre 10 episodios)

Visualizar al agente jugando

python
def record_game(model, env_name="PongNoFrameskip-v4", filename="pong_agent.mp4"):
    """Graba un episodio completo del agente jugando."""
    import imageio

    # Crear entorno con render_mode para capturar frames
    raw_env = gym.make(env_name, render_mode="rgb_array")
    eval_env = MaxAndSkip(raw_env, skip=4)
    eval_env = PreprocessFrame(eval_env)
    eval_env = FrameStack(eval_env, k=4)
    eval_env = ScaleObservation(eval_env)

    model.eval()
    state, _ = eval_env.reset()
    frames = []
    done = False

    while not done:
        # Capturar frame RGB del juego original
        frame = raw_env.render()
        frames.append(frame)

        # Decidir acción
        state_tensor = torch.FloatTensor(state).unsqueeze(0).to(DEVICE)
        with torch.no_grad():
            action = model(state_tensor).argmax(dim=1).item()

        state, _, terminated, truncated, _ = eval_env.step(action)
        done = terminated or truncated

    eval_env.close()

    # Guardar como video MP4
    imageio.mimsave(filename, frames, fps=30)
    print(f"🎬 Video guardado: {filename} ({len(frames)} frames)")

# Grabar un partido
# record_game(policy_net)  # Descomentar para ejecutar
Dependencia extra: Para grabar video necesitas pip install imageio imageio-ffmpeg. Si no quieres instalarlas, puedes usar render_mode="human" para ver el juego en ventana directamente (no graba video).

Widget interactivo: simulador de entrenamiento DQN

Este widget simula el proceso de entrenamiento para diferentes hiperparámetros. Ajusta los sliders y pulsa Simular para ver cómo cambian la curva de recompensa y el decaimiento de epsilon:

🎮 Simulador de entrenamiento DQN

1e-4
0.99
100
-
Reward actual
-
Epsilon
-
Episodio

Señales de que el entrenamiento va bien:

  • Reward sube gradualmente — señal principal de aprendizaje.
  • Varianza disminuye — el agente es más consistente.
  • Loss baja y se estabiliza — la red cada vez predice mejor los Q-values.

Señales de alerta:

  • Reward plano en −21 — el agente no está aprendiendo nada. Verifica la arquitectura, el LR y el preprocesamiento.
  • Reward sube y luego cae bruscamente — posible sobreajuste o inestabilidad de la target net.
  • Loss se dispara — gradientes explosivos. Reduce LR o añade gradient clipping.
  • Loss baja pero reward no sube — la red predice bien los Q incorrectos. Revisa el cálculo del target (¿olvidaste el factor done?).

Si no tienes GPU o quieres verificar rápidamente que el código funciona, CartPole-v1 es perfecto: entrena en 2 minutos en CPU y no necesita preprocesamiento visual.

python
# ── DQN simplificado para CartPole ───────────────────────

class DQN_CartPole(nn.Module):
    """Red simple para estados vectoriales (no imágenes)."""
    def __init__(self, n_obs, n_actions):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_obs, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, n_actions),
        )

    def forward(self, x):
        return self.net(x)

# Crear entorno CartPole (sin wrappers visuales)
cart_env = gym.make("CartPole-v1")
n_obs = cart_env.observation_space.shape[0]    # 4
n_act = cart_env.action_space.n                # 2

# Instanciar redes
cp_policy = DQN_CartPole(n_obs, n_act).to(DEVICE)
cp_target = DQN_CartPole(n_obs, n_act).to(DEVICE)
cp_target.load_state_dict(cp_policy.state_dict())

# Entrenar (500 episodios, ~2 min en CPU)
# ... mismo loop que antes, pero con el entorno CartPole
# Reward objetivo: 500 (máximo)

CartPole es ideal para debuggear: si tu DQN no resuelve CartPole en ~300 episodios, algo está mal en el código.

10

Referencias y próximos pasos

¡Felicidades! Has implementado un agente DQN completo desde cero que aprende a jugar a Pong. Repasemos qué hemos construido y hacia dónde ir después.

Resumen de lo aprendido

Mejoras inmediatas a nuestro DQN

Nuestro DQN «vanilla» funciona, pero hay mejoras bien documentadas que puedes implementar:

MejoraIdea principalGanancia típicaDificultad
Double DQN Desacoplar selección y evaluación de acciones ~10-20% mejor ⭐ Fácil (3 líneas)
Dueling DQN Separar V(s) y A(s,a) en la arquitectura ~15-25% mejor ⭐⭐ Media
Prioritized Replay Muestrear más las transiciones con mayor error TD ~20-40% más rápido ⭐⭐ Media
Noisy Networks Exploración paramérica en los pesos Elimina schedule ε ⭐⭐ Media
Multi-step Returns Usar n-step return en lugar de 1-step Bellman ~15% mejor ⭐ Fácil
Rainbow DQN Combinar TODAS las mejoras anteriores 2-3× mejor que vanilla ⭐⭐⭐ Difícil
Rainbow DQN (Hessel et al., 2018) combina las 6 mejoras y demostró que cada una aporta valor. Si quieres la mejor versión de DQN posible, implementa Rainbow paso a paso.

Más allá de DQN: algoritmos modernos

DQN fue revolucionario en 2015, pero el campo ha avanzado mucho. Los algoritmos modernos más usados son:

AlgoritmoTipoVentajasCaso de uso
PPO (Proximal Policy Optimization) Policy gradient Simple, estable, funciona en casi todo Estándar actual para la mayoría de problemas
SAC (Soft Actor-Critic) Actor-Critic Eficiente en muestras, acciones continuas Robótica, control continuo
TD3 (Twin Delayed DDPG) Actor-Critic Mejora DDPG con 3 trucos anti-sobreestimación Control continuo, simulación
A3C / A2C Actor-Critic Entrenamiento paralelo en múltiples entornos Cuando tienes muchos CPUs
IMPALA / R2D2 Distributed Escalable a cientos de actores Producción a gran escala (Google DeepMind)

Sí, por varias razones:

  • Fundamento pedagógico — DQN introduce todos los conceptos clave (replay buffer, target net, exploración) que aparecen en algoritmos más avanzados.
  • Acciones discretas — PPO y SAC brillan en acciones continuas, pero para espacios de acciones discretos (juegos, routing, scheduling), DQN sigue siendo competitivo.
  • Simplicidad — DQN con Rainbow es sorprendentemente eficiente y fácil de debuggear comparado con actor-critic.
  • Base de otros algoritmos — MuZero, Agent57, y otros algoritmos SOTA siguen usando ideas de DQN.

Papers fundamentales

  • Playing Atari with Deep Reinforcement Learning (Mnih et al., 2013) — arXiv:1312.5602 — El paper original de DQN (versión workshop).
  • Human-level control through deep reinforcement learning (Mnih et al., 2015) — Nature 518, 529–533 — La versión Nature con resultados completos en 49 juegos Atari.
  • Deep Reinforcement Learning with Double Q-learning (van Hasselt et al., 2016) — arXiv:1509.06461 — Double DQN para reducir sobreestimación.
  • Prioritized Experience Replay (Schaul et al., 2016) — arXiv:1511.05952 — Muestreo proporcional al TD-error.
  • Dueling Network Architectures (Wang et al., 2016) — arXiv:1511.06581 — Separar V(s) y A(s,a).
  • Rainbow: Combining Improvements in Deep Reinforcement Learning (Hessel et al., 2018) — arXiv:1710.02298 — Combinación de 6 mejoras de DQN.
  • Proximal Policy Optimization Algorithms (Schulman et al., 2017) — arXiv:1707.06347 — PPO, el algoritmo policy gradient más popular.

Librerías y recursos

  • Gymnasium — Entornos RL estándar (Atari, MuJoCo, Classic Control).
  • Stable-Baselines3 — Implementaciones de referencia de DQN, PPO, SAC, TD3 en PyTorch. Ideal para comparar tu implementación con la «oficial».
  • Tianshou — Librería RL modular y rápida, soporta más algoritmos que SB3.
  • CleanRL — Implementaciones single-file de algoritmos RL. Perfecto para leer código limpio y entender cada algoritmo.
  • Spinning Up (OpenAI) — Guía educativa de RL con implementaciones y explicaciones excelentes.
  • Hugging Face Deep RL Course — Curso gratuito y práctico de RL, con notebooks interactivos.

Código completo

python
"""
DQN completo para Pong — AprendeProfundo
==========================================
Ejecutar: python dqn_pong.py
Requisitos: torch, gymnasium[atari,accept-rom-license], ale-py, cv2, numpy, matplotlib
"""
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
import cv2
from collections import deque
import matplotlib.pyplot as plt

# ━━━ Hiperparámetros ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
BATCH_SIZE     = 32
GAMMA          = 0.99
LR             = 1e-4
BUFFER_SIZE    = 100_000
MIN_BUFFER     = 10_000
EPSILON_START  = 1.0
EPSILON_END    = 0.02
EPSILON_DECAY  = 100_000
TARGET_UPDATE  = 1_000
TAU            = 0.005
NUM_EPISODES   = 800
SEED           = 42
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

# ━━━ Wrappers ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class MaxAndSkip(gym.Wrapper):
    def __init__(self, env, skip=4):
        super().__init__(env)
        self.skip = skip
        self._obs_buffer = deque(maxlen=2)

    def step(self, action):
        total_reward = 0.0
        terminated = truncated = False
        for _ in range(self.skip):
            obs, reward, terminated, truncated, info = self.env.step(action)
            self._obs_buffer.append(obs)
            total_reward += reward
            if terminated or truncated:
                break
        max_frame = np.max(np.stack(self._obs_buffer), axis=0)
        return max_frame, total_reward, terminated, truncated, info

class PreprocessFrame(gym.ObservationWrapper):
    def __init__(self, env):
        super().__init__(env)
        self.observation_space = gym.spaces.Box(
            low=0, high=255, shape=(84, 84, 1), dtype=np.uint8
        )

    def observation(self, obs):
        gray = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
        cropped = gray[34:194, :]
        resized = cv2.resize(cropped, (84, 84), interpolation=cv2.INTER_AREA)
        return resized.reshape(84, 84, 1)

class FrameStack(gym.Wrapper):
    def __init__(self, env, k=4):
        super().__init__(env)
        self.k = k
        self.frames = deque(maxlen=k)
        shp = env.observation_space.shape
        self.observation_space = gym.spaces.Box(
            low=0, high=255, shape=(k, shp[0], shp[1]), dtype=np.uint8
        )

    def reset(self, **kwargs):
        obs, info = self.env.reset(**kwargs)
        for _ in range(self.k):
            self.frames.append(obs.squeeze(-1))
        return np.array(self.frames), info

    def step(self, action):
        obs, reward, terminated, truncated, info = self.env.step(action)
        self.frames.append(obs.squeeze(-1))
        return np.array(self.frames), reward, terminated, truncated, info

class ScaleObservation(gym.ObservationWrapper):
    def observation(self, obs):
        return np.array(obs, dtype=np.float32) / 255.0

def make_env(env_name="PongNoFrameskip-v4", render_mode=None):
    env = gym.make(env_name, render_mode=render_mode)
    env = MaxAndSkip(env, skip=4)
    env = PreprocessFrame(env)
    env = FrameStack(env, k=4)
    env = ScaleObservation(env)
    return env

# ━━━ Red DQN ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class DQN(nn.Module):
    def __init__(self, n_actions):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(4, 32, kernel_size=8, stride=4), nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2), nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1), nn.ReLU(),
        )
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(64 * 7 * 7, 512), nn.ReLU(),
            nn.Linear(512, n_actions),
        )

    def forward(self, x):
        return self.fc(self.conv(x))

# ━━━ Replay Buffer ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        s, a, r, ns, d = zip(*batch)
        return (
            torch.FloatTensor(np.array(s)).to(DEVICE),
            torch.LongTensor(a).to(DEVICE),
            torch.FloatTensor(r).to(DEVICE),
            torch.FloatTensor(np.array(ns)).to(DEVICE),
            torch.FloatTensor(d).to(DEVICE),
        )

    def __len__(self):
        return len(self.buffer)

# ━━━ Funciones auxiliares ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def select_action(state, step_count, env, policy_net):
    epsilon = max(EPSILON_END, EPSILON_START - step_count / EPSILON_DECAY)
    if random.random() < epsilon:
        return env.action_space.sample(), epsilon
    state_t = torch.FloatTensor(state).unsqueeze(0).to(DEVICE)
    with torch.no_grad():
        return policy_net(state_t).argmax(1).item(), epsilon

def soft_update(policy_net, target_net, tau=TAU):
    for tp, pp in zip(target_net.parameters(), policy_net.parameters()):
        tp.data.copy_(tau * pp.data + (1.0 - tau) * tp.data)

def optimize(buffer, policy_net, target_net, optimizer, loss_fn):
    if len(buffer) < MIN_BUFFER:
        return None
    s, a, r, ns, d = buffer.sample(BATCH_SIZE)
    q = policy_net(s).gather(1, a.unsqueeze(1)).squeeze(1)
    with torch.no_grad():
        nq = target_net(ns).max(1).values
        target = r + GAMMA * nq * (1 - d)
    loss = loss_fn(q, target)
    optimizer.zero_grad()
    loss.backward()
    torch.nn.utils.clip_grad_norm_(policy_net.parameters(), 10.0)
    optimizer.step()
    return loss.item()

# ━━━ Entrenamiento ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
def main():
    env = make_env()
    n_act = env.action_space.n
    policy = DQN(n_act).to(DEVICE)
    target = DQN(n_act).to(DEVICE)
    target.load_state_dict(policy.state_dict())
    target.eval()
    opt = optim.Adam(policy.parameters(), lr=LR)
    loss_fn = nn.SmoothL1Loss()
    buf = ReplayBuffer(BUFFER_SIZE)
    rewards_hist = []
    step = 0
    best = -float('inf')

    for ep in range(NUM_EPISODES):
        state, _ = env.reset()
        ep_reward = 0
        done = False
        while not done:
            action, eps = select_action(state, step, env, policy)
            ns, reward, term, trunc, _ = env.step(action)
            done = term or trunc
            buf.push(state, action, reward, ns, float(done))
            optimize(buf, policy, target, opt, loss_fn)
            soft_update(policy, target)
            state = ns
            ep_reward += reward
            step += 1
        rewards_hist.append(ep_reward)
        if (ep + 1) % 10 == 0:
            avg = np.mean(rewards_hist[-10:])
            print(f"Ep {ep+1:4d} | R: {ep_reward:6.1f} | Avg: {avg:6.1f} | ε: {eps:.3f}")
            if avg > best:
                best = avg
                torch.save(policy.state_dict(), "dqn_pong_best.pth")
    env.close()
    print(f"\n✅ Mejor reward promedio: {best:.1f}")
    return rewards_hist

if __name__ == "__main__":
    rewards = main()
🎓 Ejercicio final: Implementa Double DQN modificando la función optimize(). Solo necesitas cambiar 3 líneas (ver el desplegable del Paso 8). Compara las curvas de entrenamiento con y sin Double DQN — ¿ves diferencia en la velocidad de convergencia o en la estabilidad?