Subida y procesamiento de multiples archivos con NestJS y BullMQ

Subida y procesamiento de multiples archivos con NestJS y BullMQ

La carga de archivos, imágenes en este caso y el procesamiento de las mismas, es crucial para muchas aplicaciones modernas, y un caso común en el que necesitamos subir imágenes, procesarlas y almacenarlas en un servicio como AWS S3, todo ello mientras mantenemos una alta eficiencia y escalabilidad. Este artículo como muhos otros es parte del código o la idea que estoy desarrollando llamada Deeditt, y para ello utilizaremos NestJS para el backend, BullMQ para la cola de trabajos y el módulo cluster de Node.js para aprovechar múltiples núcleos de CPU.

El Problema

Supongamos que estás desarrollando una pequeña red social. Tus usuarios necesitan subir imágenes, pero no se trata solo de guardar un archivo en un disco. Las imágenes pueden venir en varios formatos, tamaños y calidades, y debes:

  • Verificar la extensión, el tamaño y el tipo de archivo.
  • Cambiar el formato a WebP para optimización.
  • Crear una miniatura (thumbnail) y una versión de tamaño completo de cada imagen.
  • Almacenar ambas versiones en AWS S3.

Todo esto debe suceder rápidamente y de manera escalable. El problema se agrava cuando múltiples usuarios intentan subir imágenes al mismo tiempo.

Una posible solución

Antes de empezar a describir las posibles soluciones, es importante tener en cuenta que el uso de los recursos es limitado y no se trata de utilizar tecnolgías específicamente desarrolladas para esto, entonces básicamente se debe de resolver de forma sencilla con las menos herramientas posibles, y algunas formas de soluciones este problema serían:

BullMQ: Manejo de Colas de Trabajos

BullMQ es una biblioteca para manejar colas de trabajos en Node.js. Mueve tareas costosas o de larga duración fuera del ciclo principal del evento, permitiendo que la aplicación se mantenga rápida.

Clustering en Node.js: Aprovechando Todos los Núcleos

Node.js opera en un solo hilo. Utilizar el módulo cluster nos permite crear múltiples instancias del mismo proceso para aprovechar todos los núcleos de la CPU.

Identificadores Únicos y Seguimiento

Generar un ID único para cada tarea de subida y procesamiento permite al cliente rastrear el estado de su operación.

Manejo de Errores y Reintentos

El sistema debe ser robusto y capaz de recuperarse de errores. Aquí es donde entran en juego los eventos y mecanismos de reintento de BullMQ.

Veámos como se vería todo eso en código

Servicio para la carga de archivos

A continuación vamos a explorar cómo se puede establecer un endpoint en NestJS para manejar la subida de múltiples archivos. Utilizaremos el decorador @UploadedFiles() para recibir un arreglo de archivos enviados en una solicitud HTTP POST. Estos archivos se procesarán en paralelo mediante una cola de trabajos gestionada por BullMQ y por cada archivo que se envíe a procesamiento, se generará un UUID que servirá como ID de rastreo.

He omitido a propósito la parte del código donde se hace uso de BullMQ desde app.module.ts, pero en la siguiente sección puedes ver la configuración que es similar y eso te ayudará a entender como se aplica.

// app.controller.ts
import { v4 as uuidv4 } from 'uuid';
import { Controller, Post, UploadedFiles, UseInterceptors } from '@nestjs/common';
import { FileFieldsInterceptor } from '@nestjs/platform-express';
import { Queue } from 'bullmq';

@Controller('upload')
export class AppController {
  private fileQueue: Queue;

  constructor() {
    this.fileQueue = new Queue('fileQueue');
  }

  @Post()
  @UseInterceptors(FileFieldsInterceptor([{ name: 'files', maxCount: 10 }]))
  async uploadFiles(@UploadedFiles() files: Array<Express.Multer.File>): Promise<any> {
    const uuids = [];

    // hasta este punto cada archivo subido quedara en el disco
    const jobs = files.map((file) => {
      const fileUUID = uuidv4();
      uuids.push(fileUUID);

      return this.fileQueue.add('process_file', { 
        path: file.path, 
        uuid: fileUUID 
      });
    });

    await Promise.all(jobs);
    return { status: 'ok', uuids };
  }
}

Al finalizar la solicitud, se retorna una respuesta al cliente, indicando que los archivos se han recibido y están siendo procesados.

Servicio para manejar archivos

Aquí empieza lo divertido, pues este es un proyecto diferente y su meta es recibir eventos que se han generado desde uploadFiles para procesarlos y luego subirlos a un bucket de S3.

Configuración de Clúster de Node.js

Primero, configuramos un clúster de Node.js para aprovechar múltiples núcleos de CPU en el servidor. Este paso es opcional pero recomendado para escalar la aplicación.

import * as cluster from 'cluster';
import * as os from 'os';

const numCPUs = os.cpus().length;

if (cluster.isMaster) {
  console.log(`Master ${process.pid} is running`);

  // Fork workers
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`Worker ${worker.process.pid} died`);
  });
} else {
  bootstrap(); // Inicializa la aplicación NestJS en cada proceso hijo
}
  • cluster.isMaster: Comprueba si el proceso actual es el maestro.
  • cluster.fork(): Crea un nuevo proceso hijo.
  • cluster.on('exit', ...): Escucha cuando un proceso hijo termina.

Inicialización de la Aplicación NestJS

La función bootstrap se encarga de inicializar la aplicación NestJS. Esto ocurre en cada proceso hijo del clúster.

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  await app.listen(3000);
  console.log(`Worker ${process.pid} started`);
}

Configuración del Módulo de NestJS y Bull

Aquí configuramos la cola de BullMQ y la inyectamos en nuestro módulo NestJS. También agregamos el procesador de archivos como un proveedor.

import { BullModule } from '@nestjs/bull';
import { Module } from '@nestjs/common';
import { AppService } from './app.service';
import { FileProcessor } from './file.processor';

@Module({
  imports: [
    BullModule.forRoot({
      redis: {
        host: 'localhost',
        port: 6379,
      },
    }),
    BullModule.registerQueue({
      name: 'fileQueue',
    }),
  ],
  providers: [AppService, FileProcessor],  // Aquí se ha agregado FileProcessor
})
export class AppModule {}

Procesador de archivos

Este es el corazón del sistema. El procesador de archivos toma el trabajo de la cola y realiza el procesamiento real de cada archivo: la conversión a formato webp y la subida a S3.

// file.processor.ts
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
import sharp from  'sharp';
import { PutObjectCommand, PutObjectCommandInput, S3Client } from  '@aws-sdk/client-s3';
import fs from  'node:fs';
import path from 'node:path';

// Configuración de AWS S3
// esto esta usando la nueva API de AWS
const s3 = new S3Client({
    region: 'tu_region_de_S3',
    apiVersion: 'la_version_de_tu_bucket',
    credentials: {
      accessKeyId: 'tu_access_key_id',
      secretAccessKey: 'tu_secret_access_key'
  }
});

@Processor('fileQueue')
export class FileProcessor {
  @Process('convertAndUpload')
  async handleConvertAndUpload(job: Job<{ fileName: string; filePath: string }>) {
    const { uuid: fileName, path: filePath } = job.data;
    const webpFileName = `${path.basename(fileName, path.extname(fileName))}.webp`;
    const webpFilePath = path.join(path.dirname(filePath), webpFileName);

    // Convertir la imagen a formato webp usando sharp
    const sourceImage = sharp(webpFilePath).webp({
        effort: 3
      });

        // escalamos la imagen manteniendo sus proporciones
    const scaledImage = await sourceImage.resize(300, 300, {
        fit: 'inside'
      }).toBuffer();

    // Parámetros de S3
        const transformedFile: Partial<PutObjectCommandInput> = {
            Bucket: 'tu_bucket_name',
            Key: webpFileName,
            ContentType: 'image/webp'
            Body: scaledImage
        };

    // Subir el archivo a S3
    return new Promise((resolve, reject) => {
        s3.send(new PutObjectCommand(transformedFile));
        fs.rmSync(webpFilePath, { force:  true });
    });
  }
}
  • @Processor('fileQueue'): Declara que esta clase es un procesador para la cola llamada 'fileQueue'.
  • @Process('convertAndUpload'): Indica que este método manejará los trabajos etiquetados como 'convertAndUpload'.

Dentro de handleConvertAndUpload, realizamos las operaciones de conversión y subida, y manejamos tanto el almacenamiento en S3 como la limpieza de archivos locales.

Conclusión

En este artículo, hemos explorado el desafío de manejar la carga de archivos en un entorno de servidor eficiente y cómo el procesamiento asíncrono emerge como una solución efectiva para este problema. Utilizando una arquitectura basada en NestJS, hemos delineado dos proyectos distintos: uno para recibir el archivo y otro para procesarlo. La comunicación entre estos proyectos se facilita mediante BullMQ, una biblioteca de colas de trabajos robusta que asegura un enlace seguro y efectivo.

Aunque el artículo aborda el uso de clusters en nodejs, también se pueden utilizar workers o una combinación de ambos y esto es algo que quizás escriba en una siguiente publicación.

Happy coding! :D


Photo by Patrycja Chociej on Unsplash

Jack Fiallos

Jack Fiallos

Te gustó este artículo?