Skip to content

Worker

El Data Inventory Worker es responsable de procesar eventos de inventario desde una cola, transformarlos en un formato adecuado para el análisis de datos, y publicarlos en un tema para su consumo por servicios de datos. El worker se ejecuta como un trabajo cron, verificando periódicamente nuevos mensajes en la cola.

Procesa eventos de inventario desde la cola SQS, los valida y transforma, y los publica en el tema SNS para análisis de datos.

interface EventMessage<T, D> {
type: T;
data: D;
context: {
country: string;
warehouse: string;
};
}
type TransactionEventMessage = EventMessage<
'InputTransaction' | 'OutputTransaction',
{
/* Entry */
entryId: string;
stock: number;
/* Product */
productId: string;
productEan: string;
productSku: string;
expirationDate?: string;
lot?: string;
/* Location */
locationId: string;
locationName: string;
locationType: LocationType;
locationZone?: Zone;
/* Container */
containerId?: string;
containerName?: string;
/* Transaction */
transactionId: string;
description: string;
transactionQuantity: number;
timestamp: string;
}
>;
  1. Inicialización:

    • El worker se inicializa con un QueueService y un InventoryProcessor
    • El trabajo cron se configura para ejecutar el worker a intervalos regulares
  2. Recepción de Mensajes:

    • El worker llama al QueueService para recibir mensajes de la cola SQS
    • Se pueden recibir hasta 10 mensajes en una sola llamada
  3. Procesamiento de Mensajes:

    • Para cada mensaje recibido:
      • El worker pasa el mensaje al InventoryProcessor para su procesamiento
      • El procesador verifica si el almacén está habilitado para procesamiento
      • El procesador verifica si el tipo de evento es procesable
      • El procesador valida el formato del evento
      • El procesador transforma el evento al formato esperado por los servicios de datos
      • El procesador publica el evento transformado en el tema SNS
  4. Eliminación de Mensajes:

    • Si el procesamiento es exitoso, el worker elimina el mensaje de la cola
    • Si se cumplen ciertas condiciones de error (por ejemplo, almacén no habilitado, tipo de evento no procesable, formato de evento inválido), el worker aún elimina el mensaje para evitar que sea procesado nuevamente
    • Para otros errores, el mensaje permanece en la cola y será reintentado
  5. Registro y Monitoreo:

    • El worker registra el tiempo de procesamiento y el número de eventos procesados
    • Los errores se registran con el contexto apropiado para depuración
  • QueueService (Puerto): Interfaz para operaciones de cola

    • AwsQueueService (Adaptador): Implementación usando AWS SQS
    • LogQueueService (Decorador): Agrega registro al servicio de cola
  • InventoryProcessor (Puerto): Interfaz para procesar eventos de inventario

    • InventoryProcessorImpl (Adaptador): Implementación del procesador
  • DataInventoryEventDispatcher (Puerto): Interfaz para publicar eventos

    • SNSDataInventoryEventDispatcher (Adaptador): Implementación usando AWS SNS
  • Solo se procesan los almacenes habilitados
  • Solo se procesan ciertos tipos de eventos
  • Los eventos deben conformarse a un formato específico
  • Los eventos se transforman a un formato estandarizado para análisis de datos
  • WarehouseNotEnabledError: El almacén en el evento no está habilitado para procesamiento
  • EventTypeUnprocessableError: El tipo de evento no es compatible para procesamiento
  • InvalidEventFormatError: El formato del evento es inválido
  • UnexpectedError: Ocurrió un error inesperado durante el procesamiento
  • GettingQueueMessageError: Ocurrió un error al recibir mensajes de la cola
  • DeleteQueueMessageError: Ocurrió un error al eliminar un mensaje de la cola

Estos errores se manejan apropiadamente para asegurar que el worker continúe funcionando y no se quede atascado procesando los mismos mensajes repetidamente.