Comunicando microservicios en Nodejs usando RabbitMQ y amqplib

Comunicando microservicios en Nodejs usando RabbitMQ y amqplib

La mensajería de colas o Message Queue es un tipo de software que permite la comunicación asíncrona entre varias aplicaciones, básicamente un producer como se le conoce, publica un mensaje y no precisamente necesita de una respuesta, lo que permite que continue con su ejecución. En esta interacción intervienen 2 actores, uno de ellos es el producer que es el que emite el mensaje y del otro lado está el consumer o consumers (porque puede haber más de uno), que son aplicaciones que están a la escucha de una señal desde el gestor de colas y generalmente toman el mensaje emitido por los producers y lo procesan de una manera predefinida.

Pues bueno, en este artículo explicaré como crear producers y consumers en nodejs, y como conectarlos a través de RabbitMQ (Message Queue).

RabbitMQ es un software de encolado de mensajes llamado broker de mensajería o gestor de colas. Dicho de forma simple, es un software donde se pueden definir colas, las aplicaciones se pueden conectar a dichas colas y transferir/leer mensajes en ellas, sus características principales son:

  • Garantía de entrega
  • Enrutamiento
  • Clusterización
  • Federación
  • Alta disponibilidad
  • Tolerancia a fallos

Inicializando la aplicación

Como cualquier otra aplicación de nodejs, vamos a empezar generando nuestro package.json con el siguiente comando:

yarn init -y

Debemos instalar la dependencia AMQP la cual será utilizada para conectarnos con RabbitMQ.

yarn add amqplib

Instalando RabbitMQ

Dependiendo del sistema operativo que uses, hay varias formas de instalar RabbitMQ, yo creo que la más sencilla para este ejemplo es utilizando Docker, para que puedas darte una idea, este es el docker-compose.yml que use para este artículo:


version: '3'

services:
    rabbitmq:
        image: 'rabbitmq:3.7-management-alpine'
        container_name: rabbitmq
        restart: always
        ports:
            # The standard AMQP protocol port
            - '5672:5672'
            # HTTP management UI
            - '15672:15672'
        environment:
            AMQP_URL: 'amqp://rabbitmq?connection_attempts=5&retry_delay=5'
            RABBITMQ_DEFAULT_USER: rabbitmquser
            RABBITMQ_DEFAULT_PASS: rabbitmqpassword
        networks:
            - local

networks:
  local:
    driver: bridge

Implementando el Producer

Recuerda que poducer le llamamos al servicio que da origen al mensaje y ahora lo primero será crear un archivo llamado producer.js con el siguiente código:


const amqp = require('amqplib/callback_api');

const options = {
    clientProperties:
    {
        connection_name: 'producer-service'
    }
};

amqp.connect('amqp://rabbitmquser:rabbitmqpassword@localhost', options, (error, connection) => {
    if (error) {
        throw error;
    }

    connection.createChannel((connErr, channel) => {
        if (connErr) {
            throw connErr;
        }

        channel.assertQueue('test_queue', {
            durable: true
        });

        channel.sendToQueue('test_queue', Buffer.from('Hola'), {
            persistent: true
        });
    });

    setTimeout(function() {
        connection.close();
        process.exit(0);
    }, 500);
});

Ahora explico, lo primero que se necesita es importar la librería de amqplib y una vez importada estamos listos para establecer la conexión al servidor de RabbitMQ utilizando el método amqp.connect(), éste método recibe 3 parámetros, los cuales son la cadena de conexión, opciones de configuración de la conexión y un callback donde se retornara la conexión creada o un error si es que algo falló durante la conexión.

Quizás hayas visto ejemplos anteriores donde la cadena de conexión simplemente se especifica como localhost, pero ya que nosotros hemos definido usuario y password para nuestro RabbitMQ, entonces es necesario especificarlos a la hora de establecer la conexión.

Otra cosa importante, notarán que yo he utilizado clientProperties y connection_name y esto es bastante útil cuando tienes más de un servicio conectado al servidor, pues puedes especificar básicamente el nombre de tu servicio.

Ahora si todo ha marchado bien y no se ha retornado ningún error, procedemos a crear un canal para enviar los mensajes y básicamente los mensajes se quedarán ahí hasta que puedan ser entragados a un consumer.

El método channel.assertQueue() se utiliza para verificar que el canal al cual deseo conectarme existe y de no ser así, se creará uno en ese momento con el nombre especificado. En este caso, el parámetro durable, significa que el servidor aún después de un reinicio persistirá el canal creado, garantizando que cualquier mensaje con el flag persistent permanecerá en el disco.

A continuación se ejecutará channel.sendToQueue() con el nombre de la cola a la cual quiero enviar el mensaje en formato de matriz de bytes (Buffer.from()), la conexión se cerrará al cabo de medio segundo y ojo, dependiendo de la versión de nodejs que estés utilizando pueda que utilices Buffer solamente, pero a partir de la versión 10 de nodejs ya es necesario utilizar el método Buffer.from() para pasar de un string a un Buffer de datos.

Y para finalizar el produces, solamente necesitamos poner en marcha nuestro producer.js con el comando:

node producer.js

Y listo, ya tenemos un mensajes en nuestro servidor de RabbitMQ, y lo sé porque al acceder al panel de administración vía web, observarás algo como la siguiente captura de pantalla.

Implementando el Consumer

Bastante similar al producer, ahora necesitaremos crear el consumer y en este caso consumer estará a la escucha de cualquier mensaje que haya sido puesto en el canal especificado, para ello he creado el archivo consumer.js con el siguiente código:


const amqp = require('amqplib/callback_api');

const options = {
    clientProperties:
    {
        connection_name: 'producer-service'
    }
};

amqp.connect('amqp://rabbitmquser:rabbitmqpassword@localhost', options, (error, connection) => {
    if (error) {
        throw err;
    }

    connection.createChannel((connErr, channel) => {
        if (connErr) {
            throw connErr;
        }

        channel.assertQueue('test_queue', { durable: true });

        channel.prefetch(1);

        channel.consume('test_queue', (msg) => {
            console.log(msg.content.toString());

            setTimeout(() => {
                channel.ack(msg);
                connection.close();
                process.exit(0);
            }, 500);
        });
    });
});

Notarás que hay un par de cosas similares, como la forma en que se establece la conexión y el como se verifica o se crea un canal, hasta ahí todo bien.

El método channel.prefetch() determina el número de mensaje que queremos recibir del servidor, es importante tener en cuenta la capacidad de procesamiento del consumer.

Ahora utilizaremos el método channel.consume(), le pasaremos el nombre de la cola a la cual queremos escuchar y esto retorna un callback con el mensaje que ha sido enviado al servidor y nuevamente, si todo ha salido bien, deberían poder ver en su consola el mensaje que se envío desde el producer.

Cabe mencionar que los mensajes encolados siempre deben de ser cadenas (strings) y en el caso de querer enviar un objeto de json, será necesario hacer un JSON.stringify() y luego desde el consumer deberás ejecutar JSON.parse() para transformar el string en un objeto nuevamente.

Antes de finalizar con este artículo, quiero mencionar algunas cosas a considerar antes de empezar a utilizar RabbitMQ y son las siguientes:

Manten tus colas limpias, recuerda que los mensajes son almacenados en la memoria RAM y RabbitMQ para liberar la memoria hace un flush de vez en cuando para pasar esos mensajes al disco y específicamente esta tarea requiere de tiempo y procesamiento, por lo que tener muchos mensajes puede afectar el desempeño de tu servidor, esto va estrechamente relacionado con la cantidad de mensajes que tus consumer pueden tomar.

Limita tus colas a un máximo número de mensajes o especificando un tiempo de vida para los mensajes.

Las colas son procesos que corren en hilos separados y según la documentación, una sola cola puede manejar 50,000 mensajes, así es que utiliza las colas inteligentemente.

Elimina colas que ya no estés utilizando.

Los mensajes no reconocidos o "Unacknowledged Messages" viven permanentemente en la memoria y mientras más mensajes con este flag tengas, mayor consumo de memoria tendrás por lo que hay que cuidar la cantidad de mensajes en este estado ya que pueden provocar un volcado de memoria.

Los valores de prefetch se utilizan para determinar cuantos mensajes pueden ser procesados por el consumer al mismo tiempo, es una buena practica manejar un número grande, pero no tanto como para mantener al consumer ocupado por un largo período de tiempo, porque la idea es siempre tener consumers listos para recibir mensajes.

Evita abrir y cerrar conexiones en la medida de lo posible, ya que el proceso es complejo y más si se involucra TLS. De acuerdo a la documentación, los canales pueden abrirse o cerrarse con más frecuencia si es necesario, pero si es algo que puedes evitar, hazlo.

Y bien, espero esto te de una idea de como usar RabbitMQ con nodejs, yo lo recomiendo apliamente antes de saltar a Kafka por ejemplo, si han quedado dudas, recuerda que puedes dejar tus comentarios.

Happy coding!


Photo by Mathijs Delva on Unsplash

Jack Fiallos

Jack Fiallos

Te gustó este artículo?