java.lang.ArrayIndexOutOfBoundsException: 256 с версией jeromq 0.3.6

Я использую Jeromq в многопоточной среде, как показано ниже. Ниже мой код, в котором конструктор SocketManager сначала подключается ко всем доступным розеткам, и я помещаю их в liveSocketsByDatacenter карта в connectToZMQSockets метод. После этого я запускаю фоновый поток в том же конструкторе, который запускается каждые 30 секунд и вызывает updateLiveSockets метод пинговать все те сокеты, которые уже были в liveSocketsByDatacenter сопоставить и обновить liveSocketsByDatacenter карта с тем, были ли эти розетки живы или нет.

А также getNextSocket() Метод вызывается несколькими потоками чтения одновременно, чтобы получить следующий доступный сокет, и затем мы используем этот сокет для отправки данных на него. Итак, мой вопрос: правильно ли мы используем Jeromq в многопоточной среде? Потому что мы только что увидели исключение в нашей производственной среде с этой трассировкой стека, когда пытались отправить данные в этот активный сокет, поэтому я не уверен, является ли это ошибкой или чем-то еще?

java.lang.ArrayIndexOutOfBoundsException: 256
at zmq.YQueue.push(YQueue.java:97)
at zmq.YPipe.write(YPipe.java:47)
at zmq.Pipe.write(Pipe.java:232)
at zmq.LB.send(LB.java:83)
at zmq.Push.xsend(Push.java:48)
at zmq.SocketBase.send(SocketBase.java:590)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
at org.zeromq.ZFrame.send(ZFrame.java:131)
at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
at org.zeromq.ZMsg.send(ZMsg.java:191)
at org.zeromq.ZMsg.send(ZMsg.java:163)

Ниже мой код:

public class SocketManager {
    private static final Random random = new Random();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>();
    private final ZContext ctx = new ZContext();

    private static class Holder {
        private static final SocketManager instance = new SocketManager();
    }

    public static SocketManager getInstance() {
        return Holder.instance;
    }

    private SocketManager() {
      connectToZMQSockets();
      scheduler.scheduleAtFixedRate(this::updateLiveSockets, 30, 30, TimeUnit.SECONDS);
    }

    // during startup, making a connection and populate once
    private void connectToZMQSockets() {
      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;
      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
        List<SocketHolder> addedColoSockets = connect(entry.getValue(), ZMQ.PUSH);
        liveSocketsByDatacenter.put(entry.getKey(), addedColoSockets);
      }
    }

    private List<SocketHolder> connect(List<String> addresses, int socketType) {
        List<SocketHolder> socketList = new ArrayList<>();
        for (String address : addresses) {
          try {
            Socket client = ctx.createSocket(socketType);
            // Set random identity to make tracing easier
            String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
            client.setIdentity(identity.getBytes(ZMQ.CHARSET));
            client.setTCPKeepAlive(1);
            client.setSendTimeOut(7);
            client.setLinger(0);
            client.connect(address);

            SocketHolder zmq = new SocketHolder(client, ctx, address, true);
            socketList.add(zmq);
          } catch (Exception ex) {
            // log error
          }
        }
        return socketList;
    }

    // this method will be called by multiple threads concurrently to get the next live socket
    // is there any concurrency or thread safety issue or race condition here?
    public Optional<SocketHolder> getNextSocket() {
      for (Datacenters dc : Datacenters.getOrderedDatacenters()) {
        Optional<SocketHolder> liveSocket = getLiveSocket(liveSocketsByDatacenter.get(dc));
        if (liveSocket.isPresent()) {
          return liveSocket;
        }
      }
      return Optional.absent();
    }

    private Optional<SocketHolder> getLiveSocket(final List<SocketHolder> listOfEndPoints) {
      if (!CollectionUtils.isEmpty(listOfEndPoints)) {
        // The list of live sockets
        List<SocketHolder> liveOnly = new ArrayList<>(listOfEndPoints.size());
        for (SocketHolder obj : listOfEndPoints) {
          if (obj.isLive()) {
            liveOnly.add(obj);
          }
        }
        if (!liveOnly.isEmpty()) {
          // The list is not empty so we shuffle it an return the first element
          return Optional.of(liveOnly.get(random.nextInt(liveOnly.size()))); // just pick one
        }
      }
      return Optional.absent();
    }

    // runs every 30 seconds to ping all the socket to make sure whether they are alive or not
    private void updateLiveSockets() {
      Map<Datacenters, List<String>> socketsByDatacenter = Utils.SERVERS;

      for (Map.Entry<Datacenters, List<String>> entry : socketsByDatacenter.entrySet()) {
        List<SocketHolder> liveSockets = liveSocketsByDatacenter.get(entry.getKey());
        List<SocketHolder> liveUpdatedSockets = new ArrayList<>();
        for (SocketHolder liveSocket : liveSockets) { // LINE A
          Socket socket = liveSocket.getSocket();
          String endpoint = liveSocket.getEndpoint();
          Map<byte[], byte[]> holder = populateMap();
          Message message = new Message(holder, Partition.COMMAND);

          // pinging to see whether a socket is live or not
          boolean status = SendToSocket.getInstance().execute(message.getAdd(), holder, socket);
          boolean isLive = (status) ? true : false;

          SocketHolder zmq = new SocketHolder(socket, liveSocket.getContext(), endpoint, isLive);
          liveUpdatedSockets.add(zmq);
        }
        liveSocketsByDatacenter.put(entry.getKey(), Collections.unmodifiableList(liveUpdatedSockets));
      }
    }
}

А вот как я использую getNextSocket() метод SocketManager Класс одновременно из нескольких потоков чтения:

// this method will be called from multiple threads
public boolean sendAsync(final long addr, final byte[] reco) {
  Optional<SocketHolder> liveSockets = SocketManager.getInstance().getNextSocket();
  return sendAsync(addr, reco, liveSockets.get().getSocket(), false);
}

public boolean sendAsync(final long addr, final byte[] reco, final Socket socket,
    final boolean messageA) {
  ZMsg msg = new ZMsg();
  msg.add(reco);
  boolean sent = msg.send(socket);
  msg.destroy();
  retryHolder.put(addr, reco);
  return sent;
}

  public boolean send(final long address, final byte[] encodedRecords, final Socket socket) {
    boolean sent = sendAsync(address, encodedRecords, socket, true);
    // if the record was sent successfully, then only sleep for timeout period
    if (sent) {
      try {
        TimeUnit.MILLISECONDS.sleep(500);
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
      }
    }
    // ...
    return sent;
  } 

Я не думаю, что это правильно, я верю. Похоже на то getNextSocket() может вернуть 0MQ socket в thread A, Одновременно поток таймера может получить доступ к тому же 0MQ socket пинговать это. В этом случае thread A и поток таймера мутируют одинаково 0MQ socket, что приведет к проблемам. Итак, что является лучшим и эффективным способом решения этой проблемы?

Примечание: SocketHolder является неизменным классом

Обновить:

Я только что заметил, что такая же проблема произошла на моей другой коробке с тем же ArrayIndexOutOfBoundsException но на этот раз его номер строки в 71 "YQueue" файл. Единственная постоянная вещь - 256 всегда. Так что должно быть что-то, связанное с 256 наверняка, и я не могу понять, что это за 256 здесь?

java.lang.ArrayIndexOutOfBoundsException: 256
    at zmq.YQueue.backPos(YQueue.java:71)
    at zmq.YPipe.write(YPipe.java:51)
    at zmq.Pipe.write(Pipe.java:232)
    at zmq.LB.send(LB.java:83)
    at zmq.Push.xsend(Push.java:48)
    at zmq.SocketBase.send(SocketBase.java:590)
    at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
    at org.zeromq.ZFrame.send(ZFrame.java:131)
    at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
    at org.zeromq.ZMsg.send(ZMsg.java:191)
    at org.zeromq.ZMsg.send(ZMsg.java:163)

1 ответ

Факт № 0: ZeroMQ не является потокобезопасным - по определению

В то время как документация ZeroMQ и превосходная книга Питера ХИНТЖЕНСА "Код подключен. Том 1" не забывайте напоминать об этом факте везде, где это возможно, время от времени появляется идея о возврате или даже совместном использовании экземпляра сокета ZeroMQ среди потоков. Конечно, методы экземпляров классов могут доставлять это почти "скрытно" внутри их внутренних методов и атрибутов, но надлежащие усилия по разработке должны предотвращать любые такие побочные эффекты без исключений, без оправдания.

Совместное использование, если оно обоснованно подтверждается количественными фактами, может быть способом для общего случая zmq.Context(), но кристально чистый дизайн распределенной системы может жить по действительно мультиагентной схеме, где каждый агент работает по-своему Context() -инжиниринг, точно настроенный на соответствующее сочетание конфигурации и предпочтений производительности.

Итак, что является лучшим и эффективным способом решения этой проблемы?

Никогда не делитесь сокетом ZeroMQ. Никогда, действительно. Даже если новейшие разработки начали обещать в ближайшем будущем некоторые изменения в этом направлении. Плохая привычка - загрязнять любой высокопроизводительный проект распределенной системы с низкой задержкой с помощью совместного использования. "Ничего не делиться" - лучший принцип дизайна для этого домена.


Да, я вижу, что мы не должны делиться сокетами между потоками, но в моем коде
Как вы думаете, это лучший способ решить эту проблему?

Да, лучший и эффективный способ решить эту проблему - никогда не использовать общий сокет ZeroMQ.

Это означает, что никогда не следует возвращать какой-либо объект, атрибуты которого являются сокетами ZeroMQ (которые вы активно строите и массово возвращаете из .connect(){...} класс-метод. В вашем случае все методы класса, кажется, сохранены private, что может решить проблему с разрешением "другим потокам" касаться экземпляров сокетов частного класса, но этот принцип должен быть подтвержден также на всех уровнях атрибута, чтобы быть эффективным. Наконец, это "слияние" становится коротким и нарушается
public static SocketManager getInstance(),
который случайным образом предлагает любому внешнему запрашивающему получить прямой доступ к общим частным экземплярам сокетов ZeroMQ.

Если какая-то документация явно предупреждает почти в каждой главе не делиться вещами, лучше не делиться вещами.

Итак, измените методы так, чтобы SocketManager получает больше функциональных возможностей, так как это методы класса, которые будут выполнять встроенные функциональные возможности, необходимые для явного предотвращения того, чтобы любой поток внешнего мира касался экземпляра, не являющегося совместным доступом, как описано в публикациях ZeroMQ.

Затем идет инвентаризация ресурсов: ваш код, кажется, каждые 30 секунд перепроверяет состояние дел во всех интересующих DataCenters. Это фактически создает новые объекты List два раза в минуту. Хотя вы можете спекулятивно позволить сборщику мусора Java убирать весь мусор, на который больше не ссылаются нигде, это не очень хорошая идея для объектов, связанных с ZeroMQ, встроенных в List-s из ваших предыдущих повторных проверок. На ZeroMQ-объекты все еще ссылаются изнутри Zcontext() - ZeroMQ Context() -core-factory инстанцированные потоки ввода / вывода, которые также можно рассматривать как менеджер ресурсов инвентаризации сокетов ZeroMQ. Итак, все new созданные экземпляры сокетов получают не только внешний дескриптор java со стороны, но также с внутренней ручкой, изнутри (Z)Context(), Все идет нормально. Но нигде в коде не виден какой-либо метод, который бы выводил из эксплуатации все сокеты ZeroMQ в объектах-экземплярах, которые были деассоциированы с java стороны, но все же остаются ссылками от (Z)Context() -боковая сторона. Явное снятие с эксплуатации выделенных ресурсов - это справедливая практика на стороне проектирования, тем более что ресурсы ограничены или иным образом ограничены. Способ сделать это может отличаться для { "дешевых" | "дорогие" } затраты на обслуживание такой обработки управления ресурсами (экземпляры сокетов ZeroMQ чрезвычайно дороги в обращении, как некоторые легковесные "расходуемые / одноразовые" ... но это уже другая история).

Итак, добавьте также набор правильных методов повторного использования ресурсов / ресурсов, которые бы получили общую сумму new -созданные сокеты возвращаются под вашу ответственность контроля (ваш код отвечает за количество обработчиков сокетов внутри (Z)Context() -domain-of-resources-control может быть создан и должен оставаться управляемым - сознательно или нет).

Кто-то может возразить, что могут быть некоторые "обещания" от автоматического обнаружения и (потенциально хорошо отложенной) сборки мусора, но тем не менее ваш код отвечает за правильное управление ресурсами, и даже парни из LMAX никогда не получат такую ​​смелую производительность, если будут полагаться на "обещания" от стандартного gc. Ваша проблема намного хуже, чем с самой высокой производительностью LMAX. Ваш код (пока опубликованный) ничего не делает для .close() а также .term() связанные с ZeroMQ ресурсы вообще. Это совершенно невозможная практика внутри экосистемы с неконтролируемым (распределенным спросом) потреблением. Вы должны защитить свою лодку от перегрузки сверх предела, который, как вы знаете, он может безопасно обрабатывать и динамически разгружать каждую коробку, у которой нет получателя на "противоположном берегу".

Это ответственность Капитана (вашего разработчика кода) .

Не говорить явно, моряк, отвечающий за управление запасами на самом низком уровне (ZeroMQ Context() -пол), что некоторые ящики должны быть выгружены, проблема по-прежнему ваша. Стандарт gc -chain-of-command не будет делать это "автоматически", какими бы "обещаниями" они ни казались, это не так. Так что будьте недвусмысленны в отношении управления ресурсами ZeroMQ, оцените коды возврата при заказе этих шагов и соответствующим образом обработайте все исключения, возникающие при выполнении этих операций управления ресурсами под явным контролем вашего кода.

Более низкая (если не самая низкая достижимая) степень использования ресурсов - и более высокая (если не самая высокая достижимая) производительность - это бонус от правильного выполнения этой работы. Ребята из LMAX - хороший пример того, как сделать это замечательно далеко за рамки стандартных Java-"обещаний", так что можно учиться у лучших.


Заявленные подписи вызовов по сравнению с использованными, похоже, не совпадают:
хотя я могу ошибаться в этом вопросе, так как большинство моих усилий по разработке не связаны с полиморфными вызовами Java -интерфейсов, похоже, что в сигнатуре, опубликованной как:

private List<SocketHolder> connect( Datacenters  dc,                     // 1-st
                                    List<String> addresses,              // 2-nd
                                    int          socketType              // 3-rd
                                    ) {
        ... /* implementation */
}

а также
фактический вызов метода,
называется внутри connectToZMQSockets() метод просто:

        List<SocketHolder> addedColoSockets = connect( entry.getValue(), // 1-st
                                                       ZMQ.PUSH          // 2-nd
                                                       );
Другие вопросы по тегам