Mykhaylo Adamovych Mykhaylo Adamovych - 6 months ago 32
Java Question

Disposable MBean

How could I implement disposable MBean, one that doesn't prevent resource it is monitoring from being garbage collected?

Let say I wrote dummy statistic MBean but the class it is monitoring is not singleton in the system. I would like MBean to be automatically unregistered once resource is no longer in use.

Any ideas how to achieve that?
Any existing solutions?

Thanks.

Answer

Usage

registerWeakMBean("com.company:type=connection,name=" +
getClass().getSimpleName(), connectinMBean, ConnectinStatMBean.class);

Implementatin

package util;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.ref.WeakReference;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanServer;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.StandardMBean;

import org.apache.log4j.Logger;

import sun.management.Agent;

public class JmxUtils {
    private static Logger log = Logger.getLogger(JmxUtils.class);
    private static Map<Object, String> weakMBeans = new ConcurrentHashMap<Object, String>();
    static {
        verifyJmxAgentStarted();
    }

    private static final int getAvailablePort() throws IOException {
        ServerSocket s = new ServerSocket(0);
        int result = s.getLocalPort();
        s.close();
        return result;
    }

    /**
     * @param objName
     *            domain:type=value[,name=value]
     * @param implementation
     * @param mbeanInterface
     * @see ObjectName
     * @see StandardMBean
     */
    public static final <I> ObjectInstance registerMBean(String objName, I implementation, Class<I> mbeanInterface) {
        int counter = 0;
        String uniqueSuffix = "";
        final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        while (true) {
            try {
                final ObjectName name = new ObjectName(objName + uniqueSuffix);
                final StandardMBean mbean = new StandardMBean(implementation, mbeanInterface);
                return mbs.registerMBean(mbean, name);
            } catch (final InstanceAlreadyExistsException e) {
                uniqueSuffix = "" + ++counter;
            } catch (final Exception e) {
                throw new Error(e);
            }
        }
    }

    /**
     * Weak MBean will not prevent resource it is monitoring from been garbage collected. MBean will be automatically unregistered.
     * 
     * @param objName
     *            domain:type=value[,name=value]
     * @param implementation
     * @param mbeanInterface
     * @see ObjectName
     * @see StandardMBean
     * @see WeakReference
     */
    public static final <I> ObjectInstance registerWeakMBean(String objName, I implementation, Class<I> mbeanInterface) {
        I proxy = DisposableWeakReference.newWeakReferenceProxy(new DisposableWeakReference<I>(implementation) {
            @Override
            public void dispose(Object disposable) {
                unregisterMBean(weakMBeans.remove(disposable));
            }
        }, mbeanInterface);
        ObjectInstance instance = registerMBean(objName, proxy, mbeanInterface);
        weakMBeans.put(proxy, instance.getObjectName().getCanonicalName());
        return instance;
    }

    public static <T> T newJmxClient(Class<T> clazz, String objectName, String serviceUrl) {
        return createJmxClient(clazz, objectName, serviceUrl, null, null);
    }

    public static <T> T newJmxClient(Class<T> clazz, String objectName, String serviceUrl, final String user, final String pass) {
        try {
            JMXServiceURL jmxServiceUrl = new JMXServiceURL(serviceUrl);
            Map<String, ?> env = user == null ? null : new HashMap<String, Object>() {{
                put(JMXConnector.CREDENTIALS, new String[] {user, pass});
            }};
            JMXConnector jmxc = JMXConnectorFactory.connect(jmxServiceUrl, env);
            MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
            ObjectName mbeanName = new ObjectName(objectName);
            return JMX.newMBeanProxy(mbsc, mbeanName, clazz, true);
        } catch (IOException | MalformedObjectNameException e) {
            throw new RuntimeException("Can not create client for remote JMX " + serviceUrl, e);
        }
    }

    /**
     * @param objName
     * @see ObjectName
     */
    public static final void unregisterMBean(String objName) {
        try {
            final MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
            final ObjectName name = new ObjectName(objName);
            mbs.unregisterMBean(name);
        } catch (final Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static final void verifyJmxAgentStarted() {
        try {
            String port = System.getProperty("com.sun.management.jmxremote.port");
            if (port == null) {
                port = String.valueOf(getAvailablePort());
                System.setProperty("com.sun.management.jmxremote.port", port);
                System.setProperty("com.sun.management.jmxremote.ssl", "false");
                System.setProperty("com.sun.management.jmxremote.authenticate", "false");

                Agent.startAgent();
            }
            log.info(InetAddress.getLocalHost().getCanonicalHostName() + ":" + port);
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

package util;

import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.WeakHashMap;

/**
 * Disposable weak reference calls you back when referent has been disposed. You can also create proxy to the referent to emulate direct access.
 * 
 * <pre>
 * public class Example {
 *     public interface I {
 *         // interface referent is implementing to create a proxy
 *     }
 * 
 *     public static final class T implements I {
 *         public String toString() {
 *             return "blah";
 *         }
 *     }
 * 
 *     private WeakReference&ltT&gt wr;
 *     private I wrp;
 *     private List&ltObject&gt list = new LinkedList&ltObject&gt();
 * 
 *     private void testWeakRef() {
 *         T o = new T();
 *         wr = new DisposableWeakReference&ltT&gt(o) {
 *             public void dispose(Object disposable) {
 *                 list.remove(disposable);
 *             }
 *         };
 *         list.add(wr);
 *         wrp = DisposableWeakReference.newWeakReferenceProxy(new DisposableWeakReference&ltI&gt(o) {
 *             public void dispose(Object disposable) {
 *                 list.remove(disposable);
 *                 Example.this.wrp = null;
 *             }
 *         }, I.class);
 *         list.add(wrp);
 *     }
 * 
 *     public static void main(final String[] args) throws Exception {
 *         Example exmple = new Example();
 *         exmple.testWeakRef(); // try to replace with exact implementation
 * 
 *         System.out.println("exmple.wr.get() " + exmple.wr.get()); // blah
 *         System.out.println("exmple.wrp " + exmple.wrp); // blah
 *         System.out.println("exmple.list.contains(exmple.wr) " + exmple.list.contains(exmple.wr)); // true
 *         System.out.println("exmple.list.contains(exmple.wrp) " + exmple.list.contains(exmple.wrp)); // true
 *         System.gc();
 *         Thread.sleep(10);
 *         System.out.println("exmple.wr.get() " + exmple.wr.get()); // null
 *         System.out.println("exmple.wrp " + exmple.wrp); // null or exception
 *         System.out.println("exmple.list.contains(exmple.wr) " + exmple.list.contains(exmple.wr)); // false
 *         System.out.println("exmple.list.contains(exmple.wrp) " + exmple.list.contains(exmple.wrp)); // false
 *     }
 * }
 * 
 * <pre>
 * 
 * @param <T> weak reference referent type
 * @author Mykhaylo Adamovych
 */
@SuppressWarnings({ "rawtypes" })
public abstract class DisposableWeakReference<T> extends WeakReference<T> {
    public static class DisposedException extends RuntimeException {
        private static final long serialVersionUID = -1176608195614694732L;

        public DisposedException() {
            super();
        }

        public DisposedException(String message) {
            super(message);
        }

        public DisposedException(String message, Throwable cause) {
            super(message, cause);
        }

        public DisposedException(Throwable cause) {
            super(cause);
        }
    }

    private static class ReferenceProxy<T> implements InvocationHandler {
        private final DisposableWeakReference<T> reference;

        public ReferenceProxy(DisposableWeakReference<T> reference) {
            this.reference = reference;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            if ("equals".equals(method.getName()))
                return proxy == args[0];
            else if ("hashCode".equals(method.getName()))
                return hashCode();
            T referent = reference.get();
            if (referent == null)
                throw new DisposedException("Referent has been disposed.");
            return method.invoke(referent, args);
        }
    }

    private static class WeakReferenceDisposerThread extends Thread {
        WeakReferenceDisposerThread() {
            super("Weak Reference Disposer");
        }

        @Override
        public void run() {
            while (true)
                try {
                    DisposableWeakReference<?> reference = (DisposableWeakReference<?>) queue.remove();
                    Object disposable = reference.proxy;
                    if (disposable == null)
                        disposable = reference;
                    reference.dispose(disposable);
                } catch (Throwable e) {
                    // ignore any exception while disposing
                }
        }
    }

    private static final ReferenceQueue queue = new ReferenceQueue();

    static {
        Thread disposer = new WeakReferenceDisposerThread();
        disposer.setPriority(Thread.MAX_PRIORITY - 2);
        disposer.setDaemon(true);
        disposer.start();
    }

    /**
     * You can use referent directly without {@link #get()}. Runtime exception will rise in case referent has been disposed by GC. You can use
     * {@link #dispose(Object)} to deal with proxy also.
     * 
     * @param reference
     *            disposable weak reference
     * @param clazz
     *            referent interface class
     * @param <T>
     *            referent type
     * @param <I>
     *            referent interface to create a proxy
     * @return referent proxy using weak reference
     */
    public static <I> I newWeakReferenceProxy(DisposableWeakReference<I> reference, Class<I> clazz) {
        I proxy = ReflectUtils.<I>newProxyInstance(new ReferenceProxy<I>(reference), clazz);
        reference.proxy = proxy;
        return proxy;
    }

    private Object proxy;

    public DisposableWeakReference(T referent) {
        super(referent, queue);
    }

    /**
     * Remove this weak reference wrapper from whatever when referent has been garbage collected.
     * 
     * @param disposable
     *            either this reference instance or proxy instance created by {@link #newWeakReferenceProxy(DisposableWeakReference, Class)}
     * @see WeakHashMap
     */
    public abstract void dispose(Object disposable);
}
Comments