Сообщения HornetQ все еще остаются в очереди после использования с использованием основных API

Я новичок в HornetQ, поэтому, пожалуйста, потерпите меня. Позвольте мне сначала рассказать вам мои требования:

Мне нужно промежуточное программное обеспечение для организации очередей сообщений, которое может передавать сообщения размером около 1 Кб между различными процессами с низкой задержкой и постоянством (т. Е. Оно должно выдерживать сбои системы). У меня будет несколько процессов, записывающих в одну и ту же очередь, и несколько процессов, читающих из одной и той же очереди.

Для этого я выбрал HornetQ, так как он имеет лучший рейтинг для передачи сообщений с постоянством.

В настоящее время я использую Hornetq v2.2.2Final в качестве отдельного сервера.
Я могу успешно создавать долговременные / недолговечные очереди, используя основной API (ClientSession), и успешно отправлять сообщения в очередь (ClientProducer).
Точно так же я могу читать сообщения из очереди, используя основной API (ClientConsumer).

Проблема возникает после этого, когда клиент прочитал сообщение, сообщение все еще остается в очереди, то есть количество сообщений в очереди остается постоянным. Возможно, я ошибаюсь, но у меня сложилось впечатление, что после того, как сообщение израсходовано (чтение + подтверждение), оно удаляется из очереди. Но в моем случае этого не происходит, и одни и те же сообщения читаются снова и снова снова.

Кроме того, я хотел бы сказать, что я пытался использовать недолговечные очереди с недолговечными сообщениями. но проблема остается.

Код для производителя, который я использую:

public class HQProducer implements Runnable {

    private ClientProducer producer;
    private boolean killme;
    private ClientSession session;
    private boolean durableMsg;

    public HQProducer(String host, int port, String address, String queueName,
            boolean deleteQ, boolean durable, boolean durableMsg, int pRate) {
        this.durableMsg = durableMsg;
        try {
            HashMap map = new HashMap();
            map.put("host", host);
            map.put("port", port);

            TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);

            ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);

            ClientSessionFactory factory = locator.createSessionFactory();

            session = factory.createSession();

            if (queueExists(queueName)) {
                if (deleteQ) {
                    System.out.println("Deleting existing queue :: " + queueName);
                    session.deleteQueue(queueName);
                    System.out.println("Creating queue :: " + queueName);
                    session.createQueue(address, queueName, true);
                }
            } else {
                System.out.println("Creating new  queue :: " + queueName);
                session.createQueue(address, queueName, durable);
            }
            producer = session.createProducer(SimpleString.toSimpleString(address), pRate);

            killme = false;
        } catch (Exception ex) {
            Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    @Override
    public void run() {
        long time = System.currentTimeMillis();
        int cnt = 0;
        long timediff;
        while (!killme) {
            try {
                ClientMessage message = session.createMessage(durableMsg);

                message.getBodyBuffer().writeString("Hello world");

                producer.send(message);
                cnt++;
                timediff = ((System.currentTimeMillis() - time) / 1000);
                if (timediff >= 1) {
                    System.out.println("Producer tps :: " + cnt);
                    cnt = 0;
                    time = System.currentTimeMillis();
                }
            } catch (HornetQException ex) {
                Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        try {
            session.close();
        } catch (HornetQException ex) {
            Logger.getLogger(HQProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void setKillMe(boolean killme) {
        this.killme = killme;
    }

    private boolean queueExists(String qname) {
        boolean res = false;
        try {
            //ClientSession.BindingQuery bq = session.bindingQuery(SimpleString.toSimpleString(qname));
            QueueQuery queueQuery = session.queueQuery(SimpleString.toSimpleString(qname));
            if (queueQuery.isExists()) {
                res = true;
            }
        } catch (HornetQException ex) {
            res = false;
        }
        return res;
    }
}

Также код для потребителя:

public class HQConsumer implements Runnable {

    private ClientSession session;
    private ClientConsumer consumer;
    private boolean killMe;

    public HQConsumer(String host, int port, String queueName, boolean browseOnly) {
        try {
            HashMap map = new HashMap();
            map.put("host", host);
            map.put("port", port);

            TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);

            ServerLocator locator = HornetQClient.createServerLocatorWithoutHA(config);

            ClientSessionFactory factory = locator.createSessionFactory();

            session = factory.createSession();

            session.start();

            consumer = session.createConsumer(queueName, "",0,-1,browseOnly);

            killMe = false;
        } catch (Exception ex) {
            Logger.getLogger(HQTestProducer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    @Override
    public void run() {
        long time = System.currentTimeMillis();
        int cnt = 0;
        long timediff;
        while (!killMe) {
            try {
                ClientMessage msgReceived = consumer.receive();
                msgReceived.acknowledge();
                //System.out.println("message = " + msgReceived.getBodyBuffer().readString());
                cnt++;
                timediff = ((System.currentTimeMillis() - time) / 1000);
                if (timediff >= 1) {
                    System.out.println("ConSumer tps :: " + cnt);
                    cnt = 0;
                    time = System.currentTimeMillis();
                }
            } catch (HornetQException ex) {
                Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
        try {
            session.close();
        } catch (HornetQException ex) {
            Logger.getLogger(HQConsumer.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

    public void setKillMe(boolean killMe) {
        this.killMe = killMe;
    }
}

Конфигурация сервера HornetQ::

<configuration xmlns="urn:hornetq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">

   <paging-directory>${data.dir:../data}/paging</paging-directory>

   <bindings-directory>${data.dir:../data}/bindings</bindings-directory>

   <journal-directory>${data.dir:../data}/journal</journal-directory>

   <journal-min-files>10</journal-min-files>

   <large-messages-directory>${data.dir:../data}/large-messages</large-messages-directory>

   <connectors>
      <connector name="netty">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
         <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
         <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
      </connector>

      <connector name="netty-throughput">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
         <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
         <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
         <param key="batch-delay" value="50"/>
      </connector>
   </connectors>

   <acceptors>
      <acceptor name="netty">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
         <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
         <param key="port"  value="${hornetq.remoting.netty.port:5445}"/>
      </acceptor>

      <acceptor name="netty-throughput">
         <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
         <param key="host"  value="${hornetq.remoting.netty.host:localhost}"/>
         <param key="port"  value="${hornetq.remoting.netty.batch.port:5455}"/>
         <param key="batch-delay" value="50"/>
         <param key="direct-deliver" value="false"/>
      </acceptor>
   </acceptors>

   <security-settings>
      <security-setting match="#">
         <permission type="createNonDurableQueue" roles="guest"/>
         <permission type="deleteNonDurableQueue" roles="guest"/>
         <permission type="createDurableQueue" roles="guest"/>
         <permission type="deleteDurableQueue" roles="guest"/>
         <permission type="consume" roles="guest"/>
         <permission type="send" roles="guest"/>
      </security-setting>
   </security-settings>

   <address-settings>
      <!--default for catch all-->
      <address-setting match="#">
         <dead-letter-address>jms.queue.DLQ</dead-letter-address>
         <expiry-address>jms.queue.ExpiryQueue</expiry-address>
         <redelivery-delay>0</redelivery-delay>
         <max-size-bytes>10485760</max-size-bytes>       
         <message-counter-history-day-limit>10</message-counter-history-day-limit>
         <address-full-policy>BLOCK</address-full-policy>
      </address-setting>
   </address-settings>

</configuration>

2 ответа

Решение

С помощью API ядра hornetq вы должны явно подтвердить сообщение. Я не вижу, где это происходит в вашем тесте.

Если вы не проверяете, это причина, по которой ваши сообщения блокируются. Мне нужно увидеть ваш полный пример, чтобы дать вам полный ответ.

Также: Вы должны определить свой createSession с помощью: createSession (true, true, 0)

Основной API имеет возможность пакетного подтверждения. Вы не используете транзакционный сеанс, поэтому вы не будете отправлять подтверждения на сервер, пока не достигнете ackBatchSize, настроенного на вашем serverLocator. При этом любое подтверждение будет отправлено на сервер, как только вы позвоните в ваш адрес службы Подтверждения ().

Опция, которую вы используете в данный момент, эквивалентна JMS DUPS_OK ​​с определенным DUPS_SIZE.

(Пост отредактировал мой первоначальный ответ после некоторой итерации с вами)

Настройка ackbatchsize помог мне решить проблему.. Спасибо за помощь

Другие вопросы по тегам