Использование Amazon MQ в качестве посредника для Spring Websockets + STOMP

Можно ли использовать Amazon MQ в качестве внешнего брокера для Spring + Websockets + STOMP? Я пытаюсь без удачи. Мой конфиг выглядит следующим образом:

@Configuration
@EnableWebSocketMessageBroker
@AllArgsConstructor
public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {

  @Override
  public void configureMessageBroker(MessageBrokerRegistry config) {
    config.enableStompBrokerRelay("/topic")
        .setRelayHost("my.amazon.stomp.endpoint").setRelayPort(61614)
        .setSystemLogin("xxxxxxxxx").setSystemPasscode("xxxxxxxxx");
    config.setApplicationDestinationPrefixes("/app");
  }

  @Override
  public void registerStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/ws-myapp").setAllowedOrigins("*").withSockJS();
  }
}

Но при запуске приложения я получаю это из журналов (DEBUG):

2018-06-13 16:16:42.290 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient   : [id: 0xcd26491a] REGISTERED
2018-06-13 16:16:42.291 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient   : [id: 0xcd26491a] CONNECT: my.amazon.stomp.endpoint:61614
2018-06-13 16:16:42.398  INFO [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient   : CONNECTED: [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614]
2018-06-13 16:16:42.399 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient   : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] ACTIVE
2018-06-13 16:16:42.399 DEBUG [] 17743 --- [eactor-tcp-io-5] o.s.m.s.s.StompBrokerRelayMessageHandler : TCP connection opened in session=_system_
2018-06-13 16:16:42.404 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient   : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] WRITE: 94B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 43 4f 4e 4e 45 43 54 0a 61 63 63 65 70 74 2d 76 |CONNECT.accept-v|
|00000010| 65 72 73 69 6f 6e 3a 31 2e 31 2c 31 2e 32 0a 6c |ersion:1.1,1.2.l|
|00000020| 6f 67 69 6e 3a 70 61 70 65 72 6c 65 73 73 0a 70 |ogin:xxxxxxxxx.p|
|00000030| 61 73 73 63 6f 64 65 3a 4d 38 7c 42 61 6e 41 47 |asscode:xxxxxxxx|
|00000040| 4c 2d 45 61 0a 68 65 61 72 74 2d 62 65 61 74 3a |xxxx.heart-beat:|
|00000050| 31 30 30 30 30 2c 31 30 30 30 30 0a 0a 00       |10000,10000...  |
+--------+-------------------------------------------------+----------------+
2018-06-13 16:16:42.405 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient   : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] FLUSH
2018-06-13 16:16:42.406 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient   : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] USER_EVENT: reactor.io.net.impl.netty.NettyChannelHandlerBridge$ChannelInputSubscriberEvent@37c47287
2018-06-13 16:16:42.512 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient   : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] READ: 7B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 15 03 03 00 02 02 0a                            |.......         |
+--------+-------------------------------------------------+----------------+
2018-06-13 16:16:42.513 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient   : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] READ COMPLETE
2018-06-13 16:16:42.514 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient   : [id: 0xcd26491a, L:/my.local.ip:50392 - R:my.amazon.stomp.endpoint:61614] READ COMPLETE
2018-06-13 16:16:42.623 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient   : [id: 0xcd26491a, L:/my.local.ip:50392 ! R:my.amazon.stomp.endpoint:61614] INACTIVE
2018-06-13 16:16:42.624 DEBUG [] 17743 --- [eactor-tcp-io-5] o.s.m.s.s.StompBrokerRelayMessageHandler : TCP connection to broker closed in session _system_
2018-06-13 16:16:42.624 DEBUG [] 17743 --- [eactor-tcp-io-5] o.s.m.s.s.StompBrokerRelayMessageHandler : Cleaning up connection state for session _system_
2018-06-13 16:16:42.624  INFO [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient   : CLOSED: [id: 0xcd26491a, L:/my.local.ip:50392 ! R:my.amazon.stomp.endpoint:61614]
2018-06-13 16:16:42.625  INFO [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient   : Failed to connect to reactor.io.net.impl.netty.tcp.NettyTcpClient$ReconnectingChannelListener$3@1ad2dfa9. Attempting reconnect in 5000ms.
2018-06-13 16:16:42.625 DEBUG [] 17743 --- [eactor-tcp-io-5] r.io.net.impl.netty.tcp.NettyTcpClient   : [id: 0xcd26491a, L:/my.local.ip:50392 ! R:my.amazon.stomp.endpoint:61614] UNREGISTERED

И это не соединяет. Есть идеи? Он отлично работает при подключении к локальному экземпляру ActiveMQ, разница в том, что локально он использует tcp://host против Amazon, который предоставляет stomp+ssl://host uri.

В любом случае, я не включаю протокол. Кажется, что он подключается, но не получает никакого ответа на сообщение CONNECT. Я устанавливаю пользователя и пароль, который использую для подключения к консоли администратора. Я могу подключиться через javascript к конечной точке wss://, но мне нужно установить его в качестве внешнего посредника для приложения Spring Boot.

1 ответ

Наконец, проблема была с включением протокола SSL. Вот как я это сделал:

1) Я создал собственный StompTcpFactory, чтобы создать tcp-клиент с включенным SSL:

/**
 * A TCP Client Factory to enable SSL connection
 */
public class StompTcpFactory implements NetStreams.TcpClientFactory<Message<byte[]>, Message<byte[]>> {

  private final Environment environment;
  private final EventLoopGroup eventLoopGroup;
  private final boolean ssl;
  private final List<String> addresses;

  public StompTcpFactory(List<String> addresses, boolean ssl) {
    this.addresses = addresses;
    this.ssl = ssl;
    this.environment = new Environment(
        () -> new ReactorConfiguration(Collections.emptyList(), "sync", new Properties()));
    this.eventLoopGroup = Reactor2TcpClient.initEventLoopGroup();
  }

  @Override
  public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(
      Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> tcpClientSpec) {
    Supplier<InetSocketAddress> supplier = new InetSocketAddressSupplier(addresses);
    return tcpClientSpec.env(environment).options(new NettyClientSocketOptions().eventLoopGroup(eventLoopGroup))
        .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder())).ssl(ssl ? new SslOptions() : null)
        .connect(supplier);
  }

}

Обратите внимание на вызов.ssl, чтобы установить флаг для использования ssl. Также я получаю список адресов для настройки конечных точек и логическое значение, позволяющее включать и выключать ssl (для dev) . InetSocketAddressSupplier был создан для переключения между двумя адресами, которые Amazon дает вам при работе с настройкой HA:

/**
 * Address supplier for failover connection
 */
@Slf4j
public class InetSocketAddressSupplier implements Supplier<InetSocketAddress> {

  private static int counter = 0;

  private final List<String> addresses;

  InetSocketAddressSupplier(List<String> addresses) {
    Assert.notNull(addresses, "addresses list cannot be null");
    Assert.isTrue(!addresses.isEmpty() && (addresses.size() == 2), "Addresses list must be of size 2");
    this.addresses = addresses;
  }

  @Override
  public InetSocketAddress get() {
    int serverIndex = counter % 2;
    counter++;
    String[] info = addresses.get(serverIndex).split(":");
    log.debug("Returning server {} {}:{} for connection", serverIndex, info[0], info[1]);
    return new InetSocketAddress(info[0], Integer.valueOf(info[1]));
  }

}

Если вам не нужна настройка HA, вы можете просто создать встроенного поставщика, как показано в документах Spring.

2) Затем вы можете настроить конечную точку веб-сокета для использования этой фабрики для создания клиента, а также установить имя пользователя / пароль для системных и клиентских подключений:

@Slf4j
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration extends AbstractWebSocketMessageBrokerConfigurer {

  @Qualifier("websocket")
  private MessageBrokerProperties config;
  private InetSocketAddressSupplier supplier;

  public WebSocketConfiguration(MessageBrokerProperties config) {
    this.config = config;
    this.supplier = new InetSocketAddressSupplier(config.getAddresses());
  }

  @Override
  public void configureMessageBroker(MessageBrokerRegistry registry) {
    InetSocketAddress address = supplier.get();
    registry.enableStompBrokerRelay("/topic").setRelayHost(address.getHostName()).setRelayPort(address.getPort())
        .setSystemLogin(config.getSystemLogin()).setSystemPasscode(config.getSystemPasscode())
        .setClientLogin(config.getClientLogin()).setClientPasscode(config.getClientPasscode())
        .setTcpClient(createTcpClient());
    registry.setApplicationDestinationPrefixes("/app");
  }

  private TcpOperations<byte[]> createTcpClient() {
    return new Reactor2TcpClient<>(new StompTcpFactory(config.getAddresses(), config.isUseSSL()));
  }
}

Надеюсь, поможет...

Другие вопросы по тегам