Saltar a contenido

Tarea 15: Pipelines ETL, DataOps y Orquestación con Prefect

Ingeniería de Datos — Universidad Católica del Uruguay

Objetivo: Diseñar e implementar un mini pipeline ETL con Prefect, investigando la documentación oficial para comprender los conceptos fundamentales y explorar funcionalidades avanzadas del orquestador.

Tiempo estimado: 90–120 minutos


Lecturas mínimas (recuerdo)

  • Google Cloud: Building the data engineering driven organization
  • Google Cloud: Building streaming data pipelines
  • Google Cloud Docs: MLOps — Continuous delivery and automation pipelines in ML
  • Google Developers: ML pipelines (data, training, serving)
  • DataOps School: Comprehensive Tutorial on Prefect in DataOps

Parte 1 — Investigación: Conceptos Fundamentales de Prefect (15 min)

Antes de escribir código, investiguen la documentación oficial de Prefect y respondan las siguientes preguntas. Deben incluir citas o referencias específicas de la documentación.

1.1 Tasks en Prefect

Lean la documentación oficial: Prefect Tasks

Respondan en sus propias palabras:

  1. ¿Qué es una Task en Prefect? Expliquen con sus palabras qué representa y cuándo usarla.

Respuesta: _____

  1. ¿Qué significa que las Tasks sean "lazily evaluated"? ¿Cómo afecta esto la ejecución?

Respuesta: _____

  1. ¿Qué son los Task States? Listen al menos 4 estados posibles y expliquen cuándo ocurre cada uno.
Estado ¿Cuándo ocurre?
_____ _____
_____ _____
_____ _____
_____ _____
  1. ¿Qué parámetros importantes tiene el decorador @task? Investiguen y describan al menos 3:
Parámetro ¿Qué hace? Ejemplo de uso
_____ _____ _____
_____ _____ _____
_____ _____ _____

1.2 Flows en Prefect

Lean la documentación oficial: Prefect Flows

Respondan:

  1. ¿Cuál es la diferencia entre un Flow y una Task? ¿Por qué necesitamos ambos?

Respuesta: _____

  1. ¿Qué es un "subflow"? ¿Cuándo sería útil usar subflows?

Respuesta: _____

  1. ¿Cómo maneja Prefect las dependencias entre tasks? Expliquen el concepto de DAG implícito.

Respuesta: _____

1.3 Investigación avanzada: Results y Caching

Lean: Prefect Results y Caching

  1. ¿Qué es el "result persistence"? ¿Por qué es importante en pipelines de datos?

Respuesta: _____

  1. ¿Cómo funciona el caching en Prefect? ¿Qué parámetro usarían para cachear el resultado de una task?

Respuesta: _____

  1. ¿Qué es una cache_key_fn? Den un ejemplo de cuándo la usarían.

Respuesta: _____


Parte 2 — Diseño Conceptual (5 min)

Definan en equipo un escenario simple para su pipeline:

Ejemplos: "Clicks de una campaña de marketing", "ventas del kiosco", "logs de una API", "transacciones e-commerce".

2.1 Arquitectura del escenario

Rol ¿Quién sería en su escenario?
Business data owner _____
Data engineers _____
Data consumers _____

2.2 Tipo de pipeline

  • Tipo elegido (batch/streaming): _____
  • Justificación: _____

Parte 3 — Implementación del Pipeline Base (20 min)

3.1 Setup

# Instalación de Prefect
!pip install -q prefect pandas

# Importar librerías
from prefect import flow, task
import pandas as pd
import numpy as np
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

print("✅ Entorno configurado correctamente")
print(f"📅 Fecha: {datetime.now().strftime('%Y-%m-%d %H:%M')}")

3.2 Implementar Tasks

Basándose en lo que investigaron en la Parte 1, implementen las tasks:

# === TASK 1: EXTRACT ===
# TODO: Agregar el decorador correcto basándose en la documentación
# Investiguen: ¿qué parámetros adicionales podrían ser útiles aquí?
@_____
def extract_data():
    """
    Extrae datos de la fuente.
    """
    np.random.seed(42)
    n_rows = 100

    data = {
        'fecha': pd.date_range(start='2024-01-01', periods=n_rows, freq='D'),
        'producto': np.random.choice(['A', 'B', 'C', 'D'], n_rows),
        'cantidad': np.random.randint(1, 50, n_rows),
        'precio_unitario': np.random.uniform(10, 100, n_rows).round(2),
        'region': np.random.choice(['Norte', 'Sur', 'Este', 'Oeste'], n_rows)
    }

    df = pd.DataFrame(data)
    print(f"📥 Extraídos {len(df)} registros")
    return df


# === TASK 2: TRANSFORM ===
@_____
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Aplica transformaciones a los datos.
    """
    df['total'] = df['cantidad'] * df['precio_unitario']

    df['ticket_size'] = pd.cut(
        df['total'],
        bins=[0, 100, 500, float('inf')],
        labels=['small', 'medium', 'large']
    )

    print(f"🔄 Transformados {len(df)} registros")
    return df


# === TASK 3: LOAD ===
@_____
def load_data(df: pd.DataFrame, output_path: str = "output.csv"):
    """
    Carga los datos al destino final.
    """
    df.to_csv(output_path, index=False)
    print(f"💾 Guardados {len(df)} registros en {output_path}")

3.3 Implementar Flow

# === FLOW: Orquestador del pipeline ===
@_____
def etl_flow():
    """
    Flow principal que orquesta las tasks ETL.
    """
    df_raw = extract_data()
    df_clean = transform_data(df_raw)
    load_data(df_clean)

    print("\n✅ Pipeline ETL completado exitosamente!")
    return df_clean


# === EJECUTAR ===
if __name__ == "__main__":
    resultado = etl_flow()

3.4 Preguntas de observación

Después de ejecutar el pipeline, respondan:

  1. ¿Qué información muestra Prefect en los logs? Copien un fragmento relevante y expliquen qué significa.
[Pegar logs aquí]

Explicación: _____

  1. ¿En qué orden se ejecutaron las tasks? ¿Cómo lo infiere Prefect?

Respuesta: _____

  1. ¿Qué pasaría si una task falla? Investiguen en la documentación qué estados tendría el flow.

Respuesta: _____


Parte 4 — Investigación: Funcionalidades Avanzadas (15 min)

Investiguen las siguientes funcionalidades en la documentación y implementen al menos UNA en su pipeline.

4.1 Retries y manejo de errores

Documentación: Task Retries

Investigación requerida:

  1. ¿Qué parámetros controlan los retries? Describan cada uno:
Parámetro Descripción Valor por defecto
retries _____ _____
retry_delay_seconds _____ _____
retry_jitter_factor _____ _____
  1. ¿Qué es "exponential backoff"? ¿Cómo lo implementarían?

Respuesta: _____

Implementación:

# TODO: Implementar una task con retries
# Deben usar los parámetros que investigaron
@task(retries=_____, retry_delay_seconds=_____)
def extract_data_with_retry():
    """Task con reintentos automáticos."""
    # Simular fallo aleatorio para probar retries
    if np.random.random() < 0.5:
        raise Exception("Error simulado de conexión")
    return extract_data()

4.2 Caching de resultados

Documentación: Task Caching

Investigación requerida:

  1. ¿Qué es cache_expiration? ¿Cómo se especifica?

Respuesta: _____

  1. ¿Cuándo es útil cachear una task? Den 2 ejemplos de su escenario.

  2. Ejemplo 1: _____

  3. Ejemplo 2: _____

  4. ¿Qué pasa si los inputs de la task cambian? ¿Se usa el cache?

Respuesta: _____

Implementación:

from datetime import timedelta

# TODO: Implementar caching en la task de extracción
@task(cache_expiration=timedelta(_____=_____))  # investigar unidades válidas
def extract_data_cached():
    """Task con caching - no re-ejecuta si ya corrió recientemente."""
    print("⏳ Ejecutando extracción (esto no debería aparecer si está cacheado)")
    return extract_data()

4.3 Logging personalizado

Documentación: Prefect Logging

Investigación requerida:

  1. ¿Cómo se accede al logger de Prefect dentro de una task?

Respuesta: _____

  1. ¿Qué niveles de log soporta Prefect? Listen al menos 4.





  2. ¿Cómo configurarían el nivel de log para ver más detalle?

Respuesta: _____

Implementación:

from prefect import get_run_logger

@task
def transform_data_with_logging(df: pd.DataFrame) -> pd.DataFrame:
    """Task con logging estructurado."""
    logger = _____  # obtener el logger de Prefect

    logger._____(f"Iniciando transformación de {len(df)} registros")  # nivel info

    df['total'] = df['cantidad'] * df['precio_unitario']

    # Log de estadísticas
    logger._____(f"Total ventas: ${df['total'].sum():,.2f}")  # nivel info
    logger._____(f"Detalle por región: {df.groupby('region')['total'].sum().to_dict()}")  # nivel debug

    return df

4.4 Concurrencia y paralelismo

Documentación: Task Runners

Investigación requerida:

  1. ¿Qué es un Task Runner? ¿Cuál es el default?

Respuesta: _____

  1. ¿Qué Task Runners ofrece Prefect? Describan al menos 2:
Task Runner ¿Cuándo usarlo?
_____ _____
_____ _____
  1. ¿Cómo ejecutarían tasks en paralelo? Investiguen .submit() y .map().

  2. .submit(): _____

  3. .map(): _____

Implementación (opcional pero recomendada):

from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner

@task
def process_region(region: str, df: pd.DataFrame) -> dict:
    """Procesa datos de una región específica."""
    df_region = df[df['region'] == region]
    return {
        'region': region,
        'total': df_region['total'].sum(),
        'count': len(df_region)
    }

@flow(task_runner=_____)  # usar el task runner para concurrencia
def etl_flow_parallel():
    df_raw = extract_data()
    df_clean = transform_data(df_raw)

    # Procesar cada región en paralelo
    regiones = ['Norte', 'Sur', 'Este', 'Oeste']
    futures = [process_region._____(r, df_clean) for r in regiones]  # método para ejecución async

    # Esperar resultados
    results = [f._____() for f in futures]  # método para obtener resultado

    print(f"📊 Resultados por región: {results}")
    return results

Parte 5 — Investigación: Deployments y Scheduling (10 min)

Documentación: Deployments y Schedules

5.1 Conceptos de Deployment

Respondan basándose en la documentación:

  1. ¿Qué es un Deployment en Prefect? ¿Cuál es la diferencia entre un Flow y un Deployment?

Respuesta: _____

  1. ¿Qué es un Work Pool? ¿Para qué sirve?

Respuesta: _____

  1. ¿Qué es un Worker? ¿Cómo se relaciona con el Work Pool?

Respuesta: _____

5.2 Scheduling

Investiguen las opciones de scheduling:

  1. ¿Qué tipos de schedules soporta Prefect? Describan al menos 3:
Tipo de Schedule Descripción Ejemplo
_____ _____ _____
_____ _____ _____
_____ _____ _____
  1. ¿Cómo expresarían "ejecutar todos los días a las 6 AM" en cron?

Respuesta: _____

  1. ¿Qué es RRuleSchedule? ¿Cuándo lo usarían sobre cron?

Respuesta: _____

5.3 Crear un Deployment (conceptual)

Basándose en la documentación, escriban el código para crear un deployment de su flow:

# TODO: Completar basándose en la documentación de Deployments
# https://docs.prefect.io/latest/concepts/deployments/

from prefect import flow

# Opción 1: Usando serve() - más simple
if __name__ == "__main__":
    etl_flow.serve(
        name="_____",  # nombre del deployment
        cron="_____",  # schedule en formato cron
        tags=["_____", "_____"],  # tags para organización
    )

# Opción 2: Usando deploy() - más control
# etl_flow.deploy(
#     name="_____",
#     work_pool_name="_____",
#     cron="_____",
# )

Parte 6 — Extensión DataOps (15 min)

Elijan UNA extensión e impleméntenla. Deben incluir comentarios explicando qué hace cada parte basándose en la documentación que investigaron.

Opción A — Validación con logging estructurado

from prefect import get_run_logger

@task(retries=_____, retry_delay_seconds=_____)  # agregar retries
def validate_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Valida la calidad de los datos.
    Usa logging estructurado de Prefect.
    """
    logger = get_run_logger()
    errors = []

    # TODO: Implementar validaciones con logging apropiado
    # Usar logger.info(), logger.warning(), logger.error()

    logger._____("Iniciando validación de datos")

    if len(df) <= 0:
        logger._____("DataFrame vacío detectado")
        errors.append("DataFrame vacío")

    null_counts = df.isnull().sum()
    if null_counts.sum() > 0:
        logger._____(f"Valores nulos encontrados: {null_counts.to_dict()}")

    if errors:
        raise ValueError(f"Validación fallida: {errors}")

    logger._____("✅ Validación exitosa")
    return df

Opción B — Flow parametrizado con caching

from datetime import timedelta

@task(cache_expiration=timedelta(minutes=_____))  # cachear por N minutos
def extract_data_param(n_rows: int = 100):
    """Extract con caching - investigar cuándo se invalida el cache."""
    # TODO: Implementar
    pass

@flow
def etl_flow_parametrized(
    min_amount: float = 0.0,
    output_path: str = "output.csv",
    n_rows: int = 100
):
    """
    Flow parametrizado.
    Investigar: ¿cómo afectan los parámetros al caching?
    """
    # TODO: Implementar usando las tasks con cache
    pass

Opción C — Pipeline con concurrencia

from prefect.task_runners import ConcurrentTaskRunner

@flow(task_runner=ConcurrentTaskRunner())
def etl_flow_concurrent():
    """
    Flow con procesamiento paralelo por región.
    Investigar: ¿cuándo es útil vs. secuencial?
    """
    # TODO: Implementar procesamiento paralelo
    # Usar .submit() para las tasks
    pass

Parte 7 — Reflexión y Conexión con DataOps (5 min)

7.1 Conceptos de Prefect

Basándose en su investigación, expliquen:

  1. ¿Cómo ayuda Prefect a implementar el principio de "Observabilidad" de DataOps?

Respuesta: _____

  1. ¿Cómo ayuda el caching a la "Reproducibilidad"?

Respuesta: _____

  1. ¿Cómo conectan los Deployments con "CI/CD para datos"?

Respuesta: _____

7.2 Comparación con alternativas

Investiguen brevemente (pueden usar la web):

  1. ¿Qué diferencias hay entre Prefect y Apache Airflow? Mencionen al menos 2.

  2. Diferencia 1: _____

  3. Diferencia 2: _____

  4. ¿Qué es Dagster? ¿En qué se diferencia de Prefect?

Respuesta: _____


Entregable

1. Código Prefect

  • Pipeline base funcionando (@flow + @task)
  • Al menos UNA extensión implementada (A, B o C)
  • Comentarios explicando las decisiones basadas en la documentación

2. Documento de investigación

Incluir en el notebook o en un .md separado:

  • Respuestas a todas las preguntas de investigación (Partes 1, 4, 5)
  • Citas/referencias a la documentación oficial
  • Reflexiones (Parte 7)

3. Evidencia de ejecución

  • Screenshots o logs mostrando:
  • Ejecución exitosa del flow
  • Logs de Prefect con estados de las tasks
  • (Opcional) UI de Prefect si la exploraron

Rúbrica

Criterio Peso Descripción
Investigación documentada 30% Respuestas completas basadas en la documentación oficial. Se nota que leyeron y entendieron los conceptos.
Implementación técnica 40% Pipeline funciona correctamente. Extensión implementada usa las funcionalidades investigadas.
Conexión DataOps/ML 20% Reflexiones muestran comprensión de cómo Prefect habilita principios de DataOps.
Calidad del código 10% Código limpio, comentado, con explicaciones de las decisiones tomadas.

Recursos adicionales para investigar


"La mejor forma de aprender una herramienta es leer su documentación oficial. Los tutoriales te dan el 'qué', la documentación te da el 'por qué' y el 'cómo'."