HornetQ MessageHandler howto

У меня есть проблемы с пониманием, используя HornetQ. Я пытаюсь понять, что это асинхронная обработка запросов. У меня есть серверный компонент, принимающий входящие соединения TCP. Клиенты будут отправлять запросы в проприетарном протоколе, используя сообщения TLV. Чтобы отделить обработку от вещей io, я ввел очередь сообщений, используя hornetQ. Поэтому, когда приходит новое сообщение, оно просто помещается в очередь INBOX. В эту очередь входит несколько потоков прослушивателей, поэтому обработка является многопоточной. Эти слушатели по очереди выдают ответы и помещают их обратно в очередь OUTBOX. Каждый клиент имеет свою собственную очередь исходящих сообщений, поэтому они не будут влиять друг на друга (например, когда клиент временно отключен). Поэтому для обработки исходящих сообщений я создаю ClientProducer и задаю свойство MessageHandler, чтобы получать уведомления в случае, если сообщение помещается в очередь исходящих сообщений.

Пока мне все ясно. Но теперь проблема в том, что, когда целевой клиент не подключен, я не могу сразу отправить сообщение, но должен пригласить его подключиться снова (однонаправленная связь). Но что делать с сообщением в этом случае? Я не использую обработку транзакций, поэтому не могу выполнить откат, чтобы вернуть сообщение в очередь. Может быть, фрагмент кода может прояснить, что я пытаюсь сделать:

@Override
public void onMessage(ClientMessage p_message)
{
    try
    {
        UID uid = UID.parse(p_message.getStringProperty(MessageQueueServer.MESSAGE_PROPERTY_UID));
        IoSession ioSession = SessionLookupFilter.getSession(uid);

        if ((ioSession != null) && ioSession.isConnected())
        {
            ioSession.write(MessageQueueServer.clientMessageToLTV(p_message));
            // acknowledge to remove message from queue
            p_message.acknowledge();
        }
        else
        {
            // don't acknowledge the message so we will get it again ?!?
            // TODO invite the client to establish a connection via SIP server
            // Set redelivery timeout of message to a serious value so we don't bother the client with subsequent
            // invitations.
        }
    }
    catch (Exception e)
    {
        LogMF.error(log, e, "Error during dispatching of mq-message {0} to client.", new Object[] { p_message });
    }
}

Я не смог получить что-то полезное из руководства пользователя hornetq. Так что, возможно, кто-то имеет опыт работы с этим.

Встречает Себастьяна

0 ответов

Другие вопросы по тегам