MassTransit: контракты сообщений, полиморфизм и динамические прокси-объекты

TL;DR По контрактной подписке, как я могу получить необработанный контент сообщения или исходный опубликованный объект, а не динамический прокси?

Я неудачно пытаюсь создать модульное приложение на основе MassTransit.

Моя идея состоит в том, чтобы подключить сервер Websocket к очереди, он считывает события из сокета и вставляет его в очередь как "запрос на подключение", а также читает события из очереди и отправляет их в сокеты как "события подключения". Оба имеют контракт, который позволяет серверу WS знать, к какому соединению относится событие, а остальная часть системы - откуда:

public interface IConnectionRequest
{
    String ConnectionId { get; set; }
}

public interface IConnectionEvent
{
    String ConnectionId { get; set; }
}

Там есть объект, который поддерживает сеанс и другие данные. Этот объект принимает запросы (например, запросы действий или запросы подписки) и отправляет события в результате запросов или просто потому, что состояние изменилось. Я хочу создать объекты, которые прослушивают определенное событие или набор событий, и выполняют действия с состоянием, поэтому я создал этот контракт:

public interface IConnectionRequestHandler<T> : Consumes<T>.Selected
    where T : class, IConnectionRequest
{
}

Например, я хочу создать обработчик, который создает фактический сеанс на сервере и отвечает на соединение, уведомляя, когда сеанс готов. Я создаю объект, который представляет запрос, другой для события и сам обработчик.

public class CreateSessionRequest : IConnectionRequest
{
    public String ConnectionId { get; set; }
}

public class CreatedSessionEvent : IConnectionEvent
{
    public String ConnectionId { get; set; }
    public Guid SessionId { get; set; }
}

public class CreateSessionEventHandler : IConnectionRequestHandler<CreateSessionRequest>
{
    IServiceBus _bus;

    public CreateSessionEventHandler(IServiceBus bus)
    {
        _bus = bus;
    }

    public bool Accept(CreateSessionRequest message)
    {
        return true;
    }

    public void Consume(CreateSessionRequest message)
    {
        // do stuff, create the session
        var evt = new CreatedSessionEvent() { SessionId =Guid.NewGuid(), ConnectionId = message.ConnectionId };
        _bus.Publish(evt, evt.GetType());
    }
}

Теперь для целей тестирования я создаю этот код, который имитирует сценарий. По сути, он создает коммуникационную шину и подписывает обработчик запросов:

var bus = ServiceBusFactory.New(sbc =>
{
    sbc.ReceiveFrom("loopback://localhost/queue");
});

bus.SubscribeInstance<CreateSessionEventHandler>(new CreateSessionEventHandler(bus));

Затем, имитируя сервер Websocket, я записываю часть, которая читает из WS, и отправляет ее в очередь:

IConnectionRequest e = new CreateSessionRequest() { ConnectionId = "myId" };
bus.Publish(e, e.GetType());

А теперь часть, которая должна прослушивать события из очереди и направлять их на соответствующее соединение:

bus.SubscribeHandler<IConnectionEvent>(evt => Console.WriteLine("Sending event '{0}' to connection: {1}",
                                                                evt.GetType().Name,  
                                                                evt.ConnectionId));

Но эта последняя часть не работает, как ожидалось. Объект, который я получаю в подписке, не является моим исходным событием, это динамический прокси DynamicImpl.IConnectionEventпоэтому я не могу сериализовать этот объект в JSON, так как он будет содержать только члены IConnectionEvent,

Если я указываю тип в подписке, он работает:

bus.SubscribeHandler<CreatedSessionEvent>(evt => Console.WriteLine("Sending event '{0}' to connection: {1}",
                                                                evt.GetType().FullName,  
                                                                evt.ConnectionId));

Но это означает, что для каждого нового события я должен коснуться сервера веб-сокетов, чтобы зарегистрировать этот новый тип.

Есть ли способ избежать этого?

1 ответ

Решение

TL;DR - MT поддерживает только данные по контракту для большинства сериализаций. Для этого есть множество причин, но если вы хотите использовать не-прокси-тип, вам нужно использовать двоичный сериализатор.

Так что сериализация XML и JSON (на самом деле XML просто использует сериализатор JSON под капотом, как ни странно, быстрее) генерирует прокси для всех запросов, если это возможно. Подписанный тип - это контракт, который вы используете и ожидаете, что запросите объект для получения более подробной информации, которой нет в контракте, что приведет к огромной сложности. Мы предлагаем вам избегать этого.

Таким образом, это означает, что вам нужно будет обращаться к серверу веб-сокетов для каждого типа сообщений, которые он должен был потреблять. Если единственное, что вы делаете, это пересылаете сообщение, есть задняя панель SingalR, которую кто-то сделал ( https://github.com/mdevilliers/SignalR.RabbitMq я думаю, что она одна), которая могла бы быть лучше, чем потребители MT.

Другие вопросы по тегам