Guía definitiva de asyncio en Python: patrones, errores y rendimiento
Esta guía concentra lo esencial para dominar asyncio: cómo pensar en corutinas y tareas, patrones prácticos (gestión de concurrencia, timeouts, cancelación), errores comunes y consejos de rendimiento. Incluye ejemplos reales y un mini-proyecto descargador concurrente con aiohttp.
¿Por qué asyncio?
asyncio permite I/O concurrente en un solo hilo usando un bucle de eventos. Es ideal para redes, scrapers, clientes HTTP, WebSockets o servidores que manejan muchas conexiones I/O-bound sin hilos pesados.
Conceptos clave
- Corutina: función async def que devuelve un objeto coroutine; necesita await o crear una tarea para ejecutarse.
- Tarea (Task): contenedor que programa la ejecución de una coroutine en el loop (asyncio.create_task).
- Event loop: ejecuta tareas, callbacks y gestiona I/O. Con Python 3.7+ se suele usar asyncio.run() para arrancar el loop.
Bases: ejemplo mínimo
import asyncio
async def work(n):
await asyncio.sleep(1)
return n * 2
async def main():
tasks = [asyncio.create_task(work(i)) for i in range(5)]
results = await asyncio.gather(*tasks)
print(results)
if __name__ == '__main__':
asyncio.run(main())
Por qué usar create_task: si llamas una coroutine sin crear tarea, no se ejecutará concurrentemente. create_task la programa inmediatamente en el loop.
Gather vs wait
- asyncio.gather: espera todas las corutinas y puede devolver excepciones (o return_exceptions=True).
- asyncio.wait: devuelve dos sets (done, pending), útil para continuar cuando algunas terminan primero.
Cancelación y timeouts
La cancelación en asyncio es cooperativa. Cuando cancelas una Task, la coroutine recibe una CancelledError en el punto donde esté esperando.
task = asyncio.create_task(some_coro())
# en otro lugar
task.cancel()
try:
await task
except asyncio.CancelledError:
print('Tarea cancelada')
Timeout práctico con asyncio.wait_for:
try:
await asyncio.wait_for(some_coro(), timeout=5)
except asyncio.TimeoutError:
print('Timeout')
Evita atrapar CancelledError de forma amplia (except Exception:) ya que puede impedir la cancelación limpia y provocar fugas.
Control de concurrencia: semáforos y colas
Cuando accedes a recursos limitados (sockets, API rate-limits), usa semáforos o pools.
sem = asyncio.Semaphore(10)
async def bounded_fetch(url):
async with sem:
return await fetch(url)
Asyncio.Queue es ideal para pipelines productor-consumidor:
queue = asyncio.Queue()
async def producer():
for i in range(100):
await queue.put(i)
await queue.put(None) # sentinel
async def consumer():
while True:
item = await queue.get()
if item is None:
break
# procesar item
queue.task_done()
Bloqueo y run_in_executor
Si llamas funciones bloqueantes (por ejemplo, operaciones CPU-bound o librerías sin soporte async), usa run_in_executor para no bloquear el loop:
import functools
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(None, functools.partial(blocking_io, arg))
Para operaciones CPU-bound considera procesos (con ProcessPoolExecutor) o mover a Cython/Numba/compartir carga con workers.
Ejemplo práctico: descargador concurrente con límite
Mini-proyecto: descarga de URLs con aiohttp, límite de concurrencia y cancelación/timeout.
Estructura de carpetas:
async-downloader/
├─ downloader/
│ ├─ __init__.py
│ ├─ main.py
│ └─ fetcher.py
├─ tests/
│ └─ test_fetcher.py
└─ requirements.txt
requirements.txt
aiohttp>=3.8
pytest
downloader/fetcher.py
import asyncio
import aiohttp
class Fetcher:
def __init__(self, concurrency=10, timeout=10):
self.sem = asyncio.Semaphore(concurrency)
self.timeout = timeout
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc, tb):
await self.session.close()
async def fetch(self, url):
async with self.sem:
try:
return await asyncio.wait_for(self._get(url), timeout=self.timeout)
except asyncio.TimeoutError:
return {'url': url, 'error': 'timeout'}
except Exception as e:
return {'url': url, 'error': str(e)}
async def _get(self, url):
async with self.session.get(url) as resp:
text = await resp.text()
return {'url': url, 'status': resp.status, 'body_len': len(text)}
downloader/main.py
import asyncio
from fetcher import Fetcher
async def main(urls):
async with Fetcher(concurrency=20, timeout=8) as f:
tasks = [asyncio.create_task(f.fetch(u)) for u in urls]
results = await asyncio.gather(*tasks)
for r in results:
print(r)
if __name__ == '__main__':
import sys
urls = sys.argv[1:]
asyncio.run(main(urls))
Por qué esta estructura:
- Fetcher como context manager: asegura cierre de la sesión aiohttp y evita fugas de sockets.
- Semaphore para limitar concurrencia y no abrumar la red/servicio.
- wait_for para timeouts por URL y manejo de excepciones por URL para resiliencia.
Depuración y herramientas
- Activar debug:
PYTHONASYNCIODEBUG=1oloop.set_debug(True). - Usa
asyncio.all_tasks()para inspección, y tracemalloc para fugas de memoria. - Monitoriza callbacks lentos con
loop.slow_callback_duration. - Herramientas: aiomonitor, pytest-asyncio, pyinstrument para perf.
Errores comunes y cómo evitarlos
- No awaitear una coroutine: genera warnings y corutinas sin ejecutar. Siempre usa await o create_task.
- Atrapando CancelledError por accidente: especifica excepciones o re-lanza CancelledError.
- Crear demasiadas tareas sin límite: causa memoria alta o saturación. Usa semáforos o pools.
- Usar blocking I/O en coroutines: provoca que el loop se congele. Mover a run_in_executor o usar librerías async.
- No cerrar ClientSession: lleva a ResourceWarning y sockets en TIME_WAIT. Usa context managers.
Rendimiento: consejos rápidos
- Reusa recursos (p. ej. ClientSession en HTTP).
- Agrupa awaits en gather para reducir overhead de scheduling cuando tenga sentido.
- Evita await en bucles hot por cada elemento; procesa en batches.
- Mide: perfiles con pyinstrument, y comprueba latencias de callbacks lentos con PYTHONASYNCIODEBUG.
¿Cuándo usar asyncio vs alternativas?
- asyncio ideal para I/O-bound con muchas conexiones y latencias externas.
- Para CPU-bound, usa multiprocessing o offload al hardware.
- Trio/Curio ofrecen modelos alternativos con distinta ergonomía; AnyIO permite interoperabilidad entre librerías.
Buenas prácticas finales
- Usa asyncio.run() para arrancar programas y evita manipular eventos globalmente.
- Define límites explícitos de concurrencia y timeouts por operación.
- Evita mezclar demasiado threads y asyncio sin coordinación (usa run_in_executor o loop.call_soon_threadsafe).
- Incluye tests async con pytest-asyncio y casos de cancelación/timeouts.
Consejo avanzado: si necesitas alta confiabilidad y aislamiento de fallos para tareas concurrentes, combina procesos (ProcessPoolExecutor) con asyncio para aislar fugas de memoria y evitar que una librería bloqueante rompa todo el loop. Advertencia: revisa siempre que todas las corutinas creadas sean awaited o canceladas; las tareas huérfanas son la causa más frecuente de comportamientos inesperados y consumo de recursos.
Siguiente paso: adapta el descargador de ejemplo para usar backoff exponencial y métricas (Prometheus) para observar latencias y errores en producción.
¿Quieres comentar?
Inicia sesión con Telegram para participar en la conversación