Sistema de Control de Jobs en Tiempo Real con Channels y Background Services en .NET
Introducción
En el desarrollo moderno de aplicaciones, es común necesitar ejecutar procesos en segundo plano que se comuniquen con nuestra API de forma eficiente y segura. Tradicionalmente, esto se resolvía con implementaciones complejas usando locks, colas manuales o infraestructura externa como RabbitMQ. Sin embargo, .NET ofrece una solución simple pero elegante: System.Threading.Channels.
En este artículo, exploraremos cómo construir un sistema de control de jobs en tiempo real utilizando:
- 🔧 Channels para comunicación thread-safe entre componentes
- 🔄 Background Services para tareas recurrentes
- 🚀 Minimal APIs para endpoints modernos y limpios
- ⚡ TaskCompletionSource para comunicación bidireccional
Al finalizar, tendrás un proyecto funcional que puedes adaptar para casos de uso reales como procesamiento de emails, análisis de imágenes, generación de reportes, y más.
Nota: El código fuente siempre lo encontrarás en mi github -> DevToPosts/ApiBackgroundChannels at main · isaacOjeda/DevToPosts
¿Qué son los Channels?
Los Channels en .NET son estructuras de datos thread-safe diseñadas para escenarios productor-consumidor. Piensa en ellos como una “tubería” donde un lado escribe datos y el otro los lee, sin preocuparte por locks o sincronización manual.
¿Por qué usarlos?
- ✅ Thread-safe por diseño
- ✅ Alta performance con bajo overhead
- ✅ Backpressure integrado (control de flujo)
- ✅ Ideal para comunicación entre hilos/tareas
- ✅ Alternativa simple a colas externas (RabbitMQ, Redis) para escenarios internos
- ✅ Optimizado para async/await (usa
ValueTask
internamente)
Arquitectura del Proyecto
Este proyecto demuestra cómo controlar un Background Job desde una API usando Channels para comunicación bidireccional:
┌──────────────┐ ┌─────────┐ ┌──────────────────┐
│ API Request │ ──────> │ Channel │ ──────> │ Background Job │
│ (Productor) │ │ (Cola) │ │ (Consumidor) │
└──────────────┘ └─────────┘ └──────────────────┘
↑ │
└────── TaskCompletionSource ─────────────────┘
(Respuesta)
Paso 1: Definir el Modelo de Comunicación
Primero, necesitamos estructuras para enviar comandos y recibir respuestas:
public enum CommandType { Start, Stop, GetStatus }
public class JobCommand
{
public CommandType Type { get; set; }
public TaskCompletionSource<JobStatus>? ResponseTask { get; set; }
}
public class JobStatus
{
public bool IsRunning { get; set; }
public int ExecutionCount { get; set; }
public DateTime? LastExecutionTime { get; set; }
public string Message { get; set; } = string.Empty;
}
💡 Clave: TaskCompletionSource
nos permite crear una Task que completaremos manualmente cuando tengamos la respuesta, haciendo posible la comunicación bidireccional.
Paso 2: Crear el Background Service (Consumidor)
public class JobProcessor : BackgroundService
{
private readonly Channel<JobCommand> _channel;
private bool _isJobRunning = false;
private int _executionCount = 0;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("JobProcessor iniciado. Esperando comandos...");
// ✅ Patrón recomendado por Microsoft: WaitToReadAsync + TryRead
// Más eficiente que ReadAllAsync para alta concurrencia
while (await _channel.Reader.WaitToReadAsync(stoppingToken))
{
while (_channel.Reader.TryRead(out var command))
{
try
{
await ProcessCommandAsync(command, stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error procesando comando");
// ✅ Notificar errores al productor
command.ResponseTask?.TrySetException(ex);
}
}
}
}
private async Task ProcessCommandAsync(JobCommand command, CancellationToken token)
{
switch (command.Type)
{
case CommandType.Start:
_isJobRunning = true;
_ = Task.Run(async () => await RunRecurringJobAsync(token));
// Enviar respuesta al productor
command.ResponseTask?.SetResult(new JobStatus
{
IsRunning = true,
Message = "Job iniciado"
});
break;
case CommandType.Stop:
_isJobRunning = false;
command.ResponseTask?.SetResult(new JobStatus
{
IsRunning = false,
Message = "Job detenido"
});
break;
case CommandType.GetStatus:
command.ResponseTask?.SetResult(new JobStatus
{
IsRunning = _isJobRunning,
ExecutionCount = _executionCount
});
break;
}
}
}
💡 Explicación de las mejoras:
-
WaitToReadAsync()
+TryRead()
: Patrón recomendado por Microsoft para mejor performance -
TrySetException()
: Propaga errores al productor de forma segura - Bucle anidado: Procesa múltiples comandos en batch cuando están disponibles
Paso 3: Configurar el Channel y el Servicio
En Program.cs
:
// ✅ Bounded Channel con opciones optimizadas (recomendado por Microsoft)
builder.Services.AddSingleton(Channel.CreateBounded<JobCommand>(
new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait, // Backpressure automático
SingleWriter = false, // Múltiples endpoints pueden escribir
SingleReader = true // Solo un BackgroundService consume
}));
// Registrar el Background Service
builder.Services.AddHostedService<JobProcessor>();
💡 ¿Bounded vs Unbounded?
Característica | Unbounded | Bounded |
---|---|---|
Capacidad | Ilimitada | Limitada (configurable) |
Memoria | Puede crecer sin control | Controlada |
Backpressure | No | Sí (automático) |
Uso recomendado | Productores lentos | Productores rápidos |
Performance | Writes síncronos | Writes pueden ser async |
Modos de Bounded Channel (FullMode
):
-
Wait
(recomendado): Espera hasta que haya espacio (backpressure) -
DropWrite
: Descarta el nuevo elemento -
DropOldest
: Descarta el elemento más antiguo -
DropNewest
: Descarta el elemento más reciente
Opciones de optimización:
-
SingleWriter
:true
= mejor performance si solo un productor escribe -
SingleReader
:true
= mejor performance si solo un consumidor lee -
AllowSynchronousContinuations
:false
(default) para evitar bloqueos
Paso 4: Crear los Endpoints (Productores)
public static IEndpointRouteBuilder MapJobEndpoints(this IEndpointRouteBuilder app)
{
var jobGroup = app.MapGroup("api/job");
jobGroup.MapPost("start", async (Channel<JobCommand> channel) =>
{
// Crear TaskCompletionSource para esperar la respuesta
var tcs = new TaskCompletionSource<JobStatus>();
var command = new JobCommand
{
Type = CommandType.Start,
ResponseTask = tcs
};
// ✅ WriteAsync maneja backpressure automáticamente
await channel.Writer.WriteAsync(command);
// Esperar respuesta del consumidor
var status = await tcs.Task;
return Results.Ok(status);
});
// Endpoints similares para stop y status...
return app;
}
💡 Flujo:
- API recibe request → Crea
TaskCompletionSource
- Escribe comando en el Channel con
WriteAsync()
(maneja backpressure) - Espera que el consumidor complete la Task
- Retorna la respuesta al cliente
¿Por qué este patrón?
Sin Channels:
// ❌ Locks manuales, propenso a errores
private static readonly object _lock = new();
private static Queue<Command> _queue = new();
public void AddCommand(Command cmd)
{
lock(_lock) { _queue.Enqueue(cmd); }
}
Con Channels:
// ✅ Thread-safe automático, limpio, con backpressure
await channel.Writer.WriteAsync(command);
Casos de Uso Reales con Channels
1. Cola de Emails/Notificaciones
// Microsoft recomienda Bounded para prevenir OutOfMemory
var emailChannel = Channel.CreateBounded<EmailMessage>(
new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait
});
// API recibe requests → Encola en Channel → Background envía emails en lotes
2. Procesamiento de Imágenes
Channel<ImageProcessingJob> imageChannel;
// Upload de imágenes → Channel → Worker redimensiona/optimiza en background
3. Logs Centralizados
// DropOldest para logs: si está lleno, descarta los más antiguos
var logChannel = Channel.CreateBounded<LogEntry>(
new BoundedChannelOptions(5000)
{
FullMode = BoundedChannelFullMode.DropOldest
});
4. Rate Limiting / Throttling
var boundedChannel = Channel.CreateBounded<Request>(100);
// Limita a 100 requests concurrentes, el resto espera (backpressure)
5. Event Sourcing Interno
Channel<DomainEvent> eventChannel;
// Eventos de dominio → Channel → Múltiples handlers procesan en paralelo
6. Pipeline de Datos (ejemplo oficial de Microsoft)
// Patrón de processing pipeline con múltiples stages
Channel<RawData> inputChannel;
Channel<ProcessedData> outputChannel;
// Stage 1: Raw → Validated → Stage 2: Validated → Enriched
Ventajas vs Alternativas
Escenario | Channel | Queue Externo | BlockingCollection |
---|---|---|---|
Performance | ⚡ Muy alta | 🐢 Red overhead | ✅ Alta |
Configuración | ✅ Cero | ❌ Infraestructura | ✅ Mínima |
Backpressure | ✅ Integrado | ⚠️ Manual | ⚠️ Manual |
Async/Await | ✅ Nativo (ValueTask) | ⚠️ Depende | ❌ Bloquea threads |
Escalabilidad | 🏠 Single-app | 🌍 Multi-app | 🏠 Single-app |
Memoria | ✅ Bounded options | ⚠️ Depende | ⚠️ Manual |
Usa Channels cuando:
- ✅ Comunicación dentro de la misma aplicación
- ✅ Necesitas alta performance y bajo latency
- ✅ Quieres simplicidad sin infraestructura externa
- ✅ Trabajas con async/await
- ✅ Necesitas backpressure automático
Usa Queue externo (RabbitMQ/Azure Service Bus) cuando:
- ❌ Necesitas comunicación entre múltiples aplicaciones/servicios
- ❌ Requieres persistencia de mensajes
- ❌ Necesitas escalabilidad horizontal
- ❌ Requieres garantías de entrega (at-least-once, exactly-once)
⚡ Mejores Prácticas de Microsoft Learn
1. *Consumer Pattern *
// ✅ WaitToReadAsync + TryRead (más eficiente)
while (await reader.WaitToReadAsync(cancellationToken))
{
while (reader.TryRead(out var item))
{
// Procesar item
}
}
// ❌ ReadAllAsync (menos eficiente para alta carga)
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
// Procesar item
}
2. Producer Pattern
// ✅ WriteAsync para backpressure automático
await writer.WriteAsync(item, cancellationToken);
// ⚠️ TryWrite solo para unbounded o cuando no quieres esperar
if (!writer.TryWrite(item))
{
// Channel lleno, manejar alternativa
}
3. Signal Completion
// ✅ Siempre señalar cuando terminas de escribir (esto cierra el canal)
writer.Complete();
// O con error
writer.Complete(exception);
4. Manejo de Errores
try
{
await ProcessCommand(command);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing");
command.ResponseTask?.TrySetException(ex); // ✅ Propagar al productor
}
Conclusión
Los Channels representan una evolución significativa en cómo manejamos la comunicación asíncrona en .NET. Lo que tradicionalmente requería código complejo con locks, semáforos y manejo manual de concurrencia, ahora se puede lograr con una API limpia, segura y de alto rendimiento.
¿Por qué usar Channels?
A lo largo de este tutorial, hemos visto cómo Channels ofrece ventajas significativas:
- Simplicidad: No necesitas infraestructura externa para empezar
-
Performance: Diseñados desde cero para async/await con
ValueTask
- Seguridad: Thread-safe por diseño, sin preocupaciones por race conditions
- Control: Backpressure automático previene sobrecarga del sistema
- Flexibilidad: Configuración granular según tus necesidades específicas
Cuándo SÍ usar Channels
✅ Comunicación intra-proceso: Coordinación entre componentes de la misma aplicación
✅ Alta frecuencia: Miles de mensajes por segundo con mínimo overhead
✅ Backpressure crítico: Necesitas controlar la velocidad de producción/consumo
✅ Simplicidad operacional: Quieres evitar dependencias de infraestructura externa
✅ Desarrollo rápido: Prototipado y desarrollo local sin complicaciones
Cuándo NO usar Channels
❌ Comunicación inter-proceso: Si necesitas comunicar múltiples aplicaciones/servicios
❌ Persistencia requerida: Si los mensajes deben sobrevivir reinicios
❌ Distribución geográfica: Múltiples datacenters o regiones
❌ Garantías de entrega avanzadas: Exactly-once, dead letter queues, retries configurables
❌ Monitoreo centralizado: Necesitas observabilidad empresarial de mensajería
Impacto en tu arquitectura
Este patrón es especialmente valioso cuando:
- Estás construyendo aplicaciones monolíticas modernas que necesitan procesamiento asíncrono
- Quieres reducir costos de infraestructura eliminando dependencias de message brokers
- Necesitas optimizar performance con procesamiento en memoria
- Buscas simplicidad operacional sin sacrificar escalabilidad vertical
Key Takeaways
Los 5 principios esenciales:
- ✅ Usa Bounded Channels con
FullMode.Wait
para prevenir OutOfMemory - ✅ Patrón WaitToReadAsync + TryRead para máximo throughput
- ✅ Configura SingleWriter/SingleReader cuando sea posible para mejor performance
- ✅ Siempre llama Complete() para señalizar fin de producción
- ✅ Propaga errores con TrySetException() para debugging efectivo
Evolución del patrón
Este proyecto es una base sólida que puedes extender según tus necesidades:
- Múltiples consumidores: Escala horizontalmente agregando más BackgroundServices
- Priorización: Implementa múltiples channels con diferentes prioridades
- Monitoring: Integra métricas de performance y observabilidad
- Resiliencia: Agrega retry policies y circuit breakers
Los Channels de .NET demuestran que no siempre necesitas herramientas complejas para resolver problemas complejos. A veces, la solución más elegante es la que viene incorporada en tu framework.
Próximos Pasos
¿Listo para llevar este conocimiento al siguiente nivel? Aquí tienes algunas ideas para expandir este proyecto:
1. Implementar Múltiples Consumidores
// Escalar procesamiento con múltiples workers
builder.Services.AddHostedService<JobProcessor>(); // Worker 1
builder.Services.AddHostedService<JobProcessor>(); // Worker 2
builder.Services.AddHostedService<JobProcessor>(); // Worker 3
Aprenderás: Paralelización, distribución de carga, sincronización entre workers
2. Agregar Sistema de Prioridades
public enum JobPriority { Low, Normal, High, Critical }
// Crear channels separados por prioridad
var highPriorityChannel = Channel.CreateBounded<JobCommand>(50);
var normalPriorityChannel = Channel.CreateBounded<JobCommand>(100);
var lowPriorityChannel = Channel.CreateBounded<JobCommand>(200);
Aprenderás: Gestión de prioridades, routing inteligente, SLA por prioridad
3. Integrar Observabilidad
// Métricas con System.Diagnostics.Metrics
var meter = new Meter("BackgroundJobs");
var jobsProcessed = meter.CreateCounter<long>("jobs_processed");
var queueDepth = meter.CreateObservableGauge("queue_depth",
() => channel.Reader.Count);
Aprenderás: OpenTelemetry, métricas personalizadas, dashboards con Grafana/Prometheus
4. Implementar Persistencia
// Guardar estado en caso de restart
public class PersistentJobProcessor : BackgroundService
{
private readonly IJobStateRepository _repository;
protected override async Task ExecuteAsync(CancellationToken token)
{
// Recuperar jobs pendientes al iniciar
await _repository.RestorePendingJobsAsync(token);
// ... continuar procesamiento normal
}
}
Aprenderás: State management, recovery strategies, durabilidad
5. Agregar Pipeline de Procesamiento
// Pipeline multi-etapa
var rawChannel = Channel.CreateBounded<RawData>(100);
var validatedChannel = Channel.CreateBounded<ValidatedData>(100);
var enrichedChannel = Channel.CreateBounded<EnrichedData>(100);
// Stage 1: Validación
builder.Services.AddHostedService<ValidationProcessor>();
// Stage 2: Enriquecimiento
builder.Services.AddHostedService<EnrichmentProcessor>();
// Stage 3: Persistencia
builder.Services.AddHostedService<PersistenceProcessor>();
Aprenderás: Pipeline pattern, ETL processes, data transformation
6. Implementar Rate Limiting Avanzado
// Rate limiter con ventanas deslizantes
public class RateLimitedJobProcessor : BackgroundService
{
private readonly RateLimiter _rateLimiter;
protected override async Task ExecuteAsync(CancellationToken token)
{
while (await _channel.Reader.WaitToReadAsync(token))
{
using var lease = await _rateLimiter.AcquireAsync(1, token);
if (lease.IsAcquired)
{
// Procesar job
}
}
}
}
Aprenderás: Rate limiting patterns, token bucket, leaky bucket
7. Crear Dashboard de Monitoreo
// SignalR para updates en tiempo real
builder.Services.AddSignalR();
// Notificar estado a clientes conectados
await _hubContext.Clients.All.SendAsync("JobStatusUpdate", new {
QueueDepth = channel.Reader.Count,
ProcessingRate = jobsPerSecond,
ActiveJobs = activeJobCount
});
Aprenderás: Real-time updates, SignalR, live dashboards
8. Añadir Resiliencia
// Polly para retry policies
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(3, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));
await retryPolicy.ExecuteAsync(async () =>
{
await ProcessJobAsync(command);
});
Aprenderás: Retry patterns, circuit breakers, fallback strategies
9. Implementar Health Checks
// Health check para el channel
builder.Services.AddHealthChecks()
.AddCheck<ChannelHealthCheck>("channel_health")
.AddCheck<JobProcessorHealthCheck>("job_processor_health");
public class ChannelHealthCheck : IHealthCheck
{
public Task<HealthCheckResult> CheckHealthAsync()
{
var queueDepth = _channel.Reader.Count;
return queueDepth < 1000
? HealthCheckResult.Healthy()
: HealthCheckResult.Degraded("Queue depth high");
}
}
Aprenderás: Health monitoring, readiness/liveness probes, Kubernetes integration
10. Migrar a Arquitectura Distribuida
// Cuando crezcas más allá de un solo servidor
// Considera migrar a:
// - Azure Service Bus para messaging distribuido
// - Azure Queue Storage para simplicidad y bajo costo
// - RabbitMQ para control total
// - Redis Streams para alta performance
// Mantén la misma interfaz, cambia la implementación
public interface IJobQueue
{
Task EnqueueAsync(JobCommand command);
Task<JobCommand> DequeueAsync(CancellationToken token);
}
// Implementación con Channels (actual)
public class InMemoryJobQueue : IJobQueue { }
// Implementación con Azure Service Bus (futuro)
public class ServiceBusJobQueue : IJobQueue { }
Aprenderás: Estrategias de migración, abstracciones, arquitectura evolutiva
Recursos para Continuar Aprendiendo
Documentación Oficial:
- System.Threading.Channels API Reference
- Background tasks with hosted services in ASP.NET Core | Microsoft Learn
- Channels Library Guide
- Background Services in ASP.NET Core
Artículos Avanzados: