RNN Vanilla en PyTorch: Clasificación de Secuencias Sintéticas
Implementación completa de una RNN vanilla many-to-one en PyTorch para clasificar señales temporales sintéticas (senos y ondas cuadradas).
Fundamentos de RNN en PyTorch: clasificación de secuencias sintéticas
En este notebook vamos a construir de principio a fin un ejemplo de uso de redes neuronales recurrentes (RNN) con PyTorch, siguiendo la lógica del submódulo de Fundamentos:
- Qué son los datos secuenciales y por qué un MLP no modela bien el orden temporal.
- Cómo una RNN reutiliza un estado oculto para mantener contexto.
- Cómo se formula una tarea many-to-one (entrada secuencial, una salida final).
- Cómo entrenar, evaluar y diagnosticar un modelo recurrente.
Objetivo del notebook
Resolver una tarea de clasificación de secuencias: dada una señal temporal ruidosa de longitud fija, predecir qué tipo de dinámica la generó.
Usaremos un dataset sintético (generado por fórmula + ruido), con tres clases:
- Seno de baja frecuencia
- Seno de alta frecuencia
- Onda cuadrada suavemente ruidosa
Esto nos permite centrarnos en el comportamiento de la RNN sin depender de descargas externas.
Recordatorio matemático (RNN vanilla)
En una RNN estándar, para cada paso temporal (t):
[ \mathbf{h}t = anh(\mathbf{W}{xh}\mathbf{x}t + \mathbf{W}{hh}\mathbf{h}_{t-1} + \mathbf{b}_h) ]
- (\mathbf{x}_t): entrada en el instante (t)
- (\mathbf{h}_{t-1}): memoria (estado oculto anterior)
- (\mathbf{h}_t): nuevo estado oculto
Para clasificación many-to-one, usamos el último estado (\mathbf{h}_T) para producir logits:
[ \mathbf{z} = \mathbf{W}_{hy}\mathbf{h}_T + \mathbf{b}_y ]
Luego aplicamos softmax implícitamente vía CrossEntropyLoss.
Enfoque computacional
- Generar dataset sintético con ruido controlado.
- Hacer un pequeño EDA (distribución de clases y visualización de secuencias).
- Preparar
DataLoaderpara train/val/test. - Definir una arquitectura
nn.RNN+ capa lineal final. - Entrenar registrando loss y accuracy en train y val.
- Evaluar en test con métricas, matriz de confusión y análisis cualitativo.
- Concluir y proponer extensiones.
Nota didáctica: empezamos con una RNN vanilla porque es el modelo base conceptual antes de pasar a LSTM/GRU.
# Imports principales
import math
import random
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
accuracy_score,
confusion_matrix,
classification_report,
)
# Configuración visual
sns.set_theme(style="whitegrid")
plt.rcParams["figure.figsize"] = (10, 4)
# Semilla para reproducibilidad
def set_seed(seed=42):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
set_seed(42)
# Dispositivo de cómputo
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Usando dispositivo:", DEVICE)
Usando dispositivo: cuda
1) Generación del dataset sintético
Generaremos secuencias de longitud fija (T=60). Cada secuencia pertenece a una de 3 clases y añade ruido gaussiano moderado.
- Clase 0: seno de baja frecuencia.
- Clase 1: seno de alta frecuencia.
- Clase 2: onda cuadrada (señal con cambios bruscos) + ruido.
Esto fuerza a la RNN a usar el patrón temporal global, no solo un punto aislado.
# Parámetros del dataset
N_SAMPLES = 3600
SEQ_LEN = 60
NOISE_STD = 0.20
CLASS_NAMES = ["seno_baja_freq", "seno_alta_freq", "onda_cuadrada"]
N_CLASSES = len(CLASS_NAMES)
# Eje temporal normalizado [0, 1]
t = np.linspace(0, 1, SEQ_LEN)
def generate_sequence(label, seq_len=SEQ_LEN, noise_std=NOISE_STD):
"""Genera una secuencia según la clase indicada y añade ruido."""
phase = np.random.uniform(0, 2 * np.pi)
if label == 0:
# Baja frecuencia
freq = np.random.uniform(1.0, 2.0)
signal = np.sin(2 * np.pi * freq * t + phase)
elif label == 1:
# Alta frecuencia
freq = np.random.uniform(3.0, 5.0)
signal = np.sin(2 * np.pi * freq * t + phase)
elif label == 2:
# Onda cuadrada aproximada por el signo del seno
freq = np.random.uniform(1.5, 3.0)
signal = np.sign(np.sin(2 * np.pi * freq * t + phase)).astype(float)
else:
raise ValueError("Label fuera de rango")
# Amplitud aleatoria suave para aumentar variabilidad
amplitude = np.random.uniform(0.8, 1.2)
signal = amplitude * signal
# Ruido gaussiano
noise = np.random.normal(loc=0.0, scale=noise_std, size=seq_len)
return (signal + noise).astype(np.float32)
# Construcción del dataset completo
X = np.zeros((N_SAMPLES, SEQ_LEN), dtype=np.float32)
y = np.zeros((N_SAMPLES,), dtype=np.int64)
for i in range(N_SAMPLES):
label = i % N_CLASSES # dataset balanceado
X[i] = generate_sequence(label)
y[i] = label
print("Shape X:", X.shape)
print("Shape y:", y.shape)
print("Primeras etiquetas:", y[:10])
Shape X: (3600, 60) Shape y: (3600,) Primeras etiquetas: [0 1 2 0 1 2 0 1 2 0]
2) EDA rápido y visual
# Distribución de clases
unique, counts = np.unique(y, return_counts=True)
plt.figure(figsize=(6, 4))
plt.bar([CLASS_NAMES[u] for u in unique], counts, color=["#4c78a8", "#f58518", "#54a24b"])
plt.title("Distribución de clases")
plt.ylabel("Número de secuencias")
plt.xticks(rotation=15)
plt.show()
print("Conteos por clase:", dict(zip([CLASS_NAMES[u] for u in unique], counts)))
Conteos por clase: {'seno_baja_freq': 1200, 'seno_alta_freq': 1200, 'onda_cuadrada': 1200}
# Visualizamos ejemplos de cada clase para entender el problema
fig, axes = plt.subplots(1, 3, figsize=(15, 3.8), sharey=True)
for class_id in range(N_CLASSES):
idxs = np.where(y == class_id)[0]
chosen = np.random.choice(idxs, size=5, replace=False)
for idx in chosen:
axes[class_id].plot(X[idx], alpha=0.75)
axes[class_id].set_title(CLASS_NAMES[class_id])
axes[class_id].set_xlabel("Paso temporal")
if class_id == 0:
axes[class_id].set_ylabel("Valor")
plt.suptitle("5 secuencias de ejemplo por clase")
plt.tight_layout()
plt.show()
# Estadísticos básicos por clase (media y desviación típica global)
for class_id, class_name in enumerate(CLASS_NAMES):
data = X[y == class_id]
print(f"{class_name:18s} | media={data.mean(): .3f} | std={data.std(): .3f}")
seno_baja_freq | media=-0.005 | std= 0.739 seno_alta_freq | media= 0.000 | std= 0.739 onda_cuadrada | media= 0.000 | std= 1.024
3) Preparación de train/val/test y DataLoaders
Hacemos partición estratificada para mantener proporciones de clase.
Además, la RNN de PyTorch con batch_first=True espera entradas con shape:
[ (batch_size, seq_len, input_size) ]
Como nuestra señal es univariante, input_size = 1, así que añadimos una dimensión final.
# Split train+temp / test
X_train, X_temp, y_train, y_temp = train_test_split(
X, y, test_size=0.30, stratify=y, random_state=42
)
# Split temp en val / test
X_val, X_test, y_val, y_test = train_test_split(
X_temp, y_temp, test_size=0.50, stratify=y_temp, random_state=42
)
print("Train:", X_train.shape, y_train.shape)
print("Val: ", X_val.shape, y_val.shape)
print("Test: ", X_test.shape, y_test.shape)
# Convertimos a tensores y añadimos dimensión de feature (=1)
X_train_t = torch.tensor(X_train).unsqueeze(-1)
X_val_t = torch.tensor(X_val).unsqueeze(-1)
X_test_t = torch.tensor(X_test).unsqueeze(-1)
y_train_t = torch.tensor(y_train)
y_val_t = torch.tensor(y_val)
y_test_t = torch.tensor(y_test)
# TensorDataset + DataLoader
batch_size = 64
train_loader = DataLoader(TensorDataset(X_train_t, y_train_t), batch_size=batch_size, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val_t, y_val_t), batch_size=batch_size, shuffle=False)
test_loader = DataLoader(TensorDataset(X_test_t, y_test_t), batch_size=batch_size, shuffle=False)
Train: (2520, 60) (2520,) Val: (540, 60) (540,) Test: (540, 60) (540,)
4) Modelo RNN many-to-one en PyTorch
Usaremos:
nn.RNN(input_size=1, hidden_size=32, num_layers=1)- Capa lineal final para clasificar en 3 clases.
Estrategia many-to-one: tomamos la representación del último paso temporal y la pasamos al clasificador.
class SequenceRNNClassifier(nn.Module):
"""RNN vanilla para clasificación many-to-one de secuencias."""
def __init__(self, input_size=1, hidden_size=32, num_layers=1, num_classes=3, dropout=0.0):
super().__init__()
# Capa recurrente
self.rnn = nn.RNN(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
nonlinearity="tanh",
batch_first=True,
dropout=dropout if num_layers > 1 else 0.0,
)
# Capa de salida para clasificación
self.classifier = nn.Linear(hidden_size, num_classes)
def forward(self, x):
# x: (batch, seq_len, input_size)
output, h_n = self.rnn(x)
# h_n: (num_layers, batch, hidden_size)
# Tomamos el estado oculto de la última capa en el último tiempo
last_hidden = h_n[-1] # (batch, hidden_size)
logits = self.classifier(last_hidden) # (batch, num_classes)
return logits
model = SequenceRNNClassifier(
input_size=1,
hidden_size=32,
num_layers=1,
num_classes=N_CLASSES,
).to(DEVICE)
n_params = sum(p.numel() for p in model.parameters())
print(model)
print(f"Parámetros entrenables: {n_params:,}")
SequenceRNNClassifier( (rnn): RNN(1, 32, batch_first=True) (classifier): Linear(in_features=32, out_features=3, bias=True) ) Parámetros entrenables: 1,219
5) Entrenamiento y validación
# Funciones auxiliares de entrenamiento/evaluación
def run_epoch(model, loader, criterion, optimizer=None, device=DEVICE):
"""Ejecuta una época en modo train (si hay optimizer) o eval."""
training = optimizer is not None
model.train() if training else model.eval()
epoch_losses = []
all_preds, all_targets = [], []
for xb, yb in loader:
xb, yb = xb.to(device), yb.to(device)
# Forward
logits = model(xb)
loss = criterion(logits, yb)
if training:
# Backward + actualización de pesos
optimizer.zero_grad()
loss.backward()
# Clipping opcional para estabilidad en RNN
nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
optimizer.step()
# Registro de métricas
epoch_losses.append(loss.item())
preds = torch.argmax(logits, dim=1)
all_preds.append(preds.detach().cpu().numpy())
all_targets.append(yb.detach().cpu().numpy())
all_preds = np.concatenate(all_preds)
all_targets = np.concatenate(all_targets)
avg_loss = float(np.mean(epoch_losses))
acc = float(accuracy_score(all_targets, all_preds))
return avg_loss, acc
# Hiperparámetros de entrenamiento
lr = 1e-3
epochs = 35
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
history = {
"train_loss": [],
"val_loss": [],
"train_acc": [],
"val_acc": [],
}
for epoch in range(1, epochs + 1):
train_loss, train_acc = run_epoch(model, train_loader, criterion, optimizer=optimizer)
val_loss, val_acc = run_epoch(model, val_loader, criterion, optimizer=None)
history["train_loss"].append(train_loss)
history["val_loss"].append(val_loss)
history["train_acc"].append(train_acc)
history["val_acc"].append(val_acc)
if epoch % 5 == 0 or epoch == 1:
print(
f"Epoch {epoch:02d}/{epochs} | "
f"train_loss={train_loss:.4f} val_loss={val_loss:.4f} | "
f"train_acc={train_acc:.3f} val_acc={val_acc:.3f}"
)
Epoch 01/35 | train_loss=1.1013 val_loss=1.0999 | train_acc=0.310 val_acc=0.280 Epoch 05/35 | train_loss=1.0967 val_loss=1.0983 | train_acc=0.356 val_acc=0.322 Epoch 10/35 | train_loss=0.6805 val_loss=0.5814 | train_acc=0.585 val_acc=0.628 Epoch 15/35 | train_loss=0.5388 val_loss=0.5269 | train_acc=0.662 val_acc=0.633 Epoch 20/35 | train_loss=0.4954 val_loss=0.5105 | train_acc=0.706 val_acc=0.670 Epoch 25/35 | train_loss=0.4998 val_loss=0.5011 | train_acc=0.728 val_acc=0.702 Epoch 30/35 | train_loss=0.1919 val_loss=0.1548 | train_acc=0.946 val_acc=0.954 Epoch 35/35 | train_loss=0.1602 val_loss=0.0854 | train_acc=0.952 val_acc=0.978
# Curvas de entrenamiento: loss y accuracy para train/val
fig, axes = plt.subplots(1, 2, figsize=(13, 4))
axes[0].plot(history["train_loss"], label="Train loss")
axes[0].plot(history["val_loss"], label="Val loss")
axes[0].set_title("Evolución de la pérdida")
axes[0].set_xlabel("Epoch")
axes[0].set_ylabel("Loss")
axes[0].legend()
axes[1].plot(history["train_acc"], label="Train accuracy")
axes[1].plot(history["val_acc"], label="Val accuracy")
axes[1].set_title("Evolución de la accuracy")
axes[1].set_xlabel("Epoch")
axes[1].set_ylabel("Accuracy")
axes[1].legend()
plt.tight_layout()
plt.show()
6) Evaluación en test: métricas y matriz de confusión
# Predicción en test
model.eval()
all_test_preds, all_test_targets = [], []
with torch.no_grad():
for xb, yb in test_loader:
xb = xb.to(DEVICE)
logits = model(xb)
preds = torch.argmax(logits, dim=1).cpu().numpy()
all_test_preds.append(preds)
all_test_targets.append(yb.numpy())
y_true = np.concatenate(all_test_targets)
y_pred = np.concatenate(all_test_preds)
test_acc = accuracy_score(y_true, y_pred)
print(f"Test accuracy: {test_acc:.4f}")
print("Reporte de clasificación:")
print(classification_report(y_true, y_pred, target_names=CLASS_NAMES, digits=4))
Test accuracy: 0.9778
Reporte de clasificación:
precision recall f1-score support
seno_baja_freq 1.0000 0.9500 0.9744 180
seno_alta_freq 0.9944 0.9889 0.9916 180
onda_cuadrada 0.9421 0.9944 0.9676 180
accuracy 0.9778 540
macro avg 0.9788 0.9778 0.9779 540
weighted avg 0.9788 0.9778 0.9779 540
# Matriz de confusión
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(6, 5))
sns.heatmap(
cm,
annot=True,
fmt="d",
cmap="Blues",
xticklabels=CLASS_NAMES,
yticklabels=CLASS_NAMES,
)
plt.title("Matriz de confusión (test)")
plt.xlabel("Predicción")
plt.ylabel("Etiqueta real")
plt.xticks(rotation=20)
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()
7) Inspección cualitativa de aciertos y errores
# Mostramos ejemplos bien y mal clasificados para interpretación
correct_idx = np.where(y_true == y_pred)[0]
wrong_idx = np.where(y_true != y_pred)[0]
print(f"Ejemplos correctos: {len(correct_idx)}")
print(f"Ejemplos incorrectos: {len(wrong_idx)}")
# Seleccionamos algunos ejemplos para visualizar
n_show = 3
sample_correct = np.random.choice(correct_idx, size=min(n_show, len(correct_idx)), replace=False)
sample_wrong = np.random.choice(wrong_idx, size=min(n_show, len(wrong_idx)), replace=False) if len(wrong_idx) > 0 else []
fig, axes = plt.subplots(2, n_show, figsize=(15, 6), sharey=True)
# Fila superior: aciertos
for i, idx in enumerate(sample_correct):
axes[0, i].plot(X_test[idx], color="#2ca02c")
axes[0, i].set_title(f"OK | real={CLASS_NAMES[y_test[idx]]}\npred={CLASS_NAMES[y_pred[idx]]}", fontsize=10)
axes[0, i].set_xlabel("t")
# Fila inferior: errores (si existen)
for i in range(n_show):
if len(sample_wrong) > i:
idx = sample_wrong[i]
axes[1, i].plot(X_test[idx], color="#d62728")
axes[1, i].set_title(f"Error | real={CLASS_NAMES[y_test[idx]]}\npred={CLASS_NAMES[y_pred[idx]]}", fontsize=10)
axes[1, i].set_xlabel("t")
else:
axes[1, i].axis("off")
axes[0, 0].set_ylabel("Aciertos")
axes[1, 0].set_ylabel("Errores")
plt.suptitle("Ejemplos cualitativos de predicciones")
plt.tight_layout()
plt.show()
Ejemplos correctos: 528 Ejemplos incorrectos: 12
8) Mini-experimento: ¿qué pasa si reducimos mucho hidden_size?
Para reforzar la intuición, repetimos entrenamiento rápido con una RNN más pequeña (hidden_size=8) y comparamos curvas.
Esto ayuda a entender el impacto de la capacidad del estado oculto.
# Segundo experimento corto con menor capacidad
small_model = SequenceRNNClassifier(
input_size=1,
hidden_size=8,
num_layers=1,
num_classes=N_CLASSES,
).to(DEVICE)
small_optimizer = torch.optim.Adam(small_model.parameters(), lr=1e-3)
small_criterion = nn.CrossEntropyLoss()
small_history = {"train_loss": [], "val_loss": [], "train_acc": [], "val_acc": []}
small_epochs = 20
for _ in range(small_epochs):
tr_loss, tr_acc = run_epoch(small_model, train_loader, small_criterion, optimizer=small_optimizer)
va_loss, va_acc = run_epoch(small_model, val_loader, small_criterion, optimizer=None)
small_history["train_loss"].append(tr_loss)
small_history["val_loss"].append(va_loss)
small_history["train_acc"].append(tr_acc)
small_history["val_acc"].append(va_acc)
# Comparación de curvas entre modelo base y modelo pequeño
fig, axes = plt.subplots(1, 2, figsize=(13, 4))
axes[0].plot(history["val_loss"], label="Val loss (hidden=32)")
axes[0].plot(small_history["val_loss"], label="Val loss (hidden=8)")
axes[0].set_title("Comparación de pérdida en validación")
axes[0].set_xlabel("Epoch")
axes[0].set_ylabel("Loss")
axes[0].legend()
axes[1].plot(history["val_acc"], label="Val acc (hidden=32)")
axes[1].plot(small_history["val_acc"], label="Val acc (hidden=8)")
axes[1].set_title("Comparación de accuracy en validación")
axes[1].set_xlabel("Epoch")
axes[1].set_ylabel("Accuracy")
axes[1].legend()
plt.tight_layout()
plt.show()
Conclusiones
- Hemos implementado una RNN vanilla many-to-one en PyTorch de forma completa: datos, EDA, entrenamiento y evaluación.
- El modelo aprende patrones temporales y logra una accuracy alta en un problema sintético con ruido.
- Al reducir demasiado
hidden_size, la capacidad de representación suele bajar y las métricas empeoran.
Ideas para seguir practicando
- Sustituir
nn.RNNpornn.LSTMynn.GRUcon el mismo pipeline. - Incrementar longitud de secuencia para observar limitaciones de memoria en RNN vanilla.
- Probar regularización (
dropout, weight decay) y early stopping. - Cambiar el dataset por texto simple (sentimiento) o series reales.
- Experimentar con tareas many-to-many (etiquetado por paso temporal).
Si quieres, en un siguiente notebook podemos extender este ejemplo a LSTM y comparar directamente las tres arquitecturas (RNN/LSTM/GRU) con exactamente los mismos datos y métricas.