1 /** 
2  * Queue management
3  *
4  * Authors: Tristan Brice Velloza Kildaire (deavmi)
5  */
6 module qix.manager;
7 
8 import qix.queue;
9 import niknaks.functional : Result, ok, error;
10 import std.string : format;
11 import gogga.mixins;
12 
13 /** 
14  * Default max iterations when
15  * trying to find an unused queue
16  * id
17  */
18 private enum NEWQUEUE_MAX_ITER = 1000;
19 
20 import qix.exceptions;
21 
22 /** 
23  * An exception that occurs during
24  * usage of the `Manager`
25  */
26 public final class ManagerException : QixException
27 {
28 	private this(string m)
29 	{
30 		super(m);
31 	}
32 }
33 
34 /** 
35  * A queue manager
36  */
37 public template Manager(Item)
38 {
39 	/** 
40 	 * A queue manager
41 	 */
42 	public class Manager
43 	{
44 		import core.sync.mutex : Mutex;
45 
46 		private alias QueueType = Queue!(Item);
47 		
48 		private QueueType[QueueKey] _q;
49 		private Mutex _ql; // todo: readers-writers lock
50 
51 		private size_t _mi;
52 
53 		/** 
54 		 * Construct a new queue manager
55 		 *
56 		 * Params:
57 		 *   maxIter = maximum number of
58 		 * iterations allowed when searching
59 		 * for a free queue id
60 		 */
61 		this(size_t maxIter)
62 		{
63 			this._ql = new Mutex();
64 			this._mi = maxIter;
65 		}
66 
67 		/** 
68 		 * Constructs a new queue manager
69 		 */
70 		this()
71 		{
72 			this(NEWQUEUE_MAX_ITER);
73 		}
74 
75 		/** 
76 		 * Finds an unused queue id and then
77 		 * instantiates a queue with that id
78 		 *
79 		 * Returns: a `Result` that contains
80 		 * a pointer to the queue if a free
81 		 * id was found, otherwise `false`
82 		 * if no free id could be found or
83 		 * we reached the maximum number
84 		 * of allowed iterations for searching
85 		 */
86 		public Result!(QueueType*, string) newQueue()
87 		{
88 			this._ql.lock();
89 
90 			scope(exit)
91 			{
92 				this._ql.unlock();
93 			}
94 
95 			// find a free queue id with the
96 			// random startergy
97 			Result!(QueueKey, string) qid_res = newQueue_randStrat();
98 			DEBUG("qid_res: ", qid_res);
99 
100 			if(qid_res.is_error())
101 			{
102 				return error!(string, QueueType*)(qid_res.error());
103 			}
104 
105 			// create new queue and insert it
106 			QueueKey qid = qid_res.ok();
107 			this._q[qid] = QueueType(qid);
108 			QueueType* q = qid in this._q;
109 			DEBUG("stored new queue: ", *q);
110 
111 			return ok!(QueueType*, string)(q);
112 		}
113 
114 		// uses a random dice roll as a startergy
115 		// for finding potentially free qids
116 		//
117 		// mt: assumes caller holds lock `this._ql`
118 		private Result!(QueueKey, string) newQueue_randStrat()
119 		{
120 			import qix.utils : rand;
121 
122 			size_t c = 0; // iterations
123 			QueueKey new_qid;
124 			bool succ = false;
125 			for(c = 0; c < NEWQUEUE_MAX_ITER; c++)
126 			{
127 				// try find free qid
128 				new_qid = rand();
129 				if((new_qid in this._q) is null)
130 				{
131 					succ = true;
132 					break;
133 				}
134 			}
135 			DEBUG("iterations: ", c);
136 
137 			// ran out of iterations before we
138 			// could fine free qid
139 			if(!succ)
140 			{
141 				return error!(string, QueueKey)
142 				(
143 					format
144 					(
145 						"Reached NEWQUEUE_MAX_ITER of %d before finding free queue id",
146 						NEWQUEUE_MAX_ITER
147 					)
148 				);
149 			}
150 			// found a free qid
151 			else
152 			{
153 				return ok!(QueueKey, string)(new_qid);
154 			}
155 		}
156 
157 		/** 
158 		 * Removes the provided queue from the manager
159 		 *
160 		 * Params:
161 		 *   queue = a pointer to the queue
162 		 * Returns: `true` if the queue existed, `false`
163 		 * if not or if `null` was provided
164 		 */
165 		public bool removeQueue(QueueType* queue)
166 		{
167 			return queue is null ? false : removeQueue(queue.id());
168 		}
169 
170 		/** 
171 		 * Removes the queue by the provided id
172 		 * from the manager
173 		 *
174 		 * Params:
175 		 *   key = the queue's id
176 		 * Returns: `true` if the queue existed, `false`
177 		 * otherwise
178 		 */
179 		public bool removeQueue(QueueKey key)
180 		{
181 			this._ql.lock();
182 
183 			scope(exit)
184 			{
185 				this._ql.unlock();
186 			}
187 
188 			QueueType* f = key in this._q;
189 
190 			if(f is null)
191 			{
192 				return false;
193 			}
194 
195 			this._q.remove(key);
196 			return true;
197 		}
198 
199 		private QueueType* getQueue0(QueueKey id)
200 		{
201 			this._ql.lock();
202 
203 			scope(exit)
204 			{
205 				this._ql.unlock();
206 			}
207 
208 			return id in this._q;
209 		}
210 
211 		private Result!(QueueType*, QixException) getQueue(QueueKey id)
212 		{
213 			auto q = getQueue0(id);
214 			if(q is null)
215 			{
216 				return error!(QixException, QueueType*)
217 				(
218 					new ManagerException
219 					(
220 						format
221 						(
222 							"Could not find a queue with id %d",
223 							id
224 						)
225 					)
226 				);
227 			}
228 
229 			return ok!(QueueType*, QixException)(q);
230 		}
231 
232 		// TODO: In future version let's add:
233 		//
234 		// 2. receive(QueueKey, T)
235 		// 4. wait(QueueKey)
236 		// 6. wait(QueueKey, Duration)
237 		//
238 
239 		/** 
240 		 * Pushes a new message into the queue,
241 		 * waking up one of the threads currently
242 		 * blocking to dequeue an item from it
243 		 *
244 		 * Params:
245 		 *   id = the queue's id
246 		 *   item = the item to enqueue
247 		 * Returns: a `Result` either containing
248 		 * a boolean flag about whether the admit
249 		 * policy allowed the enqueueing to occur
250 		 * or a `QixException` if the id does not
251 		 * refer to a queue registered with this
252 		 * manager
253 		 */
254 		public Result!(bool, QixException) receive(QueueKey id, Item item)
255 		{
256 			auto q_r = getQueue(id);
257 			if(!q_r)
258 			{
259 				return error!(QixException, bool)(q_r.error());
260 			}
261 
262 			auto q = q_r.ok();
263 			return ok!(bool, QixException)(q.receive(item));
264 		}
265 
266 		/** 
267 		 * Wait indefinately to dequeue an item
268 		 * from the queue given by the provided
269 		 * id
270 		 *
271 		 * Params:
272 		 *   id = the queue's id
273 		 * Returns: a `Result` either containing
274 		 * the dequeued item or a `QixException`
275 		 * if the id does not refer to a queue
276 		 * registered with this manager
277 		 */
278 		public Result!(Item, QixException) wait(QueueKey id)
279 		{
280 			return wait(id, Duration.zero);
281 		}
282 
283 		import std.datetime : Duration;
284 
285 		/** 
286 		 * Wait up until a specified maximum
287 		 * amount of time to dequeue an item
288 		 * from the queue given by the provided
289 		 * id
290 		 *
291 		 * Params:
292 		 *   id = the queue's id
293 		 *   timeout = the maximum time to wait
294 		 * whilst blocking/waiting to dequeue
295 		 * an item from the queue
296 		 * Returns: a `Result` either containing
297 		 * the dequeued item or a `QixException`
298 		 * if the id does not refer to a queue
299 		 * registered with this manager or the
300 		 * timeout was exceeded
301 		 */
302 		public Result!(Item, QixException) wait(QueueKey id, Duration timeout)
303 		{
304 			auto q_r = getQueue(id);
305 			if(!q_r)
306 			{
307 				return error!(QixException, Item)(q_r.error());
308 			}
309 
310 			auto q = q_r.ok();
311 			return q.wait(timeout);
312 		}
313 	}
314 }
315 
316 unittest
317 {
318 	// item type
319 	struct Message
320 	{
321 		private string _t;
322 		this(string t)
323 		{
324 			this._t = t;
325 		}
326 
327 		public string t()
328 		{
329 			return this._t;
330 		}
331 	} 
332 	
333 	// queue manager for queues that hold messages
334 	auto m = new Manager!(Message)();
335 
336 	// no queues present
337 	assert(m.removeQueue(0) == false);
338 	assert(m.removeQueue(1) == false);
339 
340 	// create two new queues
341 	Result!(Queue!(Message)*, string) q1_r = m.newQueue();
342 	Result!(Queue!(Message)*, string) q2_r = m.newQueue();
343 
344 	assert(q1_r.is_okay());
345 	assert(q2_r.is_okay());
346 	auto q1 = q1_r.ok();
347 	auto q2 = q2_r.ok();
348 
349 	// enqueue two messages, one per queue, then read them off
350 	//
351 	// we won't block as the messages are already arrived
352 	Message m1_in = Message("First message");
353 	Message m2_in = Message("Second message");
354 	assert(m.receive(q1.id(), m1_in)); // (indirect usage via manager) should not be rejected
355 	assert(q2.receive(m2_in)); // (direct usage via queue itself) should not be rejected
356 	assert(q1.wait() == m1_in); // should be the same message we sent in
357 	assert(q2.wait() == m2_in); // should be the same message we sent in
358 
359 	// remove queues
360 	assert(m.removeQueue(q1)); // by QueueType*
361 	assert(m.removeQueue(q2.id())); // by id
362 
363 	// handle nulls for queue removal
364 	assert(m.removeQueue(null) == false);
365 
366 	// no queues present
367 	assert(m.removeQueue(0) == false);
368 	assert(m.removeQueue(1) == false);
369 }