Неограниченная последовательность Java DDS вызывает ошибку нехватки памяти
Я новичок в DDS и пытаюсь написать простую Java-программу на Intellij-IDEA, которая состоит из 3 частей:
- Клиент симулятор, который отправляет данные.
- Мой программный симулятор, который получает данные от клиента, манипулирует ими и отправляет их обратно клиенту.
- Клиент симулятор, который читает манипулированные данные.
Все данные, которые я пытаюсь отправить в моем примере, являются простой строкой.
Я использую RTI Code Gen для автоматической генерации большей части кода.
Без и unboundedSupport
флаг (строка ограничена 255 символами) все работало просто отлично. Однако при применении unboundedSupport
флаг, я получаю следующую ошибку нехватки памяти:
java.lang.OutOfMemoryError: Java heap space
at com.rti.dds.cdr.CdrBuffer.<init>(Unknown Source)
at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
at com.rti.dds.infrastructure.EntityImpl.DDS_Entity_enable(Native Method)
at com.rti.dds.infrastructure.EntityImpl.enable(Unknown Source)
at com.rti.dds.infrastructure.NativeFactoryMixin.create_entityI(Unknown Source)
at com.rti.dds.subscription.SubscriberImpl.create_datareader(Unknown Source)
at json_dds.JsonMessageSubscriber.<init>(JsonMessageSubscriber.java:71)
at results_consumers.ResultsConsumersMain.main(ResultsConsumersMain.java:10)
create_datareader error
Я активирую клиентский симулятор, который сначала читает данные.
Это мой файл.idl:
struct JsonMessage {
string msg;
};
Это моя основная программа (строка 10 - инициализация subscriber1
):
public static void main(String... args) {
ClientResultsConsumer clientResultsConsumer = new ClientResultsConsumer();
JsonMessageSubscriber subscriber1 = new JsonMessageSubscriber(0, clientResultsConsumer,
Topics.CLIENT_TOPIC_OUTPUT_1);
subscriber1.consume();
ClientResultsConsumer2 clientResultsConsumer2 = new ClientResultsConsumer2();
JsonMessageSubscriber subscriber2 = new JsonMessageSubscriber(0, clientResultsConsumer2,
Topics.CLIENT_TOPIC_OUTPUT_1);
subscriber2.consume();
ClientResultsConsumer3 clientResultsConsumer3 = new ClientResultsConsumer3();
JsonMessageSubscriber subscriber3 =
new JsonMessageSubscriber(0, clientResultsConsumer3, Topics.CLIENT_TOPIC_OUTPUT_2);
subscriber3.consume();
}
Это мой класс ClientResultsConsumer:
public class ClientResultsConsumer implements Consumer {
@Override
public void consume(String msg) {
System.out.println("Client results consumer got " + msg);
}
}
Это мой класс JsonMessageSubscriber (строка 71 subscriber.create_datareader(
):
public class JsonMessageSubscriber implements DataConsumer {
ExecutorService executor = Executors.newSingleThreadExecutor();
public JsonMessageSubscriber(int domainId, Consumer consumer, String topicName) {
DomainParticipant participant = DomainParticipantFactory.TheParticipantFactory
.create_participant(domainId,
DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT,
null /* listener */,
StatusKind.STATUS_MASK_NONE);
if (participant == null) {
System.err.println("create_participant error\n");
System.exit(-1);
}
// --- Create subscriber --- //
/* To customize subscriber QoS, use
the configuration file USER_QOS_PROFILES.xml */
Subscriber subscriber = participant.create_subscriber(
DomainParticipant.SUBSCRIBER_QOS_DEFAULT, null /* listener */,
StatusKind.STATUS_MASK_NONE);
if (subscriber == null) {
System.err.println("create_subscriber error\n");
System.exit(-1);
}
// --- Create topic --- //
/* Register type before creating topic */
String typeName = JsonMessageTypeSupport.get_type_name();
JsonMessageTypeSupport.register_type(participant, typeName);
/* To customize topic QoS, use
the configuration file USER_QOS_PROFILES.xml */
Topic topic = participant.create_topic(
topicName,
typeName, DomainParticipant.TOPIC_QOS_DEFAULT,
null /* listener */, StatusKind.STATUS_MASK_NONE);
if (topic == null) {
System.err.println("create_topic error\n");
System.exit(-1);
}
// --- Create reader --- //
DataReaderListener listener = new JsonMessageListener(consumer);
/* To customize data reader QoS, use
the configuration file USER_QOS_PROFILES.xml */
JsonMessageDataReader reader = (JsonMessageDataReader)
subscriber.create_datareader(
topic, Subscriber.DATAREADER_QOS_DEFAULT, listener,
StatusKind.STATUS_MASK_ALL);
if (reader == null) {
System.err.println("create_datareader error\n");
System.exit(-1);
}
}
// -----------------------------------------------------------------------
@Override
public void consume() {
final long scanTimeMillis = 1000;
Runnable task = () -> {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(scanTimeMillis);
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
};
executor.submit(task);
}
}
К сожалению, я не нашел решения для этого, кроме ограничения размера последовательности, но я понял, что ограничение его достаточно большим числом решит мою проблему, это также потребует много памяти, и я бы предпочел, чтобы это не занимало больше чем минимум, необходимый для каждого сообщения.
Любая помощь будет оценена, спасибо
1 ответ
При использовании -unboundedSupport в вашем файле QoS необходимо установить некоторые пороги памяти. Эти пороговые значения описаны здесь в руководстве пользователя и определяют пороговое значение, при котором память для выборок либо динамически распределяется, либо используется повторно из предварительно выделенного источника. Они должны быть установлены как в DataReader, так и в DataWriter.
Настройки для этих порогов действительно зависят от вашего размера данных и от вашего описания. У меня недостаточно информации, чтобы предоставить вам пример, который имеет смысл в вашем сценарии. По сути, вы не хотите динамически распределять память для каждого семпла. Это может повлиять на производительность в зависимости от скорости передачи данных. Вы хотите выбрать значения, в которых большинство сэмплов используют предварительно выделенную память. Пример, представленный в руководстве пользователя в разделе " Управление памятью на стороне записывающего устройства при работе с большими данными", относится к потоковой передаче видео, которая содержит более крупные менее частые I-кадры и более мелкие более частые P-кадры. Вы можете посмотреть этот раздел и соответствующий раздел DataReader для примера XML-файла.
Мне удалось решить проблему, используя пример здесь
Все, что потребовалось, - это передать автоматически сгенерированный путь к файлу qos конструктору подписчика / издателя, а затем записать эти строки перед инициализацией участника домена (это отличается от примера, приведенного в ссылке выше, приведенный пример у меня не сработал):
DomainParticipantFactoryQos factoryQos = new DomainParticipantFactoryQos();
DomainParticipantFactory.TheParticipantFactory.get_qos(factoryQos);
factoryQos.profile.url_profile.add(0, qosPolicyPath);
factoryQos.profile.url_profile.setMaximum(1);
DomainParticipantFactory.TheParticipantFactory.set_qos(factoryQos);