Неограниченная последовательность Java DDS вызывает ошибку нехватки памяти

Я новичок в DDS и пытаюсь написать простую Java-программу на Intellij-IDEA, которая состоит из 3 частей:

  1. Клиент симулятор, который отправляет данные.
  2. Мой программный симулятор, который получает данные от клиента, манипулирует ими и отправляет их обратно клиенту.
  3. Клиент симулятор, который читает манипулированные данные.

Все данные, которые я пытаюсь отправить в моем примере, являются простой строкой.

Я использую RTI Code Gen для автоматической генерации большей части кода.

Без и unboundedSupport флаг (строка ограничена 255 символами) все работало просто отлично. Однако при применении unboundedSupport флаг, я получаю следующую ошибку нехватки памяти:

java.lang.OutOfMemoryError: Java heap space
    at com.rti.dds.cdr.CdrBuffer.<init>(Unknown Source)
    at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
    at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
    at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
    at com.rti.dds.infrastructure.EntityImpl.DDS_Entity_enable(Native Method)
    at com.rti.dds.infrastructure.EntityImpl.enable(Unknown Source)
    at com.rti.dds.infrastructure.NativeFactoryMixin.create_entityI(Unknown Source)
    at com.rti.dds.subscription.SubscriberImpl.create_datareader(Unknown Source)
    at json_dds.JsonMessageSubscriber.<init>(JsonMessageSubscriber.java:71)
    at results_consumers.ResultsConsumersMain.main(ResultsConsumersMain.java:10)
create_datareader error

Я активирую клиентский симулятор, который сначала читает данные.

Это мой файл.idl:

struct JsonMessage {
    string msg;
};

Это моя основная программа (строка 10 - инициализация subscriber1):

public static void main(String... args) {
    ClientResultsConsumer clientResultsConsumer = new ClientResultsConsumer();
    JsonMessageSubscriber subscriber1 = new JsonMessageSubscriber(0, clientResultsConsumer,
                                                                               Topics.CLIENT_TOPIC_OUTPUT_1);
    subscriber1.consume();
    ClientResultsConsumer2 clientResultsConsumer2 = new ClientResultsConsumer2();
    JsonMessageSubscriber subscriber2 = new JsonMessageSubscriber(0, clientResultsConsumer2,
                                                                               Topics.CLIENT_TOPIC_OUTPUT_1);
    subscriber2.consume();
    ClientResultsConsumer3 clientResultsConsumer3 = new ClientResultsConsumer3();
    JsonMessageSubscriber subscriber3 =
        new JsonMessageSubscriber(0, clientResultsConsumer3, Topics.CLIENT_TOPIC_OUTPUT_2);
    subscriber3.consume();
  }

Это мой класс ClientResultsConsumer:

public class ClientResultsConsumer implements Consumer {

  @Override
  public void consume(String msg) {
    System.out.println("Client results consumer got " + msg);
  }
}

Это мой класс JsonMessageSubscriber (строка 71 subscriber.create_datareader():

public class JsonMessageSubscriber implements DataConsumer {

  ExecutorService executor = Executors.newSingleThreadExecutor();

  public JsonMessageSubscriber(int domainId, Consumer consumer, String topicName) {

    DomainParticipant participant = DomainParticipantFactory.TheParticipantFactory
        .create_participant(domainId,
                            DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT,
                            null /* listener */,
                            StatusKind.STATUS_MASK_NONE);
    if (participant == null) {
      System.err.println("create_participant error\n");
      System.exit(-1);
    }

    // --- Create subscriber --- //

            /* To customize subscriber QoS, use
            the configuration file USER_QOS_PROFILES.xml */

    Subscriber subscriber = participant.create_subscriber(
        DomainParticipant.SUBSCRIBER_QOS_DEFAULT, null /* listener */,
        StatusKind.STATUS_MASK_NONE);
    if (subscriber == null) {
      System.err.println("create_subscriber error\n");
      System.exit(-1);
    }

    // --- Create topic --- //

    /* Register type before creating topic */
    String typeName = JsonMessageTypeSupport.get_type_name();
    JsonMessageTypeSupport.register_type(participant, typeName);

            /* To customize topic QoS, use
            the configuration file USER_QOS_PROFILES.xml */

    Topic topic = participant.create_topic(
        topicName,
        typeName, DomainParticipant.TOPIC_QOS_DEFAULT,
        null /* listener */, StatusKind.STATUS_MASK_NONE);
    if (topic == null) {
      System.err.println("create_topic error\n");
      System.exit(-1);
    }

    // --- Create reader --- //

    DataReaderListener listener = new JsonMessageListener(consumer);

            /* To customize data reader QoS, use
            the configuration file USER_QOS_PROFILES.xml */

    JsonMessageDataReader reader = (JsonMessageDataReader)
        subscriber.create_datareader(
            topic, Subscriber.DATAREADER_QOS_DEFAULT, listener,
            StatusKind.STATUS_MASK_ALL);
    if (reader == null) {
      System.err.println("create_datareader error\n");
      System.exit(-1);
    }
  }

  // -----------------------------------------------------------------------

  @Override
  public void consume() {
    final long scanTimeMillis = 1000;
    Runnable task = () -> {
      while (true) {
        try {
          TimeUnit.MILLISECONDS.sleep(scanTimeMillis);
        } catch (Exception e) {
          System.err.println(e.getMessage());
        }
      }
    };
    executor.submit(task);
  }
}

К сожалению, я не нашел решения для этого, кроме ограничения размера последовательности, но я понял, что ограничение его достаточно большим числом решит мою проблему, это также потребует много памяти, и я бы предпочел, чтобы это не занимало больше чем минимум, необходимый для каждого сообщения.

Любая помощь будет оценена, спасибо

1 ответ

При использовании -unboundedSupport в вашем файле QoS необходимо установить некоторые пороги памяти. Эти пороговые значения описаны здесь в руководстве пользователя и определяют пороговое значение, при котором память для выборок либо динамически распределяется, либо используется повторно из предварительно выделенного источника. Они должны быть установлены как в DataReader, так и в DataWriter.

Настройки для этих порогов действительно зависят от вашего размера данных и от вашего описания. У меня недостаточно информации, чтобы предоставить вам пример, который имеет смысл в вашем сценарии. По сути, вы не хотите динамически распределять память для каждого семпла. Это может повлиять на производительность в зависимости от скорости передачи данных. Вы хотите выбрать значения, в которых большинство сэмплов используют предварительно выделенную память. Пример, представленный в руководстве пользователя в разделе " Управление памятью на стороне записывающего устройства при работе с большими данными", относится к потоковой передаче видео, которая содержит более крупные менее частые I-кадры и более мелкие более частые P-кадры. Вы можете посмотреть этот раздел и соответствующий раздел DataReader для примера XML-файла.

Мне удалось решить проблему, используя пример здесь

Все, что потребовалось, - это передать автоматически сгенерированный путь к файлу qos конструктору подписчика / издателя, а затем записать эти строки перед инициализацией участника домена (это отличается от примера, приведенного в ссылке выше, приведенный пример у меня не сработал):

DomainParticipantFactoryQos factoryQos = new DomainParticipantFactoryQos();
DomainParticipantFactory.TheParticipantFactory.get_qos(factoryQos);
factoryQos.profile.url_profile.add(0, qosPolicyPath);
factoryQos.profile.url_profile.setMaximum(1);
DomainParticipantFactory.TheParticipantFactory.set_qos(factoryQos);
Другие вопросы по тегам