Как работает PubSub в BookSleeve/ Redis?

Интересно, как лучше всего публиковать и подписываться на каналы, используя BookSleeve. В настоящее время я реализую несколько статических методов (см. Ниже), которые позволяют мне публиковать контент на конкретный канал с сохранением недавно созданного канала в private static Dictionary<string, RedisSubscriberConnection> subscribedChannels;,

Является ли это правильным подходом, учитывая, что я хочу публиковать на каналы и подписываться на каналы в одном приложении (примечание: моя обертка является статическим классом). Достаточно ли создать один канал, даже если я хочу опубликовать и подписаться? Очевидно, что я не буду публиковать на том же канале, на котором подписался бы в том же приложении. Но я проверил это, и это сработало:

 RedisClient.SubscribeToChannel("Test").Wait();
 RedisClient.Publish("Test", "Test Message");

и это сработало.

Вот мои вопросы:

1) Будет ли эффективнее настроить выделенный канал публикации и выделенный канал подписки, а не использовать один канал для обоих?

2) В чем разница между "channel" и "PatternSubscription" семантически? Насколько я понимаю, я могу подписаться на несколько "тем" через PatternSubscription() на том же канале, правильно? Но если я хочу, чтобы для каждой "темы" вызывались разные колбэки, мне нужно было бы правильно настроить канал для каждой темы? Это эффективно или вы бы посоветовали против этого?

Здесь фрагменты кода.

Спасибо!!!

    public static Task<long> Publish(string channel, byte[] message)
    {
        return connection.Publish(channel, message);
    }

    public static Task SubscribeToChannel(string channelName)
    {
        string subscriptionString = ChannelSubscriptionString(channelName);

        RedisSubscriberConnection channel = connection.GetOpenSubscriberChannel();

        subscribedChannels[subscriptionString] = channel;

        return channel.PatternSubscribe(subscriptionString, OnSubscribedChannelMessage);
    }

    public static Task UnsubscribeFromChannel(string channelName)
    {
        string subscriptionString = ChannelSubscriptionString(channelName);

        if (subscribedChannels.Keys.Contains(subscriptionString))
        {
            RedisSubscriberConnection channel = subscribedChannels[subscriptionString];

            Task  task = channel.PatternUnsubscribe(subscriptionString);

            //remove channel subscription
            channel.Close(true);
            subscribedChannels.Remove(subscriptionString);

            return task;
        }
        else
        {
            return null;
        }
    }

    private static string ChannelSubscriptionString(string channelName)
    {
        return channelName + "*";
    }

1 ответ

Решение

1: в вашем примере только один канал (Test); канал - это просто имя, используемое для определенного паба / суб-обмена. Однако необходимо использовать 2 соединения из-за особенностей работы Redis API. Соединение, у которого есть какие-либо подписки, не может делать ничего, кроме:

  • слушать сообщения
  • управлять своими подписками (subscribe, psubscribe, unsubscribe, punsubscribe)

Однако я не понимаю этого:

private static Dictionary<string, RedisSubscriberConnection>

Вам не нужно больше, чем одно абонентское соединение, если вы не предлагаете что-то свое. Одно соединение подписчика может обрабатывать произвольное количество подписок. Быстрая проверка client list на одном из моих серверов, и у меня есть одно соединение с (на момент написания) 23 002 подписками. Что, вероятно, может быть уменьшено, но: это работает.

2: поддержка шаблонных подписок; так что вместо подписки на /topic/1, /topic/2/ и т. д. вы можете подписаться на /topic/*, Название фактического канала, используемого publish предоставляется получателю как часть подписи обратного вызова.

Любой может работать. Следует отметить, что производительность publish На это влияет общее количество уникальных подписок - но, честно говоря, это все еще тупо быстро (например, 0 мс), даже если у вас есть десятки или тысячи подписанных каналов, использующих subscribe скорее, чем psubscribe,

Но из publish

Сложность по времени: O(N+M), где N - количество клиентов, подписанных на принимающий канал, а M - общее количество подписанных шаблонов (любым клиентом).

Я рекомендую прочитать документацию redis pub / sub.


Изменить для следования по вопросам:

а) Я предполагаю, что мне придется "публиковать" синхронно (используя Result или Wait()), если я хочу гарантировать, что порядок отправки элементов от одного и того же издателя сохраняется при получении элементов, правильно?

это не будет иметь никакого значения вообще; так как вы упоминаете Result / Wait() Я предполагаю, что вы говорите о BookSleeve - в этом случае мультиплексор уже сохраняет порядок команд. Redis сам по себе является однопоточным и всегда будет обрабатывать команды по одному соединению по порядку. Однако: обратные вызовы на подписчике могут выполняться асинхронно и могут передаваться (отдельно) рабочему потоку. В настоящее время я расследую, могу ли я заставить это быть в порядке с RedisSubscriberConnection,

Обновление: начиная с версии 1.3.22 вы можете установить CompletionMode в PreserveOrder - тогда все обратные вызовы будут выполняться последовательно, а не одновременно.

б) после внесения изменений в соответствии с вашими предложениями я получаю отличную производительность при публикации нескольких элементов независимо от размера полезной нагрузки. Однако при отправке 100000 или более элементов одним издателем производительность быстро падает (до 7-8 секунд только для отправки с моего компьютера).

Во-первых, это время звучит очень высоко - я получаю локальное тестирование (для 100000 публикаций, включая ожидание ответа на все из них) 1766 мс (локальный) или 1219 мс (удаленный) (это может показаться нелогичным, но мой "локальный" ответ не идет). t работает с той же версией redis, мой "remote" - 2.6.12 для Centos, мой "local" - 2.6.8-pre2 для Windows).

Я не могу сделать ваш реальный сервер быстрее или ускорить работу сети, но: в случае фрагментации пакетов я добавил (только для вас) SuspendFlush() / ResumeFlush() пара. Это отключает активную очистку (то есть, когда очередь отправки пуста; другие типы очистки все еще происходят); Вы можете найти это помогает:

conn.SuspendFlush();
try {
    // start lots of operations...
} finally {
    conn.ResumeFlush();
}

Обратите внимание, что вы не должны Wait пока не возобновишь, потому что пока не позвонишь ResumeFlush() в буфере отправки могут быть некоторые операции. С учетом всего этого я получаю (за 100000 операций):

local: 1766ms (eager-flush) vs 1554ms (suspend-flush)
remote: 1219ms (eager-flush) vs 796ms (suspend-flush)

Как видите, это помогает больше с удаленными серверами, так как будет пропускать меньше пакетов через сеть.

Я не могу использовать транзакции, потому что позже публикуемые элементы не все доступны сразу. Есть ли способ оптимизировать с учетом этих знаний?

Я думаю, что это решено выше - но обратите внимание, что в последнее время CreateBatch был добавлен тоже. Пакет работает во многом как транзакция - просто: без транзакции. Опять же, это еще один механизм снижения фрагментации пакетов. Я подозреваю, что в вашем конкретном случае приостановка / возобновление (на флэш) - ваш лучший выбор.

Вы рекомендуете иметь один общий RedisConnection и один RedisSubscriberConnection или любую другую конфигурацию, чтобы такая обертка выполняла желаемые функции?

Пока вы не выполняете операции блокировки (blpop, brpop, brpoplpush и т. д.) или помещая в сеть слишком большие BLOB (потенциально задерживая другие операции, пока он очищается), тогда одиночное соединение каждого типа обычно работает довольно хорошо. Но YMMV в зависимости от ваших требований к использованию.

Другие вопросы по тегам