Настройте SignalR и служебную шину внутри службы Azure Service Fabric

Я портирую существующую Cloud Service WorkerRole на Service Fabric в качестве службы без сохранения состояния. Исходная облачная служба использует SignalR и служебную шину (в качестве объединительной платы SignalR) для отправки уведомлений любому прослушивающему клиенту. Существует класс Startup, который выполняет некоторые настройки:

class Startup
{
    public void Configuration(IAppBuilder app)
    {
        String connectionString = "Endpoint=sb://[name].servicebus.windows.net/;SharedSecretIssuer=owner;SharedSecretValue=[key]";
        GlobalHost.DependencyResolver.UseServiceBus(connectionString, "InSys");
        app.MapSignalR();
        Notifications.Hub = GlobalHost.ConnectionManager.GetHubContext<MyHub>();
    }
}

В методе OnStart() для WorkerRole я запускаю OWIN с помощью:

var endpoint = RoleEnvironment.CurrentRoleInstance.InstanceEndpoints["HttpEndpoint"];
var baseUri = $"{endpoint.Protocol}://{endpoint.IPEndpoint}";
var app = WebApp.Start<Startup>(new StartOptions(url: baseUri));

Как это (т. Е. Подключение к объединительной плате служебной шины SignalR) выполняется для службы без сохранения состояния в Service Fabric?

2 ответа

С помощью https://github.com/marcinbudny/SignalRSelfHostScaleOut (который является примером масштабирования с использованием Redis), я думаю, что это вылизано.

В ServiceManifest.xml я добавил следующую конечную точку:

<Endpoint Protocol="http" Name="ServiceEndpoint" Type="Input" Port="8322" />

Я также добавил класс запуска:

public static class Startup
{
    public static void ConfigureApp(IAppBuilder app)
    {
        String connectionString = "Endpoint=sb://[name].servicebus.windows.net/;SharedSecretIssuer=owner;SharedSecretValue=[value]";
        GlobalHost.DependencyResolver.UseServiceBus(connectionString, "InSys");
        app.MapSignalR();
        Notifications.Hub = GlobalHost.ConnectionManager.GetHubContext<InSysMainHub>();
    }
}

Также был добавлен класс OwinCommunicationListener:

public class OwinCommunicationListener : ICommunicationListener
{
    private readonly ServiceEventSource eventSource;
    private readonly Action<IAppBuilder> startup;
    private readonly ServiceContext serviceContext;
    private readonly string endpointName;
    private readonly string appRoot;

    private IDisposable webApp;
    private string publishAddress;
    private string listeningAddress;

    public OwinCommunicationListener(Action<IAppBuilder> startup, ServiceContext serviceContext, ServiceEventSource eventSource, string endpointName)
        : this(startup, serviceContext, eventSource, endpointName, null)
    {
    }

    public OwinCommunicationListener(Action<IAppBuilder> startup, ServiceContext serviceContext, ServiceEventSource eventSource, string endpointName, string appRoot)
    {
        if (startup == null)
        {
            throw new ArgumentNullException(nameof(startup));
        }

        if (serviceContext == null)
        {
            throw new ArgumentNullException(nameof(serviceContext));
        }

        if (endpointName == null)
        {
            throw new ArgumentNullException(nameof(endpointName));
        }

        if (eventSource == null)
        {
            throw new ArgumentNullException(nameof(eventSource));
        }

        this.startup = startup;
        this.serviceContext = serviceContext;
        this.endpointName = endpointName;
        this.eventSource = eventSource;
        this.appRoot = appRoot;
    }


    public Task<string> OpenAsync(CancellationToken cancellationToken)
    {
        var serviceEndpoint = this.serviceContext.CodePackageActivationContext.GetEndpoint(this.endpointName);
        var protocol = serviceEndpoint.Protocol;
        int port = serviceEndpoint.Port;

        if (this.serviceContext is StatefulServiceContext)
        {
            StatefulServiceContext statefulServiceContext = (StatefulServiceContext) serviceContext;

            listeningAddress = string.Format(
                CultureInfo.InvariantCulture,
                "{0}://+:{1}/{2}{3}/{4}/{5}",
                protocol,
                port,
                string.IsNullOrWhiteSpace(appRoot)
                    ? string.Empty
                    : appRoot.TrimEnd('/') + '/',
                statefulServiceContext.PartitionId,
                statefulServiceContext.ReplicaId,
                Guid.NewGuid());
        }
        else if (serviceContext is StatelessServiceContext)
        {
            listeningAddress = string.Format(
                CultureInfo.InvariantCulture,
                "{0}://+:{1}/{2}",
                protocol,
                port,
                string.IsNullOrWhiteSpace(appRoot)
                    ? string.Empty
                    : appRoot.TrimEnd('/') + '/');
        }
        else
        {
            throw new InvalidOperationException();
        }

        publishAddress = listeningAddress.Replace("+", FabricRuntime.GetNodeContext().IPAddressOrFQDN);

        try
        {
            eventSource.Message("Starting web server on " + listeningAddress);
            webApp = WebApp.Start(listeningAddress, appBuilder => startup.Invoke(appBuilder));
            eventSource.Message("Listening on " + this.publishAddress);
            return Task.FromResult(this.publishAddress);
        }
        catch (Exception ex)
        {
            eventSource.Message("Web server failed to open endpoint {0}. {1}", this.endpointName, ex.ToString());
            StopWebServer();
            throw;
        }
    }

    public Task CloseAsync(CancellationToken cancellationToken)
    {
        this.eventSource.Message("Closing web server on endpoint {0}", this.endpointName);

        this.StopWebServer();

        return Task.FromResult(true);
    }

    public void Abort()
    {
        this.eventSource.Message("Aborting web server on endpoint {0}", this.endpointName);

        this.StopWebServer();
    }

    private void StopWebServer()
    {
        if (this.webApp != null)
        {
            try
            {
                this.webApp.Dispose();
            }
            catch (ObjectDisposedException)
            {
                // no-op
            }
        }
    }
}

И, наконец, я изменил метод CreateServiceInstanceListeners в своем коде службы без сохранения состояния:

protected override IEnumerable<ServiceInstanceListener> CreateServiceInstanceListeners()
    {
        return new[]
        {
            new ServiceInstanceListener(serviceContext => new OwinCommunicationListener(Startup.ConfigureApp, serviceContext, ServiceEventSource.Current, "ServiceEndpoint"))
        };
    }

Создание службы без сохранения состояния с помощью прослушивателя Owin. Затем при запуске настройте для signalR и объединительной платы (служебная шина или sql). В идеале проблема, с которой вы столкнетесь, заключается в согласовании (дрожание рук между клиентом Signalr и сервером). На этом этапе попробуйте настроить запрос на перекрестный источник, пример кода для постоянного подключения будет выглядеть ниже.

Также обратите внимание на строку appBuilder.UseAesDataProtectorProvider("Ваш ключ"), так как это важно. Результатом этого является то, что вы не получите HTTP 400 для подключения большую часть времени. Это связано с тем, что SignalR выполнит как минимум 2 запроса при рукопожатии, и они обычно будут работать на двух разных машинах.

Спасибо Марсину Будни за объяснение.

var config = new HttpConfiguration();

// Configure your origins as required.

var cors = new EnableCorsAttribute("*", "*", "*");

config.EnableCors(cors);

FormatterConfig.ConfigureFormatters(config.Formatters);

RouteConfig.RegisterRoutes(config.Routes);

appBuilder.UseWebApi(config);

GlobalHost.DependencyResolver.UseServiceBus("yourconnection string comes here", "signalrbackplaneserver");
appBuilder.UseAesDataProtectorProvider("some password");
appBuilder.Map("/echo", map =>
{
                map.UseCors(CorsOptions.AllowAll).RunSignalR<MyEndPoint>();
});
Другие вопросы по тегам