Сбой ActiveMQ производителя и потребителя с общим каталогом не происходит
У нас работают два экземпляра ActiveMQ(версия 5.10.0), и я использую общее хранилище для достижения высокой доступности. Тем не менее, я не могу увидеть, как происходит отказоустойчивость для производителя и потребителя (ей).
ActiveMQ broker-1 работает на IP1, а broker-2 на IP2. В файле activemq.xml конфигурации я изменил адаптер постоянства, чтобы использовать общий каталог, который присутствует на IP1.
<persistenceAdapter>
<kahaDB directory="\\IP1\shared-directory\for activemq\data"/>
</persistenceAdapter>
Как на стороне производителя, так и на стороне потребителя я использую следующие конфигурации JNDI для получения соединений и сеансов сборки и т. Д.
jndi.properties
java.naming.factory.initial = ..........ActiveMQInitialContextFactory
java.naming.provider.url = failover:(tcp://IP1:61616,tcp://IP2:61616)?randomize=false
connectionFactoryNames = myConnectionFactory
queue.requestQ = my.RequestQ
Интересная часть:
Когда я начинаю эту брокерскую пару, я вижу, что один из брокеров становится мастером. Когда я запускаю продюсера, который помещает сообщение в Q (скажем, продюсер поместил 100 сообщений в Q). Пока мой продюсер еще работает; Я завершаю работу главного посредника, следовательно, ведомый посредник получает блокировку файла и становится мастером. Когда я открываю веб-консоль, я вижу, что в Q все еще есть 100 сообщений. Даже если производитель работает, он больше не помещает никаких сообщений в этот Q.
Аналогично и для потребителей. Потребитель выбирал сообщения из Q, в этом Q было сказано, что 100 сообщений не было израсходовано, когда мастер вышел из строя, теперь мастер вышел из строя, ведомый становится главным, я вижу, что 100 сообщений все еще не используются, но потребитель не выбирает ни одного сообщения из Q.
Я долго ждал их перехода на другой ресурс (>10 минут). Может ли кто-нибудь подсказать, какую конфигурацию мне не хватает?
Я копирую производителя и потребителя как есть (я скопировал это из ActiveMQ в книге действий с небольшими изменениями).
Режиссер
public class Producer {
private static String brokerURL = "failover:(tcp://IP1:3389,tcp://IP2:3389)";
private static transient ConnectionFactory factory;
private transient Connection connection;
private transient Session session;
private transient MessageProducer producer;
private static int count = 10;
private static int total;
private static int id = 1000000;
private String jobs[] = new String[] { "suspend", "delete" };
public Producer() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
}
public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}
public static void main(String[] args) throws JMSException {
Producer producer = new Producer();
while (total < 1000) {
for (int i = 0; i < count; i++) {
producer.sendMessage();
}
total += count;
System.out.println("Sent '" + count + "' of '" + total
+ "' job messages");
try {
Thread.sleep(1000);
} catch (InterruptedException x) {
}
}
producer.close();
}
public void sendMessage() throws JMSException {
int idx = 0;
while (true) {
idx = (int) Math.round(jobs.length * Math.random());
if (idx < jobs.length) {
break;
}
}
String job = jobs[idx];
Destination destination = session.createQueue("JOBS." + job);
Message message = session.createObjectMessage(id++);
System.out.println("Sending: id: "
+ ((ObjectMessage) message).getObject() + " on queue: "
+ destination);
producer.send(destination, message);
}
}
потребитель
public class Consumer {
private static String brokerURL = "failover:(tcp://IP1:3389,tcp://IP2:3389)";
private static transient ConnectionFactory factory;
private transient Connection connection;
private transient Session session;
private String jobs[] = new String[] { "suspend", "delete" };
public Consumer() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}
public static void main(String[] args) throws JMSException {
Consumer consumer = new Consumer();
for (String job : consumer.jobs) {
Destination destination = consumer.getSession().createQueue(
"JOBS." + job);
MessageConsumer messageConsumer = consumer.getSession()
.createConsumer(destination);
messageConsumer.setMessageListener(new Listener(job));
}
}
public Session getSession() {
return session;
}
}
Еще одна вещь: меня больше интересует отказоустойчивость потребителей, чем производитель. Еще одно наблюдение: потребитель останавливается и внезапно приходит в командную строку.
Спасибо. -JE