Saltar a contenido

🔗 Tarea 4: EDA Multi-fuentes y Joins - Fill in the Blanks

UT1: Análisis Exploratorio de Datos | Práctica Guiada

🎯 Objetivos Básicos

  • Aprender a integrar datos de múltiples fuentes
  • Dominar los diferentes tipos de joins con pandas
  • Realizar análisis agregados con groupby
  • Crear reportes consolidados de datos integrados

📋 Lo que necesitas saber ANTES de empezar

  • Conceptos básicos de pandas (read_csv, DataFrames)
  • Idea general de bases de datos relacionales
  • Conceptos básicos de joins (inner, left, right, outer)

🔧 Paso 1: Setup Inicial

📋 CONTEXTO DE NEGOCIO (CRISP-DM: Business Understanding)

🔗 Referencias oficiales:

🚕 Caso de negocio:

  • Problema: La comisión de taxis de NYC necesita análisis en tiempo real de 3+ millones de viajes mensuales
  • Objetivo: Integrar datos oficiales completos para entender patrones metropolitanos reales
  • Escala: ~3M viajes, 265 zonas, múltiples boroughs, datos en tiempo real
  • Variables: Viajes oficiales NYC (enero 2023), zonas geográficas completas, eventos calendario
  • Valor para el negocio: Decisiones basadas en datos reales a escala metropolitana
# Importar librerías que vamos a usar
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import sqlite3
from pathlib import Path

# Configurar visualizaciones
plt.style.use('default')
sns.set_palette('husl')
plt.rcParams['figure.figsize'] = (10, 6)

print("✅ Setup completo para análisis multi-fuentes!")

🚕 Paso 2: Carga de Datos desde Múltiples Fuentes

# === CARGAR DATOS DE MÚLTIPLES FUENTES ===

# 1. Cargar datos de viajes desde Parquet (Dataset oficial completo NYC)
print("Cargando datos oficiales de NYC Taxi (dataset completo)...")
trips_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet"

# Cargar dataset oficial (~3M registros de enero 2023)
trips = pd._______(trips_url)  # función para leer archivos .parquet (más eficiente que CSV)

print(f"   Viajes cargados: {trips.shape[0]:,} filas, {trips.shape[1]} columnas")
print(f"   Columnas: {list(trips.columns)}")
print(f"   Período: {trips['tpep_pickup_datetime'].min()} a {trips['tpep_pickup_datetime'].max()}")
print(f"   Tamaño en memoria: {trips.memory_usage(deep=True).sum() / 1024**2:.1f} MB")

# 2. Cargar datos de zonas desde CSV (Dataset oficial completo)
print("\nCargando datos oficiales de zonas NYC...")
zones_url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv"
zones = pd._______(zones_url)  # función estándar para archivos CSV

print(f"   Zonas cargadas: {zones.shape[0]} filas, {zones.shape[1]} columnas")
print(f"   Columnas: {list(zones.columns)}")
print(f"   Boroughs únicos: {zones['Borough'].unique()}")

# 3. Cargar calendario de eventos desde JSON 
print("\nCargando datos de calendario de eventos...")
calendar_url = "https://juanfkurucz.com/ucu-id/ut1/data/calendar.json"
calendar = pd._______(calendar_url)  # función para archivos JSON
calendar['date'] = pd._______(calendar['date']).dt.date  # convertir strings a fechas, luego extraer solo la fecha

print(f"   Eventos calendario: {calendar.shape[0]} filas")
print(f"   Columnas: {list(calendar.columns)}")

# 4. Mostrar primeras filas de cada dataset
print("\nVISTA PREVIA DE DATOS:")
print("\n--- TRIPS ---")
print(trips._____())  # método para mostrar primeras filas de un DataFrame
print("\n--- ZONES ---")
print(zones._____())  # mismo método para ver estructura de datos
print("\n--- CALENDAR ---")
print(calendar._____())  # revisar formato de los eventos

💡 PISTAS:

  • 📊 ¿Qué función lee CSV? Documentación
  • 📅 ¿Qué parámetro de read_csv convierte automáticamente fechas?
  • 🗂️ ¿Qué función lee JSON? Documentación
  • 📆 ¿Qué función convierte strings a fechas? Documentación
  • 👁️ ¿Qué método muestra las primeras filas?

🧹 Paso 3: Normalización de Datos

# === NORMALIZAR Y PREPARAR DATOS PARA JOINS ===

# 1. Estandarizar nombres de columnas
print("Normalizando nombres de columnas...")
trips.columns = trips.columns.str._______()  # convertir todas las columnas a minúsculas
zones.columns = zones.columns.str._______()  # misma transformación para consistencia

print(f"   Trips columnas: {list(trips.columns)}")
print(f"   Zones columnas: {list(zones.columns)}")

# 2. Crear columna de fecha para el join con calendario
trips['pickup_date'] = trips['_______'].dt.date  # extraer solo la fecha (sin hora) de la columna datetime

print(f"   Columna pickup_date creada")
print(f"   Rango de fechas: {trips['pickup_date'].min()} a {trips['pickup_date'].max()}")

# 3. Verificar tipos de datos para joins
print("\nVERIFICACIÓN DE TIPOS PARA JOINS:")
print(f"   trips['pulocationid'] tipo: {trips['pulocationid'].dtype}")
print(f"   zones['locationid'] tipo: {zones['locationid'].dtype}")
print(f"   trips['pickup_date'] tipo: {type(trips['pickup_date'].iloc[0])}")
print(f"   calendar['date'] tipo: {type(calendar['date'].iloc[0])}")

# 4. Optimización para datasets grandes (~3M registros)
print("\nOPTIMIZACIÓN PARA DATASETS GRANDES:")
initial_memory = trips.memory_usage(deep=True).sum() / 1024**2
print(f"   Memoria inicial: {initial_memory:.1f} MB")

# Optimizar tipos de datos para 3+ millones de registros
print("   Optimizando tipos de datos para 3M+ registros...")

# Limpiar valores nulos antes de convertir tipos
print("   Limpiando valores nulos antes de optimización...")
trips['passenger_count'] = trips['passenger_count']._______()  # método para rellenar valores nulos con un valor específico
trips = trips.dropna(subset=['pulocationid', 'dolocationid'])  # eliminar filas críticas sin ubicación (necesarias para joins)

# Convertir tipos después de limpiar
trips['pulocationid'] = trips['pulocationid'].astype('int16')
trips['dolocationid'] = trips['dolocationid'].astype('int16') 
trips['passenger_count'] = trips['passenger_count'].astype('int8')
zones['locationid'] = zones['locationid'].astype('int16')

print(f"   Registros después de limpieza: {len(trips):,}")

optimized_memory = trips.memory_usage(deep=True).sum() / 1024**2
savings = ((initial_memory - optimized_memory) / initial_memory * 100)

print(f"   Memoria optimizada: {optimized_memory:.1f} MB")
print(f"   Ahorro de memoria: {savings:.1f}%")

# 5. Revisar datos faltantes antes de joins
print("\nDATOS FALTANTES ANTES DE JOINS:")
print("Trips (top 5 columnas con más nulos):")
trips_nulls = trips._______().sum().sort_values(ascending=False).head()  # método para detectar valores nulos, sumar y ordenar
print(trips_nulls)

print("\nZones:")
zones_nulls = zones._______().sum()  # revisar si hay valores faltantes en lookup table
print(zones_nulls)

print("\nCalendar:")
calendar_nulls = calendar._______().sum()  # verificar integridad del calendario de eventos
print(calendar_nulls)

# Análisis de calidad de datos
print("\nANÁLISIS DE CALIDAD:")
total_trips = len(trips)
print(f"   Total de viajes: {total_trips:,}")
print(f"   Viajes sin pickup location: {trips['pulocationid'].isna().sum():,}")
print(f"   Viajes sin dropoff location: {trips['dolocationid'].isna().sum():,}")
print(f"   Viajes sin passenger_count: {trips['passenger_count'].isna().sum():,}")

# Estrategias de limpieza recomendadas
print("\nESTRATEGIAS DE LIMPIEZA:")
print("   Ubicaciones nulas: Eliminar (crítico para joins)")
print("   Passenger_count nulos: Rellenar con valor típico (1)")
print("   Tarifas nulas: Revisar caso por caso")

💡 PISTAS:

  • 🔤 ¿Qué método convierte strings a minúsculas? Documentación
  • 📅 ¿Qué columna tiene las fechas de pickup? (Revisa la salida anterior: tpep_pickup_datetime)
  • 🔍 ¿Qué método detecta valores nulos? Documentación
  • 🧹 ¿Qué método rellena valores nulos? Documentación
  • 📊 Para passenger_count nulos, ¿qué valor típico usarías? (Pista: la mayoría de taxis llevan 1 pasajero)
  • 🎯 ¿Qué función lee archivos Parquet? Documentación

🔗 Paso 4: Join Principal - Trips con Zones

# === PRIMER JOIN: TRIPS + ZONES ===

# 1. Hacer join de trips con zones para obtener información geográfica
print("Realizando join: trips + zones...")
trips_with_zones = trips._______(zones,   # método principal para unir DataFrames
                                left_on='_______',   # columna de trips que contiene ID de zona de pickup
                                right_on='_______',  # columna de zones que contiene ID correspondiente
                                how='_______')       # tipo de join que mantiene todos los trips

print(f"   Registros antes del join: {len(trips)}")
print(f"   Registros después del join: {len(trips_with_zones)}")
print(f"   Nuevas columnas añadidas: {[col for col in trips_with_zones.columns if col not in trips.columns]}")

# 2. Verificar el resultado del join
print("\nVERIFICACIÓN DEL JOIN:")
print("Conteo por Borough:")
print(trips_with_zones['borough'].value_counts())

# 3. Verificar si hay valores nulos después del join
null_after_join = trips_with_zones['borough']._______().sum()  # contar nulos en columna borough
print(f"\nViajes sin borough asignado: {null_after_join}")

if null_after_join > 0:
    print("   Algunos viajes no encontraron su zona correspondiente")
    print("   LocationIDs problemáticos:")
    problematic_ids = trips_with_zones[trips_with_zones['borough']._______()]['PULocationID'].unique()  # filtrar filas con nulos
    print(f"   {problematic_ids}")

# 4. Mostrar muestra del resultado
print("\nMUESTRA DEL DATASET INTEGRADO:")
print(trips_with_zones[['PULocationID', 'borough', 'zone', 'trip_distance', 'total_amount']].head())

💡 PISTAS:

  • 🔗 ¿Qué método hace joins en pandas? Documentación
  • 🗝️ ¿Qué columna de trips contiene el ID de zona? (Revisa las columnas arriba)
  • 🗝️ ¿Qué columna de zones contiene el ID correspondiente?
  • 🔍 ¿Qué tipo de join mantiene todos los registros de la tabla izquierda? ('left', 'right', 'inner', 'outer')
  • ❓ ¿Qué método detecta valores nulos?

📅 Paso 5: Segundo Join - Agregar Datos de Calendario

# === SEGUNDO JOIN: TRIPS_ZONES + CALENDAR ===

# 1. Hacer join con datos de calendario
print("Realizando join: trips_zones + calendar...")
trips_complete = trips_with_zones._______(calendar,   # mismo método de join que antes
                                         left_on='_______',   # columna de fecha que creamos en trips
                                         right_on='_______',  # columna de fecha en calendar
                                         how='_______')       # tipo que mantiene todos los trips aunque no haya evento especial

print(f"   Registros antes del join: {len(trips_with_zones)}")
print(f"   Registros después del join: {len(trips_complete)}")

# 2. Crear flag de evento especial
trips_complete['is_special_day'] = trips_complete['special']._______('False')  # método para rellenar nulos con valor por defecto

print("\nDISTRIBUCIÓN DE DÍAS ESPECIALES:")
print(trips_complete['is_special_day'].value_counts())
print("\nEjemplos de eventos especiales:")
special_days = trips_complete[trips_complete['is_special_day'] == True]
if len(special_days) > 0:
    print(special_days[['pickup_date', 'special', 'borough']].drop_duplicates())
else:
    print("   No hay eventos especiales en este período")

# 3. Mostrar dataset final integrado
print("\nDATASET FINAL INTEGRADO:")
print(f"   Total registros: {len(trips_complete)}")
print(f"   Total columnas: {len(trips_complete.columns)}")
print(f"   Columnas principales: {['borough', 'zone', 'is_special_day', 'trip_distance', 'total_amount']}")

# 4. Verificar integridad de los datos finales
print("\nVERIFICACIÓN FINAL:")
print("Datos faltantes por columna clave:")
key_columns = ['borough', 'zone', 'trip_distance', 'total_amount', 'is_special_day']
for col in key_columns:
    missing = trips_complete[col]._______().sum()  # verificar nulos en cada columna clave final
    print(f"   {col}: {missing} nulos")

💡 PISTAS:

  • 🔗 Usa el mismo método de join que antes
  • 📅 ¿Qué columna tiene la fecha en trips_with_zones?
  • 📅 ¿Qué columna tiene la fecha en calendar?
  • 🔍 ¿Qué tipo de join quieres? (mantener todos los viajes aunque no haya evento especial)
  • 🏷️ ¿Qué método rellena valores nulos? Documentación

📈 Paso 6: Análisis por Borough

# === ANÁLISIS AGREGADO POR BOROUGH ===

# 1. Análisis básico por borough (con dataset grande)
print("Análisis por Borough (procesando datos grandes)...")
borough_analysis = trips_complete._______(by='_______').agg({   # método para agrupar datos, por qué columna geográfica?
    'PULocationID': '_______',  # función para contar número de registros/viajes
    'trip_distance': ['_______', 'std', 'median'],  # función para promedio + desviación + mediana
    'total_amount': ['_______', 'std', 'median'],   # mismas estadísticas para tarifas
    'fare_amount': '_______',     # solo promedio de tarifa base
    'tip_amount': ['mean', 'median'],  # estadísticas de propinas
    'passenger_count': '_______'  # función para promedio de pasajeros
}).round(2)

# Aplanar columnas multi-nivel
borough_analysis.columns = ['num_trips', 'avg_distance', 'std_distance', 'median_distance',
                           'avg_total', 'std_total', 'median_total', 'avg_fare', 
                           'avg_tip', 'median_tip', 'avg_passengers']

# Ordenar por número de viajes
borough_analysis = borough_analysis._______(by='num_trips', ascending=False)  # método para ordenar DataFrame por una columna específica

print("\nANÁLISIS COMPLETO POR BOROUGH:")
print(borough_analysis)

# 2. Calcular métricas adicionales empresariales
borough_analysis['revenue_per_km'] = (borough_analysis['avg_total'] / 
                                     borough_analysis['avg_distance']).round(2)
borough_analysis['tip_rate'] = (borough_analysis['avg_tip'] / 
                               borough_analysis['avg_fare'] * 100).round(1)
borough_analysis['market_share'] = (borough_analysis['num_trips'] / 
                                  borough_analysis['num_trips'].sum() * 100).round(1)

print("\nANÁLISIS CON MÉTRICAS EMPRESARIALES:")
print(borough_analysis[['num_trips', 'market_share', 'revenue_per_km', 'tip_rate']])

# 3. Encontrar insights
print("\nINSIGHTS PRINCIPALES:")
print(f"   Borough con más viajes: {borough_analysis.index[0]}")
print(f"   Borough con viajes más largos: {borough_analysis['avg_distance'].idxmax()}")
print(f"   Borough con tarifas más altas: {borough_analysis['avg_total'].idxmax()}")
print(f"   Mejor revenue por km: {borough_analysis['revenue_per_km'].idxmax()}")

💡 PISTAS:

  • 👥 ¿Qué método agrupa datos? Documentación
  • 🏙️ ¿Por qué columna quieres agrupar?
  • 🔢 ¿Qué función cuenta registros? ('count', 'sum', 'mean')
  • 📊 ¿Qué función calcula promedios? ('count', 'sum', 'mean')
  • 📈 ¿Qué método ordena DataFrames? Documentación

📅 Paso 7: Análisis por Borough y Día Especial

# === ANÁLISIS COMPARATIVO: DÍAS NORMALES VS ESPECIALES ===

# 1. Análisis por borough y tipo de día
print("📅 Análisis: Borough + Día Especial...")
borough_day_analysis = trips_complete._______(by=['_______', '_______']).agg({  # agrupar por DOS columnas: geografía y tipo de día
    'PULocationID': '_______',  # función para contar viajes
    'trip_distance': '_______',  # función para promedio de distancia
    'total_amount': '_______'    # función para promedio de tarifa
}).round(2)

borough_day_analysis.columns = ['num_trips', 'avg_distance', 'avg_total']

print("\n📊 ANÁLISIS BOROUGH + DÍA ESPECIAL:")
print(borough_day_analysis)

# 2. Comparar días normales vs especiales
print("\n🔍 COMPARACIÓN DÍAS NORMALES VS ESPECIALES:")

# Pivotear para comparar fácilmente
comparison = trips_complete._______(by='is_special_day').agg({  # agrupar solo por tipo de día para comparación general
    'trip_distance': 'mean',    # promedio de distancia por tipo de día
    'total_amount': 'mean',     # promedio de tarifa por tipo de día
    'PULocationID': 'count'     # conteo de viajes por tipo de día
}).round(2)

# Renombrar índices según los valores únicos encontrados
unique_day_types = comparison.index.tolist()
if len(unique_day_types) == 2:
    comparison.index = ['Día Normal', 'Día Especial']
elif len(unique_day_types) == 1:
    if unique_day_types[0] in ['False', False]:
        comparison.index = ['Día Normal']
    else:
        comparison.index = ['Día Especial']

comparison.columns = ['Avg Distance', 'Avg Amount', 'Num Trips']

print(comparison)

# 3. Calcular diferencias porcentuales
if len(comparison) > 1:
    # Hay tanto días normales como especiales
    if 'Día Normal' in comparison.index and 'Día Especial' in comparison.index:
        normal_day = comparison.loc['Día Normal']
        special_day = comparison.loc['Día Especial']

        print("\nIMPACTO DE DÍAS ESPECIALES:")
        distance_change = ((special_day['Avg Distance'] - normal_day['Avg Distance']) / normal_day['Avg Distance'] * 100)
        amount_change = ((special_day['Avg Amount'] - normal_day['Avg Amount']) / normal_day['Avg Amount'] * 100)

        print(f"   Cambio en distancia promedio: {distance_change:+.1f}%")
        print(f"   Cambio en tarifa promedio: {amount_change:+.1f}%")
    else:
        print("\nINFORMACIÓN DE DÍAS:")
        for idx, row in comparison.iterrows():
            print(f"   {idx}: {row['Num Trips']:,} viajes, ${row['Avg Amount']:.2f} promedio")
else:
    print(f"\nSOLO HAY {comparison.index[0]}:")
    print(f"   Viajes: {comparison.iloc[0]['Num Trips']:,}")
    print(f"   Distancia promedio: {comparison.iloc[0]['Avg Distance']:.2f} millas")
    print(f"   Tarifa promedio: ${comparison.iloc[0]['Avg Amount']:.2f}")
    print("   No hay datos de días especiales para comparar en este período")

💡 PISTAS:

  • 👥 groupby() puede agrupar por múltiples columnas usando una lista
  • 🏙️ ¿Qué columnas quieres usar para agrupar? (borough e is_special_day)
  • 🔢 Usa las mismas funciones de agregación que antes
  • 📊 ¿Qué método agrupa solo por una columna para comparar?

Paso 8: Técnicas para Datasets Grandes

# === TÉCNICAS PARA TRABAJAR CON DATASETS GRANDES ===

# 1. Sampling estratégico para visualizaciones
print("⚡ Aplicando técnicas para datasets grandes...")

# Si el dataset es muy grande, usar muestra para visualizaciones
if len(trips_complete) > 50000:
    print(f"   📊 Dataset grande detectado: {len(trips_complete):,} registros")
    print("   🎯 Creando muestra estratificada para visualizaciones...")

    # Muestra proporcional por borough
    sample_size = min(10000, len(trips_complete) // 10)
    trips_sample = trips_complete._______(n=sample_size, random_state=42)  # método para tomar muestra aleatoria de n registros

    print(f"   ✅ Muestra creada: {len(trips_sample):,} registros ({len(trips_sample)/len(trips_complete)*100:.1f}%)")
else:
    trips_sample = trips_complete
    print("   ℹ️ Dataset pequeño, usando datos completos para visualización")

# 2. Análisis de performance de joins
print("\n📈 ANÁLISIS DE PERFORMANCE:")
join_stats = {
    'total_trips': len(trips),
    'matched_zones': (trips_complete['borough'].notna()).sum(),
    'match_rate': (trips_complete['borough'].notna().sum() / len(trips) * 100),
    'unique_zones_used': trips_complete['zone'].nunique(),
    'total_zones_available': len(zones),
    'zone_coverage': (trips_complete['zone'].nunique() / len(zones) * 100)
}

for key, value in join_stats.items():
    if 'rate' in key or 'coverage' in key:
        print(f"   {key}: {value:.1f}%")
    else:
        print(f"   {key}: {value:,}")

# 3. Análisis temporal avanzado (solo si hay suficientes datos)
if len(trips_complete) > 1000:
    print("\n📅 ANÁLISIS TEMPORAL AVANZADO:")

    # Análisis por hora del día
    trips_complete['pickup_hour'] = trips_complete['tpep_pickup_datetime'].dt.hour  # extraer hora de la fecha/hora
    hourly_analysis = trips_complete._______(by='pickup_hour').agg({  # agrupar por hora del día
        'PULocationID': 'count',     # contar viajes por hora
        'total_amount': 'mean',      # tarifa promedio por hora
        'trip_distance': 'mean'      # distancia promedio por hora
    }).round(2)

    hourly_analysis.columns = ['trips_count', 'avg_amount', 'avg_distance']

    print("   ⏰ Horas pico por número de viajes:")
    peak_hours = hourly_analysis._______(by='trips_count', ascending=False).head(3)  # ordenar por más viajes, tomar top 3
    for hour, stats in peak_hours.iterrows():
        print(f"      {hour:02d}:00 - {stats['trips_count']:,} viajes")

📊 Paso 9: Análisis de Correlaciones (Opcional)

# === ANÁLISIS DE CORRELACIONES NUMÉRICAS ===

# Calcular correlaciones entre variables numéricas
print("Calculando correlaciones entre variables numéricas...")
numeric_cols = ['trip_distance', 'total_amount', 'fare_amount', 'tip_amount']
corr_matrix = trips_complete[numeric_cols]._______()  # método para calcular matriz de correlación

print("\nMatriz de Correlación:")
print(corr_matrix.round(3))

print("\nCorrelaciones más fuertes:")
corr_pairs = []
for i in range(len(corr_matrix.columns)):
    for j in range(i+1, len(corr_matrix.columns)):
        corr_pairs.append((corr_matrix.columns[i], corr_matrix.columns[j], corr_matrix.iloc[i, j]))

corr_pairs.sort(key=lambda x: abs(x[2]), reverse=True)
for var1, var2, corr in corr_pairs[:3]:
    print(f"   {var1} vs {var2}: {corr:.3f}")

print("\nINTERPRETACIÓN DE CORRELACIONES:")
print("   > 0.7: Correlación fuerte positiva")
print("   0.3-0.7: Correlación moderada positiva") 
print("   -0.3-0.3: Correlación débil")
print("   < -0.7: Correlación fuerte negativa")

PISTAS:

  • ¿Qué método calcula correlaciones? Documentación
  • Las correlaciones van de -1 a 1, donde 1 es correlación perfecta positiva

Paso 10: Preguntas de Reflexión sobre Joins

Responde estas preguntas después de completar el código:

  1. ¿Qué diferencia hay entre un LEFT JOIN y un INNER JOIN? PISTA: Guía visual de joins

  2. ¿Por qué usamos LEFT JOIN en lugar de INNER JOIN para trips+zones? PISTA: ¿Qué pasaría si algunos viajes no tienen zona asignada?

  3. ¿Qué problemas pueden surgir al hacer joins con datos de fechas? PISTA: Tipos de datos, formatos, zonas horarias

  4. ¿Cuál es la ventaja de integrar múltiples fuentes de datos? PISTA: Análisis más rico, contexto completo, insights cruzados

  5. ¿Qué insights de negocio obtuviste del análisis integrado? PISTA: Patrones por zona, impacto de eventos especiales, oportunidades


Felicitaciones! Has Completado la Tarea

Lo que has aprendido:

  • Cargar datos de múltiples fuentes (CSV, JSON, SQLite)
  • Realizar diferentes tipos de joins con pandas
  • Normalizar y preparar datos para integración
  • Crear análisis agregados con groupby
  • Visualizar resultados de datos integrados

Próximos pasos:

Practica con más casos de joins:

Domina técnicas avanzadas de integración:

Conecta con más fuentes de datos:


BONUS: Introducción a Prefect

¿Qué es Prefect y por qué usarlo?

El Problema: Tu análisis funciona... pero ¿qué pasa si:

  • La URL de datos falla por 5 segundos?
  • Quieres ejecutarlo todos los días automáticamente?
  • Necesitas ver qué paso falló exactamente?

La Solución - Prefect:

  • @task: Si algo falla, lo reintenta automáticamente
  • @flow: Conecta pasos de forma inteligente
  • Logging: Ve exactamente qué está pasando
  • Simple: Solo añades 2 decoradores a tu código

Paso BONUS 1: Setup Básico

# === SETUP PREFECT ===

# Instalar Prefect (si no está instalado)
# !pip install prefect

import prefect
from prefect import task, flow, get_run_logger
import pandas as pd

print("Prefect instalado y configurado")
print(f"   Versión: {prefect.__version__}")

Paso BONUS 2: Convertir Funciones a Tasks

# === TASKS SIMPLES PARA APRENDER PREFECT ===

@task(name="Cargar Datos", retries=2, retry_delay_seconds=3)
def cargar_datos(url: str, tipo: str) -> pd.DataFrame:
    """Task simple para cargar cualquier tipo de datos"""
    logger = get_run_logger()
    logger.info(f"Cargando {tipo} desde: {url}")

    # Cargar según el tipo
    if tipo == "trips":
        data = pd._______(url)  # función para Parquet
    elif tipo == "zones":
        data = pd._______(url)  # función para CSV
    else:  # calendar
        data = pd._______(url)  # función para JSON
        data['date'] = pd._______(data['date']).dt.date  # convertir a fechas

    logger.info(f"{tipo} cargado: {data.shape[0]} filas")
    return data

@task(name="Hacer Join Simple")
def hacer_join_simple(trips: pd.DataFrame, zones: pd.DataFrame) -> pd.DataFrame:
    """Task para hacer join básico de trips + zones"""
    logger = get_run_logger()
    logger.info("Haciendo join simple...")

    # Normalizar columnas
    trips.columns = trips.columns.str._______()  # convertir a minúsculas
    zones.columns = zones.columns.str._______()  # misma transformación

    # Join básico
    resultado = trips._______(zones,   # método para unir DataFrames
                             left_on='_______',   # columna de pickup location en trips
                             right_on='_______',  # columna de location en zones
                             how='_______')       # tipo de join que mantiene todos los trips

    logger.info(f"Join completado: {len(resultado)} registros")
    return resultado

@task(name="Análisis Rápido")
def analisis_rapido(data: pd.DataFrame) -> dict:
    """Task para análisis básico"""
    logger = get_run_logger()
    logger.info("Haciendo análisis básico...")

    # Stats simples
    stats = {
        'total_registros': len(data),
        'boroughs': data['borough']._______().head(3).to_dict(),  # método para contar valores
        'distancia_promedio': round(data['trip_distance']._______(), 2),  # método para promedio
        'tarifa_promedio': round(data['total_amount']._______(), 2)  # método para promedio
    }

    logger.info(f"Análisis completado: {stats['total_registros']} registros")
    return stats

PISTAS:

  • ¿Qué función lee Parquet? ¿CSV? ¿JSON?
  • ¿Qué función convierte strings a fechas?
  • ¿Qué método convierte columnas a minúsculas?
  • ¿Qué método hace joins? ¿Qué columnas usar? ¿Qué tipo de join?
  • ¿Qué método cuenta valores únicos? ¿Qué método calcula promedios?

Paso BONUS 3: Crear un Flow Simple

# === FLOW PRINCIPAL (EL PIPELINE COMPLETO) ===

@flow(name="Pipeline Simple NYC Taxi")
def pipeline_taxi_simple():
    """
    Flow simple que conecta todos los tasks
    """
    logger = get_run_logger()
    logger.info("Iniciando pipeline simple...")

    # URLs de datos
    trips_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet"
    zones_url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv"

    # PASO 1: Cargar datos (con retry automático si falla)
    logger.info("Paso 1: Cargando datos...")
    trips = cargar_datos(trips_url, "_______")  # tipo de datos trips
    zones = cargar_datos(zones_url, "_______")  # tipo de datos zones

    # PASO 2: Hacer join
    logger.info("Paso 2: Haciendo join...")
    data_unida = hacer_join_simple(trips, zones)

    # PASO 3: Análisis básico
    logger.info("Paso 3: Analizando...")
    resultados = analisis_rapido(data_unida)

    # PASO 4: Mostrar resultados
    logger.info("Pipeline completado!")
    logger.info(f"Resultados: {resultados}")

    return resultados

PISTAS:

  • ¿Qué tipo de datos son los trips? ¿Y las zones?
  • Usa los nombres exactos que definiste en la función cargar_datos

Paso BONUS 4: Ejecutar el Pipeline

# === EJECUTAR EL PIPELINE ===

if __name__ == "__main__":
    print("Ejecutando pipeline simple...")

    # Ejecutar el flow
    resultado = _______()  # nombre de la función del flow

    print("\nRESULTADOS FINALES:")
    print(f"   Total registros: {resultado['total_registros']:,}")
    print(f"   Distancia promedio: {resultado['distancia_promedio']} millas")
    print(f"   Tarifa promedio: ${resultado['tarifa_promedio']}")
    print("\nTop 3 Boroughs:")
    for borough, count in resultado['_______'].items():  # clave del diccionario que contiene boroughs
        print(f"   {borough}: {count:,} viajes")

PISTAS:

  • ¿Cómo se llama la función del flow que creaste?
  • ¿Qué clave del diccionario contiene los datos de boroughs?

¿Qué acabas de aprender con Prefect?

Conceptos Básicos:

  • @task: Convierte cualquier función en algo robusto que puede reintentar automáticamente
  • @flow: Conecta tasks de forma inteligente
  • Logging: Ve en tiempo real qué está pasando
  • Retry: Si algo falla (red, servidor), Prefect lo intenta de nuevo

Diferencia Clave:

# ANTES - Código normal:
def cargar_datos(url):
    return pd.read_csv(url)  # Si falla, todo se rompe

# DESPUÉS - Con Prefect:
@task(retries=2)
def cargar_datos(url):
    return pd.read_csv(url)  # Si falla, lo intenta 2 veces más

Próximos Pasos con Prefect:

  • Scheduling: Ejecutar automáticamente cada día
  • Alertas: Notificarte si algo falla
  • UI Web: Ver el estado de todos tus pipelines
  • Paralelización: Ejecutar múltiples tasks a la vez

Felicitaciones por completar el BONUS!

Responde estas preguntas:

  1. ¿Qué ventaja tiene usar @task en lugar de una función normal? PISTA: ¿Qué pasa si la carga de datos falla temporalmente?

  2. ¿Para qué sirve el @flow decorator? PISTA: ¿Cómo conecta y organiza los tasks?

  3. ¿En qué casos reales usarías esto? PISTA: Reportes diarios, análisis automáticos, pipelines de ML

De aquí a Profesional:

  • Data Analyst → Ya sabes hacer análisis complejos
  • Data Engineer → Con Prefect, puedes automatizar todo
  • MLOps → Estos pipelines son la base para ML en producción

Ya no eres solo alguien que hace análisis, eres alguien que construye sistemas!