Масштабирование Socket.IO для нескольких процессов Node.js с использованием кластера

Разорвать мне голову с этим... кто-нибудь смог масштабировать Socket.IO для нескольких "рабочих" процессов, порожденных модулем кластера Node.js?

Допустим, у меня есть следующие четыре рабочих процесса (псевдо):

// on the server
var express = require('express');
var server = express();
var socket = require('socket.io');
var io = socket.listen(server);

// socket.io
io.set('store', new socket.RedisStore);

// set-up connections...
io.sockets.on('connection', function(socket) {

  socket.on('join', function(rooms) {
    rooms.forEach(function(room) {
      socket.join(room);
    });
  });

  socket.on('leave', function(rooms) {
    rooms.forEach(function(room) {
      socket.leave(room);
    });
  });

});

// Emit a message every second
function send() {
  io.sockets.in('room').emit('data', 'howdy');
}

setInterval(send, 1000);

И в браузере...

// on the client
socket = io.connect();
socket.emit('join', ['room']);

socket.on('data', function(data){
  console.log(data);
});

Проблема: каждую секунду я получаю четыре сообщения, потому что четыре отдельных рабочих процесса отправляют сообщения.

Как я могу убедиться, что сообщение отправлено только один раз?

4 ответа

Решение

Изменить: В Socket.IO 1.0+ вместо настройки магазина с несколькими клиентами Redis теперь можно использовать более простой модуль адаптера Redis.

var io = require('socket.io')(3000);
var redis = require('socket.io-redis');
io.adapter(redis({ host: 'localhost', port: 6379 }));

Пример, показанный ниже, будет выглядеть примерно так:

var cluster = require('cluster');
var os = require('os');

if (cluster.isMaster) {
  // we create a HTTP server, but we do not use listen
  // that way, we have a socket.io server that doesn't accept connections
  var server = require('http').createServer();
  var io = require('socket.io').listen(server);
  var redis = require('socket.io-redis');

  io.adapter(redis({ host: 'localhost', port: 6379 }));

  setInterval(function() {
    // all workers will receive this in Redis, and emit
    io.emit('data', 'payload');
  }, 1000);

  for (var i = 0; i < os.cpus().length; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  }); 
}

if (cluster.isWorker) {
  var express = require('express');
  var app = express();

  var http = require('http');
  var server = http.createServer(app);
  var io = require('socket.io').listen(server);
  var redis = require('socket.io-redis');

  io.adapter(redis({ host: 'localhost', port: 6379 }));
  io.on('connection', function(socket) {
    socket.emit('data', 'connected to worker: ' + cluster.worker.id);
  });

  app.listen(80);
}

Если у вас есть главный узел, который должен публиковаться в других процессах Socket.IO, но сам не принимает соединения с сокетами, используйте socket.io-emitter вместо socket.io-redis.

Если у вас возникли проблемы с масштабированием, запустите приложения Node с DEBUG=*, Socket.IO теперь реализует отладку, которая также выводит отладочные сообщения адаптера Redis. Пример вывода:

socket.io:server initializing namespace / +0ms
socket.io:server creating engine.io instance with opts {"path":"/socket.io"} +2ms
socket.io:server attaching client serving req handler +2ms
socket.io-parser encoding packet {"type":2,"data":["event","payload"],"nsp":"/"} +0ms
socket.io-parser encoded {"type":2,"data":["event","payload"],"nsp":"/"} as 2["event","payload"] +1ms
socket.io-redis ignore same uid +0ms

Если и ваш главный, и дочерний процессы отображают одни и те же сообщения синтаксического анализатора, значит ваше приложение правильно масштабируется.


Там не должно быть проблем с вашей настройкой, если вы излучаете от одного работника. То, что вы делаете - это излучение всех четырех работников, и из-за публикации / подписки Redis сообщения не дублируются, а записываются четыре раза, как вы и просили приложение. Вот простая диаграмма того, что делает Redis:

Client  <--  Worker 1 emit -->  Redis
Client  <--  Worker 2  <----------|
Client  <--  Worker 3  <----------|
Client  <--  Worker 4  <----------|

Как вы можете видеть, когда вы отправляете сообщение от работника, оно публикует его в Redis, и оно будет отражаться от других работников, которые подписались на базу данных Redis. Это также означает, что вы можете использовать несколько серверов сокетов, подключенных к одному и тому же экземпляру, и emit на одном сервере будет запущен на всех подключенных серверах.

В кластере, когда клиент подключается, он подключается к одному из четырех ваших сотрудников, а не ко всем четырем. Это также означает, что все, что вы излучаете от этого работника, будет показано клиенту только один раз. Да, приложение масштабируется, но, как вы делаете, вы излучаете от всех четырех работников, а база данных Redis делает это так, как если бы вы вызывали его четыре раза на одном работнике. Если клиент действительно подключится ко всем четырем вашим экземплярам сокетов, он будет получать шестнадцать сообщений в секунду, а не четыре.

Тип обработки сокетов зависит от типа приложения, которое вы собираетесь иметь. Если вы собираетесь обрабатывать клиентов по отдельности, у вас не должно возникнуть проблем, потому что событие подключения будет срабатывать только для одного работника на одного клиента. Если вам нужен глобальный "сердцебиение", то в вашем главном процессе может быть обработчик сокетов. Поскольку рабочие умирают, когда умирает главный процесс, вы должны компенсировать нагрузку на соединение от основного процесса и позволить дочерним процессам обрабатывать соединения. Вот пример:

var cluster = require('cluster');
var os = require('os');

if (cluster.isMaster) {
  // we create a HTTP server, but we do not use listen
  // that way, we have a socket.io server that doesn't accept connections
  var server = require('http').createServer();
  var io = require('socket.io').listen(server);

  var RedisStore = require('socket.io/lib/stores/redis');
  var redis = require('socket.io/node_modules/redis');

  io.set('store', new RedisStore({
    redisPub: redis.createClient(),
    redisSub: redis.createClient(),
    redisClient: redis.createClient()
  }));

  setInterval(function() {
    // all workers will receive this in Redis, and emit
    io.sockets.emit('data', 'payload');
  }, 1000);

  for (var i = 0; i < os.cpus().length; i++) {
    cluster.fork();
  }

  cluster.on('exit', function(worker, code, signal) {
    console.log('worker ' + worker.process.pid + ' died');
  }); 
}

if (cluster.isWorker) {
  var express = require('express');
  var app = express();

  var http = require('http');
  var server = http.createServer(app);
  var io = require('socket.io').listen(server);

  var RedisStore = require('socket.io/lib/stores/redis');
  var redis = require('socket.io/node_modules/redis');

  io.set('store', new RedisStore({
    redisPub: redis.createClient(),
    redisSub: redis.createClient(),
    redisClient: redis.createClient()
  }));

  io.sockets.on('connection', function(socket) {
    socket.emit('data', 'connected to worker: ' + cluster.worker.id);
  });

  app.listen(80);
}

В этом примере пять экземпляров Socket.IO, один из которых является мастером, а четыре - дочерними. Главный сервер никогда не звонит listen() поэтому в этом процессе нет накладных расходов на соединение. Однако, если вы вызовете emit в главном процессе, он будет опубликован в Redis, и четыре рабочих процесса будут выполнять emit на своих клиентах. Это компенсирует нагрузку на соединение для рабочих, и если рабочий умрет, логика вашего основного приложения останется нетронутой в мастере.

Обратите внимание, что в Redis все выбросы, даже в пространстве имен или комнате, будут обрабатываться другими рабочими процессами, как если бы вы инициировали выброс из этого процесса. Другими словами, если у вас есть два экземпляра Socket.IO с одним экземпляром Redis, вызовите emit() на сокете в первом работнике будут отправлять данные своим клиентам, в то время как рабочий два будет делать то же самое, как если бы вы вызвали emit от этого работника.

Позвольте мастеру обрабатывать ваше сердцебиение (пример ниже) или запускать несколько процессов на разных портах внутри и балансировать их нагрузку с помощью nginx (который поддерживает также веб-сокеты от V1.3 и выше).

Кластер с Мастером

// on the server
var express = require('express');
var server = express();
var socket = require('socket.io');
var io = socket.listen(server);
var cluster = require('cluster');
var numCPUs = require('os').cpus().length;

// socket.io
io.set('store', new socket.RedisStore);

// set-up connections...
io.sockets.on('connection', function(socket) {
    socket.on('join', function(rooms) {
        rooms.forEach(function(room) {
            socket.join(room);
        });
    });

    socket.on('leave', function(rooms) {
        rooms.forEach(function(room) {
            socket.leave(room);
        });
    });

});

if (cluster.isMaster) {
    // Fork workers.
    for (var i = 0; i < numCPUs; i++) {
        cluster.fork();
    }

    // Emit a message every second
    function send() {
        console.log('howdy');
        io.sockets.in('room').emit('data', 'howdy');
    }

    setInterval(send, 1000);


    cluster.on('exit', function(worker, code, signal) {
        console.log('worker ' + worker.process.pid + ' died');
    }); 
}

Это на самом деле похоже на Socket.IO, успешно преуспевающий в масштабировании. Можно ожидать, что сообщение от одного сервера отправится во все сокеты в этой комнате, независимо от того, к какому серверу они подключены.

Лучше всего иметь один основной процесс, который отправляет сообщение каждую секунду. Вы можете сделать это, только запустив его, если cluster.isMaster, например.

Межпроцессного взаимодействия недостаточно, чтобы socket.io 1.4.5 работал с кластером. Принудительный режим websocket также является обязательным. См. Рукопожатие WebSocket в Node.JS, Socket.IO и кластеры не работают

Другие вопросы по тегам