MessageListener не работает?

У меня работает приложение hornetQ. Я пытаюсь перенести это на Артемиду.

Я использую jmsTemplate для обмена сообщениями. jmsTemplate.sendAndReceive() работает как для сервера, так и для клиента. Я создаю потребителя с настраиваемым messageListener для сервера и клиента. Он работает в hornetQ, но отправленные им сообщения никогда не поступают в messageListener.

мне нужно что-то изменить в session.createConsumer(tq).setMessageListener(новый MyMessageListener)?

я получил это в журнале сервера:

org.apache.activemq.artemis.core.server  : AMQ221003: Deploying queue jms.queue.e746ebf4-de2d-4257-84b0-975d94b5536a
org.apache.activemq.artemis.core.server  : AMQ222165: No Dead Letter Address configured for queue jms.queue.e746ebf4-de2d-4257-84b0-975d94b5536a in AddressSettings
org.apache.activemq.artemis.core.server  : AMQ222166: No Expiry Address configured for queue jms.queue.e746ebf4-de2d-4257-84b0-975d94b5536a in AddressSettings

Через 1-2 минуты теряет связь и запрашивает новую.

Сервер:

application.properties

spring.artemis.mode=embedded
spring.artemis.embedded.enabled=true
spring.artemis.embedded.queues=connection

Настройщик

@Component
public class ArtemisCustomizer {

  @Bean
  public ArtemisConfigurationCustomizer artemisConfigurationCustomizer() {

    return new ArtemisConfigurationCustomizer() {
      @Override
      public void customize(Configuration configuration) {
        Map<String, Object> transportProperties = new HashMap<String, Object>();
        transportProperties.put(TransportConstants.HOST_PROP_NAME, "0.0.0.0");
        transportProperties.put(TransportConstants.PORT_PROP_NAME, port);

//        transportProperties.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
//        transportProperties.put(TransportConstants.KEYSTORE_PATH_PROP_NAME, keystorePath);
//        transportProperties.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, keystorePassword);
//        transportProperties.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, truststorePath);
//        transportProperties.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, truststorePassword);

        Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations();
        acceptors.add(new TransportConfiguration(NettyAcceptorFactory.class.getName(), transportProperties));
      }
    };

  }

}

потребитель

@Component
public class ConnectionConsumer {

  private Map<Long, String> queueMap = new HashMap<>();

  @JmsListener(destination = "connection")
  public void process(Message message) {
    log.info("JMS message received: {}", message);

    try {
      Client client = agentConnected(message.getBody(AgentConnection.class)); // here it logs the client connected and get the info
      if (client != null) {
        jmsTemplate.send(message.getJMSReplyTo(), new MessageCreator() {

          @Override
          public Message createMessage(Session session) throws JMSException {
            TemporaryQueue tq = session.createTemporaryQueue();
            session.createConsumer(tq).setMessageListener(new ClientMessageListener(client.getId(), receivingMessageService));
            return session.createObjectMessage(new AgentUseQueue(tq.getQueueName()));
          }
        });
      }

    } catch (Exception e) {
      log.error("Error processing JMS message:", e);
    }

  }

MessageListener

public class ClientMessageListener implements MessageListener {

  private final long clientId;
  private final ReceivingMessages agentService;

  @Override
  public void onMessage(Message message) {
    try {
      Object o = message.getBody(Object.class);
      log.info("Received from clientId {} the message: {}", clientId, o);
      .....
      if (o instanceof HeartbeatMessage) {
        service.heartbeatReceived(clientId, ((HeartbeatMessage) o).getInstances());
      }
      .....
    } catch (Exception e) {
      log.error("Could not interpret JMS message: ", e);
    }
  }

}

клиент

application.properties

spring.artemis.mode=native

Настройщик

@Component
public class ArtemisCustomizer {

  @Bean
  public ConnectionFactory jmsConnectionFactory() {

    Map<String, Object> transportProperties = new HashMap<String, Object>();
    transportProperties.put(TransportConstants.HOST_PROP_NAME, ip);
    transportProperties.put(TransportConstants.PORT_PROP_NAME, port);

//    log.info("SSL enabled: {}", true);
//    transportProperties.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
//    transportProperties.put(TransportConstants.KEYSTORE_PATH_PROP_NAME, keystorePath);
//    transportProperties.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, keystorePassword);

    return new ActiveMQConnectionFactory(true, new TransportConfiguration(NettyConnectorFactory.class.getName(), transportProperties));
  }
}

соединение

@Component
public class ConnectionJms {

  @Autowired
  private JmsTemplate jmsTemplate;

  @Autowired
  private Properties properties;

  @Autowired
  private Outbound outbound;
  ...

  @PostConstruct
  public void configure() {
    jmsTemplate.setReceiveTimeout(60000);
  }

  @Scheduled(fixedDelay = 30000, initialDelay = 10000)
  public void connect() {
    if (properties.getJmsQueueSender() == null || properties.getLastMessageReceived() == null || (System.currentTimeMillis() - properties.getLastMessageReceived()) > 120000) {

      try {
        Message message = this.jmsTemplate.sendAndReceive(properties.getAgentConnectionQueue(), (session) -> {
          JMSContext ctx = jmsTemplate.getConnectionFactory().createContext();
          TemporaryQueue tq = ctx.createTemporaryQueue();
          properties.setJmsQueueReceiver(tq.getQueueName());
          ctx.createConsumer(tq).setMessageListener(new MyMessageListener(inbound, properties));
          return session.createObjectMessage(new AgentConnection(properties.getJmsQueueReceiver(), properties.getExternalIpAddress(), properties.getInternalIpAddress()));
        });

        if (message != null) {
          AgentUseQueue auq = message.getBody(AgentUseQueue.class);
          log.info("Connected. Use this queue to communicate now: {}", auq);
          properties.setJmsQueueSender(auq.getQueueName());
          properties.setLastMessageReceived(System.currentTimeMillis());

          outbound.heartbeat(...);
        } else {
          log.info("Did not receive any response. Trying again in 15 seconds.");
        }
      } catch (Exception e) {
        log.error("Error sending agent connection message:", e);
      }
    } else {
      outbound.heartbeat(...);
    }
  }

}

Outbound

@Service
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public class Outbound {

  @Autowired
  private Properties properties;

  @Autowired
  private JmsTemplate jmsTemplate;

... methods ...

  public void heartbeat(RunningInstanceState[] states) {
    log.info("Sending heartbeat. States: {}", new Object[] {states});
    send(new HeartbeatMessage(states));
  }

  public boolean isConnected() {
    return properties.getJmsQueueSender() != null;
  }

  private void send(Serializable message) {
    if (isConnected()) {
      try {
        jmsTemplate.send(properties.getJmsQueueSender(), session -> session.createObjectMessage(message));
      } catch (Exception je) {
        log.warn("Error on JMS, reseting queues: {}", je.getMessage());
        properties.resetJms();
      }
    }
  }

}

РЕДАКТИРОВАНИЕ

Я создаю небольшие проекты Maven по ссылке http://s000.tinyupload.com/index.php?file_id=07555336945447914472

Он работает с использованием HornetQ. Конфигурация artemis прокомментирована с pom.xml, application.properties и настройщиками

1 ответ

Решением было изменить TemporaryQueue на Queue.

// TemporaryQueue tq = session.createTemporaryQueue(); // only works for HornetQ
String clientQueue = UUID.randomUUID().toString();
Queue q = session.createQueue(clientQueue);
Другие вопросы по тегам