Class SingleConsumerQueue<E>
- Type Parameters:
E
- the type of elements held in this collection
- All Implemented Interfaces:
Serializable
,Iterable<E>
,Collection<E>
,Queue<E>
null
elements.
A SingleConsumerQueue
is an appropriate choice when many producer threads will share
access to a common collection and a single consumer thread drains it. This collection is useful
in scenarios such as implementing flat combining, actors, or lock amortization.
This implementation employs combination to transfer elements between threads that are producing concurrently. This approach avoids contention on the queue by combining colliding operations that have identical semantics. When a pair of producers collide, the task of performing the combined set of operations is delegated to one of the threads and the other thread optionally waits for its operation to be completed. This decision of whether to wait for completion is determined by constructing either a linearizable or optimistic queue.
Iterators are weakly consistent, returning elements reflecting the state of the queue at
some point at or since the creation of the iterator. They do not throw ConcurrentModificationException
, and may proceed concurrently with other operations.
Elements contained in the queue since the creation of the iterator will be returned exactly once.
Beware that it is the responsibility of the caller to ensure that a consumer has exclusive read access to the queue. This implementation does not include fail-fast behavior to guard against incorrect consumer usage.
Beware that, unlike in most collections, the size
method is NOT a
constant-time operation. Because of the asynchronous nature of these queues, determining the
current number of elements requires a traversal of the elements, and so may report inaccurate
results if this collection is modified during traversal.
Warning: This class is scheduled for removal in version 3.0.0.
- See Also:
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescription(package private) static final class
(package private) static class
(package private) static final class
A proxy that is serialized instead of the queue. -
Field Summary
FieldsModifier and TypeFieldDescription(package private) final AtomicReference<SingleConsumerQueue.Node<E>>[]
(package private) static final int
The number of slots in the elimination array.(package private) static final int
The mask value for indexing into the arena.(package private) final Function<E,
SingleConsumerQueue.Node<E>> (package private) static final int
The number of CPUs(package private) static final Function<?,
?> The factory for creating an optimistic node.(package private) static final long
The offset to the thread-specific probe field.(package private) static final long
(package private) static final int
The number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an operation.Fields inherited from class com.github.benmanes.caffeine.SCQHeader.HeadAndTailRef
tail, TAIL_OFFSET
Fields inherited from class com.github.benmanes.caffeine.SCQHeader.PadHeadAndTail
p20, p21, p22, p23, p24, p25, p26, p27, p30, p31, p32, p33, p34, p35, p36
Fields inherited from class com.github.benmanes.caffeine.SCQHeader.HeadRef
head
-
Constructor Summary
ConstructorsModifierConstructorDescriptionprivate
SingleConsumerQueue
(Function<E, SingleConsumerQueue.Node<E>> factory) -
Method Summary
Modifier and TypeMethodDescriptionboolean
boolean
addAll
(Collection<? extends E> c) (package private) void
append
(SingleConsumerQueue.Node<E> first, SingleConsumerQueue.Node<E> last) Adds the linked list of nodes to the queue.(package private) static int
ceilingPowerOfTwo
(int x) boolean
(package private) static <E> SingleConsumerQueue.Node<E>
findLast
(SingleConsumerQueue.Node<E> node) Returns the last node in the linked list.(package private) static int
index()
Returns the arena index for the current thread.boolean
isEmpty()
iterator()
static <E> SingleConsumerQueue<E>
Creates a queue with a linearizable backoff strategy.boolean
static <E> SingleConsumerQueue<E>
Creates a queue with an optimistic backoff strategy.peek()
poll()
private void
readObject
(ObjectInputStream stream) int
size()
(package private) SingleConsumerQueue.Node<E>
transferOrCombine
(SingleConsumerQueue.Node<E> first, SingleConsumerQueue.Node<E> last) Attempts to receive a linked list from a waiting producer or transfer the specified linked list to an arriving producer.(package private) Object
Methods inherited from class com.github.benmanes.caffeine.SCQHeader.HeadAndTailRef
casTail, lazySetTail
Methods inherited from class java.util.AbstractQueue
clear, element, remove
Methods inherited from class java.util.AbstractCollection
containsAll, remove, removeAll, retainAll, toArray, toArray, toString
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
Methods inherited from interface java.util.Collection
clear, containsAll, equals, hashCode, parallelStream, remove, removeAll, removeIf, retainAll, spliterator, stream, toArray, toArray, toArray
-
Field Details
-
NCPU
static final int NCPUThe number of CPUs -
ARENA_LENGTH
static final int ARENA_LENGTHThe number of slots in the elimination array. -
ARENA_MASK
static final int ARENA_MASKThe mask value for indexing into the arena. -
OPTIMISIC
The factory for creating an optimistic node. -
SPINS
static final int SPINSThe number of times to spin (doing nothing except polling a memory location) before giving up while waiting to eliminate an operation. Should be zero on uniprocessors. On multiprocessors, this value should be large enough so that two threads exchanging items as fast as possible block only when one of them is stalled (due to GC or preemption), but not much longer, to avoid wasting CPU resources. Seen differently, this value is a little over half the number of cycles of an average context switch time on most systems. The value here is approximately the average of those across a range of tested systems. -
PROBE
static final long PROBEThe offset to the thread-specific probe field. -
arena
-
factory
-
serialVersionUID
static final long serialVersionUID- See Also:
-
-
Constructor Details
-
SingleConsumerQueue
-
-
Method Details
-
ceilingPowerOfTwo
static int ceilingPowerOfTwo(int x) -
optimistic
Creates a queue with an optimistic backoff strategy. A thread completes its operation without waiting after it successfully hands off the additional element(s) to another producing thread for batch insertion. This optimistic behavior may result in additions not appearing in FIFO order due to the backoff strategy trying to compensate for queue contention.- Type Parameters:
E
- the type of elements held in this collection- Returns:
- a new queue where producers complete their operation immediately if combined with another producing thread's
-
linearizable
Creates a queue with a linearizable backoff strategy. A thread waits for a completion signal if it successfully hands off the additional element(s) to another producing thread for batch insertion.- Type Parameters:
E
- the type of elements held in this collection- Returns:
- a new queue where producers wait for a completion signal after combining its addition with another producing thread's
-
isEmpty
public boolean isEmpty()- Specified by:
isEmpty
in interfaceCollection<E>
- Overrides:
isEmpty
in classAbstractCollection<E>
-
size
public int size()- Specified by:
size
in interfaceCollection<E>
- Specified by:
size
in classAbstractCollection<E>
-
contains
- Specified by:
contains
in interfaceCollection<E>
- Overrides:
contains
in classAbstractCollection<E>
-
peek
-
offer
-
poll
-
add
- Specified by:
add
in interfaceCollection<E>
- Specified by:
add
in interfaceQueue<E>
- Overrides:
add
in classAbstractQueue<E>
-
addAll
- Specified by:
addAll
in interfaceCollection<E>
- Overrides:
addAll
in classAbstractQueue<E>
-
append
Adds the linked list of nodes to the queue. -
transferOrCombine
@Nullable SingleConsumerQueue.Node<E> transferOrCombine(@Nonnull SingleConsumerQueue.Node<E> first, SingleConsumerQueue.Node<E> last) Attempts to receive a linked list from a waiting producer or transfer the specified linked list to an arriving producer.- Parameters:
first
- the first node in the linked list to try to transferlast
- the last node in the linked list to try to transfer- Returns:
- either
null
if the element was transferred, the first node if neither a transfer nor receive were successful, or the received last element from a producer
-
index
static int index()Returns the arena index for the current thread. -
findLast
Returns the last node in the linked list. -
iterator
- Specified by:
iterator
in interfaceCollection<E>
- Specified by:
iterator
in interfaceIterable<E>
- Specified by:
iterator
in classAbstractCollection<E>
-
writeReplace
Object writeReplace() -
readObject
- Throws:
InvalidObjectException
-