Потребитель, вызывающий утечку памяти на Wildfly AS

У меня есть producerconsumer настройка с использованием Wildfly Что касается JMSпроизводитель использует newFixedThreadPool(126) Каждую 1 минуту каждый поток извлекает данные из REST сервис и толкает его на HornetQна Wildfly КАК.

Затем на стороне потребителя у меня есть класс Consumer, который потребляет сообщения в HornetQ и простой Parser класс, для вставки БД, я создаю экземпляр объекта типа Parser в моем onMessage() а затем передать ему сообщение, сообщение находится в JSON и мой класс парсера проходит по нему, получая значения и вставляя их в мою БД.

Потребитель:

public void Consume(Consumer asyncReceiver) throws Throwable {

    try {
        /** Get the initial context */
        final Properties props = new Properties();
        /** If debugging in IDE the properties are acceded this way */
        if(debug){
            InputStream f = getClass().getClassLoader().getResourceAsStream("consumer.properties");
            props.load(f);
        }
        /** If running the .jar artifact the properties are acceded this way*/
        else{
            File jarPath = new File(getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
            String propertiesPath = jarPath.getParentFile().getAbsolutePath();
            props.load(new FileInputStream(propertiesPath + File.separator + "consumer.properties"));
        }

        /** These few lines should be removed and setup in the properties file*/
        props.put(Context.INITIAL_CONTEXT_FACTORY, props.getProperty("INITIAL_CONTEXT_FACTORY"));
        props.put(Context.PROVIDER_URL, props.getProperty("PROVIDER_URL"));
        props.put(Context.SECURITY_PRINCIPAL, props.getProperty("DEFAULT_USERNAME"));
        props.put(Context.SECURITY_CREDENTIALS, props.getProperty("DEFAULT_PASSWORD"));
        context = new InitialContext(props);

        /** Lookup the queue object */
        Queue queue = (Queue) context.lookup(props.getProperty("DEFAULT_DESTINATION"));

        /** Lookup the queue connection factory */
        ConnectionFactory connFactory = (ConnectionFactory) context.lookup(props.getProperty("DEFAULT_CONNECTION_FACTORY"));

         /** Create a queue connection */
        connection = connFactory.createConnection(props.getProperty("DEFAULT_USERNAME"), props.getProperty("DEFAULT_PASSWORD"));

         /** Create a queue session */
         queueSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

         /** Create a queue consumer */
         msgConsumer = queueSession.createConsumer(queue);

         /** Set an asynchronous message listener */
        msgConsumer.setMessageListener(asyncReceiver);

         /** Set an asynchronous exception listener on the connection */
        connection.setExceptionListener(asyncReceiver);

        /** Start connection */
        connection.start();

         /** Wait for messages */
        System.out.println("waiting for messages");
        for (int i = 0; i < 47483647; i++) {
            Thread.sleep(1000);
            System.out.print(".");
        }
        System.out.println();

    } catch (Exception e) {
        log.severe(e.getMessage());
        throw e;
    }finally {
        if (context != null) {
            context.close();
        }
        if (queueSession != null)
        { queueSession.close();
        }
        if(msgConsumer != null){
            msgConsumer.close();
        }
        if (connection != null) {
            connection.close();
        }
    }
}

@Override
public void onMessage(Message message) {
    TextMessage msg = (TextMessage) message;
    try {
        Parser parser = new Parser();
        parser.parseApplication(msg.getText());

    } catch (Exception e) {
        e.printStackTrace();
    }
}

Parser:

 public void parseApplication(String NRData) throws Exception {

        DBConnection db = DBConnection.createApplication();
        db.getConnection();
        JsonFactory factory = new JsonFactory();
        ObjectMapper mapper = new ObjectMapper(factory);
        JsonNode rootNode = mapper.readTree(NRData);

        Iterator<Map.Entry<String, JsonNode>> fieldsIterator = rootNode.fields();
        while (fieldsIterator.hasNext()) {

            Map.Entry<String, JsonNode> field = fieldsIterator.next();
            String envName = field.getKey();
            JsonNode appValue = field.getValue();
            JSONArray jsonArray = new JSONArray(appValue.toString());
            String appName = jsonArray.getString(0);
            String appID = jsonArray.getString(1);
            JSONObject json = jsonArray.getJSONObject(2);

            JSONObject metricsData = json.getJSONObject("metric_data");
            JSONArray metrics = metricsData.getJSONArray("metrics");
            JSONObject array1 = metrics.getJSONObject(0);
            JSONArray timeslices = array1.getJSONArray("timeslices");

            for (int i = 0; i < timeslices.length(); i++) {
                JSONObject array2 = timeslices.getJSONObject(i);
                JSONObject values = array2.getJSONObject("values");
//                Instant from = array2.getString("from");
                Instant from = TimestampUtils.parseTimestamp(array2.get("from").toString(), null);
                Instant to = TimestampUtils.parseTimestamp(array2.get("to").toString(), null);
                Iterator<String> nameItr = values.keys();
                while (nameItr.hasNext()) {
                    String name = nameItr.next();
                    System.out.println(
                            "\nEnv name: " + envName +
                                    "\nApp name: " + appName +
                                    "\nApp ID: " + appID +
                                    "\nRequest per minute: " + values.getDouble(name) +
                                    "\nFrom: " + from + " To: " + to);

                    ThroughputEntry TP = new ThroughputEntry();
                    TP.setThroughput(values.getDouble(name));
                    TP.setEnvironment(envName);
                    TP.setName(appName);
                    TP.setRetrieved(from);
                    TP.setPeriodEnd(to);

                    db.addHistory(TP);
                }
            }
        }
    }

DBconnection:

public class DBConnection {

private final String table;


/**
 * Set the table name for applications
 */
public static DBConnection createApplication() {
    return new DBConnection("APPLICATIONDATA");
}

public DBConnection(String table) {
    this.table = String.format("NRDATA.%s", table);
}

public Connection getConnection() throws IllegalAccessException,
        InstantiationException, ClassNotFoundException, SQLException {

    try {
        Class.forName("COM.ibm.db2os390.sqlj.jdbc.DB2SQLJDriver");
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    }

    System.out.println("Connecting to database...");
    Connection connection = DriverManager.getConnection(DB2url, user, password);
    System.out.println("From DAO, connection obtained ");
    return connection;
}


public boolean addHistory(ThroughputEntry entry) throws Exception {

    try (Connection connection = getConnection()) {
        Statement statement = connection.createStatement();

        return 0 < statement
                .executeUpdate(String
                        .format("INSERT INTO " + table
                                        + "(ID, RETRIEVED, PERIOD_END, ENVIRONMENT, APPNAME, THROUGHPUT)"
                                        + "VALUES ('%s', '%s', '%s', '%s', '%s', %s)",
                                entry.getUUID(), entry.getRetrieved(),
                                entry.getPeriodEnd(), entry.getEnvironment(),
                                entry.getName(), entry.getThroughput()));


    }
}
}

Так что моя проблема в том, что я получаю утечку памяти на Wildfly AS, и я думаю, что проблема может быть с моим потребителем.

Итак, несколько вопросов:

я должен буферизовать сообщения, полученные в моем onMessage() метод на потребителя, прежде чем вставить их в БД?

если я получаю слишком много сообщений, может ли это быть причиной утечки? отправляет ли потребитель какие-либо сообщения? Ack к Wildfly КАК?

У меня потребитель работает бесконечно с циклом, может быть, это неправильно, может, он должен спать или ждать.

Я уже 2 дня пытаюсь решить эту проблему, любая помощь будет очень признательна.

1 ответ

Решение

Вы должны закрыть все, что нужно закрыть. Я смотрю на первые несколько строк и уже вижу два потока, которые не закрываются.

Просмотрите свой код, все, что реализует AutoCloseable должен быть правильно закрыт. Использовать try-with-resources сделать это.

IDE уже могут дать вам указатели на возможные утечки ресурсов, например, в предупреждениях о настройке Eclipse в Java Compiler>Errors/Warnings,


Изменить: вы отредактировали свой вопрос, и теперь очевидна утечка. в parseApplication метод у вас есть заявление db.getConnection(), Метод создает соединение, которое вы никогда не используете и никогда не закрываете.

Другие вопросы по тегам