APPLICATION
CONTAINERS
CONCURRENT
LANG
NETWORK
SYSTEM
UTIL
COMMUNITY
A blocking, shared memory queue that uses the Boost Interprocess library.
This class provides a shared memory backed blocking queue and implements the BlockingQueue API. The class encapsulates calls to the Boost Interprocess library, which create, use, and delete a shared memory queue, and has additional sequencing and chunking management logic. The sequencing and chunking logic manages the enqueueing and dequeueing of oversize objects, which require multiple shared memory queue send and receive operations that may be out of order and/or interleaved with other enqueued buffers.
The implementation uses the Boost Serialization library for marshalling and unmarshalling of objects. In order to use the queue, the object type T must provide Boost serialize or save/load methods.
#include <Balau/Interprocess/SharedMemoryQueue.hpp>
The queue is used in the same way as any other BlockingQueue implementation.
The queue can be instantiated in three ways:
The most simple constructor used to create a queue is as follows.
// Create a shared memory queue for objects of type T and // with a capacity of 100. SharedMemoryQueue<T> queue(100);
Such a queue is only useful if the application will share the queue by forking or if the automatically generated name prefix is obtained by calling getName on the resulting queue instance.
In order to create a queue with a known name prefix, use the constructor that takes a string argument in addition to the capacity.
// Get the queue's predefined name prefix from somewhere. const std::string name = getQueueName(); // Create a shared memory queue with the predefined name prefix. SharedMemoryQueue<T> queue(100, name);
SharedMemoryQueue has a number of other optional parameters. These are outlined below.
Parameter | Type | Default | Description |
---|---|---|---|
capacity | unsigned int | No default | The number of items that the queue can hold. |
buffer size | unsigned int | Marshal size of T() plus header | The size in bytes of each item in the queue. This size includes the header size. |
name | std::string | UUID | The name of the queue. |
throw on oversize | bool | false | Throw an exception if an attempt to enqueue an oversize object is made. |
If the queue is to be used with multiple dequeueing processes, the buffer size of the queue must be large enough to fit all serialised objects plus the queue header size of 16 bytes. For POD objects, which do not have any fields that allocate memory, the default buffer size calculated by the queue from a default constructed object is sufficient. For object types that do have fields that allocate memory, the default buffer size calculated by the queue is not sufficient and thus the buffer size must be supplied manually. Otherwise, the queue will be defective.
Note that manual specification of the buffer size is not required if dequeueing will occur in a single process and with synchronised access. In this case, the result of not specifying a sufficient buffer size will result in oversize serialisations being split into chunks which are then sent over the shared memory queue in turn. Then, the single process calling dequeue/tryDequeue with synchronised access will join these chunks together before deserialising.
An equivalent pair of constructors are available that open the shared memory objects of a queue if they already exist, otherwise, they create the objects. In order to use these constructors, the OpenOrCreateSelector object must be passed as the first argument. Otherwise, these constructors are identical to their counterparts which only create the shared memory objects.
// Get the queue's predefined name prefix from somewhere. const std::string name = getQueueName(); // Open or create a shared memory queue with the predefined name prefix. SharedMemoryQueue<T> queue(OpenOrCreateSelector, 100, name);
There is a single constructor for instantiating a SharedMemoryQueue as a user of an existing queue. This constructor takes a std::string containing the name of the queue to open.
// Get the queue's predefined name prefix from somewhere. const std::string name = getQueueName(); // Open a shared memory queue with the predefined name prefix. SharedMemoryQueue<T> queue(name);
Once the queue has been created or opened, it can be used in the same way as any other BlockingQueue implementation. See the BlockingQueue API documentation for information on the blocking queue interface.
This queue implementation has the following concurrency guarantees.
The queue can be used for concurrent enqueues and concurrent dequeues across processes/threads if the maximum enqueued serialised object size + queue header size is guaranteed to be smaller than the shared memory queue buffer size.
If the above guarantee cannot be met (due, for example, a non-deterministic serialised object size), the queue can be used for concurrent enqueues across processes/threads, but only synchronised dequeues in a single process. This is due to the dequeueing of partial objects occurring in one process, rendering the continuation of the dequeueing of that object impossible in other processes.
If this limitation is breached, the set of applications using the shared memory queue will be defective.
The dequeueing calls in such a scenario must also be protected by a mutex if multiple threads of the dequeueing application are concurrently dequeueing. No such mutex protection is required if oversize objects are not being enqueued.
In order to catch oversize message errors in a system that is not designed for oversize message dequeueing, all constructors of the SharedMemoryQueue accept an additional boolean argument. Setting this argument to true will cause an exception to be thrown if an attempt is made to enqueue an oversize message. This check can be switched on in order to catch early such errors during the development and testing phases.
There are two ways to utilise a shared memory queue in multiple processes:
Forking is a simple way to construct and use the share memory queue across processes, but it is only supported by Unix-like operating systems. In order to construct and use a shared memory queue in a parent process and a set of forked child processes, construct the queue in the parent and fork as normal. The Balau Fork class provides a convenient API for forking. The shared memory queue will be ready for use in the child processes without any further action. The first constructor is used for this.
// The type of object being sent across the queue. struct A { int i; double d; A() : i(0), d(0.0) {} A(int i_, double d_) : i(i_), d(d_) {} // The serialize method, used by the queue to marshal and unmarshal the object. template <typename Archive> void serialize(Archive & archive, unsigned int ) { archive & BoostSerialization(d) & BoostSerialization(i); } }; // Construct the shared memory queue before forking. SharedMemoryQueue<A> queue(100); // Perform the fork. The child will not return. Fork::performFork([&queue] () { return runChildLogic(queue); }, true)
Processes that are not related by forking may access the same shared memory queue by communicating the name to each process.
There are two possibilities for communicating the name:
With the first solution, a name is decided upon in advance or is algorithmically generated by the application. One solution to this when sharing a queue between multiple instances of the same application is to construct a name prefix via the application's executable path. A helper function namePrefixFromAppPath() is available for this in the SharedMemoryUtils class. Using this solution, a set of shared memory queue names can be created by appending predefined strings to the name generated from the helper function.
Another solution is to pre-share a name that can be guaranteed not to be used by other processes, either hard wired in the application (not recommended) or via the application's configuration/options.
In order to use a peer-to-peer approach, the create-or-open constructors can be used.
// Create the name for the shared memory queue. const std::string name = SharedMemoryUtils::namePrefixFromAppPath() + "_myQueue"; // Create or open the shared memory queue with the name prefix. SharedMemoryQueue<A> object(OpenOrCreate, 100, name);
In order to use a manager-worker approach, one of the constructors which creates the shared memory objects can be used in the manager process and the queue open constructor can be used in the worker processes. Due to the necessity of the queue existing for the worker processes, the manager process will need to create the queue before the workers attempt to open it.
// Manager process.. // Create the name prefix for the shared memory queue. const std::string name = SharedMemoryUtils::namePrefixFromAppPath() + "_myQueue"; // Create the shared memory queue. SharedMemoryQueue<A> queue(100, name); /////////////////////////////////////////////////////////////////// // Worker process.. // Create the name prefix for the shared memory queue. const std::string name = SharedMemoryUtils::namePrefixFromAppPath() + "_myQueue"; // Open the shared memory queue. SharedMemoryQueue<A> queue(name);
Copyright © 2008 Bora Software
Bora® and the Bora logo are registered trademarks in the European Community, the United States of America, and other countries.