17 #ifndef COM_BORA_SOFTWARE__BALAU_INTERPROCESS__SHARED_MEMORY_QUEUE 18 #define COM_BORA_SOFTWARE__BALAU_INTERPROCESS__SHARED_MEMORY_QUEUE 24 #include <Balau/Interprocess/Impl/SharedMemoryQueueImpl.hpp> 29 #include <boost/archive/binary_iarchive.hpp> 30 #include <boost/archive/binary_oarchive.hpp> 31 #include <boost/date_time/posix_time/posix_time_types.hpp> 32 #include <boost/interprocess/ipc/message_queue.hpp> 33 #include <boost/interprocess/managed_shared_memory.hpp> 34 #include <boost/iostreams/device/back_inserter.hpp> 35 #include <boost/iostreams/filtering_stream.hpp> 36 #include <boost/iostreams/stream.hpp> 41 #pragma clang diagnostic push 42 #pragma ide diagnostic ignored "cppcoreguidelines-pro-type-member-init" 80 private:
using CharVector = std::vector<char>;
82 public:
static const unsigned int headerSize =
sizeof(Impl::QueueHeader);
83 public:
static const unsigned int minimumChunkSize = 2 * headerSize;
91 :
SharedMemoryQueue(capacity, calculateDefaultBufferSize(),
"SMQ_" +
UUID().asString(), throwOnOversize_) {}
98 public:
SharedMemoryQueue(
unsigned int capacity,
unsigned int bufferSize_,
bool throwOnOversize_ =
false)
106 public:
SharedMemoryQueue(
unsigned int capacity, std::string name_,
bool throwOnOversize_ =
false)
107 :
SharedMemoryQueue(capacity, calculateDefaultBufferSize(),
std::move(name_), throwOnOversize_) {}
117 unsigned int bufferSize_,
119 bool throwOnOversize_ =
false)
120 : name(
std::move(prepQueue(name_, bufferSize_)))
121 , queue(
CreateOnly, name.c_str(), capacity, bufferSize_)
122 , chunkSize(bufferSize_)
123 , queueState(
CreateOnly, name +
"_queueState")
124 , throwOnOversize(throwOnOversize_) {}
134 unsigned int capacity,
136 bool throwOnOversize_ =
false)
140 , calculateDefaultBufferSize()
156 unsigned int capacity,
157 unsigned int bufferSize_,
159 bool throwOnOversize_ =
false)
160 : name(
std::move(name_))
161 , queue(openOrCreateQueue(capacity, bufferSize_))
162 , chunkSize(bufferSize_)
164 , throwOnOversize(throwOnOversize_) {}
170 : name(
std::move(name_))
172 , chunkSize((unsigned int) queue.get_max_msg_size())
173 , queueState(
OpenOnly, name +
"_queueState")
174 , throwOnOversize(throwOnOversize_) {}
192 public:
void enqueue(
const T &
object,
unsigned int priority) {
193 const Impl::QueueHeader messageHeader { queueState->sequenceNumber++, 1, 0, 0 };
194 CharVector & marshalBuffer = Impl::SharedMemoryQueueTLS::storage.marshalBuffer;
195 marshalBuffer.clear();
197 marshal(marshalBuffer,
object, messageHeader);
199 const unsigned int totalBytes = (
unsigned int) marshalBuffer.size() - headerSize;
200 auto * marshalHeader = (Impl::QueueHeader *) marshalBuffer.data();
201 marshalHeader->totalBytes = totalBytes;
203 if (marshalBuffer.size() <= chunkSize) {
205 queue.send(marshalBuffer.data(), marshalBuffer.size(), priority);
211 if (throwOnOversize) {
215 "The serialized message is too large to fit into a single message (" 216 , marshalBuffer.size()
224 const unsigned int chunkCount = calculateChunkCount(totalBytes);
225 size_t dataStart = headerSize;
227 for (
unsigned int m = 0; m < chunkCount; m++) {
228 size_t chunkStart = dataStart - headerSize;
229 auto & chunkHeader = * (Impl::QueueHeader *) (marshalBuffer.data() + chunkStart);
230 chunkHeader.sequenceNumber = messageHeader.sequenceNumber;
231 chunkHeader.chunkCount = chunkCount;
232 chunkHeader.chunkNumber = m;
233 chunkHeader.totalBytes = totalBytes;
236 const size_t chunkBytes = m < chunkCount - 1
238 : marshalBuffer.size() - chunkStart;
240 queue.send(marshalBuffer.data() + chunkStart, chunkBytes, priority);
243 dataStart += chunkSize - headerSize;
255 CharVector & queueBuffer = Impl::SharedMemoryQueueTLS::storage.queueBuffer;
256 unsigned long receivedSize;
257 unsigned int priority;
259 dequeueNextBuffer(queueBuffer, receivedSize, priority, 0);
261 const auto * queueHeader = (
const Impl::QueueHeader *) queueBuffer.data();
263 if (queueHeader->chunkCount == 1) {
265 return unmarshal(queueBuffer);
266 }
else if (queueHeader->chunkNumber < queueHeader->chunkCount) {
268 return performMultiChunkDequeue(
269 queueBuffer, receivedSize, priority
270 , [
this] (CharVector &,
271 CharVector & queueBuffer,
272 unsigned long & thisReceivedSize,
273 unsigned int & thisPriority,
275 dequeueNextBuffer(queueBuffer, thisReceivedSize, thisPriority, rejected);
281 return completeMultiChunkDequeue(
282 queueBuffer, receivedSize, priority
283 , [
this] (CharVector &,
284 CharVector & queueBuffer,
285 unsigned long & thisReceivedSize,
286 unsigned int & thisPriority,
288 dequeueNextBuffer(queueBuffer, thisReceivedSize, thisPriority, rejected);
302 return tryDequeue(std::chrono::milliseconds(0));
311 public: T
tryDequeue(std::chrono::milliseconds waitTime)
override {
312 CharVector & queueBuffer = Impl::SharedMemoryQueueTLS::storage.queueBuffer;
314 unsigned long receivedSize;
315 unsigned int priority;
317 if (!tryDequeueNextBuffer(queueBuffer, receivedSize, priority, 0, waitTime)) {
321 const auto * queueHeader = (
const Impl::QueueHeader *) queueBuffer.data();
323 if (queueHeader->chunkCount == 1) {
325 return unmarshal(queueBuffer);
326 }
else if (queueHeader->chunkNumber < queueHeader->chunkCount) {
328 return performMultiChunkDequeue(
329 queueBuffer, receivedSize, priority
330 , [waitTime,
this] (CharVector & marshalBuffer,
331 CharVector & queueBuffer,
332 unsigned long & thisReceivedSize,
333 unsigned int & thisPriority,
335 if (!tryDequeueNextBuffer(queueBuffer, thisReceivedSize, thisPriority, rejected, waitTime)) {
336 pendingBuffers.emplace_front(
337 PendingBuffer(std::move(marshalBuffer), marshalBuffer.size(), thisPriority)
340 marshalBuffer = CharVector();
349 return completeMultiChunkDequeue(
350 queueBuffer, receivedSize, priority
351 , [waitTime,
this] (CharVector & marshalBuffer,
352 CharVector & queueBuffer,
353 unsigned long & thisReceivedSize,
354 unsigned int & thisPriority,
356 if (!tryDequeueNextBuffer(queueBuffer, thisReceivedSize, thisPriority, rejected, waitTime)) {
357 pendingBuffers.emplace_front(
358 PendingBuffer(std::move(queueBuffer), queueBuffer.size(), thisPriority)
360 marshalBuffer = CharVector();
370 public:
bool full()
const override {
371 return queue.get_max_msg() - queue.get_num_msg() == 0;
374 public:
bool empty()
const override {
375 return queue.get_num_msg() == 0;
394 private:
template <
typename DequeueFunctionT>
395 T performMultiChunkDequeue(CharVector & queueBuffer,
396 unsigned long & receivedSize,
397 unsigned int & priority,
398 DequeueFunctionT dequeueFunction) {
399 const auto * queueHeader = (
const Impl::QueueHeader *) queueBuffer.data();
400 const unsigned int totalBytes = queueHeader->totalBytes;
402 CharVector & marshalBuffer = Impl::SharedMemoryQueueTLS::storage.marshalBuffer;
403 marshalBuffer.clear();
404 marshalBuffer.resize(headerSize + totalBytes);
407 memcpy(marshalBuffer.data(), queueBuffer.data(), headerSize);
408 auto * marshalHeader = (Impl::QueueHeader *) marshalBuffer.data();
409 marshalHeader->chunkNumber = marshalHeader->chunkCount;
411 size_t rejectedPendingBuffers = 0;
412 const unsigned long sequenceNumber = queueHeader->sequenceNumber;
413 unsigned int partsLeft = queueHeader->chunkCount;
416 if (queueHeader->sequenceNumber != sequenceNumber) {
417 ++rejectedPendingBuffers;
418 pendingBuffers.emplace_back(PendingBuffer(std::move(queueBuffer), receivedSize, priority));
419 queueBuffer = CharVector(chunkSize);
422 const size_t copyStart = headerSize + queueHeader->chunkNumber * (chunkSize - headerSize);
423 const size_t byteCount = queueBuffer.size() - headerSize;
424 memcpy(marshalBuffer.data() + copyStart, queueBuffer.data() + headerSize, byteCount);
425 ++marshalHeader->chunkNumber;
429 if (partsLeft == 0) {
430 return unmarshal(marshalBuffer);
433 if (!dequeueFunction(marshalBuffer, queueBuffer, receivedSize, priority, rejectedPendingBuffers)) {
438 queueHeader = (
const Impl::QueueHeader *) queueBuffer.data();
449 template <
typename DequeueFunctionT>
450 T completeMultiChunkDequeue(CharVector & marshalBuffer,
451 unsigned long & receivedSize,
452 unsigned int & priority,
453 DequeueFunctionT dequeueFunction) {
454 auto * marshalHeader = (Impl::QueueHeader *) marshalBuffer.data();
455 const unsigned int totalBytes = marshalHeader->totalBytes;
458 receivedSize == totalBytes + headerSize
459 ,
"Received size of partially dequeued buffer is not the same as the size of the buffer." 462 size_t rejectedPendingBuffers = 0;
463 const unsigned long sequenceNumber = marshalHeader->sequenceNumber;
464 unsigned int partsLeft = 2 * marshalHeader->chunkCount - marshalHeader->chunkNumber;
465 CharVector queueBuffer {};
468 if (!dequeueFunction(marshalBuffer, queueBuffer, receivedSize, priority, rejectedPendingBuffers)) {
473 auto * queueHeader = (
const Impl::QueueHeader *) queueBuffer.data();
475 if (sequenceNumber != queueHeader->sequenceNumber) {
476 ++rejectedPendingBuffers;
477 pendingBuffers.emplace_back(PendingBuffer(std::move(queueBuffer), receivedSize, priority));
478 queueBuffer = CharVector(chunkSize);
481 const size_t copyStart = headerSize + (chunkSize - headerSize) * queueHeader->chunkNumber;
482 const size_t byteCount = queueBuffer.size() - headerSize;
483 memcpy(marshalBuffer.data() + copyStart, queueBuffer.data() + headerSize, byteCount);
484 ++marshalHeader->chunkNumber;
488 if (partsLeft == 0) {
489 return unmarshal(marshalBuffer);
498 private:
void dequeueNextBuffer(CharVector & buffer,
499 unsigned long & receivedSize,
500 unsigned int & priority,
501 size_t rejectedPendingBuffers) {
502 if (rejectedPendingBuffers < pendingBuffers.size()) {
503 auto pendingBuffer = pendingBuffers.front();
504 buffer = std::move(pendingBuffer.buffer);
505 pendingBuffers.pop_front();
506 receivedSize = pendingBuffer.receivedSize;
507 priority = pendingBuffer.priority;
509 buffer.resize(chunkSize);
510 queue.receive(buffer.data(), buffer.size(), receivedSize, priority);
511 buffer.resize(receivedSize);
519 private:
bool tryDequeueNextBuffer(CharVector & buffer,
520 unsigned long & receivedSize,
521 unsigned int & priority,
522 size_t rejectedPendingBuffers,
523 std::chrono::milliseconds waitTime) {
524 if (rejectedPendingBuffers < pendingBuffers.size()) {
525 auto pendingBuffer = pendingBuffers.front();
526 buffer = std::move(pendingBuffer.buffer);
527 pendingBuffers.pop_front();
528 receivedSize = pendingBuffer.receivedSize;
529 priority = pendingBuffer.priority;
532 boost::posix_time::ptime timeout(boost::posix_time::microsec_clock::universal_time());
533 boost::posix_time::time_duration w = boost::posix_time::milliseconds(waitTime.count());
536 buffer.resize(chunkSize);
538 if (queue.timed_receive(buffer.data(), buffer.size(), receivedSize, priority, timeout)) {
539 buffer.resize(receivedSize);
547 private:
using SinkDevice = boost::iostreams::back_insert_device<CharVector>;
548 private:
using SinkBuffer = boost::iostreams::stream_buffer<SinkDevice>;
549 private:
using SourceDevice = boost::iostreams::basic_array_source<char>;
550 private:
using SourceBuffer = boost::iostreams::stream_buffer<SourceDevice>;
554 private:
void marshal(CharVector & buffer,
const T &
object,
const Impl::QueueHeader & header)
const {
555 buffer.resize(headerSize);
556 const char * headerBytes = (
const char *) &header;
557 memcpy(buffer.data(), headerBytes, headerSize);
558 SinkBuffer oStreamBuffer { SinkDevice(buffer) };
559 boost::archive::binary_oarchive archive(oStreamBuffer);
565 private: T unmarshal(
const CharVector & buffer) {
568 SourceBuffer iStreamBuffer(SourceDevice(buffer.data() + headerSize, buffer.size()));
569 boost::archive::binary_iarchive archive(iStreamBuffer);
576 private:
unsigned int calculateChunkCount(
unsigned int totalBytes)
const {
577 const unsigned int netBufferSize = chunkSize - headerSize;
578 return totalBytes / netBufferSize + (totalBytes % netBufferSize != 0);
581 private:
unsigned int calculateDefaultBufferSize()
const {
585 Impl::QueueHeader header {};
588 marshal(buffer,
object, header);
589 return (
unsigned int) buffer.size() + minimumChunkSize;
593 private: std::string & prepQueue(std::string & n,
unsigned int bufferSize) {
594 if (bufferSize < minimumChunkSize) {
597 ,
"The supplied shared memory queue buffer is less than the minimum legal size of " 598 + ::
toString(minimumChunkSize) +
"." 602 boost::interprocess::shared_memory_object::remove(n.c_str());
607 private: boost::interprocess::message_queue openOrCreateQueue(
unsigned int queueSize,
unsigned int bufferSize) {
608 if (bufferSize < minimumChunkSize) {
611 ,
"The supplied shared memory queue buffer is less than the minimum legal size of " 612 + ::
toString(minimumChunkSize) +
"." 616 return boost::interprocess::message_queue(
OpenOrCreate, name.c_str(), queueSize, bufferSize);
619 private: boost::interprocess::message_queue openQueue() {
620 return boost::interprocess::message_queue(
OpenOnly, name.c_str());
623 friend struct SharedMemoryQueueTest;
626 private:
struct QueueState {
627 std::atomic_uint sequenceNumber;
629 explicit QueueState() : sequenceNumber(0) {}
633 private:
struct PendingBuffer {
635 unsigned long receivedSize;
636 unsigned int priority;
638 PendingBuffer(CharVector && buffer_,
unsigned long receivedSize_,
unsigned int priority_)
639 : buffer(std::move(buffer_))
640 , receivedSize(receivedSize_)
641 , priority(priority_) {}
644 private:
const std::string name;
645 private: boost::interprocess::message_queue queue;
646 private:
const unsigned int chunkSize {};
648 private: std::deque<PendingBuffer> pendingBuffers;
649 private:
const bool throwOnOversize;
654 #pragma clang diagnostic pop 656 #endif // COM_BORA_SOFTWARE__BALAU_INTERPROCESS__SHARED_MEMORY_QUEUE #define BoostSerialization(NAME)
Prettier macro for the Boost serialisation NVP macro.
Definition: SerializationMacros.hpp:25
Base interface for blocking queues.
boost::interprocess::open_or_create_t OpenOrCreateSelector
Type of OpenOrCreate constructor selector.
Definition: SharedMemoryUtils.hpp:36
SharedMemoryQueue(std::string name_, bool throwOnOversize_=false)
Open an existing shared memory queue with the specified name.
Definition: SharedMemoryQueue.hpp:169
const CreateOnlySelector CreateOnly
Used to select an interprocess queue/object constructor that creates only.
SharedMemoryQueue(OpenOrCreateSelector, unsigned int capacity, std::string name_, bool throwOnOversize_=false)
Open or create a shared memory queue of type T and with the specified capacity.
Definition: SharedMemoryQueue.hpp:133
#define ThrowBalauException(ExceptionClass,...)
Throw a Balau style exception, with implicit file and line number, and optional stacktrace.
Definition: BalauException.hpp:45
SharedMemoryQueue(unsigned int capacity, std::string name_, bool throwOnOversize_=false)
Create a shared memory queue of type T and with the specified capacity,.
Definition: SharedMemoryQueue.hpp:106
Balau exceptions for containers.
bool empty() const override
Returns true if the queue is empty.
Definition: SharedMemoryQueue.hpp:374
SharedMemoryQueue(OpenOrCreateSelector, unsigned int capacity, unsigned int bufferSize_, std::string name_, bool throwOnOversize_=false)
Open or create a shared memory queue of type T, with the specified capacity and buffer size...
Definition: SharedMemoryQueue.hpp:155
bool full() const override
Returns true if the queue is full.
Definition: SharedMemoryQueue.hpp:370
UUID class, using the Boost uuid implementation.
Interprocess functionality including interprocess containers.
Definition: MSharedMemoryObject.hpp:27
const OpenOrCreateSelector OpenOrCreate
Used to select an interprocess queue/object constructor that opens or creates.
Thrown when an invalid size is supplied or detected.
Definition: ContainerExceptions.hpp:54
T dequeue() override
Dequeue an object.
Definition: SharedMemoryQueue.hpp:254
A blocking, shared memory queue that uses the Boost interprocess library.
Definition: SharedMemoryQueue.hpp:79
T tryDequeue() override
Try to dequeue an object.
Definition: SharedMemoryQueue.hpp:301
Base interface for blocking queues.
Definition: BlockingQueue.hpp:29
void enqueue(T &&object) override
Enqueue an object with a priority of zero.
Definition: SharedMemoryQueue.hpp:181
std::string getName() const
Get the name of the queue.
Definition: SharedMemoryQueue.hpp:383
T tryDequeue(std::chrono::milliseconds waitTime) override
Try to dequeue an object, waiting the specified time before giving up.
Definition: SharedMemoryQueue.hpp:311
void enqueue(const T &object, unsigned int priority)
Enqueue an object with the specified priority.
Definition: SharedMemoryQueue.hpp:192
const OpenOnlySelector OpenOnly
Used to select an interprocess queue/object constructor that opens only.
A shared memory object that uses the Boost interprocess library.
Interprocess shared memory utilities.
UUID class, using the Boost uuid implementation.
Definition: UUID.hpp:33
SharedMemoryQueue(unsigned int capacity, bool throwOnOversize_=false)
Create a shared memory queue of type T and with the specified capacity.
Definition: SharedMemoryQueue.hpp:90
Balau::U8String< AllocatorT > toString(LoggingLevel level)
Print the logging level as a UTF-8 string.
Definition: LoggingLevel.hpp:73
Utilities for printing numeric values in different formats.
static void assertion(bool test, StringFunctionT function)
If the bug test assertion fails, abort after logging the message supplied by the function.
Definition: Assert.hpp:49
SharedMemoryQueue(unsigned int capacity, unsigned int bufferSize_, bool throwOnOversize_=false)
Create a shared memory queue of type T, with the specified capacity, and with the specified buffer si...
Definition: SharedMemoryQueue.hpp:98
SharedMemoryQueue(unsigned int capacity, unsigned int bufferSize_, std::string name_, bool throwOnOversize_=false)
Create a shared memory queue of type T, with the specified capacity and buffer size.
Definition: SharedMemoryQueue.hpp:116