public abstract class RingBufferConsumerControl extends Object
This code is substantially based on the ingenious work done by Martin Thompson on what he calls "Mechanical Sympathy." It leans heavily on the source code from version 3.0.0.beta2 of the LMAX-exchange Disruptor but has been completely refactored in order to invert separate the control mechanism from what is being controlled and to simplify the API.
For more information on the LMAX Disruptor, see:
http://lmax-exchange.github.com/disruptor/
Employ Martin Thompson's "mechanical sympathy" to the concurrency control mechanism for a ring buffer. It directly borrows from their code base and as of the time this class was written, it has about 20-30% more throughput than their own ring buffer as measured by their own OneToOne benchmark test and about 15x faster as measured on their ThreeToOne test (on my machine, 64bit Linux, 64bit Java 6 Jvm).
This class is incredibly temperamental and must strictly be used the way it was intended. Misuse can easily lead to lockups, missed sequences, etc.
In general, the RingBufferControl
is the logic that controls the entries in a
ring buffer, but not the ring buffer itself. These classes make no assumptions about
where the data is stored (other than in physical memory), or what type it is.
The RingBufferControl
is completely analogous to a traditional "condition
variable." Just like a Condition Variable is the synchronization mechanism that gates
concurrent access to some 'condition', but says nothing about what the 'condition'
actually is, the RingBufferControl
gates concurrent access to the publishing
and consuming of data in a ring buffer.
The 'consumer side' control and the 'publishing side' control are broken into two separate classes. This class represents control of the consumer side of the ring buffer. while RingBufferControl
additionally contains the publish side control.
These two base primitives can only be used with one consuming thread and one publishing thread, however, they form the building blocks for several other configurations (see
RingBufferControlMulticaster
and RingBufferControlMultiplexor
)
Modifier and Type | Class and Description |
---|---|
static interface |
RingBufferConsumerControl.ConsumerWaitStrategy
This interface can be implemented to provide various custom wait strategies for the consumer side of the control.
|
Modifier and Type | Field and Description |
---|---|
static long |
ACQUIRE_STOP_REQUEST
This value can be returned from
availableTo() or tryAvailableTo() to inform the consumer that the published has called
RingBufferControl.publishStop() |
protected int |
bufferSize |
protected int |
indexMask |
protected static long |
INITIAL_CURSOR_VALUE
Set to -1 as sequence starting point
|
protected Iterator |
iter |
long |
p1
Making this public avoids it being optimized away
|
long |
p2
Making this public avoids it being optimized away
|
long |
p3
Making this public avoids it being optimized away
|
long |
p4
Making this public avoids it being optimized away
|
long |
p5
Making this public avoids it being optimized away
|
long |
p6
Making this public avoids it being optimized away
|
protected com.lmax.disruptor.Sequence |
publishCursor |
static RingBufferConsumerControl.ConsumerWaitStrategy |
spin
Using this strategy provides 'spin' approach to waiting for data to be available for the consumer.
|
protected com.lmax.disruptor.util.PaddedLong |
stop |
protected boolean |
stopIsCommon |
protected com.lmax.disruptor.Sequence |
tail |
static long |
UNAVAILABLE
This value can be returned from
tryAvailableTo() to indicate that there are no pending published values available. |
protected RingBufferConsumerControl.ConsumerWaitStrategy |
waitStrategy |
static RingBufferConsumerControl.ConsumerWaitStrategy |
yield
Using this strategy provides 'yield' approach to waiting for data to be available for the consumer.
|
Modifier | Constructor and Description |
---|---|
protected |
RingBufferConsumerControl(int sizePowerOfTwo,
RingBufferConsumerControl.ConsumerWaitStrategy waitStrategy,
com.lmax.disruptor.Sequence cursor) |
protected |
RingBufferConsumerControl(int sizePowerOfTwo,
RingBufferConsumerControl.ConsumerWaitStrategy waitStrategy,
com.lmax.disruptor.Sequence cursor,
com.lmax.disruptor.util.PaddedLong commonStop) |
Modifier and Type | Method and Description |
---|---|
long |
availableTo()
availableTo() is a consumer side call that will block using the wait strategy until a value (or several) have been published. |
protected long |
availableTo(long requestedSequence) |
protected void |
clear() |
<T> Iterable<T> |
consumeAsIterable(T[] values) |
<T> Iterator<T> |
consumeAsIterator(T[] values) |
protected void |
doNotifyProcessed(long sequence) |
protected com.lmax.disruptor.Sequence |
getTail() |
int |
index(long sequence)
This method will convert the sequence to an index of a ring buffer.
|
boolean |
isShutdown()
Once the publisher calls
RingBufferControl.publishStop() and the consumer acquires it this method will return true . |
void |
notifyProcessed()
This method must be called by the consumer once the consumer is finished with the currently published results.
|
long |
sumPaddingToPreventOptimisation() |
long |
tryAvailableTo()
This method allows the consumer side to poll for publishing events.
|
protected long |
tryAvailableTo(long requestedSequence) |
protected static final long INITIAL_CURSOR_VALUE
public static final long ACQUIRE_STOP_REQUEST
availableTo()
or tryAvailableTo()
to inform the consumer that the published has called
RingBufferControl.publishStop()
public static final long UNAVAILABLE
tryAvailableTo()
to indicate that there are no pending published values available.public static final RingBufferConsumerControl.ConsumerWaitStrategy spin
public static final RingBufferConsumerControl.ConsumerWaitStrategy yield
protected final int bufferSize
protected final int indexMask
protected final RingBufferConsumerControl.ConsumerWaitStrategy waitStrategy
protected final com.lmax.disruptor.Sequence publishCursor
protected final com.lmax.disruptor.Sequence tail
protected final com.lmax.disruptor.util.PaddedLong stop
protected boolean stopIsCommon
public volatile long p1
public volatile long p2
public volatile long p3
public volatile long p4
public volatile long p5
public volatile long p6
protected Iterator iter
protected RingBufferConsumerControl(int sizePowerOfTwo, RingBufferConsumerControl.ConsumerWaitStrategy waitStrategy, com.lmax.disruptor.Sequence cursor) throws IllegalArgumentException
IllegalArgumentException
protected RingBufferConsumerControl(int sizePowerOfTwo, RingBufferConsumerControl.ConsumerWaitStrategy waitStrategy, com.lmax.disruptor.Sequence cursor, com.lmax.disruptor.util.PaddedLong commonStop) throws IllegalArgumentException
IllegalArgumentException
public long availableTo()
availableTo()
is a consumer side call that will block using the wait strategy until a value (or several) have been published. It will return the sequence that represents the
position the publisher has published to.
This method can return ACQUIRE_STOP_REQUEST
which means the publisher has called RingBufferControl.publishStop()
. Once the consumer receives this value the
RingBufferControl
has been reset.
public long tryAvailableTo()
This method allows the consumer side to poll for publishing events. It will return what availableTo()
will return but can also return UNAVAILABLE
which
means there's no currently published value available
public void notifyProcessed()
public int index(long sequence)
public <T> Iterator<T> consumeAsIterator(T[] values)
public <T> Iterable<T> consumeAsIterable(T[] values)
public boolean isShutdown()
RingBufferControl.publishStop()
and the consumer acquires it this method will return true
. It will also return true
up until the first sequence
is retrieved by a consumer. It will return false
at all other times.public long sumPaddingToPreventOptimisation()
protected com.lmax.disruptor.Sequence getTail()
protected void clear()
protected long tryAvailableTo(long requestedSequence)
protected long availableTo(long requestedSequence)
protected final void doNotifyProcessed(long sequence)
Copyright © 2018. All rights reserved.