Tarea 15: Pipelines ETL, DataOps y Orquestación con Prefect¶
Ingeniería de Datos — Universidad Católica del Uruguay
Objetivo: Diseñar e implementar un mini pipeline ETL con Prefect, investigando la documentación oficial para comprender los conceptos fundamentales y explorar funcionalidades avanzadas del orquestador.
Tiempo estimado: 90–120 minutos
Lecturas mínimas (recuerdo)¶
- Google Cloud: Building the data engineering driven organization
- Google Cloud: Building streaming data pipelines
- Google Cloud Docs: MLOps — Continuous delivery and automation pipelines in ML
- Google Developers: ML pipelines (data, training, serving)
- DataOps School: Comprehensive Tutorial on Prefect in DataOps
Parte 1 — Investigación: Conceptos Fundamentales de Prefect (15 min)¶
Antes de escribir código, investiguen la documentación oficial de Prefect y respondan las siguientes preguntas. Deben incluir citas o referencias específicas de la documentación.
1.1 Tasks en Prefect¶
Lean la documentación oficial: Prefect Tasks
Respondan en sus propias palabras:
- ¿Qué es una Task en Prefect? Expliquen con sus palabras qué representa y cuándo usarla.
Respuesta: _____
- ¿Qué significa que las Tasks sean "lazily evaluated"? ¿Cómo afecta esto la ejecución?
Respuesta: _____
- ¿Qué son los Task States? Listen al menos 4 estados posibles y expliquen cuándo ocurre cada uno.
| Estado | ¿Cuándo ocurre? |
|---|---|
| _____ | _____ |
| _____ | _____ |
| _____ | _____ |
| _____ | _____ |
- ¿Qué parámetros importantes tiene el decorador
@task? Investiguen y describan al menos 3:
| Parámetro | ¿Qué hace? | Ejemplo de uso |
|---|---|---|
| _____ | _____ | _____ |
| _____ | _____ | _____ |
| _____ | _____ | _____ |
1.2 Flows en Prefect¶
Lean la documentación oficial: Prefect Flows
Respondan:
- ¿Cuál es la diferencia entre un Flow y una Task? ¿Por qué necesitamos ambos?
Respuesta: _____
- ¿Qué es un "subflow"? ¿Cuándo sería útil usar subflows?
Respuesta: _____
- ¿Cómo maneja Prefect las dependencias entre tasks? Expliquen el concepto de DAG implícito.
Respuesta: _____
1.3 Investigación avanzada: Results y Caching¶
Lean: Prefect Results y Caching
- ¿Qué es el "result persistence"? ¿Por qué es importante en pipelines de datos?
Respuesta: _____
- ¿Cómo funciona el caching en Prefect? ¿Qué parámetro usarían para cachear el resultado de una task?
Respuesta: _____
- ¿Qué es una
cache_key_fn? Den un ejemplo de cuándo la usarían.
Respuesta: _____
Parte 2 — Diseño Conceptual (5 min)¶
Definan en equipo un escenario simple para su pipeline:
Ejemplos: "Clicks de una campaña de marketing", "ventas del kiosco", "logs de una API", "transacciones e-commerce".
2.1 Arquitectura del escenario¶
| Rol | ¿Quién sería en su escenario? |
|---|---|
| Business data owner | _____ |
| Data engineers | _____ |
| Data consumers | _____ |
2.2 Tipo de pipeline¶
- Tipo elegido (batch/streaming): _____
- Justificación: _____
Parte 3 — Implementación del Pipeline Base (20 min)¶
3.1 Setup¶
# Instalación de Prefect
!pip install -q prefect pandas
# Importar librerías
from prefect import flow, task
import pandas as pd
import numpy as np
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')
print("✅ Entorno configurado correctamente")
print(f"📅 Fecha: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
3.2 Implementar Tasks¶
Basándose en lo que investigaron en la Parte 1, implementen las tasks:
# === TASK 1: EXTRACT ===
# TODO: Agregar el decorador correcto basándose en la documentación
# Investiguen: ¿qué parámetros adicionales podrían ser útiles aquí?
@_____
def extract_data():
"""
Extrae datos de la fuente.
"""
np.random.seed(42)
n_rows = 100
data = {
'fecha': pd.date_range(start='2024-01-01', periods=n_rows, freq='D'),
'producto': np.random.choice(['A', 'B', 'C', 'D'], n_rows),
'cantidad': np.random.randint(1, 50, n_rows),
'precio_unitario': np.random.uniform(10, 100, n_rows).round(2),
'region': np.random.choice(['Norte', 'Sur', 'Este', 'Oeste'], n_rows)
}
df = pd.DataFrame(data)
print(f"📥 Extraídos {len(df)} registros")
return df
# === TASK 2: TRANSFORM ===
@_____
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
"""
Aplica transformaciones a los datos.
"""
df['total'] = df['cantidad'] * df['precio_unitario']
df['ticket_size'] = pd.cut(
df['total'],
bins=[0, 100, 500, float('inf')],
labels=['small', 'medium', 'large']
)
print(f"🔄 Transformados {len(df)} registros")
return df
# === TASK 3: LOAD ===
@_____
def load_data(df: pd.DataFrame, output_path: str = "output.csv"):
"""
Carga los datos al destino final.
"""
df.to_csv(output_path, index=False)
print(f"💾 Guardados {len(df)} registros en {output_path}")
3.3 Implementar Flow¶
# === FLOW: Orquestador del pipeline ===
@_____
def etl_flow():
"""
Flow principal que orquesta las tasks ETL.
"""
df_raw = extract_data()
df_clean = transform_data(df_raw)
load_data(df_clean)
print("\n✅ Pipeline ETL completado exitosamente!")
return df_clean
# === EJECUTAR ===
if __name__ == "__main__":
resultado = etl_flow()
3.4 Preguntas de observación¶
Después de ejecutar el pipeline, respondan:
- ¿Qué información muestra Prefect en los logs? Copien un fragmento relevante y expliquen qué significa.
[Pegar logs aquí]
Explicación: _____
- ¿En qué orden se ejecutaron las tasks? ¿Cómo lo infiere Prefect?
Respuesta: _____
- ¿Qué pasaría si una task falla? Investiguen en la documentación qué estados tendría el flow.
Respuesta: _____
Parte 4 — Investigación: Funcionalidades Avanzadas (15 min)¶
Investiguen las siguientes funcionalidades en la documentación y implementen al menos UNA en su pipeline.
4.1 Retries y manejo de errores¶
Documentación: Task Retries
Investigación requerida:
- ¿Qué parámetros controlan los retries? Describan cada uno:
| Parámetro | Descripción | Valor por defecto |
|---|---|---|
retries |
_____ | _____ |
retry_delay_seconds |
_____ | _____ |
retry_jitter_factor |
_____ | _____ |
- ¿Qué es "exponential backoff"? ¿Cómo lo implementarían?
Respuesta: _____
Implementación:
# TODO: Implementar una task con retries
# Deben usar los parámetros que investigaron
@task(retries=_____, retry_delay_seconds=_____)
def extract_data_with_retry():
"""Task con reintentos automáticos."""
# Simular fallo aleatorio para probar retries
if np.random.random() < 0.5:
raise Exception("Error simulado de conexión")
return extract_data()
4.2 Caching de resultados¶
Documentación: Task Caching
Investigación requerida:
- ¿Qué es
cache_expiration? ¿Cómo se especifica?
Respuesta: _____
-
¿Cuándo es útil cachear una task? Den 2 ejemplos de su escenario.
-
Ejemplo 1: _____
-
Ejemplo 2: _____
-
¿Qué pasa si los inputs de la task cambian? ¿Se usa el cache?
Respuesta: _____
Implementación:
from datetime import timedelta
# TODO: Implementar caching en la task de extracción
@task(cache_expiration=timedelta(_____=_____)) # investigar unidades válidas
def extract_data_cached():
"""Task con caching - no re-ejecuta si ya corrió recientemente."""
print("⏳ Ejecutando extracción (esto no debería aparecer si está cacheado)")
return extract_data()
4.3 Logging personalizado¶
Documentación: Prefect Logging
Investigación requerida:
- ¿Cómo se accede al logger de Prefect dentro de una task?
Respuesta: _____
-
¿Qué niveles de log soporta Prefect? Listen al menos 4.
-
-
-
-
-
¿Cómo configurarían el nivel de log para ver más detalle?
Respuesta: _____
Implementación:
from prefect import get_run_logger
@task
def transform_data_with_logging(df: pd.DataFrame) -> pd.DataFrame:
"""Task con logging estructurado."""
logger = _____ # obtener el logger de Prefect
logger._____(f"Iniciando transformación de {len(df)} registros") # nivel info
df['total'] = df['cantidad'] * df['precio_unitario']
# Log de estadísticas
logger._____(f"Total ventas: ${df['total'].sum():,.2f}") # nivel info
logger._____(f"Detalle por región: {df.groupby('region')['total'].sum().to_dict()}") # nivel debug
return df
4.4 Concurrencia y paralelismo¶
Documentación: Task Runners
Investigación requerida:
- ¿Qué es un Task Runner? ¿Cuál es el default?
Respuesta: _____
- ¿Qué Task Runners ofrece Prefect? Describan al menos 2:
| Task Runner | ¿Cuándo usarlo? |
|---|---|
| _____ | _____ |
| _____ | _____ |
-
¿Cómo ejecutarían tasks en paralelo? Investiguen
.submit()y.map(). -
.submit(): _____ .map(): _____
Implementación (opcional pero recomendada):
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@task
def process_region(region: str, df: pd.DataFrame) -> dict:
"""Procesa datos de una región específica."""
df_region = df[df['region'] == region]
return {
'region': region,
'total': df_region['total'].sum(),
'count': len(df_region)
}
@flow(task_runner=_____) # usar el task runner para concurrencia
def etl_flow_parallel():
df_raw = extract_data()
df_clean = transform_data(df_raw)
# Procesar cada región en paralelo
regiones = ['Norte', 'Sur', 'Este', 'Oeste']
futures = [process_region._____(r, df_clean) for r in regiones] # método para ejecución async
# Esperar resultados
results = [f._____() for f in futures] # método para obtener resultado
print(f"📊 Resultados por región: {results}")
return results
Parte 5 — Investigación: Deployments y Scheduling (10 min)¶
Documentación: Deployments y Schedules
5.1 Conceptos de Deployment¶
Respondan basándose en la documentación:
- ¿Qué es un Deployment en Prefect? ¿Cuál es la diferencia entre un Flow y un Deployment?
Respuesta: _____
- ¿Qué es un Work Pool? ¿Para qué sirve?
Respuesta: _____
- ¿Qué es un Worker? ¿Cómo se relaciona con el Work Pool?
Respuesta: _____
5.2 Scheduling¶
Investiguen las opciones de scheduling:
- ¿Qué tipos de schedules soporta Prefect? Describan al menos 3:
| Tipo de Schedule | Descripción | Ejemplo |
|---|---|---|
| _____ | _____ | _____ |
| _____ | _____ | _____ |
| _____ | _____ | _____ |
- ¿Cómo expresarían "ejecutar todos los días a las 6 AM" en cron?
Respuesta: _____
- ¿Qué es
RRuleSchedule? ¿Cuándo lo usarían sobre cron?
Respuesta: _____
5.3 Crear un Deployment (conceptual)¶
Basándose en la documentación, escriban el código para crear un deployment de su flow:
# TODO: Completar basándose en la documentación de Deployments
# https://docs.prefect.io/latest/concepts/deployments/
from prefect import flow
# Opción 1: Usando serve() - más simple
if __name__ == "__main__":
etl_flow.serve(
name="_____", # nombre del deployment
cron="_____", # schedule en formato cron
tags=["_____", "_____"], # tags para organización
)
# Opción 2: Usando deploy() - más control
# etl_flow.deploy(
# name="_____",
# work_pool_name="_____",
# cron="_____",
# )
Parte 6 — Extensión DataOps (15 min)¶
Elijan UNA extensión e impleméntenla. Deben incluir comentarios explicando qué hace cada parte basándose en la documentación que investigaron.
Opción A — Validación con logging estructurado¶
from prefect import get_run_logger
@task(retries=_____, retry_delay_seconds=_____) # agregar retries
def validate_data(df: pd.DataFrame) -> pd.DataFrame:
"""
Valida la calidad de los datos.
Usa logging estructurado de Prefect.
"""
logger = get_run_logger()
errors = []
# TODO: Implementar validaciones con logging apropiado
# Usar logger.info(), logger.warning(), logger.error()
logger._____("Iniciando validación de datos")
if len(df) <= 0:
logger._____("DataFrame vacío detectado")
errors.append("DataFrame vacío")
null_counts = df.isnull().sum()
if null_counts.sum() > 0:
logger._____(f"Valores nulos encontrados: {null_counts.to_dict()}")
if errors:
raise ValueError(f"Validación fallida: {errors}")
logger._____("✅ Validación exitosa")
return df
Opción B — Flow parametrizado con caching¶
from datetime import timedelta
@task(cache_expiration=timedelta(minutes=_____)) # cachear por N minutos
def extract_data_param(n_rows: int = 100):
"""Extract con caching - investigar cuándo se invalida el cache."""
# TODO: Implementar
pass
@flow
def etl_flow_parametrized(
min_amount: float = 0.0,
output_path: str = "output.csv",
n_rows: int = 100
):
"""
Flow parametrizado.
Investigar: ¿cómo afectan los parámetros al caching?
"""
# TODO: Implementar usando las tasks con cache
pass
Opción C — Pipeline con concurrencia¶
from prefect.task_runners import ConcurrentTaskRunner
@flow(task_runner=ConcurrentTaskRunner())
def etl_flow_concurrent():
"""
Flow con procesamiento paralelo por región.
Investigar: ¿cuándo es útil vs. secuencial?
"""
# TODO: Implementar procesamiento paralelo
# Usar .submit() para las tasks
pass
Parte 7 — Reflexión y Conexión con DataOps (5 min)¶
7.1 Conceptos de Prefect¶
Basándose en su investigación, expliquen:
- ¿Cómo ayuda Prefect a implementar el principio de "Observabilidad" de DataOps?
Respuesta: _____
- ¿Cómo ayuda el caching a la "Reproducibilidad"?
Respuesta: _____
- ¿Cómo conectan los Deployments con "CI/CD para datos"?
Respuesta: _____
7.2 Comparación con alternativas¶
Investiguen brevemente (pueden usar la web):
-
¿Qué diferencias hay entre Prefect y Apache Airflow? Mencionen al menos 2.
-
Diferencia 1: _____
-
Diferencia 2: _____
-
¿Qué es Dagster? ¿En qué se diferencia de Prefect?
Respuesta: _____
Entregable¶
1. Código Prefect¶
- Pipeline base funcionando (
@flow+@task) - Al menos UNA extensión implementada (A, B o C)
- Comentarios explicando las decisiones basadas en la documentación
2. Documento de investigación¶
Incluir en el notebook o en un .md separado:
- Respuestas a todas las preguntas de investigación (Partes 1, 4, 5)
- Citas/referencias a la documentación oficial
- Reflexiones (Parte 7)
3. Evidencia de ejecución¶
- Screenshots o logs mostrando:
- Ejecución exitosa del flow
- Logs de Prefect con estados de las tasks
- (Opcional) UI de Prefect si la exploraron
Rúbrica¶
| Criterio | Peso | Descripción |
|---|---|---|
| Investigación documentada | 30% | Respuestas completas basadas en la documentación oficial. Se nota que leyeron y entendieron los conceptos. |
| Implementación técnica | 40% | Pipeline funciona correctamente. Extensión implementada usa las funcionalidades investigadas. |
| Conexión DataOps/ML | 20% | Reflexiones muestran comprensión de cómo Prefect habilita principios de DataOps. |
| Calidad del código | 10% | Código limpio, comentado, con explicaciones de las decisiones tomadas. |
Recursos adicionales para investigar¶
- Prefect Documentation Home
- Prefect Concepts Overview
- Prefect Tutorial
- Prefect Guides
- Prefect API Reference
- Prefect GitHub Examples
- Prefect Community Slack
"La mejor forma de aprender una herramienta es leer su documentación oficial. Los tutoriales te dan el 'qué', la documentación te da el 'por qué' y el 'cómo'."