Oracle: отправить последнюю вставку / обновление в JMS

У меня есть триггер вставки / обновления для таблицы Oracle. Есть ли способ отправить сведения о затронутой строке (все столбцы) в виде сообщения в JMS? Я могу написать программу на Java, 'loadjava' и вызвать из триггера. Влияет ли этот способ на производительность?

Есть ли какой-нибудь родной способ достижения этого?

2 ответа

Существует действительно родной способ: использовать AQ JMS из PL/SQL, см. https://docs.oracle.com/database/121/ADQUE/jm_exmpl.htm. Короче говоря, вы создаете очередь AQ с типом полезной нагрузки JMS; тогда вы можете отправлять сообщения с PL / SQL из триггера. Внешний Java-клиент может подключаться к базе данных и читать сообщения с помощью JMS.

Я не знаю, насколько вызов в Java повлияет на производительность, но я стараюсь избегать этого. Это была хорошая идея, но она так и не завоевала популярность, поэтому она остается незначительным делом и, по крайней мере, на ранних этапах всегда были проблемы. PL/SQL с другой стороны работает.

Если вам нужно отправить данные в другой продукт очереди сообщений (теги activemq и mq), вы можете прочитать сообщения на Java и переслать их. Это добавляет дополнительный шаг, но это просто.

У loadjava много проблем и они нестабильны, если загружено много классов и много бизнеса, посмотрите вызов Java из Oracle, PLSQL вызывает oracle.aurora.vm.ReadOnlyObjectException

Oracle AQ, как я знаю, не является бесплатной.

Я реализовал ту же потребность после того, как попробовал много возможностей, создав только 1 класс, загруженный в oracle с помощью loadjava, который вызывается триггером как процедура и имеет возможность вызывать внешнюю Java-программу со всеми необходимыми параметрами и регистрировать вывод внешнего процесса в таблица, как показано ниже.

Я закодировал текстовое сообщение в BASE64, потому что я использовал формат JSON, и некоторые специальные символы могут вызывать проблемы в качестве параметров для внешней Java-программы.

я использовал "#*#jms_separator#*#" в качестве разделителя в строке отправленного параметра для анализа содержимого, если мне нужно отправить много параметров во внешнюю программу.

вся продолжительность ShellExecutor.shellExec составляет около 500 мс и работает с 1 года без каких-либо проблем.

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.Clob;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Arrays;
import java.util.concurrent.FutureTask;

import sun.misc.BASE64Encoder;

public class ShellExecutor {

    static {
        System.setProperty("file.encoding", "UTF-8");
    }

    private static final String INSERT_LOGS_SQL = "INSERT INTO JMS_LOG (TEXT_LOG) VALUES (?) ";
    private static final String DEFAULT_CONNECTION = "jdbc:default:connection:";

    public static String SQLshellExec(String command) throws Exception {
        long start = System.currentTimeMillis();
        StringBuffer result = new StringBuffer();
        ShellExecutor worker = new ShellExecutor();
        try {
            worker.shellExec(command, result);
        } finally {
            result.append("exe duration : " + (System.currentTimeMillis() - start + "\n"));
            Connection dbConnection = null;
            PreparedStatement logsStatement = null;
            try {
                dbConnection = DriverManager.getConnection(DEFAULT_CONNECTION);
                logsStatement = dbConnection.prepareStatement(INSERT_LOGS_SQL);
                logsStatement.clearParameters();
                Clob clob = dbConnection.createClob();
                clob.setString(1, result.toString());
                logsStatement.setClob(1, clob);
                logsStatement.executeUpdate();
            } finally {
                if (logsStatement != null) {
                    try {
                        logsStatement.close();
                    } catch (Exception e) {
                    }
                }
            }
        }
        return result.substring(result.length() - 3090);
    }

    public void shellExec(String command, StringBuffer result) throws Exception {
        Process process = null;
        int exit = -10;
        try {
            InputStream stdout = null;
            String[] params = command.split("#*#jms_separator#*#");
            BASE64Encoder benc = new BASE64Encoder();
            for (int i = 0; i < params.length; i++) {
                if (params[i].contains("{") || params[i].contains("}") || params[i].contains("<")
                        || params[i].contains("/>")) {
                    params[i] = benc.encodeBuffer(params[i].getBytes("UTF-8"));
                }
            }
            result.append("Using separator : " + "#*#jms_separator#*#").append("\n")
                    .append("Calling : " + Arrays.toString(params)).append("\n");
            ProcessBuilder pb = new ProcessBuilder(params);
            pb.redirectErrorStream(true);
            process = pb.start();

            stdout = process.getInputStream();

            LogStreamReader lsr = new LogStreamReader(stdout, result);
            FutureTask<String> stdoutFuture = new FutureTask<String>(lsr, null);
            Thread thread = new Thread(stdoutFuture, "LogStreamReader");
            thread.start();

            try {
                exit = process.waitFor();
            } catch (InterruptedException e) {
                try {
                    exit = process.waitFor();
                } catch (Exception e1) {
                }
            }
            stdoutFuture.get();
            result.append("\n").append("exit code :").append(exit).append("\n");
            if (exit != 0) {
                throw new RuntimeException(result.toString());
            }
        } catch (Exception e) {
            result.append("\nException(").append(e.toString()).append("):").append(e.getCause()).append("\n\n");
            e.printStackTrace(System.err);
            throw e;
        } finally {
            if (process != null) {
                process.destroy();
            }
        }
    }
}

class LogStreamReader implements Runnable {

    private BufferedReader reader;
    private StringBuffer result;

    public LogStreamReader(InputStream is, StringBuffer result) {
        this.reader = new BufferedReader(new InputStreamReader(is));
        this.result = result;
    }

    public void run() {
        try {
            String line = null;
            while ((line = reader.readLine()) != null) {
                result.append(line).append("\n");
            }
        } catch (Exception e) {
            result.append("\nException(").append(e.toString()).append("):").append(e.getCause()).append("\n\n");
            e.printStackTrace(System.err);
        } finally {
            try {
                reader.close();
            } catch (IOException e) {
            }
        }
    }
}

Класс внешней Java-программы, упакованный как исполняемый файл со всеми необходимыми библиотеками, простой отправитель JMS:

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;

import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.commons.codec.binary.Base64;
import org.json.JSONObject;

import progress.message.jclient.ConnectionFactory;
import progress.message.jimpl.Connection;

public class JMSSender {
    private static SimpleDateFormat sdf = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss.SSS");

    public static void main(String[] args) throws Throwable {
        doSend(args[0]);
    }

    public static void doSend(String text)
            throws Throwable {
        if (Base64.isBase64(text)) {
            text = new String(Base64.decodeBase64(text));
        }
        String content = "\n\nsending message :" + text;
        Connection con = null;
        Session session = null;
        try {
            ConnectionFactory cf = new ConnectionFactory();
            session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination dest =  session.createTopic(destination) ;
            MessageProducer producer = session.createProducer(dest);
            con.start();
            JSONObject json = new JSONObject();
            json.put("content", text);
            json.put("date", sdf.format(new Date()));

            TextMessage tm = session.createTextMessage(json.toString());
            producer.send(tm);
            content += " \n\n" + "sent message :" + json.toString();
        } catch (Throwable e) {
            content += " \n\n" + e.toString() + " \n\n" + Arrays.toString(e.getStackTrace());
            if (e.getCause() != null) {
                content += " \n\nCause : " + e.getCause().toString() + " \n\n"
                        + Arrays.toString(e.getCause().getStackTrace());
            }
            e.printStackTrace(System.err);
            throw e;
        } finally {
            write("steps on sending message : " + content);
            if (session != null) {
                try {
                    session.commit();
                    session.close();
                } catch (Exception e) {
                }
                session = null;
            }
            if (con != null) {
                try {
                    con.stop();
                    con.close();
                } catch (Exception e) {
                }
            }
        }
    }

    private static void write(String log) {
        try {
            if (System.out != null) {
                System.out.println(log);
            }
        } catch (Exception e2) {
        }
    }
}
Другие вопросы по тегам