2 *****************************************************************************
3 * @author This file is part of libff, developed by Clearmatics Ltd
4 * (originally developed by SCIPR Lab) and contributors
6 * @copyright MIT license (see LICENSE file)
7 *****************************************************************************/
9 #ifndef __LIBFF_COMMON_CONCURRENT_FIFO_TCC__
10 #define __LIBFF_COMMON_CONCURRENT_FIFO_TCC__
12 #include "libff/common/concurrent_fifo.hpp"
21 // concurrent_fifo_spsc
24 concurrent_fifo_spsc<T>::concurrent_fifo_spsc(size_t capacity)
26 , _buffer((T *)malloc(capacity * sizeof(T)))
27 , _producer_next_idx(0)
28 , _producer_num_produced(0)
29 , _consumer_next_idx(0)
30 , _consumer_num_consumed(0)
34 template<typename T> concurrent_fifo_spsc<T>::~concurrent_fifo_spsc()
39 template<typename T> size_t concurrent_fifo_spsc<T>::capacity() const
44 template<typename T> T *concurrent_fifo_spsc<T>::try_enqueue_begin()
46 const size_t consumer_num_consumed =
47 _consumer_num_consumed.load(std::memory_order_relaxed);
48 const size_t producer_num_produced =
49 _producer_num_produced.load(std::memory_order_relaxed);
50 const size_t num_empty_slots =
51 consumer_num_consumed + _capacity - producer_num_produced;
52 // TODO: handle overflow
53 assert(num_empty_slots <= _capacity);
55 if (num_empty_slots > 0) {
56 return &_buffer[_producer_next_idx];
62 template<typename T> T *concurrent_fifo_spsc<T>::enqueue_begin_wait()
64 T *v = try_enqueue_begin();
66 std::this_thread::yield();
67 v = try_enqueue_begin();
72 template<typename T> void concurrent_fifo_spsc<T>::enqueue_end()
74 // Caller is expected to only call this if try_enqueue_begin succeeded. No
75 // need to check consumer state.
77 _producer_next_idx = (_producer_next_idx + 1) % _capacity;
78 const size_t producer_num_produced =
79 _producer_num_produced.load(std::memory_order_relaxed);
80 _producer_num_produced.store(
81 producer_num_produced + 1, std::memory_order_release);
84 template<typename T> const T *concurrent_fifo_spsc<T>::try_dequeue_begin()
86 const size_t producer_num_produced =
87 _producer_num_produced.load(std::memory_order_relaxed);
88 const size_t consumer_num_consumed =
89 _consumer_num_consumed.load(std::memory_order_relaxed);
90 const size_t num_available = producer_num_produced - consumer_num_consumed;
91 assert(num_available <= _capacity);
92 // TODO: handle overflow
94 if (num_available > 0) {
95 return &_buffer[_consumer_next_idx];
101 template<typename T> const T *concurrent_fifo_spsc<T>::dequeue_begin_wait()
103 const T *v = try_dequeue_begin();
105 std::this_thread::yield();
106 v = try_dequeue_begin();
111 template<typename T> void concurrent_fifo_spsc<T>::dequeue_end()
113 // Caller must only call if try_dequeue_begin succeeded. No need to check
116 _consumer_next_idx = (_consumer_next_idx + 1) % _capacity;
117 const size_t consumer_num_consumed =
118 _consumer_num_consumed.load(std::memory_order_relaxed);
119 _consumer_num_consumed.store(
120 consumer_num_consumed + 1, std::memory_order_release);
123 // concurrent_buffer_fifo_spsc
126 concurrent_buffer_fifo_spsc<T>::concurrent_buffer_fifo_spsc(
127 size_t num_buffers, size_t num_entries_per_buffer)
128 : _queue(num_buffers)
129 , _entries_per_buffer(num_entries_per_buffer)
130 , _buffers((T **)malloc(num_buffers * sizeof(T *)))
131 , _next_buffer_idx(0)
133 for (size_t i = 0; i < num_buffers; ++i) {
134 _buffers[i] = (T *)malloc(num_entries_per_buffer * sizeof(T));
139 concurrent_buffer_fifo_spsc<T>::~concurrent_buffer_fifo_spsc()
141 const size_t num_buffers = _queue.capacity();
142 // Free all buffers, then the list of pointers.
143 for (size_t i = 0; i < num_buffers; ++i) {
149 template<typename T> T *concurrent_buffer_fifo_spsc<T>::try_enqueue_begin()
151 T **const buffer_ptr = _queue.try_enqueue_begin();
153 return enqueue_next_buffer(buffer_ptr);
158 template<typename T> T *concurrent_buffer_fifo_spsc<T>::enqueue_begin_wait()
160 return enqueue_next_buffer(_queue.enqueue_begin_wait());
163 template<typename T> void concurrent_buffer_fifo_spsc<T>::enqueue_end()
165 // The buffer location has already been written into the queue, ready for
166 // dequeuing. Nothing to do except call the underlying queue.
167 _queue.enqueue_end();
171 const T *concurrent_buffer_fifo_spsc<T>::try_dequeue_begin()
173 T *const *const buffer_ptr = _queue.try_dequeue_begin();
181 const T *concurrent_buffer_fifo_spsc<T>::dequeue_begin_wait()
183 T *const *const buffer_ptr = _queue.dequeue_begin_wait();
187 template<typename T> void concurrent_buffer_fifo_spsc<T>::dequeue_end()
189 _queue.dequeue_end();
193 T *concurrent_buffer_fifo_spsc<T>::enqueue_next_buffer(T **const buffer_ptr)
195 T *const buffer = _buffers[_next_buffer_idx];
196 _next_buffer_idx = (_next_buffer_idx + 1) % _queue.capacity();
198 *buffer_ptr = buffer;
204 #endif // __LIBFF_COMMON_CONCURRENT_FIFO_TCC__