SharedMemoryQueue.hpp
Go to the documentation of this file.
1 // @formatter:off
2 //
3 // Balau core C++ library
4 //
5 // Copyright (C) 2017 Bora Software (contact@borasoftware.com)
6 //
7 // Licensed under the Boost Software License - Version 1.0 - August 17th, 2003.
8 // See the LICENSE file for the full license text.
9 //
10 
16 
17 #ifndef COM_BORA_SOFTWARE__BALAU_INTERPROCESS__SHARED_MEMORY_QUEUE
18 #define COM_BORA_SOFTWARE__BALAU_INTERPROCESS__SHARED_MEMORY_QUEUE
19 
24 #include <Balau/Interprocess/Impl/SharedMemoryQueueImpl.hpp>
25 #include <Balau/Type/UUID.hpp>
26 #include <Balau/Util/Vectors.hpp>
28 
29 #include <boost/archive/binary_iarchive.hpp>
30 #include <boost/archive/binary_oarchive.hpp>
31 #include <boost/date_time/posix_time/posix_time_types.hpp>
32 #include <boost/interprocess/ipc/message_queue.hpp>
33 #include <boost/interprocess/managed_shared_memory.hpp>
34 #include <boost/iostreams/device/back_inserter.hpp>
35 #include <boost/iostreams/filtering_stream.hpp>
36 #include <boost/iostreams/stream.hpp>
37 
38 #include <deque>
39 
40 // Ignore false positives for constructor field initialisation.
41 #pragma clang diagnostic push
42 #pragma ide diagnostic ignored "cppcoreguidelines-pro-type-member-init"
43 
44 namespace Balau::Interprocess {
45 
79 template <typename T> class SharedMemoryQueue : public Container::BlockingQueue<T> {
80  private: using CharVector = std::vector<char>;
81 
82  public: static const unsigned int headerSize = sizeof(Impl::QueueHeader);
83  public: static const unsigned int minimumChunkSize = 2 * headerSize;
84 
90  public: explicit SharedMemoryQueue(unsigned int capacity, bool throwOnOversize_ = false)
91  : SharedMemoryQueue(capacity, calculateDefaultBufferSize(), "SMQ_" + UUID().asString(), throwOnOversize_) {}
92 
98  public: SharedMemoryQueue(unsigned int capacity, unsigned int bufferSize_, bool throwOnOversize_ = false)
99  : SharedMemoryQueue(capacity, bufferSize_, "SMO_" + UUID().asString(), throwOnOversize_) {}
100 
106  public: SharedMemoryQueue(unsigned int capacity, std::string name_, bool throwOnOversize_ = false)
107  : SharedMemoryQueue(capacity, calculateDefaultBufferSize(), std::move(name_), throwOnOversize_) {}
108 
116  public: SharedMemoryQueue(unsigned int capacity,
117  unsigned int bufferSize_,
118  std::string name_,
119  bool throwOnOversize_ = false)
120  : name(std::move(prepQueue(name_, bufferSize_)))
121  , queue(CreateOnly, name.c_str(), capacity, bufferSize_)
122  , chunkSize(bufferSize_)
123  , queueState(CreateOnly, name + "_queueState")
124  , throwOnOversize(throwOnOversize_) {}
125 
134  unsigned int capacity,
135  std::string name_,
136  bool throwOnOversize_ = false)
139  , capacity
140  , calculateDefaultBufferSize()
141  , true
142  , std::move(name_)
143  , throwOnOversize_
144  ) {}
145 
156  unsigned int capacity,
157  unsigned int bufferSize_,
158  std::string name_,
159  bool throwOnOversize_ = false)
160  : name(std::move(name_))
161  , queue(openOrCreateQueue(capacity, bufferSize_))
162  , chunkSize(bufferSize_)
163  , queueState(OpenOrCreate, name + "_queueState")
164  , throwOnOversize(throwOnOversize_) {}
165 
169  public: explicit SharedMemoryQueue(std::string name_, bool throwOnOversize_ = false)
170  : name(std::move(name_))
171  , queue(openQueue())
172  , chunkSize((unsigned int) queue.get_max_msg_size())
173  , queueState(OpenOnly, name + "_queueState")
174  , throwOnOversize(throwOnOversize_) {}
175 
181  public: void enqueue(T && object) override {
182  enqueue(object, 0);
183  }
184 
192  public: void enqueue(const T & object, unsigned int priority) {
193  const Impl::QueueHeader messageHeader { queueState->sequenceNumber++, 1, 0, 0 };
194  CharVector & marshalBuffer = Impl::SharedMemoryQueueTLS::storage.marshalBuffer;
195  marshalBuffer.clear();
196 
197  marshal(marshalBuffer, object, messageHeader);
198 
199  const unsigned int totalBytes = (unsigned int) marshalBuffer.size() - headerSize;
200  auto * marshalHeader = (Impl::QueueHeader *) marshalBuffer.data();
201  marshalHeader->totalBytes = totalBytes;
202 
203  if (marshalBuffer.size() <= chunkSize) {
204  // The message fits in a single buffer.
205  queue.send(marshalBuffer.data(), marshalBuffer.size(), priority);
206  } else {
207  // The object is split across multiple buffers.
208  // This code avoids copying by writing each header into the marshal buffer,
209  // in the discarded area just before the next data to be sent.
210 
211  if (throwOnOversize) {
214  , ::toString(
215  "The serialized message is too large to fit into a single message ("
216  , marshalBuffer.size()
217  , "/"
218  , chunkSize
219  , ")."
220  )
221  );
222  }
223 
224  const unsigned int chunkCount = calculateChunkCount(totalBytes);
225  size_t dataStart = headerSize;
226 
227  for (unsigned int m = 0; m < chunkCount; m++) {
228  size_t chunkStart = dataStart - headerSize;
229  auto & chunkHeader = * (Impl::QueueHeader *) (marshalBuffer.data() + chunkStart);
230  chunkHeader.sequenceNumber = messageHeader.sequenceNumber;
231  chunkHeader.chunkCount = chunkCount;
232  chunkHeader.chunkNumber = m;
233  chunkHeader.totalBytes = totalBytes;
234 
235  // Account for a potentially partial buffer for the last queue buffer.
236  const size_t chunkBytes = m < chunkCount - 1
237  ? chunkSize
238  : marshalBuffer.size() - chunkStart;
239 
240  queue.send(marshalBuffer.data() + chunkStart, chunkBytes, priority);
241 
242  // The start of the data is moved forward the size of the buffer less the header size.
243  dataStart += chunkSize - headerSize;
244  }
245  }
246  }
247 
254  public: T dequeue() override {
255  CharVector & queueBuffer = Impl::SharedMemoryQueueTLS::storage.queueBuffer;
256  unsigned long receivedSize;
257  unsigned int priority;
258 
259  dequeueNextBuffer(queueBuffer, receivedSize, priority, 0);
260 
261  const auto * queueHeader = (const Impl::QueueHeader *) queueBuffer.data();
262 
263  if (queueHeader->chunkCount == 1) {
264  // The entire object fits in a single buffer.
265  return unmarshal(queueBuffer);
266  } else if (queueHeader->chunkNumber < queueHeader->chunkCount) {
267  // The object is split across multiple buffers, potentially interleaved with other buffers.
268  return performMultiChunkDequeue(
269  queueBuffer, receivedSize, priority
270  , [this] (CharVector &,
271  CharVector & queueBuffer,
272  unsigned long & thisReceivedSize,
273  unsigned int & thisPriority,
274  size_t rejected) {
275  dequeueNextBuffer(queueBuffer, thisReceivedSize, thisPriority, rejected);
276  return true;
277  }
278  );
279  } else {
280  // This is a partially dequeued multi-chunk dequeue which was previously placed on the pending queue.
281  return completeMultiChunkDequeue(
282  queueBuffer, receivedSize, priority
283  , [this] (CharVector &,
284  CharVector & queueBuffer,
285  unsigned long & thisReceivedSize,
286  unsigned int & thisPriority,
287  size_t rejected) {
288  dequeueNextBuffer(queueBuffer, thisReceivedSize, thisPriority, rejected);
289  return true;
290  }
291  );
292  }
293  }
294 
301  public: T tryDequeue() override {
302  return tryDequeue(std::chrono::milliseconds(0));
303  }
304 
311  public: T tryDequeue(std::chrono::milliseconds waitTime) override {
312  CharVector & queueBuffer = Impl::SharedMemoryQueueTLS::storage.queueBuffer;
313 
314  unsigned long receivedSize;
315  unsigned int priority;
316 
317  if (!tryDequeueNextBuffer(queueBuffer, receivedSize, priority, 0, waitTime)) {
318  return T();
319  }
320 
321  const auto * queueHeader = (const Impl::QueueHeader *) queueBuffer.data();
322 
323  if (queueHeader->chunkCount == 1) {
324  // The entire object fits in a single buffer.
325  return unmarshal(queueBuffer);
326  } else if (queueHeader->chunkNumber < queueHeader->chunkCount) {
327  // The object is split across multiple buffers, potentially interleaved with other buffers.
328  return performMultiChunkDequeue(
329  queueBuffer, receivedSize, priority
330  , [waitTime, this] (CharVector & marshalBuffer,
331  CharVector & queueBuffer,
332  unsigned long & thisReceivedSize,
333  unsigned int & thisPriority,
334  size_t rejected) {
335  if (!tryDequeueNextBuffer(queueBuffer, thisReceivedSize, thisPriority, rejected, waitTime)) {
336  pendingBuffers.emplace_front(
337  PendingBuffer(std::move(marshalBuffer), marshalBuffer.size(), thisPriority)
338  );
339 
340  marshalBuffer = CharVector();
341  return false;
342  }
343 
344  return true;
345  }
346  );
347  } else {
348  // This is a partially dequeued multi-chunk dequeue which was previously placed on the pending queue.
349  return completeMultiChunkDequeue(
350  queueBuffer, receivedSize, priority
351  , [waitTime, this] (CharVector & marshalBuffer,
352  CharVector & queueBuffer,
353  unsigned long & thisReceivedSize,
354  unsigned int & thisPriority,
355  size_t rejected) {
356  if (!tryDequeueNextBuffer(queueBuffer, thisReceivedSize, thisPriority, rejected, waitTime)) {
357  pendingBuffers.emplace_front(
358  PendingBuffer(std::move(queueBuffer), queueBuffer.size(), thisPriority)
359  );
360  marshalBuffer = CharVector();
361  return false;
362  }
363 
364  return true;
365  }
366  );
367  }
368  }
369 
370  public: bool full() const override {
371  return queue.get_max_msg() - queue.get_num_msg() == 0;
372  }
373 
374  public: bool empty() const override {
375  return queue.get_num_msg() == 0;
376  }
377 
383  public: std::string getName() const {
384  return name;
385  }
386 
388 
389  //
390  // Shared between the dequeue and tryDequeue methods.
391  // The object is split across multiple buffers, potentially interleaved with other buffers.
392  // The supplied queue buffer is the first received chunk.
393  //
394  private: template <typename DequeueFunctionT>
395  T performMultiChunkDequeue(CharVector & queueBuffer,
396  unsigned long & receivedSize,
397  unsigned int & priority,
398  DequeueFunctionT dequeueFunction) {
399  const auto * queueHeader = (const Impl::QueueHeader *) queueBuffer.data();
400  const unsigned int totalBytes = queueHeader->totalBytes;
401 
402  CharVector & marshalBuffer = Impl::SharedMemoryQueueTLS::storage.marshalBuffer;
403  marshalBuffer.clear();
404  marshalBuffer.resize(headerSize + totalBytes);
405 
406  // Set up the header in case the marshal buffer ends up on the pending queue (tryDequeue only).
407  memcpy(marshalBuffer.data(), queueBuffer.data(), headerSize);
408  auto * marshalHeader = (Impl::QueueHeader *) marshalBuffer.data();
409  marshalHeader->chunkNumber = marshalHeader->chunkCount;
410 
411  size_t rejectedPendingBuffers = 0;
412  const unsigned long sequenceNumber = queueHeader->sequenceNumber;
413  unsigned int partsLeft = queueHeader->chunkCount;
414 
415  while (true) {
416  if (queueHeader->sequenceNumber != sequenceNumber) { // Never true on the first iteration.
417  ++rejectedPendingBuffers;
418  pendingBuffers.emplace_back(PendingBuffer(std::move(queueBuffer), receivedSize, priority));
419  queueBuffer = CharVector(chunkSize);
420  } else {
421  // This can occur out of order (if the chunks arrive out of order).
422  const size_t copyStart = headerSize + queueHeader->chunkNumber * (chunkSize - headerSize);
423  const size_t byteCount = queueBuffer.size() - headerSize;
424  memcpy(marshalBuffer.data() + copyStart, queueBuffer.data() + headerSize, byteCount);
425  ++marshalHeader->chunkNumber;
426  --partsLeft;
427  }
428 
429  if (partsLeft == 0) {
430  return unmarshal(marshalBuffer);
431  }
432 
433  if (!dequeueFunction(marshalBuffer, queueBuffer, receivedSize, priority, rejectedPendingBuffers)) {
434  // Only occurs for tryDequeue.
435  return T();
436  }
437 
438  queueHeader = (const Impl::QueueHeader *) queueBuffer.data();
439  }
440  }
441 
442  //
443  // Shared between the dequeue and tryDequeue methods.
444  // The object is split across multiple buffers, potentially interleaved with other buffers.
445  // The supplied marshal buffer is part filled with previously dequeue chunk(s)
446  // and already the correct length.
447  //
448  private:
449  template <typename DequeueFunctionT>
450  T completeMultiChunkDequeue(CharVector & marshalBuffer,
451  unsigned long & receivedSize,
452  unsigned int & priority,
453  DequeueFunctionT dequeueFunction) {
454  auto * marshalHeader = (Impl::QueueHeader *) marshalBuffer.data();
455  const unsigned int totalBytes = marshalHeader->totalBytes;
456 
458  receivedSize == totalBytes + headerSize
459  , "Received size of partially dequeued buffer is not the same as the size of the buffer."
460  );
461 
462  size_t rejectedPendingBuffers = 0;
463  const unsigned long sequenceNumber = marshalHeader->sequenceNumber;
464  unsigned int partsLeft = 2 * marshalHeader->chunkCount - marshalHeader->chunkNumber;
465  CharVector queueBuffer {};
466 
467  while (true) {
468  if (!dequeueFunction(marshalBuffer, queueBuffer, receivedSize, priority, rejectedPendingBuffers)) {
469  // Only occurs for tryDequeue.
470  return T();
471  }
472 
473  auto * queueHeader = (const Impl::QueueHeader *) queueBuffer.data();
474 
475  if (sequenceNumber != queueHeader->sequenceNumber) {
476  ++rejectedPendingBuffers;
477  pendingBuffers.emplace_back(PendingBuffer(std::move(queueBuffer), receivedSize, priority));
478  queueBuffer = CharVector(chunkSize);
479  } else {
480  // This can occur out of order (if the chunks arrive out of order).
481  const size_t copyStart = headerSize + (chunkSize - headerSize) * queueHeader->chunkNumber;
482  const size_t byteCount = queueBuffer.size() - headerSize;
483  memcpy(marshalBuffer.data() + copyStart, queueBuffer.data() + headerSize, byteCount);
484  ++marshalHeader->chunkNumber;
485  --partsLeft;
486  }
487 
488  if (partsLeft == 0) {
489  return unmarshal(marshalBuffer);
490  }
491  }
492  }
493 
494  //
495  // Dequeue next buffer form the pending list if the pending list size is greater
496  // than rejectedPendingBuffers, otherwise dequeue from the shared memory queue.
497  //
498  private: void dequeueNextBuffer(CharVector & buffer,
499  unsigned long & receivedSize,
500  unsigned int & priority,
501  size_t rejectedPendingBuffers) {
502  if (rejectedPendingBuffers < pendingBuffers.size()) {
503  auto pendingBuffer = pendingBuffers.front();
504  buffer = std::move(pendingBuffer.buffer);
505  pendingBuffers.pop_front();
506  receivedSize = pendingBuffer.receivedSize;
507  priority = pendingBuffer.priority;
508  } else {
509  buffer.resize(chunkSize);
510  queue.receive(buffer.data(), buffer.size(), receivedSize, priority);
511  buffer.resize(receivedSize);
512  }
513  }
514 
515  //
516  // Dequeue next buffer form the pending list if the pending list size is greater
517  // than rejectedPendingBuffers, otherwise dequeue from the shared memory queue.
518  //
519  private: bool tryDequeueNextBuffer(CharVector & buffer,
520  unsigned long & receivedSize,
521  unsigned int & priority,
522  size_t rejectedPendingBuffers,
523  std::chrono::milliseconds waitTime) {
524  if (rejectedPendingBuffers < pendingBuffers.size()) {
525  auto pendingBuffer = pendingBuffers.front();
526  buffer = std::move(pendingBuffer.buffer);
527  pendingBuffers.pop_front();
528  receivedSize = pendingBuffer.receivedSize;
529  priority = pendingBuffer.priority;
530  return true;
531  } else {
532  boost::posix_time::ptime timeout(boost::posix_time::microsec_clock::universal_time());
533  boost::posix_time::time_duration w = boost::posix_time::milliseconds(waitTime.count());
534  timeout += w;
535 
536  buffer.resize(chunkSize);
537 
538  if (queue.timed_receive(buffer.data(), buffer.size(), receivedSize, priority, timeout)) {
539  buffer.resize(receivedSize);
540  return true;
541  } else {
542  return false;
543  }
544  }
545  }
546 
547  private: using SinkDevice = boost::iostreams::back_insert_device<CharVector>;
548  private: using SinkBuffer = boost::iostreams::stream_buffer<SinkDevice>;
549  private: using SourceDevice = boost::iostreams::basic_array_source<char>;
550  private: using SourceBuffer = boost::iostreams::stream_buffer<SourceDevice>;
551 
552  // Marshal the object to bytes.
553  // Assume single queue buffer (add the header in order to avoid later copying).
554  private: void marshal(CharVector & buffer, const T & object, const Impl::QueueHeader & header) const {
555  buffer.resize(headerSize);
556  const char * headerBytes = (const char *) &header;
557  memcpy(buffer.data(), headerBytes, headerSize);
558  SinkBuffer oStreamBuffer { SinkDevice(buffer) };
559  boost::archive::binary_oarchive archive(oStreamBuffer);
560  archive << BoostSerialization(object);
561  }
562 
563  // Unmarshal the bytes to an object.
564  // Skip the initial header bytes before unmarshalling.
565  private: T unmarshal(const CharVector & buffer) {
566  // Uncomment the following line for debug output.
567  //std::cerr << toString(getpid(), " -\n", Util::PrettyPrint::printHexBytes(buffer.data(), buffer.size(), 90, 2), "\n");
568  SourceBuffer iStreamBuffer(SourceDevice(buffer.data() + headerSize, buffer.size()));
569  boost::archive::binary_iarchive archive(iStreamBuffer);
570  T object;
571  archive >> BoostSerialization(object);
572  return object;
573  }
574 
575  // The number of buffers required for a message of length totalBytes.
576  private: unsigned int calculateChunkCount(unsigned int totalBytes) const {
577  const unsigned int netBufferSize = chunkSize - headerSize;
578  return totalBytes / netBufferSize + (totalBytes % netBufferSize != 0);
579  }
580 
581  private: unsigned int calculateDefaultBufferSize() const {
582  // Try marshalling a default constructed object to get an indication
583  // of the serialised size. Multiply this size by the default buffer
584  // size multiplier to get the buffer size.
585  Impl::QueueHeader header {};
586  T object {};
587  CharVector buffer;
588  marshal(buffer, object, header);
589  return (unsigned int) buffer.size() + minimumChunkSize;
590  }
591 
592  // Allows queue prepping for compilers without guaranteed copy elision.
593  private: std::string & prepQueue(std::string & n, unsigned int bufferSize) {
594  if (bufferSize < minimumChunkSize) {
597  , "The supplied shared memory queue buffer is less than the minimum legal size of "
598  + ::toString(minimumChunkSize) + "."
599  );
600  }
601 
602  boost::interprocess::shared_memory_object::remove(n.c_str());
603 
604  return n;
605  }
606 
607  private: boost::interprocess::message_queue openOrCreateQueue(unsigned int queueSize, unsigned int bufferSize) {
608  if (bufferSize < minimumChunkSize) {
611  , "The supplied shared memory queue buffer is less than the minimum legal size of "
612  + ::toString(minimumChunkSize) + "."
613  );
614  }
615 
616  return boost::interprocess::message_queue(OpenOrCreate, name.c_str(), queueSize, bufferSize);
617  }
618 
619  private: boost::interprocess::message_queue openQueue() {
620  return boost::interprocess::message_queue(OpenOnly, name.c_str());
621  }
622 
623  friend struct SharedMemoryQueueTest;
624 
625  // Shared state across processes using the queue.
626  private: struct QueueState {
627  std::atomic_uint sequenceNumber;
628 
629  explicit QueueState() : sequenceNumber(0) {}
630  };
631 
632  // Pending buffers are used to support single process dequeueing of oversize objects.
633  private: struct PendingBuffer {
634  CharVector buffer;
635  unsigned long receivedSize;
636  unsigned int priority;
637 
638  PendingBuffer(CharVector && buffer_, unsigned long receivedSize_, unsigned int priority_)
639  : buffer(std::move(buffer_))
640  , receivedSize(receivedSize_)
641  , priority(priority_) {}
642  };
643 
644  private: const std::string name;
645  private: boost::interprocess::message_queue queue;
646  private: const unsigned int chunkSize {};
647  private: MSharedMemoryObject<QueueState> queueState;
648  private: std::deque<PendingBuffer> pendingBuffers;
649  private: const bool throwOnOversize;
650 };
651 
652 } // namespace Balau::Interprocess
653 
654 #pragma clang diagnostic pop
655 
656 #endif // COM_BORA_SOFTWARE__BALAU_INTERPROCESS__SHARED_MEMORY_QUEUE
#define BoostSerialization(NAME)
Prettier macro for the Boost serialisation NVP macro.
Definition: SerializationMacros.hpp:25
Base interface for blocking queues.
boost::interprocess::open_or_create_t OpenOrCreateSelector
Type of OpenOrCreate constructor selector.
Definition: SharedMemoryUtils.hpp:36
SharedMemoryQueue(std::string name_, bool throwOnOversize_=false)
Open an existing shared memory queue with the specified name.
Definition: SharedMemoryQueue.hpp:169
const CreateOnlySelector CreateOnly
Used to select an interprocess queue/object constructor that creates only.
SharedMemoryQueue(OpenOrCreateSelector, unsigned int capacity, std::string name_, bool throwOnOversize_=false)
Open or create a shared memory queue of type T and with the specified capacity.
Definition: SharedMemoryQueue.hpp:133
#define ThrowBalauException(ExceptionClass,...)
Throw a Balau style exception, with implicit file and line number, and optional stacktrace.
Definition: BalauException.hpp:45
SharedMemoryQueue(unsigned int capacity, std::string name_, bool throwOnOversize_=false)
Create a shared memory queue of type T and with the specified capacity,.
Definition: SharedMemoryQueue.hpp:106
Balau exceptions for containers.
Utilities for vectors.
STL namespace.
bool empty() const override
Returns true if the queue is empty.
Definition: SharedMemoryQueue.hpp:374
SharedMemoryQueue(OpenOrCreateSelector, unsigned int capacity, unsigned int bufferSize_, std::string name_, bool throwOnOversize_=false)
Open or create a shared memory queue of type T, with the specified capacity and buffer size...
Definition: SharedMemoryQueue.hpp:155
bool full() const override
Returns true if the queue is full.
Definition: SharedMemoryQueue.hpp:370
UUID class, using the Boost uuid implementation.
Interprocess functionality including interprocess containers.
Definition: MSharedMemoryObject.hpp:27
const OpenOrCreateSelector OpenOrCreate
Used to select an interprocess queue/object constructor that opens or creates.
Thrown when an invalid size is supplied or detected.
Definition: ContainerExceptions.hpp:54
T dequeue() override
Dequeue an object.
Definition: SharedMemoryQueue.hpp:254
A blocking, shared memory queue that uses the Boost interprocess library.
Definition: SharedMemoryQueue.hpp:79
T tryDequeue() override
Try to dequeue an object.
Definition: SharedMemoryQueue.hpp:301
Base interface for blocking queues.
Definition: BlockingQueue.hpp:29
void enqueue(T &&object) override
Enqueue an object with a priority of zero.
Definition: SharedMemoryQueue.hpp:181
std::string getName() const
Get the name of the queue.
Definition: SharedMemoryQueue.hpp:383
T tryDequeue(std::chrono::milliseconds waitTime) override
Try to dequeue an object, waiting the specified time before giving up.
Definition: SharedMemoryQueue.hpp:311
void enqueue(const T &object, unsigned int priority)
Enqueue an object with the specified priority.
Definition: SharedMemoryQueue.hpp:192
const OpenOnlySelector OpenOnly
Used to select an interprocess queue/object constructor that opens only.
A shared memory object that uses the Boost interprocess library.
Interprocess shared memory utilities.
UUID class, using the Boost uuid implementation.
Definition: UUID.hpp:33
SharedMemoryQueue(unsigned int capacity, bool throwOnOversize_=false)
Create a shared memory queue of type T and with the specified capacity.
Definition: SharedMemoryQueue.hpp:90
Balau::U8String< AllocatorT > toString(LoggingLevel level)
Print the logging level as a UTF-8 string.
Definition: LoggingLevel.hpp:73
Utilities for printing numeric values in different formats.
static void assertion(bool test, StringFunctionT function)
If the bug test assertion fails, abort after logging the message supplied by the function.
Definition: Assert.hpp:49
SharedMemoryQueue(unsigned int capacity, unsigned int bufferSize_, bool throwOnOversize_=false)
Create a shared memory queue of type T, with the specified capacity, and with the specified buffer si...
Definition: SharedMemoryQueue.hpp:98
SharedMemoryQueue(unsigned int capacity, unsigned int bufferSize_, std::string name_, bool throwOnOversize_=false)
Create a shared memory queue of type T, with the specified capacity and buffer size.
Definition: SharedMemoryQueue.hpp:116