Сбой ActiveMQ производителя и потребителя с общим каталогом не происходит

У нас работают два экземпляра ActiveMQ(версия 5.10.0), и я использую общее хранилище для достижения высокой доступности. Тем не менее, я не могу увидеть, как происходит отказоустойчивость для производителя и потребителя (ей).

ActiveMQ broker-1 работает на IP1, а broker-2 на IP2. В файле activemq.xml конфигурации я изменил адаптер постоянства, чтобы использовать общий каталог, который присутствует на IP1.

<persistenceAdapter>
  <kahaDB directory="\\IP1\shared-directory\for activemq\data"/>
</persistenceAdapter>

Как на стороне производителя, так и на стороне потребителя я использую следующие конфигурации JNDI для получения соединений и сеансов сборки и т. Д.

jndi.properties

java.naming.factory.initial = ..........ActiveMQInitialContextFactory
java.naming.provider.url = failover:(tcp://IP1:61616,tcp://IP2:61616)?randomize=false
connectionFactoryNames = myConnectionFactory
queue.requestQ = my.RequestQ

Интересная часть:

Когда я начинаю эту брокерскую пару, я вижу, что один из брокеров становится мастером. Когда я запускаю продюсера, который помещает сообщение в Q (скажем, продюсер поместил 100 сообщений в Q). Пока мой продюсер еще работает; Я завершаю работу главного посредника, следовательно, ведомый посредник получает блокировку файла и становится мастером. Когда я открываю веб-консоль, я вижу, что в Q все еще есть 100 сообщений. Даже если производитель работает, он больше не помещает никаких сообщений в этот Q.

Аналогично и для потребителей. Потребитель выбирал сообщения из Q, в этом Q было сказано, что 100 сообщений не было израсходовано, когда мастер вышел из строя, теперь мастер вышел из строя, ведомый становится главным, я вижу, что 100 сообщений все еще не используются, но потребитель не выбирает ни одного сообщения из Q.

Я долго ждал их перехода на другой ресурс (>10 минут). Может ли кто-нибудь подсказать, какую конфигурацию мне не хватает?

Я копирую производителя и потребителя как есть (я скопировал это из ActiveMQ в книге действий с небольшими изменениями).

Режиссер

public class Producer {

    private static String brokerURL = "failover:(tcp://IP1:3389,tcp://IP2:3389)";
    private static transient ConnectionFactory factory;
    private transient Connection connection;
    private transient Session session;
    private transient MessageProducer producer;

    private static int count = 10;
    private static int total;
    private static int id = 1000000;
    private String jobs[] = new String[] { "suspend", "delete" };

    public Producer() throws JMSException {
        factory = new ActiveMQConnectionFactory(brokerURL);
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(null);
    }

    public void close() throws JMSException {
        if (connection != null) {
            connection.close();
        }
    }

    public static void main(String[] args) throws JMSException {
        Producer producer = new Producer();
        while (total < 1000) {
            for (int i = 0; i < count; i++) {
                producer.sendMessage();
            }
            total += count;
            System.out.println("Sent '" + count + "' of '" + total
                    + "' job messages");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException x) {
            }
        }
        producer.close();

    }

    public void sendMessage() throws JMSException {
        int idx = 0;
        while (true) {
            idx = (int) Math.round(jobs.length * Math.random());
            if (idx < jobs.length) {
                break;
            }
        }
        String job = jobs[idx];
        Destination destination = session.createQueue("JOBS." + job);
        Message message = session.createObjectMessage(id++);
        System.out.println("Sending: id: "
                + ((ObjectMessage) message).getObject() + " on queue: "
                + destination);
        producer.send(destination, message);
    }
}

потребитель

public class Consumer {

    private static String brokerURL = "failover:(tcp://IP1:3389,tcp://IP2:3389)";
    private static transient ConnectionFactory factory;
    private transient Connection connection;
    private transient Session session;

    private String jobs[] = new String[] { "suspend", "delete" };

    public Consumer() throws JMSException {
        factory = new ActiveMQConnectionFactory(brokerURL);
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    public void close() throws JMSException {
        if (connection != null) {
            connection.close();
        }
    }

    public static void main(String[] args) throws JMSException {
        Consumer consumer = new Consumer();
        for (String job : consumer.jobs) {
            Destination destination = consumer.getSession().createQueue(
                    "JOBS." + job);
            MessageConsumer messageConsumer = consumer.getSession()
                    .createConsumer(destination);
            messageConsumer.setMessageListener(new Listener(job));
        }
    }

    public Session getSession() {
        return session;
    }

}

Еще одна вещь: меня больше интересует отказоустойчивость потребителей, чем производитель. Еще одно наблюдение: потребитель останавливается и внезапно приходит в командную строку.

Спасибо. -JE

0 ответов

Другие вопросы по тегам