СЛУШАТЬ / УВЕДОМИТЬ pgconnection идет вниз Java?

Я использую PostgreSQL DB и применяю его LISTEN/NOTIFY функциональность. Таким образом, мой слушатель находится на моей AS (сервер приложений), и у меня есть триггеры, сконфигурированные на моей БД, чтобы при выполнении операций CRUD над таблицей NOTIFY Запрос отправлен на AS.

СЛУШАТЕЛЬ класс в Java:

        @Singleton
        @Startup
    NotificationListenerInterface.class)
        public class NotificationListener extends Thread implements NotificationListenerInterface {

            @Resource(mappedName="java:/RESOURCES") 
            private DataSource ds;

            @PersistenceContext(unitName = "one")
            EntityManager em;

            Logger logger = Logger.getLogger(NotificationListener.class);

            private Connection Conn;
            private PGConnection pgConnection = null;
            private NotifyRequest notifyRequest = null;

            @PostConstruct
            public void notificationListener() throws Throwable {

                System.out.println("Notification****************");
                try
                {


                    Class.forName("com.impossibl.postgres.jdbc.PGDriver");
                    String url = "jdbc:pgsql://192.xx.xx.126:5432/postgres";


                    Conn = DriverManager.getConnection(url,"postgres","password");
                    this.pgConnection = (PGConnection) Conn;

                    System.out.println("PG CONNECTON: "+ pgConnection);
                    Statement listenStatement = Conn.createStatement();
                    listenStatement.execute("LISTEN notify_channel");
                    listenStatement.close();

                    pgConnection.addNotificationListener(new PGNotificationListener() {

                        @Override
                        public void notification(int processId, String channelName, String payload){

                            System.out.println("*********INSIDE NOTIFICATION*************");

                            System.out.println("Payload: " + jsonPayload);

}

Так как мой AS работает, я настроил, что при запуске вызывается класс слушателя (@Startup annotation) и начать слушать на канале.

Теперь это работает нормально, если, скажем, для тестирования я редактирую свою таблицу в БД вручную, уведомление генерируется и LISTENER получает его.

Однако когда я программно отправляю запрос UPDATE на таблицу, UPADTE выполняется успешно, но LISTENER ничего не получает.

Я чувствую, что мое соединение LISTENER обрывается, когда я отправляю запрос (он также устанавливает соединение для редактирования сущностей), но я не уверен. Я читал о постоянных и объединенных соединениях, но не смог решить, как это сделать.

Я использую jar pgjdbc ( http://impossibl.github.io/pgjdbc-ng/) для асинхронных уведомлений, так как соединение jdbc требует опроса.

РЕДАКТИРОВАТЬ:

Когда я пробую вышеупомянутый слушатель с опросом, используя стандартный jdbc jar (не pgjdbc), я получаю уведомления.

я делаю PGNotification notif[] = con.getNotifications()и я получаю уведомления, однако, делая это асинхронно, как показано ниже, я не получаю уведомления.

    pgConnection.addNotificationListener(new PGNotificationListener() {

         @Override
         public void notification(int processId, String channelName, String payload){

            System.out.println("*********INSIDE NOTIFICATION*************");
         }

РЕШИТЬ:

Мой слушатель выходил из области видимости после завершения выполнения функции, поскольку у моего слушателя была область действия функции. Так что держал его в переменной-члене моего класса запускаемых компонентов, и тогда он работал.

1 ответ

Решение

Слушатели уведомлений внутренне поддерживаются этой библиотекой как слабые ссылки, а это означает, что вам нужно хранить жесткую ссылку извне, чтобы они не собирались мусором. Проверьте строки класса BasicContext 642 - 655:

public void addNotificationListener(String name, String channelNameFilter, NotificationListener listener) {

    name = nullToEmpty(name);
    channelNameFilter = channelNameFilter != null ? channelNameFilter : ".*";

    Pattern channelNameFilterPattern = Pattern.compile(channelNameFilter);

    NotificationKey key = new NotificationKey(name, channelNameFilterPattern);

    synchronized (notificationListeners) {
      notificationListeners.put(key, new WeakReference<NotificationListener>(listener));
    }

}

Если GC поднимает ваш слушатель, вызовы "get" для слабой ссылки вернут ноль и не сработают, как видно из строк 690 - 710

  @Override
  public synchronized void reportNotification(int processId, String channelName, String payload) {

    Iterator<Map.Entry<NotificationKey, WeakReference<NotificationListener>>> iter = notificationListeners.entrySet().iterator();
    while (iter.hasNext()) {

      Map.Entry<NotificationKey, WeakReference<NotificationListener>> entry = iter.next();

      NotificationListener listener = entry.getValue().get();
      if (listener == null) {

        iter.remove();
      }
      else if (entry.getKey().channelNameFilter.matcher(channelName).matches()) {

        listener.notification(processId, channelName, payload);
      }

    }

}

Чтобы это исправить, добавьте слушателей уведомлений как таковые:

/// Do not let this reference go out of scope!
PGNotificationListener listener = new PGNotificationListener() {

@Override
public void notification(int processId, String channelName, String payload) {
    // interesting code
};
pgConnection.addNotificationListener(listener);

Довольно странный вариант использования слабых ссылок на мой взгляд...

Другие вопросы по тегам