Siddharth Trikha Siddharth Trikha - 5 months ago 42
Java Question

LISTEN/NOTIFY pgconnection goes down java?

I am using PostgreSQL DB and applying it's

LISTEN/NOTIFY
functionality. So my listener is at my AS (Application Server) and I have triggers configured on my DB such that when CRUD operations are performed on a table a
NOTIFY
request is sent on AS.

LISTENER class in java:

@Singleton
@Startup
NotificationListenerInterface.class)
public class NotificationListener extends Thread implements NotificationListenerInterface {

@Resource(mappedName="java:/RESOURCES")
private DataSource ds;

@PersistenceContext(unitName = "one")
EntityManager em;

Logger logger = Logger.getLogger(NotificationListener.class);

private Connection Conn;
private PGConnection pgConnection = null;
private NotifyRequest notifyRequest = null;

@PostConstruct
public void notificationListener() throws Throwable {

System.out.println("Notification****************");
try
{


Class.forName("com.impossibl.postgres.jdbc.PGDriver");
String url = "jdbc:pgsql://192.xx.xx.126:5432/postgres";


Conn = DriverManager.getConnection(url,"postgres","password");
this.pgConnection = (PGConnection) Conn;

System.out.println("PG CONNECTON: "+ pgConnection);
Statement listenStatement = Conn.createStatement();
listenStatement.execute("LISTEN notify_channel");
listenStatement.close();

pgConnection.addNotificationListener(new PGNotificationListener() {

@Override
public void notification(int processId, String channelName, String payload){

System.out.println("*********INSIDE NOTIFICATION*************");

System.out.println("Payload: " + jsonPayload);

}


So as my AS is up, I have configured that at startup the listener class is called (
@Startup annotation
) and it's start listening on the channel.

Now this works fine if like say for testing I edit my table in DB manually, the notification is generated and the LISTENER receives it.

However, when I programmatically send a UPDATE request on the table, the UPADTE is performed successfully but LISTENER is not receiving anything.

I feel my connection of the LISTENER goes down when I send a request (it also makes a connection to edit entities), but I am not sure. I read about permanent connections and pooled connections, but not able to decide how to pursue that.

I am using pgjdbc (http://impossibl.github.io/pgjdbc-ng/) jar for async notifications as jdbc connection requires polling.

EDIT:

When I try the above listener with polling by using the standard jdbc jar (not pgjdbc), I get the notifications.

I do
PGNotification notif[] = con.getNotifications()

and I get notifications, however doing it asynchronously like below I don't get notifications.

pgConnection.addNotificationListener(new PGNotificationListener() {

@Override
public void notification(int processId, String channelName, String payload){

System.out.println("*********INSIDE NOTIFICATION*************");
}


SOLVED:

My listener was going out of scope (as suggested by @Luke) once the function was executed (my listener had the function scope). So kept it into a member variable of my startup bean class.

Answer

The notification listeners are internally maintained by that library as weak references meaning that you have to hold a hard reference externally so they won't be garbage collected. Check out the BasicContext class lines 642 - 655:

public void addNotificationListener(String name, String channelNameFilter, NotificationListener listener) {

    name = nullToEmpty(name);
    channelNameFilter = channelNameFilter != null ? channelNameFilter : ".*";

    Pattern channelNameFilterPattern = Pattern.compile(channelNameFilter);

    NotificationKey key = new NotificationKey(name, channelNameFilterPattern);

    synchronized (notificationListeners) {
      notificationListeners.put(key, new WeakReference<NotificationListener>(listener));
    }

}

If the GC picks up your listener, calls to "get" on the weak reference will return null and will not fire as seen from lines 690 - 710

  @Override
  public synchronized void reportNotification(int processId, String channelName, String payload) {

    Iterator<Map.Entry<NotificationKey, WeakReference<NotificationListener>>> iter = notificationListeners.entrySet().iterator();
    while (iter.hasNext()) {

      Map.Entry<NotificationKey, WeakReference<NotificationListener>> entry = iter.next();

      NotificationListener listener = entry.getValue().get();
      if (listener == null) {

        iter.remove();
      }
      else if (entry.getKey().channelNameFilter.matcher(channelName).matches()) {

        listener.notification(processId, channelName, payload);
      }

    }

}

To fix this, add your notification listeners as such:

/// Do not let this reference go out of scope!
PGNotificationListener listener = new PGNotificationListener() {

@Override
public void notification(int processId, String channelName, String payload) {
    // interesting code
};
pgConnection.addNotificationListener(listener);

Quite an odd use-case for weak references in my opinion...

Comments