🏭 Caso de Uso

LSTM PyTorch — Predicción de temperatura (Melbourne)

Predicción de series temporales con LSTM en PyTorch: temperatura mínima diaria en Melbourne, ventanas deslizantes y evaluación con MAE/R².

🐍 Python 📓 Jupyter Notebook

LSTM en PyTorch para predicción de series temporales (sin NLP)

Objetivo del notebook

En este notebook vamos a construir un flujo completo y didáctico para entrenar una red LSTM (Long Short-Term Memory) con PyTorch sobre un problema real de series temporales: la predicción de la temperatura mínima diaria.

La idea es conectar directamente con la teoría del submódulo de LSTM:

  • Por qué una RNN vanilla sufre con dependencias largas.
  • Cómo el cell state y las gates (forget, input, output) ayudan a retener información relevante.
  • Cómo entrenar y evaluar una LSTM en un caso práctico no basado en texto.

Dataset y modelo que usaremos

  • Dataset: Daily Minimum Temperatures in Melbourne (1981–1990), una serie temporal univariante clásica.
  • Tarea: dado un historial de window_size días, predecir la temperatura del día siguiente (regresión).
  • Modelo: LSTM apilada (num_layers=2) + capa lineal final para la predicción escalar.

Este enfoque es coherente con la teoría de LSTM en series temporales: entrada secuencial numérica, memoria de corto/medio plazo y salida continua.


Fundamento matemático/computacional

Para cada paso temporal (t), una celda LSTM calcula:

[ f_t = \sigma(W_f [h_{t-1}, x_t] + b_f) ] [ i_t = \sigma(W_i [h_{t-1}, x_t] + b_i) ] [ ilde{C}t = anh(W_c [h{t-1}, x_t] + b_c) ] [ C_t = f_t \odot C_{t-1} + i_t \odot ilde{C}t ] [ o_t = \sigma(W_o [h{t-1}, x_t] + b_o) ] [ h_t = o_t \odot anh(C_t) ]

Donde:

  • (x_t): entrada en el instante (t).
  • (h_t): estado oculto (información que “sale” de la celda).
  • (C_t): estado de celda (memoria acumulada).
  • (\sigma): sigmoide (valores entre 0 y 1, útil como puerta).
  • (\odot): producto elemento a elemento.

Intuición de las puertas

  • Forget gate (f_t): decide qué parte del pasado olvidar.
  • Input gate (i_t): decide cuánta información nueva escribir.
  • Output gate (o_t): decide qué parte de la memoria exponer como salida.

Gracias a esta ruta aditiva en (C_t), la LSTM mitiga el vanishing gradient mejor que una RNN simple.


Plan del notebook

  1. Carga y revisión inicial de datos (EDA breve).
  2. Preprocesado: partición temporal, escalado y ventanas deslizantes.
  3. Definición de Dataset/DataLoader en PyTorch.
  4. Construcción de la arquitectura LSTM.
  5. Entrenamiento con validación por época.
  6. Curvas de entrenamiento (loss y MAE) y evaluación final.
  7. Visualización de predicciones vs valores reales.
  8. Conclusiones y siguientes experimentos.
[1]
# Importaciones principales
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

# Configuración visual para gráficos
sns.set_theme(style="whitegrid")
plt.rcParams["figure.figsize"] = (12, 4)

# Semillas para reproducibilidad
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

# Selección automática de dispositivo
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Usando dispositivo: {DEVICE}")
Usando dispositivo: cuda

1) Carga del dataset

[2]
# Cargamos el dataset desde una fuente pública
# El archivo contiene dos columnas: Date y Temp
url = "https://raw.githubusercontent.com/jbrownlee/Datasets/master/daily-min-temperatures.csv"
df = pd.read_csv(url)

# Convertimos la fecha al tipo datetime para facilitar análisis temporal
df["Date"] = pd.to_datetime(df["Date"])

# Mostramos las primeras filas
print(df.head())
print(f"\nNúmero de filas: {len(df)}")
        Date  Temp
0 1981-01-01  20.7
1 1981-01-02  17.9
2 1981-01-03  18.8
3 1981-01-04  14.6
4 1981-01-05  15.8

Número de filas: 3650

2) EDA breve (exploración de datos)

Vamos a validar calidad básica del dataset y observar su comportamiento temporal.

[3]
# Información general y nulos
print(df.info())
print("\nValores nulos por columna:")
print(df.isnull().sum())

# Estadísticos descriptivos
print("\nResumen estadístico de temperatura:")
print(df["Temp"].describe())
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3650 entries, 0 to 3649
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype         
---  ------  --------------  -----         
 0   Date    3650 non-null   datetime64[ns]
 1   Temp    3650 non-null   float64       
dtypes: datetime64[ns](1), float64(1)
memory usage: 57.2 KB
None

Valores nulos por columna:
Date    0
Temp    0
dtype: int64

Resumen estadístico de temperatura:
count    3650.000000
mean       11.177753
std         4.071837
min         0.000000
25%         8.300000
50%        11.000000
75%        14.000000
max        26.300000
Name: Temp, dtype: float64
[4]
# Evolución temporal de la temperatura
plt.figure(figsize=(14, 4))
plt.plot(df["Date"], df["Temp"], color="tab:blue", linewidth=1)
plt.title("Temperatura mínima diaria en Melbourne")
plt.xlabel("Fecha")
plt.ylabel("Temperatura mínima (°C)")
plt.show()
Output
[5]
# Distribución de la variable objetivo
plt.figure(figsize=(7, 4))
sns.histplot(df["Temp"], kde=True, bins=30, color="tab:green")
plt.title("Distribución de temperaturas mínimas")
plt.xlabel("Temperatura (°C)")
plt.show()
Output
[6]
# Estacionalidad aproximada: temperatura promedio por mes
monthly_mean = df.assign(month=df["Date"].dt.month).groupby("month")["Temp"].mean()

plt.figure(figsize=(8, 4))
monthly_mean.plot(marker="o", color="tab:orange")
plt.title("Temperatura media por mes (promedio histórico)")
plt.xlabel("Mes")
plt.ylabel("Temp media (°C)")
plt.xticks(range(1, 13))
plt.show()
Output

3) Preprocesado para LSTM

Decisiones de diseño

  • Split temporal (sin barajar): entrenamos con el pasado y validamos/test con el futuro.
  • Escalado Min-Max: mejora estabilidad numérica durante el entrenamiento.
  • Ventanas deslizantes: cada muestra será una secuencia de window_size días y su etiqueta el día siguiente.
[7]
# Extraemos la serie como array 2D (requisito del escalador)
values = df[["Temp"]].values.astype(np.float32)

# Definimos proporciones de partición temporal
n_total = len(values)
train_end = int(0.7 * n_total)
val_end = int(0.85 * n_total)

train_values = values[:train_end]
val_values = values[train_end:val_end]
test_values = values[val_end:]

print(f"Train: {train_values.shape}, Val: {val_values.shape}, Test: {test_values.shape}")

# Ajustamos el escalador SOLO con train para evitar data leakage
scaler = MinMaxScaler()
train_scaled = scaler.fit_transform(train_values)
val_scaled = scaler.transform(val_values)
test_scaled = scaler.transform(test_values)
Train: (2555, 1), Val: (547, 1), Test: (548, 1)
[8]
# Función para crear ventanas deslizantes
# X: secuencia de longitud window_size, y: valor inmediatamente posterior

def make_windows(series, window_size=30):
    X, y = [], []
    for i in range(len(series) - window_size):
        X.append(series[i:i + window_size])
        y.append(series[i + window_size])
    return np.array(X, dtype=np.float32), np.array(y, dtype=np.float32)

WINDOW_SIZE = 30

X_train, y_train = make_windows(train_scaled, WINDOW_SIZE)
X_val, y_val = make_windows(val_scaled, WINDOW_SIZE)
X_test, y_test = make_windows(test_scaled, WINDOW_SIZE)

print("Shapes tras ventaneo:")
print(f"X_train: {X_train.shape}, y_train: {y_train.shape}")
print(f"X_val:   {X_val.shape}, y_val:   {y_val.shape}")
print(f"X_test:  {X_test.shape}, y_test:  {y_test.shape}")
Shapes tras ventaneo:
X_train: (2525, 30, 1), y_train: (2525, 1)
X_val:   (517, 30, 1), y_val:   (517, 1)
X_test:  (518, 30, 1), y_test:  (518, 1)

4) Dataset y DataLoaders de PyTorch

[9]
class TempSequenceDataset(Dataset):
    """Dataset sencillo para pares (secuencia, target) en series temporales."""
    def __init__(self, X, y):
        # Convertimos a tensores torch
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32)

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Creamos datasets
train_ds = TempSequenceDataset(X_train, y_train)
val_ds = TempSequenceDataset(X_val, y_val)
test_ds = TempSequenceDataset(X_test, y_test)

# DataLoaders (no shuffle para mantener orden temporal interno por batch)
BATCH_SIZE = 64
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=False)
val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False)

# Inspección rápida de un batch
xb, yb = next(iter(train_loader))
print(f"Batch X: {xb.shape}  (batch, seq_len, features)")
print(f"Batch y: {yb.shape}")
Batch X: torch.Size([64, 30, 1])  (batch, seq_len, features)
Batch y: torch.Size([64, 1])

5) Arquitectura LSTM

Usamos una LSTM de 2 capas para aumentar capacidad de representación temporal.

  • input_size=1 (temperatura diaria)
  • hidden_size=64
  • num_layers=2
  • dropout=0.2 entre capas recurrentes
  • capa Linear final para regresión (1 valor)
[10]
class LSTMRegressor(nn.Module):
    def __init__(self, input_size=1, hidden_size=64, num_layers=2, dropout=0.2):
        super().__init__()

        # Capa LSTM principal
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            dropout=dropout,
            batch_first=True
        )

        # Capa fully-connected para mapear hidden -> predicción escalar
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        # out shape: (batch, seq_len, hidden_size)
        out, (h_n, c_n) = self.lstm(x)

        # Tomamos la salida del último paso temporal
        last_out = out[:, -1, :]  # (batch, hidden_size)

        # Predicción final
        pred = self.fc(last_out)  # (batch, 1)
        return pred

model = LSTMRegressor().to(DEVICE)
print(model)
LSTMRegressor(
  (lstm): LSTM(1, 64, num_layers=2, batch_first=True, dropout=0.2)
  (fc): Linear(in_features=64, out_features=1, bias=True)
)

6) Entrenamiento y validación por época

[11]
# Definimos función de pérdida y optimizador
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# Función auxiliar para calcular MAE de forma consistente
def mae_torch(y_true, y_pred):
    return torch.mean(torch.abs(y_true - y_pred))

# Hiperparámetros de entrenamiento
EPOCHS = 35

history = {
    "train_loss": [],
    "val_loss": [],
    "train_mae": [],
    "val_mae": []
}

for epoch in range(1, EPOCHS + 1):
    # ====== Entrenamiento ======
    model.train()
    train_loss_accum = 0.0
    train_mae_accum = 0.0

    for X_batch, y_batch in train_loader:
        X_batch = X_batch.to(DEVICE)
        y_batch = y_batch.to(DEVICE)

        # Zero grad
        optimizer.zero_grad()

        # Forward
        preds = model(X_batch)
        loss = criterion(preds, y_batch)
        mae = mae_torch(y_batch, preds)

        # Backward
        loss.backward()

        # Gradient clipping para estabilidad (coherente con teoría de RNNs)
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)

        # Paso de optimización
        optimizer.step()

        train_loss_accum += loss.item() * X_batch.size(0)
        train_mae_accum += mae.item() * X_batch.size(0)

    train_epoch_loss = train_loss_accum / len(train_loader.dataset)
    train_epoch_mae = train_mae_accum / len(train_loader.dataset)

    # ====== Validación ======
    model.eval()
    val_loss_accum = 0.0
    val_mae_accum = 0.0

    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch = X_batch.to(DEVICE)
            y_batch = y_batch.to(DEVICE)

            preds = model(X_batch)
            loss = criterion(preds, y_batch)
            mae = mae_torch(y_batch, preds)

            val_loss_accum += loss.item() * X_batch.size(0)
            val_mae_accum += mae.item() * X_batch.size(0)

    val_epoch_loss = val_loss_accum / len(val_loader.dataset)
    val_epoch_mae = val_mae_accum / len(val_loader.dataset)

    history["train_loss"].append(train_epoch_loss)
    history["val_loss"].append(val_epoch_loss)
    history["train_mae"].append(train_epoch_mae)
    history["val_mae"].append(val_epoch_mae)

    if epoch % 5 == 0 or epoch == 1:
        print(
            f"Epoch {epoch:02d}/{EPOCHS} | "
            f"train_loss={train_epoch_loss:.4f} val_loss={val_epoch_loss:.4f} | "
            f"train_mae={train_epoch_mae:.4f} val_mae={val_epoch_mae:.4f}"
        )
Epoch 01/35 | train_loss=0.0577 val_loss=0.0200 | train_mae=0.1812 val_mae=0.1144
Epoch 05/35 | train_loss=0.0137 val_loss=0.0108 | train_mae=0.0913 val_mae=0.0806
Epoch 10/35 | train_loss=0.0118 val_loss=0.0103 | train_mae=0.0854 val_mae=0.0793
Epoch 15/35 | train_loss=0.0116 val_loss=0.0101 | train_mae=0.0845 val_mae=0.0783
Epoch 20/35 | train_loss=0.0111 val_loss=0.0096 | train_mae=0.0824 val_mae=0.0766
Epoch 25/35 | train_loss=0.0105 val_loss=0.0088 | train_mae=0.0801 val_mae=0.0728
Epoch 30/35 | train_loss=0.0094 val_loss=0.0079 | train_mae=0.0763 val_mae=0.0703
Epoch 35/35 | train_loss=0.0092 val_loss=0.0078 | train_mae=0.0754 val_mae=0.0702

7) Curvas de entrenamiento

En regresión no hablamos de accuracy de la misma manera que en clasificación. Por eso monitorizamos:

  • Loss (MSE)
  • MAE (error absoluto medio)
[12]
epochs_axis = np.arange(1, EPOCHS + 1)

fig, ax = plt.subplots(1, 2, figsize=(14, 4))

# Curva de loss
ax[0].plot(epochs_axis, history["train_loss"], label="Train loss", marker="o", markersize=3)
ax[0].plot(epochs_axis, history["val_loss"], label="Val loss", marker="o", markersize=3)
ax[0].set_title("MSE vs Epoch")
ax[0].set_xlabel("Epoch")
ax[0].set_ylabel("MSE")
ax[0].legend()

# Curva de MAE
ax[1].plot(epochs_axis, history["train_mae"], label="Train MAE", marker="o", markersize=3)
ax[1].plot(epochs_axis, history["val_mae"], label="Val MAE", marker="o", markersize=3)
ax[1].set_title("MAE vs Epoch")
ax[1].set_xlabel("Epoch")
ax[1].set_ylabel("MAE")
ax[1].legend()

plt.tight_layout()
plt.show()
Output

8) Evaluación final en test y predicciones

[14]
# Función para obtener predicciones de un DataLoader
def predict_loader(model, loader):
    model.eval()
    preds_list, y_list = [], []

    with torch.no_grad():
        for X_batch, y_batch in loader:
            X_batch = X_batch.to(DEVICE)
            preds = model(X_batch)

            preds_list.append(preds.cpu().numpy())
            y_list.append(y_batch.numpy())

    preds = np.concatenate(preds_list, axis=0)
    y_true = np.concatenate(y_list, axis=0)
    return y_true, preds

# Predicciones escaladas
y_test_scaled, y_pred_scaled = predict_loader(model, test_loader)

# Volvemos a escala original (°C)
y_test_real = scaler.inverse_transform(y_test_scaled)
y_pred_real = scaler.inverse_transform(y_pred_scaled)

# Métricas de regresión
mae = mean_absolute_error(y_test_real, y_pred_real)
rmse = mean_squared_error(y_test_real, y_pred_real)
r2 = r2_score(y_test_real, y_pred_real)

print("Métricas en test (escala original):")
print(f"MAE : {mae:.3f} °C")
print(f"RMSE: {rmse:.3f} °C")
print(f"R²  : {r2:.3f}")
Métricas en test (escala original):
MAE : 1.741 °C
RMSE: 5.009 °C
R²  : 0.653
[15]
# Comparación visual: valores reales vs predicción
plt.figure(figsize=(14, 4))
plt.plot(y_test_real, label="Real", linewidth=2)
plt.plot(y_pred_real, label="Predicción LSTM", linewidth=2, alpha=0.8)
plt.title("Predicción de temperatura mínima diaria (conjunto test)")
plt.xlabel("Índice temporal (test)")
plt.ylabel("Temperatura (°C)")
plt.legend()
plt.show()
Output
[16]
# Gráfico de dispersión real vs predicho
plt.figure(figsize=(6, 6))
plt.scatter(y_test_real, y_pred_real, alpha=0.5)
min_v = min(y_test_real.min(), y_pred_real.min())
max_v = max(y_test_real.max(), y_pred_real.max())
plt.plot([min_v, max_v], [min_v, max_v], "r--", label="Predicción perfecta")
plt.title("Real vs Predicho")
plt.xlabel("Valor real (°C)")
plt.ylabel("Valor predicho (°C)")
plt.legend()
plt.show()
Output

9) Mini-bloque de pruebas rápidas (sanity checks)

Estas comprobaciones ayudan al alumno a verificar que el pipeline está bien construido.

[17]
# Test 1: dimensiones coherentes en el forward
x_dummy = torch.randn(8, WINDOW_SIZE, 1).to(DEVICE)
with torch.no_grad():
    y_dummy = model(x_dummy)
assert y_dummy.shape == (8, 1), f"Forma inesperada: {y_dummy.shape}"
print("✅ Test de forma del modelo superado.")

# Test 2: no hay NaNs en historial de entrenamiento
assert not np.isnan(history["train_loss"]).any(), "NaN en train_loss"
assert not np.isnan(history["val_loss"]).any(), "NaN en val_loss"
print("✅ Test de NaNs en historial superado.")

# Test 3: métrica base razonable (el modelo aprende algo)
assert mae < 3.0, f"MAE demasiado alto: {mae:.3f}"
print("✅ Test de rendimiento mínimo superado (MAE < 3°C).")
✅ Test de forma del modelo superado.
✅ Test de NaNs en historial superado.
✅ Test de rendimiento mínimo superado (MAE < 3°C).

Conclusiones

  • Hemos construido un pipeline completo de LSTM en PyTorch para una serie temporal real y no textual.
  • La LSTM captura dependencias temporales y ofrece predicciones razonables de la temperatura diaria.
  • Se observa una convergencia estable mediante curvas de MSE y MAE en train/validación.
  • El uso de técnicas prácticas (escalado, ventanas deslizantes y gradient clipping) mejora el entrenamiento.

Sugerencias para seguir explorando

  1. Probar distintos window_size (por ejemplo 14, 60, 90 días).
  2. Ajustar hidden_size, num_layers, dropout y learning rate.
  3. Comparar con una RNN vanilla y con una GRU para ver diferencias de estabilidad y rendimiento.
  4. Añadir variables exógenas (humedad, presión, etc.) para un escenario multivariante.
  5. Implementar early stopping y learning-rate scheduler.
  6. Probar una LSTM bidireccional y discutir cuándo tiene sentido en forecasting.