public class RingBufferControlMultiplexor extends Object
This is a helper class for managing a set of RingBufferControl
s for use in a "multiple-publisher to single-consumer" thread configuration. Performance comparisons to LMAX-exchanges's Disruptor using
a Multi-publisher, single- consumer configuration using their own 3-to-1 test seem to indicate about a 15x performance increase. However, keep in mind that this is a substantially different means of
implementing a multi-publisher to single-consumer than the Disruptor's implementation, and can therefore have substantially different behavior. Most obviously this will be related to message ordering in
which this implementation is substantially more likely to create out of order messages between different publishers.
Modifier and Type | Field and Description |
---|---|
static int |
SPIN_TRIES
The number of spin tried before the
availableTo() backs down to using Thread.yield() . |
Constructor and Description |
---|
RingBufferControlMultiplexor(int numOfPublishers,
int sizePowerOfTwo)
Instantiate a
RingBufferControlMultiplexor with a given number of publishers and a given buffer size for each RingBufferControl . |
RingBufferControlMultiplexor(int sizePowerOfTwo,
Object... controlledData)
Instantiate a
RingBufferControlMultiplexor with a given set of actual buffers being controlled and a given buffer size for each RingBufferControl . |
Modifier and Type | Method and Description |
---|---|
long |
availableTo()
availableTo() is a consumer side call that will block until a value (or several) have been published to at least one of the underlying RingBufferControl s. |
RingBufferControl |
get(int index)
This will retrieve the
RingBufferControl that corresponds to the index given. |
Object |
getControlled(int index)
This will retrieve the corresponding resource that's being controlled by the
RingBufferControl selected by that index. |
Object |
getCurrentControlled()
Given that the
availableTo() or tryAvailableTo() has returned a valid sequence, this method will return the current managed
resource that corresponds to the RingBufferControl that has consumer data pending on it. |
int |
getCurrentIndex()
Given that the
availableTo() or tryAvailableTo() has returned a valid sequence, this method will return the current
RingBufferControl index that the sequence is for. |
int |
index(long sequence)
This method will convert the sequence to an index of a ring buffer.
|
void |
notifyProcessed()
This method must be called by the consumer once the consumer is finished with the currently published results.
|
long |
tryAvailableTo()
This method allows the consumer side to poll for publishing events on ANY of the managed
RingBufferControl . |
public static final int SPIN_TRIES
availableTo()
backs down to using Thread.yield()
.public RingBufferControlMultiplexor(int numOfPublishers, int sizePowerOfTwo) throws IllegalArgumentException
RingBufferControlMultiplexor
with a given number of publishers and a given buffer size for each RingBufferControl
.numOfPublishers
- is the number of publisherssizePowerOfTwo
- is the size of each RingBufferControl
and it must be a power of 2 or an IllegalArgumentException
is thrown.IllegalArgumentException
- if the sizePowerOfTwo parameter isn't a power of 2.public RingBufferControlMultiplexor(int sizePowerOfTwo, Object... controlledData) throws IllegalArgumentException
RingBufferControlMultiplexor
with a given set of actual buffers being controlled and a given buffer size for each RingBufferControl
. The RingBufferControlMultiplexor
can help a consumer manage the resources that each RingBufferControl
controls. After the consumer calls availableTo()
or
tryAvailableTo()
the specific resource that corresponds with the RingBufferControl
that indicated available processing could be selected by a subsequent call
to getCurrentControlled()
.sizePowerOfTwo
- is the size of each RingBufferControl
and it must be a power of 2 or an IllegalArgumentException
is thrown.IllegalArgumentException
- if the sizePowerOfTwo parameter isn't a power of 2.public RingBufferControl get(int index)
RingBufferControl
that corresponds to the index given. This is the way a particular publisher should retrieve its corresponding RingBufferControl
.public Object getControlled(int index)
RingBufferControl
selected by that index.public long availableTo()
availableTo()
is a consumer side call that will block until a value (or several) have been published to at least one of the underlying RingBufferControl
s. It
will return the sequence that represents the position the publisher has published to.
Upon returning a valid sequence the internal state will be set so that the consumer can retrieve information and resources about which RingBufferControl
can be retrieved using
getCurrentIndex()
and getCurrentControlled()
, which should only be called from the consumer side once a normal sequence has been
returned.
This method can return RingBufferConsumerControl.ACQUIRE_STOP_REQUEST
which means the publisher has called RingBufferControl.publishStop()
on ALL of the RingBufferControl
s being managed
by this instance. Once the consumer receives this value all of the managed RingBufferControl
have been reset.
If this method returns a valid sequence, it MUST NOT be called again prior to calling notifyProcessed()
Note, the wait strategy uses a spin that quickly (SPIN_TRIES
iterations) backs down to using Thread.yield()
.
public long tryAvailableTo()
This method allows the consumer side to poll for publishing events on ANY of the managed RingBufferControl
. It will return the same thing that availableTo()
will return, but can also return RingBufferConsumerControl.UNAVAILABLE
which means there's no currently published value available on any underlying RingBufferControl
s.
If this method returns a valid sequence, it MUST NOT be called again prior to calling notifyProcessed()
public int getCurrentIndex()
Given that the availableTo()
or tryAvailableTo()
has returned a valid sequence, this method will return the current
RingBufferControl
index that the sequence is for.
This method's return value is only valid in the thread that called one of the acquireTo
methods.
public Object getCurrentControlled()
Given that the availableTo()
or tryAvailableTo()
has returned a valid sequence, this method will return the current managed
resource that corresponds to the RingBufferControl
that has consumer data pending on it. If the RingBufferControlMultiplexor
hasn't been instantiated to manage the underlying controlled
resources than this method will return null
.
This method's return value is only valid in the thread that called one of the acquireTo
methods.
public void notifyProcessed()
This method must be called by the consumer once the consumer is finished with the currently published results. The value provided should be the value returned from
availableTo()
or tryAvailableTo()
, however, IT MUST NOT BE RingBufferConsumerControl.UNAVAILABLE
or
RingBufferConsumerControl.ACQUIRE_STOP_REQUEST
.
This method MUST be called prior to a subsequent call to either of the acquireTo
methods.
public int index(long sequence)
Copyright © 2018. All rights reserved.