Clearmatics Libff  0.1
C++ library for Finite Fields and Elliptic Curves
concurrent_fifo.tcc
Go to the documentation of this file.
1 /** @file
2  *****************************************************************************
3  * @author This file is part of libff, developed by Clearmatics Ltd
4  * (originally developed by SCIPR Lab) and contributors
5  * (see AUTHORS).
6  * @copyright MIT license (see LICENSE file)
7  *****************************************************************************/
8 
9 #ifndef __LIBFF_COMMON_CONCURRENT_FIFO_TCC__
10 #define __LIBFF_COMMON_CONCURRENT_FIFO_TCC__
11 
12 #include "libff/common/concurrent_fifo.hpp"
13 
14 #include <assert.h>
15 #include <stdlib.h>
16 #include <thread>
17 
18 namespace libff
19 {
20 
21 // concurrent_fifo_spsc
22 
23 template<typename T>
24 concurrent_fifo_spsc<T>::concurrent_fifo_spsc(size_t capacity)
25  : _capacity(capacity)
26  , _buffer((T *)malloc(capacity * sizeof(T)))
27  , _producer_next_idx(0)
28  , _producer_num_produced(0)
29  , _consumer_next_idx(0)
30  , _consumer_num_consumed(0)
31 {
32 }
33 
34 template<typename T> concurrent_fifo_spsc<T>::~concurrent_fifo_spsc()
35 {
36  free(_buffer);
37 }
38 
39 template<typename T> size_t concurrent_fifo_spsc<T>::capacity() const
40 {
41  return _capacity;
42 }
43 
44 template<typename T> T *concurrent_fifo_spsc<T>::try_enqueue_begin()
45 {
46  const size_t consumer_num_consumed =
47  _consumer_num_consumed.load(std::memory_order_relaxed);
48  const size_t producer_num_produced =
49  _producer_num_produced.load(std::memory_order_relaxed);
50  const size_t num_empty_slots =
51  consumer_num_consumed + _capacity - producer_num_produced;
52  // TODO: handle overflow
53  assert(num_empty_slots <= _capacity);
54 
55  if (num_empty_slots > 0) {
56  return &_buffer[_producer_next_idx];
57  }
58 
59  return nullptr;
60 }
61 
62 template<typename T> T *concurrent_fifo_spsc<T>::enqueue_begin_wait()
63 {
64  T *v = try_enqueue_begin();
65  while (!v) {
66  std::this_thread::yield();
67  v = try_enqueue_begin();
68  }
69  return v;
70 }
71 
72 template<typename T> void concurrent_fifo_spsc<T>::enqueue_end()
73 {
74  // Caller is expected to only call this if try_enqueue_begin succeeded. No
75  // need to check consumer state.
76 
77  _producer_next_idx = (_producer_next_idx + 1) % _capacity;
78  const size_t producer_num_produced =
79  _producer_num_produced.load(std::memory_order_relaxed);
80  _producer_num_produced.store(
81  producer_num_produced + 1, std::memory_order_release);
82 }
83 
84 template<typename T> const T *concurrent_fifo_spsc<T>::try_dequeue_begin()
85 {
86  const size_t producer_num_produced =
87  _producer_num_produced.load(std::memory_order_relaxed);
88  const size_t consumer_num_consumed =
89  _consumer_num_consumed.load(std::memory_order_relaxed);
90  const size_t num_available = producer_num_produced - consumer_num_consumed;
91  assert(num_available <= _capacity);
92  // TODO: handle overflow
93 
94  if (num_available > 0) {
95  return &_buffer[_consumer_next_idx];
96  }
97 
98  return nullptr;
99 }
100 
101 template<typename T> const T *concurrent_fifo_spsc<T>::dequeue_begin_wait()
102 {
103  const T *v = try_dequeue_begin();
104  while (!v) {
105  std::this_thread::yield();
106  v = try_dequeue_begin();
107  }
108  return v;
109 }
110 
111 template<typename T> void concurrent_fifo_spsc<T>::dequeue_end()
112 {
113  // Caller must only call if try_dequeue_begin succeeded. No need to check
114  // producer state.
115 
116  _consumer_next_idx = (_consumer_next_idx + 1) % _capacity;
117  const size_t consumer_num_consumed =
118  _consumer_num_consumed.load(std::memory_order_relaxed);
119  _consumer_num_consumed.store(
120  consumer_num_consumed + 1, std::memory_order_release);
121 }
122 
123 // concurrent_buffer_fifo_spsc
124 
125 template<typename T>
126 concurrent_buffer_fifo_spsc<T>::concurrent_buffer_fifo_spsc(
127  size_t num_buffers, size_t num_entries_per_buffer)
128  : _queue(num_buffers)
129  , _entries_per_buffer(num_entries_per_buffer)
130  , _buffers((T **)malloc(num_buffers * sizeof(T *)))
131  , _next_buffer_idx(0)
132 {
133  for (size_t i = 0; i < num_buffers; ++i) {
134  _buffers[i] = (T *)malloc(num_entries_per_buffer * sizeof(T));
135  }
136 }
137 
138 template<typename T>
139 concurrent_buffer_fifo_spsc<T>::~concurrent_buffer_fifo_spsc()
140 {
141  const size_t num_buffers = _queue.capacity();
142  // Free all buffers, then the list of pointers.
143  for (size_t i = 0; i < num_buffers; ++i) {
144  free(_buffers[i]);
145  }
146  free(_buffers);
147 }
148 
149 template<typename T> T *concurrent_buffer_fifo_spsc<T>::try_enqueue_begin()
150 {
151  T **const buffer_ptr = _queue.try_enqueue_begin();
152  if (buffer_ptr) {
153  return enqueue_next_buffer(buffer_ptr);
154  }
155  return nullptr;
156 }
157 
158 template<typename T> T *concurrent_buffer_fifo_spsc<T>::enqueue_begin_wait()
159 {
160  return enqueue_next_buffer(_queue.enqueue_begin_wait());
161 }
162 
163 template<typename T> void concurrent_buffer_fifo_spsc<T>::enqueue_end()
164 {
165  // The buffer location has already been written into the queue, ready for
166  // dequeuing. Nothing to do except call the underlying queue.
167  _queue.enqueue_end();
168 }
169 
170 template<typename T>
171 const T *concurrent_buffer_fifo_spsc<T>::try_dequeue_begin()
172 {
173  T *const *const buffer_ptr = _queue.try_dequeue_begin();
174  if (buffer_ptr) {
175  return *buffer_ptr;
176  }
177  return nullptr;
178 }
179 
180 template<typename T>
181 const T *concurrent_buffer_fifo_spsc<T>::dequeue_begin_wait()
182 {
183  T *const *const buffer_ptr = _queue.dequeue_begin_wait();
184  return *buffer_ptr;
185 }
186 
187 template<typename T> void concurrent_buffer_fifo_spsc<T>::dequeue_end()
188 {
189  _queue.dequeue_end();
190 }
191 
192 template<typename T>
193 T *concurrent_buffer_fifo_spsc<T>::enqueue_next_buffer(T **const buffer_ptr)
194 {
195  T *const buffer = _buffers[_next_buffer_idx];
196  _next_buffer_idx = (_next_buffer_idx + 1) % _queue.capacity();
197 
198  *buffer_ptr = buffer;
199  return buffer;
200 }
201 
202 } // namespace libff
203 
204 #endif // __LIBFF_COMMON_CONCURRENT_FIFO_TCC__