Ограничение скорости абонента сервера потоковой передачи NATS и ровно один раз доставки

Я немного играю с трансляцией NATS, и у меня проблема с ограничением абонентской скорости. Когда я устанавливаю максимальное значение в полете на 1, а тайм-аут на 1 секунду, и у меня есть потребитель, который по сути является Thread.sleep(1000), тогда я получаю несколько раз одно и то же событие. Я думал, что, ограничивая полет и используя ручной ack, этого не должно произойти. Как я могу получить точно один раз доставку на очень медленных потребителей?

  case class EventBus[I, O](inputTopic: String, outputTopic: String, connection: Connection, eventProcessor: StatefulEventProcessor[I, O]) {
    // the event bus could be some abstract class while the `Connection` coulbd be injected using DI
    val substritionOptions: SubscriptionOptions = new SubscriptionOptions.Builder()
                                                                         .setManualAcks(true)
                                                                         .setDurableName("foo")
                                                                         .setMaxInFlight(1)
                                                                         .setAckWait(1, TimeUnit.SECONDS)
                                                                         .build()

    if (!inputTopic.isEmpty) {
      connection.subscribe(inputTopic, new MessageHandler() {
        override def onMessage(m: Message) {
          m.ack()
          try {
            val event = eventProcessor.deserialize(m.getData)
            eventProcessor.onEvent(event)
          } catch {
            case any =>
              try {
                val command = new String(m.getData)
                eventProcessor.onCommand(command)
              } catch {
                case any => println(s"de-serialization error: $any")
              }
          } finally {
            println("got event")
          }
        }
      }, substritionOptions)
    }

    if (!outputTopic.isEmpty) {
      eventProcessor.setBus(e => {
        try {
          connection.publish(outputTopic, eventProcessor.serialize(e))
        } catch {
          case ex => println(s"serialization error $ex")
        }
      })
    }
  }


  abstract class StatefulEventProcessor[I, O] {
    private var bus: Option[O => Unit] = None
    def onEvent(event: I): Unit
    def onCommand(command: String): Unit

    def serialize(o: O): Array[Byte] =
      SerializationUtils.serialize(o.asInstanceOf[java.io.Serializable])

    def deserialize(in: Array[Byte]): I =
      SerializationUtils.deserialize[I](in)

    def setBus(push: O => Unit): Unit = {
      if (bus.isDefined) {
        throw new IllegalStateException("bus already set")
      } else {
        bus = Some(push)
      }
    }

    def push(event: O) =
      bus.get.apply(event)
  }


  EventBus("out-1", "out-2", sc, new StatefulEventProcessor[String, String] {
    override def onEvent(event: String): Unit = {
      Thread.sleep(1000)
      push("!!!" + event)
    }

    override def onCommand(command: String): Unit = {}
  })

  (0 until 100).foreach(i => sc.publish("out-1", SerializationUtils.serialize(s"test-$i")))

1 ответ

Решение

Во-первых, в случае потоковой трансляции NATS нет точной (повторной) гарантии доставки. MaxInflight дает вам гарантию того, что сервер не будет отправлять новые сообщения подписчику, пока число неподтвержденных сообщений не станет меньше этого числа. Таким образом, в случае MaxInflight(1) вы просите сервер отправить следующее новое сообщение только после получения подтверждения от ранее доставленного сообщения. Однако это не блокирует повторную доставку неподтвержденных сообщений.

Сервер не имеет никакой гарантии или не знает, что сообщение фактически получено подписчиком. Это то, для чего предназначен ACK, чтобы сервер знал, что подписчик правильно обработал сообщение. Если сервер не соблюдает повторную доставку (даже при достижении MaxInflight), то "потерянное" сообщение навсегда остановит вашу подписку. Имейте в виду, что потоковый сервер и клиенты NATS напрямую не связаны друг с другом TCP-соединением (оба они подключены к серверу NATS, или gnatsd).

Другие вопросы по тегам