Amqp, rabbit mq и socket.io переподключаются к очереди, даже если клиент закрыт

Я кодирую уведомление браузера, используя rabbitMQ и socket.io. Моя конфигурация работает нормально, за исключением одного случая.

Когда я вхожу в свою систему с пользователем, он создает очередь уведомления-UID-IDпользователя (сейчас queueName отправляется oaraeter запроса, я буду реализовывать более изощренный метод, как только я решу проблему)

Если я вхожу в систему с другим пользователем в другом браузере, он создает другую очередь-уведомление-UID-secondduserid.

Если я выйду из системы одного из пользователей, очередь исчезнет (так как она недолговечна).

Проблема заключается в том, что когда я обновляю или загружаю другую страницу в другом сеансе, она воссоздает вторую очередь, даже если имя параметра paramater не отправляется.

server.js

var amqp = require('amqp');
var app = require('express')();
var http = require('http').Server(app);
var io = require('socket.io')(http);

var rabbitMqConnection = null;
var _queue = null;
var _consumerTag = null;


io.use(function (socket, next) {
    var handshakeData = socket.handshake;
    // Here i will implement token verification
    console.log(socket.handshake.query.queueName);
    next();
});


// Gets the connection event form client
io.sockets.on('connection', function (socket) {

    var queueName = socket.handshake.query.queueName;

    console.log("Socket Connected");

    // Connects to rabbiMq
    rabbitMqConnection = amqp.createConnection({host: 'localhost', reconnect: false});

    // Update our stored tag when it changes
    rabbitMqConnection.on('tag.change', function (event) {
        if (_consumerTag === event.oldConsumerTag) {
            _consumerTag = event.consumerTag;
            // Consider unsubscribing from the old tag just in case it lingers
            _queue.unsubscribe(event.oldConsumerTag);
        }
    });

    // Listen for ready event
    rabbitMqConnection.on('ready', function () {
        console.log('Connected to rabbitMQ');

        // Listen to the queue
        rabbitMqConnection.queue(queueName, {
                closeChannelOnUnsubscribe: true,
                durable: false,
                autoClose: true
            },
            function (queue) {
                console.log('Connected to ' + queueName);
                _queue = queue;

                // Bind to the exchange
                queue.bind('users.direct', queueName);

                queue.subscribe({ack: false, prefetchCount: 1}, function (message, headers, deliveryInfo, ack) {
                    console.log("Received a message from route " + deliveryInfo.routingKey);
                    socket.emit('notification', message);
                    //ack.acknowledge();
                }).addCallback(function (res) {
                    // Hold on to the consumer tag so we can unsubscribe later
                    _consumerTag = res.consumerTag;
                });
            });
    });


    // Listen for disconnection
    socket.on('disconnect', function () {
        _queue.unsubscribe(_consumerTag);
        rabbitMqConnection.disconnect();
        console.log("Socket Disconnected");
    });

});

http.listen(8080);

client.js

var io = require('socket.io-client');

$(document).ready(function () {

    var socket = io('http://myserver.it:8080/', {
         query:  { queueName: 'notification-UID-' + UID},
        'sync disconnect on unload': true,
        });

    socket.on('notification', function (data) {
        console.log(data);
    });
})

Любая идея?

1 ответ

Решение

Так что я решил свою проблему, это была проблема с переменной областью, которая путала вещи. Позвольте мне объяснить, что я делаю, может быть, это кому-нибудь пригодится.

По сути, я пытаюсь создать систему уведомлений браузера, это означает, что мое приложение публикует (сторона производителя) для обмена объектом уведомления, который содержит некоторую информацию, такую ​​как тема, ссылка и сообщение.

Обмен является разветвлением (users.notification.fanout), которое имеет два связанных обмена: users.direct (прямой тип) и users.notification.store (тип разветвления).

Когда производитель публикует уведомление, он обращается к users.notification.fanout с ключом маршрутизации "tification-UID-userid"(где userid - это реальный идентификатор пользователя.

Объект уведомления направляется в обоих пользователей: users.direct и users.notification.store. Последний имеет потребителя, который записывает уведомление в БД, если пользователь не зарегистрирован, первый публикует уведомление в браузере.

Так как работает потребитель браузера?

Я использовал классическую комбинацию socket.io, node server и amqplib.

Каждый раз, когда пользователь входит в систему, socket.io создает очередь с именем и ключом маршрутизации Notification-UID-userid и связывает ее с обменом users.direct.

В то же время я добавил https на свой сервер, чтобы немного измениться с первой версии.

Вы можете прочитать комментарии, чтобы узнать, что он делает.

Так что мой server.js

var amqp = require('amqp');
var fs = require('fs');
var app = require('express')();
// Https server, certificates and private key added
var https = require('https').Server({
    key: fs.readFileSync('/home/www/site/privkey.pem'),
    cert: fs.readFileSync('/home/www/site/fullchain.pem')},app);
var io = require('socket.io')(https);

// Used to verify if token is valid
// If not it will discard connection
io.use(function (socket, next) {
    var handshakeData = socket.handshake;
    // Here i will implement token verification
    console.log("Check this token: " + handshakeData.query.token);
    next();
});
// Gets the connection event from client
io.sockets.on('connection', function (socket) {
    // Connection log
    console.log("Socket Connected with ID: " + socket.id);
    // THIS WAS THE PROBLEM
    // Local variables for connections
    // Former i've put these variables outside the connection so at 
    // every client they were "overridden". 
    // RabbitMq Connection (Just for current client)
    var _rabbitMqConnection = null;
    // Queue (just for current client)
    var _queue = null;
    // Consumer tag (just for current client)
    var _consumerTag = null;
    // Queue name and routing key for current user
    var queueName = socket.handshake.query.queueName;
    // Connects to rabbiMq with default data to localhost guest guest
    _rabbitMqConnection = amqp.createConnection();
    // Connection ready
    _rabbitMqConnection.on('ready', function () {
        // Connection log
        console.log('#' + socket.id + ' - Connected to RabbitMQ');
        // Creates the queue (default is transient and autodelete)
        // https://www.npmjs.com/package/amqp#connectionqueuename-options-opencallback
        _rabbitMqConnection.queue(queueName, function (queue) {
            // Connection log
            console.log('#' + socket.id + ' - Connected to ' + queue.name + ' queue');
            // Stores local queue
            _queue = queue;
            // Bind to the exchange (default)
            queue.bind('users.direct', queueName, function () {
                // Binding log
                console.log('#' + socket.id + ' - Binded to users.direct exchange');
                // Consumer definition
                queue.subscribe({ack: false}, function (message, headers, deliveryInfo, messageObject) {
                    // Message log
                    console.log('#' + socket.id + ' - Received a message from route ' + deliveryInfo.routingKey);
                    // Emit the message to the client
                    socket.emit('notification', message);
                }).addCallback(function (res) {
                    // Hold on to the consumer tag so we can unsubscribe later
                    _consumerTag = res.consumerTag;
                    // Consumer tag log
                    console.log('#' + socket.id + ' - Consumer ' + _consumerTag + ' created');
                })
            });

        });
    });
    // Update our stored tag when it changes
    _rabbitMqConnection.on('tag.change', function (event) {
        if (_consumerTag === event.oldConsumerTag) {
            _consumerTag = event.consumerTag;
            // Unsubscribe from the old tag just in case it lingers
            _queue.unsubscribe(event.oldConsumerTag);
        }
    });
    // Listen for disconnection
    socket.on('disconnect', function () {
        _queue.unsubscribe(_consumerTag);
        _rabbitMqConnection.disconnect();
        console.log('#' + socket.id + ' - Socket Disconnected');
    });
});

https.listen(8080);

Тогда мой client.js

var io = require ('socket.io-client');

$(document).ready(function () {

    var socket = io('https://myserver.com:8080/', {
        secure: true, // for ssl connections
        query:  { queueName: 'notification-UID-' + UID, token: JWTToken}, // params sent to server, JWTToken for authentication
        'sync disconnect on unload': true // Every time the client unload, socket disconnects
    });

    socket.on('notification', function (data) {
        // Do what you want with your data
        console.log(data);
    });
})
Другие вопросы по тегам