Eugene Eugene - 5 months ago 34
Java Question

LMAX Disruptor as a blocking queue?

Is there any way i can have both in one structure -


  1. Semantics of BlockingQueue, ie - non blocking peek, blocking poll and blocking put. Multiple providers one consumer.

  2. RingBuffer, which effectively works as an object pool, so instead of putting new object in ring buffer, i want to reuse existing object there, copying the state. So basically the functionality LMAX disruptor has out of the box.



Is there something which works like that already?
I guess i can try and use Disruptor for that, i already can use it as a blocking queue with blocking put(if the ring buffer is "full") if i understand correctly. It already has the "reusable objects" semantics i need. So the only problem is how to create a client which would be able to PULL objects(instead of using callbacks), so as i'm not really familiar with internal Disruptor structure - can it be done? With all those sequencers, creating a new EventProcessor or something like that?

And no, the obvious solution of having a blocking queue on a client side and getting from it is not an ideal solution, as it breaks the whole point of using the disruptor object pool - you'll need to have a new pool now, or just create a new objects in the callback before putting in that blocking queue etc, and i don't want to have any garbage created at all.

So is there a way to achieve it with Disruptor, or any other performance oriented/garbage free java library?

Answer

For curious, i haven't been able to get a "blocking pull" semantics from the Disruptor itself, but of course it's trivial to add "blocking" functionality to the non-blocking pull. "Peek" functionality by itself is possible but not efficient(you need to copy the item again and again on each peek) and can be replaced by just caching the results of "poll".

So, the minimal raw solution, implemented only the methods i need:

public class DisruptorMPSCQueue<T extends ICopyable<T>> {

    private final RingBuffer<T> ringBuffer;
    private final EventPoller<T> eventPoller;
    private T tempPolledEvent;

    private EventPoller.Handler<T> pollerHandler = new EventPoller.Handler<T>() {
        @Override
        public boolean onEvent(final T event, final long sequence, final boolean endOfBatch) throws Exception {
            tempPolledEvent.copyFrom(event);
            return false;
        }
    };

    public DisruptorMPSCQueue(EventFactory<T> typeConstructor, int size) {
        ringBuffer = RingBuffer.createMultiProducer(typeConstructor, size);
        eventPoller = ringBuffer.newPoller();
        ringBuffer.addGatingSequences(eventPoller.getSequence());
    }

    /**
     * Blocking, can be called from any thread, the event will be copied to the ringBuffer
     */
    public void put(final T event) {
        long sequence = ringBuffer.next(); // blocked by ringBuffer's gatingSequence
        ringBuffer.get(sequence).copyFrom(event);
        ringBuffer.publish(sequence);
    }

    /**
     * Not blocking, can be called from any thread, the event will be copied to the ringBuffer
     *
     * @throws IllegalStateException if the element cannot be added at this time due to capacity restrictions
     */
    public void offer(final T event) {
        long sequence;
        try {
            sequence = ringBuffer.tryNext();
        } catch (InsufficientCapacityException e) {
            throw new IllegalStateException(e); // to mimic blockingQueue
        }
        ringBuffer.get(sequence).copyFrom(event);
        ringBuffer.publish(sequence);
    }

    /**
     * Retrieve top of the queue(removes from the queue). NOT thread-safe, can be called from one thread only.
     *
     * @param destination top of the queue will be copied to destination
     * @return destination object or null if the queue is empty
     */
    public T poll(final T destination) {
        try {
            tempPolledEvent = destination;  // yea, the poller usage is a bit dumb
            EventPoller.PollState poll = eventPoller.poll(pollerHandler);
            if (poll == EventPoller.PollState.PROCESSING) {
                return tempPolledEvent;
            } else {
                return null;
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}