🔗 Tarea 4: EDA Multi-fuentes y Joins - Fill in the Blanks¶
UT1: Análisis Exploratorio de Datos | Práctica Guiada
🎯 Objetivos Básicos¶
- Aprender a integrar datos de múltiples fuentes
- Dominar los diferentes tipos de joins con pandas
- Realizar análisis agregados con groupby
- Crear reportes consolidados de datos integrados
📋 Lo que necesitas saber ANTES de empezar¶
- Conceptos básicos de pandas (read_csv, DataFrames)
- Idea general de bases de datos relacionales
- Conceptos básicos de joins (inner, left, right, outer)
🔧 Paso 1: Setup Inicial¶
📋 CONTEXTO DE NEGOCIO (CRISP-DM: Business Understanding)
🔗 Referencias oficiales:
🚕 Caso de negocio:
- Problema: La comisión de taxis de NYC necesita análisis en tiempo real de 3+ millones de viajes mensuales
- Objetivo: Integrar datos oficiales completos para entender patrones metropolitanos reales
- Escala: ~3M viajes, 265 zonas, múltiples boroughs, datos en tiempo real
- Variables: Viajes oficiales NYC (enero 2023), zonas geográficas completas, eventos calendario
- Valor para el negocio: Decisiones basadas en datos reales a escala metropolitana
# Importar librerías que vamos a usar
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import sqlite3
from pathlib import Path
# Configurar visualizaciones
plt.style.use('default')
sns.set_palette('husl')
plt.rcParams['figure.figsize'] = (10, 6)
print("✅ Setup completo para análisis multi-fuentes!")
🚕 Paso 2: Carga de Datos desde Múltiples Fuentes¶
# === CARGAR DATOS DE MÚLTIPLES FUENTES ===
# 1. Cargar datos de viajes desde Parquet (Dataset oficial completo NYC)
print("Cargando datos oficiales de NYC Taxi (dataset completo)...")
trips_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet"
# Cargar dataset oficial (~3M registros de enero 2023)
trips = pd._______(trips_url) # función para leer archivos .parquet (más eficiente que CSV)
print(f" Viajes cargados: {trips.shape[0]:,} filas, {trips.shape[1]} columnas")
print(f" Columnas: {list(trips.columns)}")
print(f" Período: {trips['tpep_pickup_datetime'].min()} a {trips['tpep_pickup_datetime'].max()}")
print(f" Tamaño en memoria: {trips.memory_usage(deep=True).sum() / 1024**2:.1f} MB")
# 2. Cargar datos de zonas desde CSV (Dataset oficial completo)
print("\nCargando datos oficiales de zonas NYC...")
zones_url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv"
zones = pd._______(zones_url) # función estándar para archivos CSV
print(f" Zonas cargadas: {zones.shape[0]} filas, {zones.shape[1]} columnas")
print(f" Columnas: {list(zones.columns)}")
print(f" Boroughs únicos: {zones['Borough'].unique()}")
# 3. Cargar calendario de eventos desde JSON
print("\nCargando datos de calendario de eventos...")
calendar_url = "https://juanfkurucz.com/ucu-id/ut1/data/calendar.json"
calendar = pd._______(calendar_url) # función para archivos JSON
calendar['date'] = pd._______(calendar['date']).dt.date # convertir strings a fechas, luego extraer solo la fecha
print(f" Eventos calendario: {calendar.shape[0]} filas")
print(f" Columnas: {list(calendar.columns)}")
# 4. Mostrar primeras filas de cada dataset
print("\nVISTA PREVIA DE DATOS:")
print("\n--- TRIPS ---")
print(trips._____()) # método para mostrar primeras filas de un DataFrame
print("\n--- ZONES ---")
print(zones._____()) # mismo método para ver estructura de datos
print("\n--- CALENDAR ---")
print(calendar._____()) # revisar formato de los eventos
💡 PISTAS:
- 📊 ¿Qué función lee CSV? Documentación
- 📅 ¿Qué parámetro de read_csv convierte automáticamente fechas?
- 🗂️ ¿Qué función lee JSON? Documentación
- 📆 ¿Qué función convierte strings a fechas? Documentación
- 👁️ ¿Qué método muestra las primeras filas?
🧹 Paso 3: Normalización de Datos¶
# === NORMALIZAR Y PREPARAR DATOS PARA JOINS ===
# 1. Estandarizar nombres de columnas
print("Normalizando nombres de columnas...")
trips.columns = trips.columns.str._______() # convertir todas las columnas a minúsculas
zones.columns = zones.columns.str._______() # misma transformación para consistencia
print(f" Trips columnas: {list(trips.columns)}")
print(f" Zones columnas: {list(zones.columns)}")
# 2. Crear columna de fecha para el join con calendario
trips['pickup_date'] = trips['_______'].dt.date # extraer solo la fecha (sin hora) de la columna datetime
print(f" Columna pickup_date creada")
print(f" Rango de fechas: {trips['pickup_date'].min()} a {trips['pickup_date'].max()}")
# 3. Verificar tipos de datos para joins
print("\nVERIFICACIÓN DE TIPOS PARA JOINS:")
print(f" trips['pulocationid'] tipo: {trips['pulocationid'].dtype}")
print(f" zones['locationid'] tipo: {zones['locationid'].dtype}")
print(f" trips['pickup_date'] tipo: {type(trips['pickup_date'].iloc[0])}")
print(f" calendar['date'] tipo: {type(calendar['date'].iloc[0])}")
# 4. Optimización para datasets grandes (~3M registros)
print("\nOPTIMIZACIÓN PARA DATASETS GRANDES:")
initial_memory = trips.memory_usage(deep=True).sum() / 1024**2
print(f" Memoria inicial: {initial_memory:.1f} MB")
# Optimizar tipos de datos para 3+ millones de registros
print(" Optimizando tipos de datos para 3M+ registros...")
# Limpiar valores nulos antes de convertir tipos
print(" Limpiando valores nulos antes de optimización...")
trips['passenger_count'] = trips['passenger_count']._______() # método para rellenar valores nulos con un valor específico
trips = trips.dropna(subset=['pulocationid', 'dolocationid']) # eliminar filas críticas sin ubicación (necesarias para joins)
# Convertir tipos después de limpiar
trips['pulocationid'] = trips['pulocationid'].astype('int16')
trips['dolocationid'] = trips['dolocationid'].astype('int16')
trips['passenger_count'] = trips['passenger_count'].astype('int8')
zones['locationid'] = zones['locationid'].astype('int16')
print(f" Registros después de limpieza: {len(trips):,}")
optimized_memory = trips.memory_usage(deep=True).sum() / 1024**2
savings = ((initial_memory - optimized_memory) / initial_memory * 100)
print(f" Memoria optimizada: {optimized_memory:.1f} MB")
print(f" Ahorro de memoria: {savings:.1f}%")
# 5. Revisar datos faltantes antes de joins
print("\nDATOS FALTANTES ANTES DE JOINS:")
print("Trips (top 5 columnas con más nulos):")
trips_nulls = trips._______().sum().sort_values(ascending=False).head() # método para detectar valores nulos, sumar y ordenar
print(trips_nulls)
print("\nZones:")
zones_nulls = zones._______().sum() # revisar si hay valores faltantes en lookup table
print(zones_nulls)
print("\nCalendar:")
calendar_nulls = calendar._______().sum() # verificar integridad del calendario de eventos
print(calendar_nulls)
# Análisis de calidad de datos
print("\nANÁLISIS DE CALIDAD:")
total_trips = len(trips)
print(f" Total de viajes: {total_trips:,}")
print(f" Viajes sin pickup location: {trips['pulocationid'].isna().sum():,}")
print(f" Viajes sin dropoff location: {trips['dolocationid'].isna().sum():,}")
print(f" Viajes sin passenger_count: {trips['passenger_count'].isna().sum():,}")
# Estrategias de limpieza recomendadas
print("\nESTRATEGIAS DE LIMPIEZA:")
print(" Ubicaciones nulas: Eliminar (crítico para joins)")
print(" Passenger_count nulos: Rellenar con valor típico (1)")
print(" Tarifas nulas: Revisar caso por caso")
💡 PISTAS:
- 🔤 ¿Qué método convierte strings a minúsculas? Documentación
- 📅 ¿Qué columna tiene las fechas de pickup? (Revisa la salida anterior:
tpep_pickup_datetime
) - 🔍 ¿Qué método detecta valores nulos? Documentación
- 🧹 ¿Qué método rellena valores nulos? Documentación
- 📊 Para passenger_count nulos, ¿qué valor típico usarías? (Pista: la mayoría de taxis llevan 1 pasajero)
- 🎯 ¿Qué función lee archivos Parquet? Documentación
🔗 Paso 4: Join Principal - Trips con Zones¶
# === PRIMER JOIN: TRIPS + ZONES ===
# 1. Hacer join de trips con zones para obtener información geográfica
print("Realizando join: trips + zones...")
trips_with_zones = trips._______(zones, # método principal para unir DataFrames
left_on='_______', # columna de trips que contiene ID de zona de pickup
right_on='_______', # columna de zones que contiene ID correspondiente
how='_______') # tipo de join que mantiene todos los trips
print(f" Registros antes del join: {len(trips)}")
print(f" Registros después del join: {len(trips_with_zones)}")
print(f" Nuevas columnas añadidas: {[col for col in trips_with_zones.columns if col not in trips.columns]}")
# 2. Verificar el resultado del join
print("\nVERIFICACIÓN DEL JOIN:")
print("Conteo por Borough:")
print(trips_with_zones['borough'].value_counts())
# 3. Verificar si hay valores nulos después del join
null_after_join = trips_with_zones['borough']._______().sum() # contar nulos en columna borough
print(f"\nViajes sin borough asignado: {null_after_join}")
if null_after_join > 0:
print(" Algunos viajes no encontraron su zona correspondiente")
print(" LocationIDs problemáticos:")
problematic_ids = trips_with_zones[trips_with_zones['borough']._______()]['PULocationID'].unique() # filtrar filas con nulos
print(f" {problematic_ids}")
# 4. Mostrar muestra del resultado
print("\nMUESTRA DEL DATASET INTEGRADO:")
print(trips_with_zones[['PULocationID', 'borough', 'zone', 'trip_distance', 'total_amount']].head())
💡 PISTAS:
- 🔗 ¿Qué método hace joins en pandas? Documentación
- 🗝️ ¿Qué columna de trips contiene el ID de zona? (Revisa las columnas arriba)
- 🗝️ ¿Qué columna de zones contiene el ID correspondiente?
- 🔍 ¿Qué tipo de join mantiene todos los registros de la tabla izquierda? ('left', 'right', 'inner', 'outer')
- ❓ ¿Qué método detecta valores nulos?
📅 Paso 5: Segundo Join - Agregar Datos de Calendario¶
# === SEGUNDO JOIN: TRIPS_ZONES + CALENDAR ===
# 1. Hacer join con datos de calendario
print("Realizando join: trips_zones + calendar...")
trips_complete = trips_with_zones._______(calendar, # mismo método de join que antes
left_on='_______', # columna de fecha que creamos en trips
right_on='_______', # columna de fecha en calendar
how='_______') # tipo que mantiene todos los trips aunque no haya evento especial
print(f" Registros antes del join: {len(trips_with_zones)}")
print(f" Registros después del join: {len(trips_complete)}")
# 2. Crear flag de evento especial
trips_complete['is_special_day'] = trips_complete['special']._______('False') # método para rellenar nulos con valor por defecto
print("\nDISTRIBUCIÓN DE DÍAS ESPECIALES:")
print(trips_complete['is_special_day'].value_counts())
print("\nEjemplos de eventos especiales:")
special_days = trips_complete[trips_complete['is_special_day'] == True]
if len(special_days) > 0:
print(special_days[['pickup_date', 'special', 'borough']].drop_duplicates())
else:
print(" No hay eventos especiales en este período")
# 3. Mostrar dataset final integrado
print("\nDATASET FINAL INTEGRADO:")
print(f" Total registros: {len(trips_complete)}")
print(f" Total columnas: {len(trips_complete.columns)}")
print(f" Columnas principales: {['borough', 'zone', 'is_special_day', 'trip_distance', 'total_amount']}")
# 4. Verificar integridad de los datos finales
print("\nVERIFICACIÓN FINAL:")
print("Datos faltantes por columna clave:")
key_columns = ['borough', 'zone', 'trip_distance', 'total_amount', 'is_special_day']
for col in key_columns:
missing = trips_complete[col]._______().sum() # verificar nulos en cada columna clave final
print(f" {col}: {missing} nulos")
💡 PISTAS:
- 🔗 Usa el mismo método de join que antes
- 📅 ¿Qué columna tiene la fecha en trips_with_zones?
- 📅 ¿Qué columna tiene la fecha en calendar?
- 🔍 ¿Qué tipo de join quieres? (mantener todos los viajes aunque no haya evento especial)
- 🏷️ ¿Qué método rellena valores nulos? Documentación
📈 Paso 6: Análisis por Borough¶
# === ANÁLISIS AGREGADO POR BOROUGH ===
# 1. Análisis básico por borough (con dataset grande)
print("Análisis por Borough (procesando datos grandes)...")
borough_analysis = trips_complete._______(by='_______').agg({ # método para agrupar datos, por qué columna geográfica?
'PULocationID': '_______', # función para contar número de registros/viajes
'trip_distance': ['_______', 'std', 'median'], # función para promedio + desviación + mediana
'total_amount': ['_______', 'std', 'median'], # mismas estadísticas para tarifas
'fare_amount': '_______', # solo promedio de tarifa base
'tip_amount': ['mean', 'median'], # estadísticas de propinas
'passenger_count': '_______' # función para promedio de pasajeros
}).round(2)
# Aplanar columnas multi-nivel
borough_analysis.columns = ['num_trips', 'avg_distance', 'std_distance', 'median_distance',
'avg_total', 'std_total', 'median_total', 'avg_fare',
'avg_tip', 'median_tip', 'avg_passengers']
# Ordenar por número de viajes
borough_analysis = borough_analysis._______(by='num_trips', ascending=False) # método para ordenar DataFrame por una columna específica
print("\nANÁLISIS COMPLETO POR BOROUGH:")
print(borough_analysis)
# 2. Calcular métricas adicionales empresariales
borough_analysis['revenue_per_km'] = (borough_analysis['avg_total'] /
borough_analysis['avg_distance']).round(2)
borough_analysis['tip_rate'] = (borough_analysis['avg_tip'] /
borough_analysis['avg_fare'] * 100).round(1)
borough_analysis['market_share'] = (borough_analysis['num_trips'] /
borough_analysis['num_trips'].sum() * 100).round(1)
print("\nANÁLISIS CON MÉTRICAS EMPRESARIALES:")
print(borough_analysis[['num_trips', 'market_share', 'revenue_per_km', 'tip_rate']])
# 3. Encontrar insights
print("\nINSIGHTS PRINCIPALES:")
print(f" Borough con más viajes: {borough_analysis.index[0]}")
print(f" Borough con viajes más largos: {borough_analysis['avg_distance'].idxmax()}")
print(f" Borough con tarifas más altas: {borough_analysis['avg_total'].idxmax()}")
print(f" Mejor revenue por km: {borough_analysis['revenue_per_km'].idxmax()}")
💡 PISTAS:
- 👥 ¿Qué método agrupa datos? Documentación
- 🏙️ ¿Por qué columna quieres agrupar?
- 🔢 ¿Qué función cuenta registros? ('count', 'sum', 'mean')
- 📊 ¿Qué función calcula promedios? ('count', 'sum', 'mean')
- 📈 ¿Qué método ordena DataFrames? Documentación
📅 Paso 7: Análisis por Borough y Día Especial¶
# === ANÁLISIS COMPARATIVO: DÍAS NORMALES VS ESPECIALES ===
# 1. Análisis por borough y tipo de día
print("📅 Análisis: Borough + Día Especial...")
borough_day_analysis = trips_complete._______(by=['_______', '_______']).agg({ # agrupar por DOS columnas: geografía y tipo de día
'PULocationID': '_______', # función para contar viajes
'trip_distance': '_______', # función para promedio de distancia
'total_amount': '_______' # función para promedio de tarifa
}).round(2)
borough_day_analysis.columns = ['num_trips', 'avg_distance', 'avg_total']
print("\n📊 ANÁLISIS BOROUGH + DÍA ESPECIAL:")
print(borough_day_analysis)
# 2. Comparar días normales vs especiales
print("\n🔍 COMPARACIÓN DÍAS NORMALES VS ESPECIALES:")
# Pivotear para comparar fácilmente
comparison = trips_complete._______(by='is_special_day').agg({ # agrupar solo por tipo de día para comparación general
'trip_distance': 'mean', # promedio de distancia por tipo de día
'total_amount': 'mean', # promedio de tarifa por tipo de día
'PULocationID': 'count' # conteo de viajes por tipo de día
}).round(2)
# Renombrar índices según los valores únicos encontrados
unique_day_types = comparison.index.tolist()
if len(unique_day_types) == 2:
comparison.index = ['Día Normal', 'Día Especial']
elif len(unique_day_types) == 1:
if unique_day_types[0] in ['False', False]:
comparison.index = ['Día Normal']
else:
comparison.index = ['Día Especial']
comparison.columns = ['Avg Distance', 'Avg Amount', 'Num Trips']
print(comparison)
# 3. Calcular diferencias porcentuales
if len(comparison) > 1:
# Hay tanto días normales como especiales
if 'Día Normal' in comparison.index and 'Día Especial' in comparison.index:
normal_day = comparison.loc['Día Normal']
special_day = comparison.loc['Día Especial']
print("\nIMPACTO DE DÍAS ESPECIALES:")
distance_change = ((special_day['Avg Distance'] - normal_day['Avg Distance']) / normal_day['Avg Distance'] * 100)
amount_change = ((special_day['Avg Amount'] - normal_day['Avg Amount']) / normal_day['Avg Amount'] * 100)
print(f" Cambio en distancia promedio: {distance_change:+.1f}%")
print(f" Cambio en tarifa promedio: {amount_change:+.1f}%")
else:
print("\nINFORMACIÓN DE DÍAS:")
for idx, row in comparison.iterrows():
print(f" {idx}: {row['Num Trips']:,} viajes, ${row['Avg Amount']:.2f} promedio")
else:
print(f"\nSOLO HAY {comparison.index[0]}:")
print(f" Viajes: {comparison.iloc[0]['Num Trips']:,}")
print(f" Distancia promedio: {comparison.iloc[0]['Avg Distance']:.2f} millas")
print(f" Tarifa promedio: ${comparison.iloc[0]['Avg Amount']:.2f}")
print(" No hay datos de días especiales para comparar en este período")
💡 PISTAS:
- 👥 groupby() puede agrupar por múltiples columnas usando una lista
- 🏙️ ¿Qué columnas quieres usar para agrupar? (borough e is_special_day)
- 🔢 Usa las mismas funciones de agregación que antes
- 📊 ¿Qué método agrupa solo por una columna para comparar?
⚡ Paso 8: Técnicas para Datasets Grandes¶
# === TÉCNICAS PARA TRABAJAR CON DATASETS GRANDES ===
# 1. Sampling estratégico para visualizaciones
print("⚡ Aplicando técnicas para datasets grandes...")
# Si el dataset es muy grande, usar muestra para visualizaciones
if len(trips_complete) > 50000:
print(f" 📊 Dataset grande detectado: {len(trips_complete):,} registros")
print(" 🎯 Creando muestra estratificada para visualizaciones...")
# Muestra proporcional por borough
sample_size = min(10000, len(trips_complete) // 10)
trips_sample = trips_complete._______(n=sample_size, random_state=42) # método para tomar muestra aleatoria de n registros
print(f" ✅ Muestra creada: {len(trips_sample):,} registros ({len(trips_sample)/len(trips_complete)*100:.1f}%)")
else:
trips_sample = trips_complete
print(" ℹ️ Dataset pequeño, usando datos completos para visualización")
# 2. Análisis de performance de joins
print("\n📈 ANÁLISIS DE PERFORMANCE:")
join_stats = {
'total_trips': len(trips),
'matched_zones': (trips_complete['borough'].notna()).sum(),
'match_rate': (trips_complete['borough'].notna().sum() / len(trips) * 100),
'unique_zones_used': trips_complete['zone'].nunique(),
'total_zones_available': len(zones),
'zone_coverage': (trips_complete['zone'].nunique() / len(zones) * 100)
}
for key, value in join_stats.items():
if 'rate' in key or 'coverage' in key:
print(f" {key}: {value:.1f}%")
else:
print(f" {key}: {value:,}")
# 3. Análisis temporal avanzado (solo si hay suficientes datos)
if len(trips_complete) > 1000:
print("\n📅 ANÁLISIS TEMPORAL AVANZADO:")
# Análisis por hora del día
trips_complete['pickup_hour'] = trips_complete['tpep_pickup_datetime'].dt.hour # extraer hora de la fecha/hora
hourly_analysis = trips_complete._______(by='pickup_hour').agg({ # agrupar por hora del día
'PULocationID': 'count', # contar viajes por hora
'total_amount': 'mean', # tarifa promedio por hora
'trip_distance': 'mean' # distancia promedio por hora
}).round(2)
hourly_analysis.columns = ['trips_count', 'avg_amount', 'avg_distance']
print(" ⏰ Horas pico por número de viajes:")
peak_hours = hourly_analysis._______(by='trips_count', ascending=False).head(3) # ordenar por más viajes, tomar top 3
for hour, stats in peak_hours.iterrows():
print(f" {hour:02d}:00 - {stats['trips_count']:,} viajes")
📊 Paso 9: Análisis de Correlaciones (Opcional)¶
# === ANÁLISIS DE CORRELACIONES NUMÉRICAS ===
# Calcular correlaciones entre variables numéricas
print("Calculando correlaciones entre variables numéricas...")
numeric_cols = ['trip_distance', 'total_amount', 'fare_amount', 'tip_amount']
corr_matrix = trips_complete[numeric_cols]._______() # método para calcular matriz de correlación
print("\nMatriz de Correlación:")
print(corr_matrix.round(3))
print("\nCorrelaciones más fuertes:")
corr_pairs = []
for i in range(len(corr_matrix.columns)):
for j in range(i+1, len(corr_matrix.columns)):
corr_pairs.append((corr_matrix.columns[i], corr_matrix.columns[j], corr_matrix.iloc[i, j]))
corr_pairs.sort(key=lambda x: abs(x[2]), reverse=True)
for var1, var2, corr in corr_pairs[:3]:
print(f" {var1} vs {var2}: {corr:.3f}")
print("\nINTERPRETACIÓN DE CORRELACIONES:")
print(" > 0.7: Correlación fuerte positiva")
print(" 0.3-0.7: Correlación moderada positiva")
print(" -0.3-0.3: Correlación débil")
print(" < -0.7: Correlación fuerte negativa")
PISTAS:
- ¿Qué método calcula correlaciones? Documentación
- Las correlaciones van de -1 a 1, donde 1 es correlación perfecta positiva
Paso 10: Preguntas de Reflexión sobre Joins¶
Responde estas preguntas después de completar el código:
-
¿Qué diferencia hay entre un LEFT JOIN y un INNER JOIN? PISTA: Guía visual de joins
-
¿Por qué usamos LEFT JOIN en lugar de INNER JOIN para trips+zones? PISTA: ¿Qué pasaría si algunos viajes no tienen zona asignada?
-
¿Qué problemas pueden surgir al hacer joins con datos de fechas? PISTA: Tipos de datos, formatos, zonas horarias
-
¿Cuál es la ventaja de integrar múltiples fuentes de datos? PISTA: Análisis más rico, contexto completo, insights cruzados
-
¿Qué insights de negocio obtuviste del análisis integrado? PISTA: Patrones por zona, impacto de eventos especiales, oportunidades
Felicitaciones! Has Completado la Tarea¶
Lo que has aprendido:¶
- Cargar datos de múltiples fuentes (CSV, JSON, SQLite)
- Realizar diferentes tipos de joins con pandas
- Normalizar y preparar datos para integración
- Crear análisis agregados con groupby
- Visualizar resultados de datos integrados
Próximos pasos:¶
Practica con más casos de joins:
- E-commerce Orders - Orders + Products + Customers
- Hospital Data - Patients + Treatments + Doctors
- Library System - Books + Authors + Loans
- Sales Data - Sales + Products + Customers
Domina técnicas avanzadas de integración:
- Concat y Append - Unir DataFrames verticalmente
- Join vs Merge - Diferentes métodos de unión
- Validation en Merge - Verificar integridad de joins
- MultiIndex - Índices jerárquicos
Conecta con más fuentes de datos:
- APIs REST - Consumir servicios web
- PostgreSQL - Bases de datos relacionales
- Cloud Storage - S3, Azure, GCP
- Excel avanzado - Múltiples hojas y rangos
BONUS: Introducción a Prefect¶
¿Qué es Prefect y por qué usarlo?¶
El Problema: Tu análisis funciona... pero ¿qué pasa si:
- La URL de datos falla por 5 segundos?
- Quieres ejecutarlo todos los días automáticamente?
- Necesitas ver qué paso falló exactamente?
La Solución - Prefect:
- @task: Si algo falla, lo reintenta automáticamente
- @flow: Conecta pasos de forma inteligente
- Logging: Ve exactamente qué está pasando
- Simple: Solo añades 2 decoradores a tu código
Paso BONUS 1: Setup Básico¶
# === SETUP PREFECT ===
# Instalar Prefect (si no está instalado)
# !pip install prefect
import prefect
from prefect import task, flow, get_run_logger
import pandas as pd
print("Prefect instalado y configurado")
print(f" Versión: {prefect.__version__}")
Paso BONUS 2: Convertir Funciones a Tasks¶
# === TASKS SIMPLES PARA APRENDER PREFECT ===
@task(name="Cargar Datos", retries=2, retry_delay_seconds=3)
def cargar_datos(url: str, tipo: str) -> pd.DataFrame:
"""Task simple para cargar cualquier tipo de datos"""
logger = get_run_logger()
logger.info(f"Cargando {tipo} desde: {url}")
# Cargar según el tipo
if tipo == "trips":
data = pd._______(url) # función para Parquet
elif tipo == "zones":
data = pd._______(url) # función para CSV
else: # calendar
data = pd._______(url) # función para JSON
data['date'] = pd._______(data['date']).dt.date # convertir a fechas
logger.info(f"{tipo} cargado: {data.shape[0]} filas")
return data
@task(name="Hacer Join Simple")
def hacer_join_simple(trips: pd.DataFrame, zones: pd.DataFrame) -> pd.DataFrame:
"""Task para hacer join básico de trips + zones"""
logger = get_run_logger()
logger.info("Haciendo join simple...")
# Normalizar columnas
trips.columns = trips.columns.str._______() # convertir a minúsculas
zones.columns = zones.columns.str._______() # misma transformación
# Join básico
resultado = trips._______(zones, # método para unir DataFrames
left_on='_______', # columna de pickup location en trips
right_on='_______', # columna de location en zones
how='_______') # tipo de join que mantiene todos los trips
logger.info(f"Join completado: {len(resultado)} registros")
return resultado
@task(name="Análisis Rápido")
def analisis_rapido(data: pd.DataFrame) -> dict:
"""Task para análisis básico"""
logger = get_run_logger()
logger.info("Haciendo análisis básico...")
# Stats simples
stats = {
'total_registros': len(data),
'boroughs': data['borough']._______().head(3).to_dict(), # método para contar valores
'distancia_promedio': round(data['trip_distance']._______(), 2), # método para promedio
'tarifa_promedio': round(data['total_amount']._______(), 2) # método para promedio
}
logger.info(f"Análisis completado: {stats['total_registros']} registros")
return stats
PISTAS:
- ¿Qué función lee Parquet? ¿CSV? ¿JSON?
- ¿Qué función convierte strings a fechas?
- ¿Qué método convierte columnas a minúsculas?
- ¿Qué método hace joins? ¿Qué columnas usar? ¿Qué tipo de join?
- ¿Qué método cuenta valores únicos? ¿Qué método calcula promedios?
Paso BONUS 3: Crear un Flow Simple¶
# === FLOW PRINCIPAL (EL PIPELINE COMPLETO) ===
@flow(name="Pipeline Simple NYC Taxi")
def pipeline_taxi_simple():
"""
Flow simple que conecta todos los tasks
"""
logger = get_run_logger()
logger.info("Iniciando pipeline simple...")
# URLs de datos
trips_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet"
zones_url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv"
# PASO 1: Cargar datos (con retry automático si falla)
logger.info("Paso 1: Cargando datos...")
trips = cargar_datos(trips_url, "_______") # tipo de datos trips
zones = cargar_datos(zones_url, "_______") # tipo de datos zones
# PASO 2: Hacer join
logger.info("Paso 2: Haciendo join...")
data_unida = hacer_join_simple(trips, zones)
# PASO 3: Análisis básico
logger.info("Paso 3: Analizando...")
resultados = analisis_rapido(data_unida)
# PASO 4: Mostrar resultados
logger.info("Pipeline completado!")
logger.info(f"Resultados: {resultados}")
return resultados
PISTAS:
- ¿Qué tipo de datos son los trips? ¿Y las zones?
- Usa los nombres exactos que definiste en la función
cargar_datos
Paso BONUS 4: Ejecutar el Pipeline¶
# === EJECUTAR EL PIPELINE ===
if __name__ == "__main__":
print("Ejecutando pipeline simple...")
# Ejecutar el flow
resultado = _______() # nombre de la función del flow
print("\nRESULTADOS FINALES:")
print(f" Total registros: {resultado['total_registros']:,}")
print(f" Distancia promedio: {resultado['distancia_promedio']} millas")
print(f" Tarifa promedio: ${resultado['tarifa_promedio']}")
print("\nTop 3 Boroughs:")
for borough, count in resultado['_______'].items(): # clave del diccionario que contiene boroughs
print(f" {borough}: {count:,} viajes")
PISTAS:
- ¿Cómo se llama la función del flow que creaste?
- ¿Qué clave del diccionario contiene los datos de boroughs?
¿Qué acabas de aprender con Prefect?¶
Conceptos Básicos:
- @task: Convierte cualquier función en algo robusto que puede reintentar automáticamente
- @flow: Conecta tasks de forma inteligente
- Logging: Ve en tiempo real qué está pasando
- Retry: Si algo falla (red, servidor), Prefect lo intenta de nuevo
Diferencia Clave:
# ANTES - Código normal:
def cargar_datos(url):
return pd.read_csv(url) # Si falla, todo se rompe
# DESPUÉS - Con Prefect:
@task(retries=2)
def cargar_datos(url):
return pd.read_csv(url) # Si falla, lo intenta 2 veces más
Próximos Pasos con Prefect:
- Scheduling: Ejecutar automáticamente cada día
- Alertas: Notificarte si algo falla
- UI Web: Ver el estado de todos tus pipelines
- Paralelización: Ejecutar múltiples tasks a la vez
Felicitaciones por completar el BONUS!¶
Responde estas preguntas:
-
¿Qué ventaja tiene usar @task en lugar de una función normal? PISTA: ¿Qué pasa si la carga de datos falla temporalmente?
-
¿Para qué sirve el @flow decorator? PISTA: ¿Cómo conecta y organiza los tasks?
-
¿En qué casos reales usarías esto? PISTA: Reportes diarios, análisis automáticos, pipelines de ML
De aquí a Profesional:¶
- Data Analyst → Ya sabes hacer análisis complejos
- Data Engineer → Con Prefect, puedes automatizar todo
- MLOps → Estos pipelines son la base para ML en producción
Ya no eres solo alguien que hace análisis, eres alguien que construye sistemas!