Невозможно закрыть беседы с сервисным брокером

У меня проблема с закрытием разговоров Service Broker, и я хотел бы концептуально понять, как все должно работать. Обратите внимание, что мой код основан на примере Дэна Гузмана по адресу http://www.dbdelta.com/service-broker-external-activator-example/.

Учитывая следующую конфигурацию SQL Server 2012 Service Broker:

  • инициатор
  • Служба инициатора - внутренне активирована
  • цель
  • Целевой монитор очереди
  • Служба внешнего активатора (приложение служб SSIS, которое связывается со службой REST)

Это мое понимание процесса:

  1. Инициатор отправляет сообщение в Target.
  2. Срабатывает событие уведомления о целевой очереди.
  3. Служба внешнего активатора выполняется в ответ на событие уведомления Целевая очередь; выдает RECEIVE TOP (1) для получения сообщения, отправленного Target.
  4. Служба внешнего активатора заканчивает обработку и выполняет END CONVERSATION @ConversationHandle.
  5. Результатом END CONVERSATION шага 4 является закрытие сообщения для инициатора; Служба Инициатора, активируемая внутри, запускается в ответ на появление сообщения в очереди Инициатора, обрабатывает его и выдает END CONVERSATION @ConversationHandle.

К сожалению, сторона Инициатора беседы закрывается, но Целевая сторона никогда не закрывается; он остается в состоянии DISCONNECTED_INBOUND.

Не закроется ли целевая сторона диалога, когда служба внешнего активатора выдаст сообщение END CONVERSATION @ConversationHandle? Если нет, как вы закрываете Целевую сторону разговора?

        -- Enabling service broker
        USE master
        ALTER DATABASE my_database
        SET ENABLE_BROKER; 

        --Create Message Types for Request and Response messages

        -- For Request
        CREATE MESSAGE TYPE
        [ABCEventRequestMessage]
        VALIDATION=WELL_FORMED_XML; 

        -- For Response
        CREATE MESSAGE TYPE
        [ABCEventResponseMessage]
        VALIDATION=WELL_FORMED_XML; 

        --Create Contract for the Conversation 

        CREATE CONTRACT [ABCEventContract]
        (
            [ABCEventRequestMessage] SENT BY INITIATOR 
            -- must make the reply message 'SENT BY ANY'
            -- so the External Activator service can
            -- send it to the Initiator after talking
            -- to the webservice
            ,[ABCEventResponseMessage] SENT BY ANY
        );

        --Create Queue for the Initiator
        CREATE QUEUE ABCEventInitiatorQueue
        WITH STATUS = ON,
        ACTIVATION (
          PROCEDURE_NAME = dbo.pci_LogABCEventTransferResult,
          MAX_QUEUE_READERS = 1,
          EXECUTE AS SELF
        ); 

        --Create Queue for the Target
        CREATE QUEUE ABCEventTargetQueue; 

        --Create Queue for the External Activator
       CREATE QUEUE ExternalActivatorQueue;

        --Create Service for the Target and the Initiator.

        --Create Service for the Initiator.
        -- NOTE: not specifying a contract on the queue
        -- means that the service can initiate conversations
        -- but it can't be a target for any other services
        CREATE SERVICE [ABCEventInitiatorService]
        ON QUEUE ABCEventInitiatorQueue;

        --Create Service for the Target.
        CREATE SERVICE [ABCEventTargetService] 
        ON QUEUE ABCEventTargetQueue
        ([ABCEventContract]); 

        --Create Service for the External Activator
        CREATE SERVICE ExternalActivatorService
        ON QUEUE ExternalActivatorQueue
        (
          [http://schemas.microsoft.com/SQL/Notifications/PostEventNotification]
        );

      CREATE EVENT NOTIFICATION EventNotificationABCEventTargetQueue
          ON QUEUE ABCEventTargetQueue
          FOR QUEUE_ACTIVATION
          TO SERVICE 'ExternalActivatorService', 'current database';
    });

        IF OBJECT_ID('dbo.pci_InitiateABCEventTransfer', 'P') IS NULL
        BEGIN
          EXEC ('CREATE PROCEDURE dbo.pci_InitiateABCEventTransfer as SELECT 1')
        END;

        ALTER PROC dbo.pci_InitiateABCEventTransfer

        @CompleteTriggerXMLOut XML = NULL OUTPUT
        ,@ConversationHandle uniqueidentifier = NULL OUTPUT
        ,@CompleteTriggerXMLIn XML

        ---------------------------------------------
        --called by application to trigger batch process
        --Sample Usage:
        --
        -- EXEC dbo.pci_InitiateABCEventTransfer @@CompleteTriggerXML = 'your_xml_here';
        -- NOTE: when calling this stored procedure from a SQL Server Mgmt
        -- Studio query window, enclose the xml data in single quotes;
        -- if you use double quotes instead, SQL Server thinks
        -- you are specifying a field name in which to save the data
        -- and returns an error like 'Maximum length is 128'
        ---------------------------------------------
        AS
        DECLARE
        --@conv_hand uniqueidentifier
        @message_body varbinary(MAX);

        SET @CompleteTriggerXMLOut = @CompleteTriggerXMLin

        BEGIN TRY

        BEGIN TRAN;

        BEGIN DIALOG CONVERSATION @ConversationHandle
        FROM SERVICE ABCEventInitiatorService
        TO SERVICE 'ABCEventTargetService', 'CURRENT DATABASE'
        ON CONTRACT ABCEventContract
        WITH
        ENCRYPTION = OFF,
        LIFETIME = 6000;

        -- NOTE: because we created our services with a specific
        -- contract, ABCEventContract, that includes specific
        -- message types (ABCEventRequestMessage & ABCEventResponseMessage)
        -- but does not include the DEFAULT message type,
        -- we must include the MESSAGE TYPE clause
        -- of the SEND ON CONVERSATION command; without this clause,
        -- Service Broker will assume the DEFAULT message type
        -- and the SEND will fail, saying, "The message TYPE 'DEFAULT'
        -- is not part of the service contract."

        SEND ON CONVERSATION @ConversationHandle
        MESSAGE TYPE ABCEventRequestMessage
        (@CompleteTriggerXMLOut);

        COMMIT;
        END TRY
        BEGIN CATCH
        THROW;
        END CATCH;

        SELECT @CompleteTriggerXMLOut, @ConversationHandle;

        PRINT 'CompleteTriggerXMLOut = ' + CONVERT(nvarchar(max), @CompleteTriggerXMLOut);

        RETURN @@ERROR; 

        IF OBJECT_ID('dbo.pci_GetABCEventDetails', 'P') IS NULL
        BEGIN
          EXEC ('CREATE PROCEDURE dbo.pci_GetABCEventDetails as SELECT 1')
        END;

        ALTER PROC dbo.pci_GetABCEventDetails
        --------------------------------------
        --called by SSIS package at start ---
        --------------------------------------
        AS
        DECLARE
        @conversation_handle uniqueidentifier
        ,@message_type_name sysname
        ,@message_body xml
        ,@parameter1 int;

        BEGIN TRY

        BEGIN TRAN;

        RECEIVE TOP(1)
        @conversation_handle = conversation_handle
        ,@message_type_name = message_type_name
        ,@message_body = message_body
        FROM dbo.ABCEventTargetQueue;

        IF @@ROWCOUNT = 0
        BEGIN
        RAISERROR ('No messages received from dbo.ABCEventTargetQueue', 16, 1);
        RETURN 1;
        END;

        INSERT INTO dbo.ABCEventTransferLog(
        ConversationHandle
        ,MessageTypeName
        ,MessageBody
        )
        VALUES(
        @conversation_handle
        ,@message_type_name
        ,CAST(@message_body AS varbinary(MAX))
        );

        COMMIT;

        SELECT
            CAST(@message_body AS nvarchar(MAX)) AS WhatIsThis
            ,@conversation_handle AS ConversationHandle
            ,@parameter1 AS Parameter1;

        END TRY
        BEGIN CATCH
        THROW;
        END CATCH;

        RETURN @@ERROR;

        IF OBJECT_ID('dbo.pci_CompleteABCEventTransfer', 'P') IS NULL
        BEGIN
          EXEC ('CREATE PROCEDURE dbo.pci_CompleteABCEventTransfer as SELECT 1')
        END;

            ALTER PROC dbo.pci_CompleteABCEventTransfer
          @ConversationHandle uniqueidentifier
                ,@WebserviceResponseStatusCode integer
                ,@WebserviceResponseXML xml
           ------------------------------------------
           -- called by SSIS package at completion
           -- Sample Usage:

           -- normal completion:
           -- EXEC dbo.pci_CompleteABCEventTransfer
           -- @ConversationHandle = '00000000-0000-0000-0000-000000000000';

           -- completed with error:
           -- EXEC dbo.pci_CompleteABCEventTransfer
           -- @ConversationHandle = '00000000-0000-0000-0000-000000000000'
           -- @ErrorMessage = 'an error occurred';
           ------------------------------------------
         AS

         IF @WebserviceResponseStatusCode <> 201
               -- webservice record creation failed;
               -- complete conversation with error
         BEGIN
           END CONVERSATION @ConversationHandle
           WITH ERROR = 1
           DESCRIPTION = 'Something went horribly wrong';
         END;
               -- webservice created record in remote system;
               -- complete conversation normally
               ELSE
         BEGIN
           END CONVERSATION @ConversationHandle;
         END

         RETURN @@ERROR;

  # because of circular references, must create a "dummy"
  # LogABCEventTransferResult stored procedure first,
  # create the ABCEventInitiatorQueue and then replace
  # the dummy stored procedure with the real one

      IF OBJECT_ID('dbo.pci_LogABCEventTransferResult', 'P') IS NULL
      BEGIN
        EXEC ('CREATE PROCEDURE dbo.pci_LogABCEventTransferResult as SELECT 1')
      END;

      ALTER PROC dbo.pci_LogABCEventTransferResult
      ---------------------------------------------
      --initiator queue activated proc to process messages
      ---------------------------------------------
      AS
      DECLARE
      @conversation_handle uniqueidentifier
      ,@message_type_name sysname
      ,@message_body varbinary(MAX);
      WHILE 1 = 1
      BEGIN
      WAITFOR (
        RECEIVE TOP (1)
        @conversation_handle = conversation_handle
        ,@message_type_name = message_type_name
        ,@message_body = message_body
        FROM dbo.ABCEventInitiatorQueue
      ), TIMEOUT 1000;
      IF @@ROWCOUNT = 0
      BEGIN
      --exit when no more messages
      RETURN;
      END;

      --log message
      INSERT INTO dbo.ABCEventTransferLog(
        ConversationHandle
        ,MessageTypeName
        ,MessageBody
      )
      VALUES(
        @conversation_handle
        ,@message_type_name
        ,@message_body
      );
      END CONVERSATION @conversation_handle;
      END;

      --log table
      CREATE TABLE dbo.ABCEventTransferLog(
      ConversationHandle uniqueidentifier NOT NULL
      ,MessageTypeName sysname NOT NULL
      ,MessageBody varbinary(MAX) NULL
      ,LogTime datetime2(3) NOT NULL
      CONSTRAINT DF_ServiceBrokerLog_LogTime
      DEFAULT (SYSDATETIME())
      );
      CREATE CLUSTERED INDEX cdx_ABCEventTransferLog ON dbo.ABCEventTransferLog(LogTime);

1 ответ

Решение

Здесь участвуют два разговора. Одним из них является ваш собственный диалог приложения, а другим - системный диалог, который уведомляет советника. Исходя из того, как вы описываете свой код, вы полагаетесь на EA, чтобы закрыть диалог, но это только закроет системный диалог. Вам также необходимо закрыть диалог приложения, дескриптор, который вы получаете из своей целевой очереди.

Обновить:

Ваш активированный сервис должен выдать RECEIVE ... FROM Target а потом END CONVERSATION на @handle он получил. В вашем объяснении говорится, что это то, что вы делаете (шаг 4), но если вы это сделаете, целевой разговор будет очищен должным образом через 30 минут.

Однако вы замечаете разговоры в состоянии DISCONNECTED_INBOUND. Это диалог, который получил сообщение EndDialog, но приложение не выполнило END CONVERSION для него.

Таким образом, существует разрыв между тем, что вы объясняете, что делает приложение, и наблюдаемым состоянием разговоров. Я предполагаю, что в коде может быть ошибка, и код делает не совсем то, что вы думаете. Попробуйте разместить здесь соответствующие части кода.

Другие вопросы по тегам