Пропущенные события ZMQ распространяются в jeromq scala

Я новичок в ZeroMQ и, кажется, теряю сообщения в цикле в моем begin() метод.

Мне интересно, если я пропускаю кусок, где я не в очереди сообщений или что-то?

Когда я вызываю событие на моем издателе, которое отправляет два сообщения моему подписчику с небольшим промежутком между ними, я, кажется, не получаю второе сообщение, которое передается. Что мне не хватает?

class ZMQSubscriber[T <: Transaction, B <: Block](
  socket: InetSocketAddress,
  hashTxListener: Option[HashDigest => Future[Unit]],
  hashBlockListener: Option[HashDigest => Future[Unit]],
  rawTxListener: Option[Transaction => Future[Unit]],
  rawBlockListener: Option[Block => Future[Unit]]) {
  private val logger = BitcoinSLogger.logger

  def begin()(implicit ec: ExecutionContext) = {
    val context = ZMQ.context(1)

    //  First, connect our subscriber socket
    val subscriber = context.socket(ZMQ.SUB)
    val uri = socket.getHostString + ":" + socket.getPort

    //subscribe to the appropriate feed
    hashTxListener.map { _ =>
      subscriber.subscribe(HashTx.topic.getBytes(ZMQ.CHARSET))
      logger.debug("subscribed to the transaction hashes from zmq")
    }

    rawTxListener.map { _ =>
      subscriber.subscribe(RawTx.topic.getBytes(ZMQ.CHARSET))
      logger.debug("subscribed to raw transactions from zmq")
    }

    hashBlockListener.map { _ =>
      subscriber.subscribe(HashBlock.topic.getBytes(ZMQ.CHARSET))
      logger.debug("subscribed to the hashblock stream from zmq")
    }

    rawBlockListener.map { _ =>
      subscriber.subscribe(RawBlock.topic.getBytes(ZMQ.CHARSET))
      logger.debug("subscribed to raw block")
    }

    subscriber.connect(uri)
    subscriber.setRcvHWM(0)
    logger.info("Connection to zmq client successful")

    while (true) {
      val notificationTypeStr = subscriber.recvStr(ZMQ.DONTWAIT)
      val body = subscriber.recv(ZMQ.DONTWAIT)
      Future(processMsg(notificationTypeStr, body))
    }
  }

  private def processMsg(topic: String, body: Seq[Byte])(implicit ec: ExecutionContext): Future[Unit] = Future {

    val notification = ZMQNotification.fromString(topic)
    val res: Option[Future[Unit]] = notification.flatMap {
      case HashTx =>
        hashTxListener.map { f =>
          val hash = Future(DoubleSha256Digest.fromBytes(body))
          hash.flatMap(f(_))
        }
      case RawTx =>
        rawTxListener.map { f =>
          val tx = Future(Transaction.fromBytes(body))
          tx.flatMap(f(_))
        }
      case HashBlock =>
        hashBlockListener.map { f =>
          val hash = Future(DoubleSha256Digest.fromBytes(body))
          hash.flatMap(f(_))
        }
      case RawBlock =>
        rawBlockListener.map { f =>
          val block = Future(Block.fromBytes(body))
          block.flatMap(f(_))
        }
    }
  }
}

2 ответа

Что мне не хватает?

Как работают блокирующие v / s неблокирующие методы работы:

Хитрость заключается в (не) режиме блокировки соответствующего вызова .recv() метод.

Второй звонок в subscriber.recv( ZMQ.DONTWAIT ) -метод, таким образом, возвращается немедленно, поэтому ваша вторая часть, (body) может и не будет по закону ничего не содержать, даже если в вашем обещании говорилось, что пара сообщений действительно была отправлена ​​со стороны издателя (пара .send() вызовы методов - можно также возразить, есть вероятность того, что отправитель фактически отправлял только одно сообщение, состоящее из нескольких частей - MCVE-код не является специфическим для этой части).

Итак, как только вы перевели свой код из неблокирующего режима (в O/P) в принципиально блокирующий режим (который блокировал / синхронизировал дальнейший поток кода с внешним событием прибытия любого правдоподобно отформатированного сообщение, не возвращающееся ранее), в:

val zmsg = ZMsg.recvMsg(subscriber) // which BLOCKS-till-a-1st-zmsg-arrived

как дальнейшая обработка .pop() детали просто выгружают компоненты (см. замечание о фактическом ZMsg структура из нескольких частей, фактически отправленная опубликованной стороной, представленная выше)


Безопасность следующая:
безлимитный alloc-s v / s обязательная блокировка / удаление сообщений?

код удивил меня по нескольким пунктам. Помимо довольно "позднего" звонка .connect() -метод, по сравнению со всеми предыдущими подробными настройками сокета-архетипа (которые обычно организуются "после" запроса на установку соединения). Хотя это может работать нормально, как и предполагалось, но оно предоставляет еще более узкое (меньшее) временное окно для .Context() -экземпляр для настройки и (повторного) согласования всех соответствующих деталей подключения, чтобы стать RTO.

Одна конкретная линия привлекла мое внимание: subscriber.setRcvHWM( 0 ) это версия, зависящая от архетипа. Однако нулевое значение приводит к тому, что приложение становится уязвимым, и я бы не советовал делать это в любом приложении промышленного уровня.

Так что это, кажется, было решено с помощью ZMsg.recvMsg() в whileпетля вместо

  val notificationTypeStr = subscriber.recvStr(ZMQ.DONTWAIT)
  val body = subscriber.recv(ZMQ.DONTWAIT)

Я не уверен, почему это работает, но это работает. Так вот, что мой begin метод выглядит как сейчас

    while (run) {
      val zmsg = ZMsg.recvMsg(subscriber)
      val notificationTypeStr = zmsg.pop().getString(ZMQ.CHARSET)
      val body = zmsg.pop().getData
      Future(processMsg(notificationTypeStr, body))
    }
    Future.successful(Unit)
  }
Другие вопросы по тегам