1 /** 
2  * Definitions for queues
3  *
4  * Authors: Tristan Brice Velloza Kildaire (deavmi)
5  */
6 module qix.queue;
7 
8 /**
9  * The type used to represent
10  * a queue id
11  */
12 public alias QueueKey = size_t;
13 
14 import core.sync.mutex : Mutex;
15 import core.sync.condition : Condition;
16 import std.datetime : Duration;
17 import qix.exceptions;
18 import niknaks.functional : Result, ok, error;
19 
20 import gogga.mixins;
21 
22 /** 
23  * Admittance policy
24  *
25  * A delegate that takes in an `Item`
26  * and returns `true` if it should be
27  * admitted to the queue, `false`
28  * otherwise
29  */
30 public alias AdmitPolicy(Item) = bool delegate(Item);
31 
32 /** 
33  * Timeout exception thrown when
34  * a time-based wait times-out
35  */
36 public final class TimeoutException : QixException
37 {
38 	private this()
39 	{
40 		super("Timeout after waiting");
41 	}
42 }
43 
44 /** 
45  * Queue type
46  */
47 public template Queue(Item)
48 {
49 	private alias AP = AdmitPolicy!(Item);
50 
51 	/** 
52 	 * Queue type
53 	 */
54 	public struct Queue
55 	{
56 		private QueueKey _id;
57 
58 		private Mutex _l;
59 		private Condition _c;
60 		import std.container.slist : SList;
61 		import std.range : walkLength;
62 		private SList!(Item) _q;
63 		// todo: list here
64 		private AP _ap; // admit policy
65 
66 		/** 
67 		 * Constructs a new queue with
68 		 * the given id and the admittance
69 		 * policy
70 		 *
71 		 * Params:
72 		 *   id = the id
73 		 *   ap = admittance policy
74 		 */
75 		package this(QueueKey id, AP ap) @safe
76 		{
77 			this._id = id;
78 			this._l = new Mutex();
79 			this._c = new Condition(this._l);
80 			this._ap = ap;
81 		}
82 
83 		/** 
84 		 * Constructs a new queue with
85 		 * the given id
86 		 *
87 		 * Params:
88 		 *   id = the id
89 		 */
90 		package this(QueueKey id) @safe
91 		{
92 			this(id, null);
93 		}
94 
95 		/** 
96 		 * Returns this queue's id
97 		 *
98 		 * Returns: the id
99 		 */
100 		public QueueKey id() @safe
101 		{
102 			return this._id;
103 		}
104 
105 		private bool wouldAdmit(Item i)
106 		{
107 			// if no policy => true
108 			// else, apply policy
109 			bool s = this._ap is null ? true : this._ap(i);
110 			DEBUG("Admit policy returned: ", s);
111 			return s;
112 		}
113 
114 		/** 
115 		 * Places an item into this queue and
116 		 * wakes up one of the waiter(s)
117 		 *
118 		 * The item is only enqueued if
119 		 * there is an admittance policy
120 		 * associated with this queue, and
121 		 * if so, if it evaluates to `true`.
122 		 *
123 		 * Params:
124 		 *   i = the item to attempt to enqueue
125 		 * Returns: `true` if enqueued, `false`
126 		 * otherwise
127 		 */
128 		public bool receive(Item i)
129 		{
130 			// lock, apply filter delegate (if any), insert (if so), unlock
131 			this._l.lock();
132 
133 			scope(exit)
134 			{
135 				this._l.unlock();
136 			}
137 
138 			if(!wouldAdmit(i))
139 			{
140 				DEBUG("Admit policy denied: '", i, "'");
141 				return false;
142 			}
143 
144 			// todo: filter here (Document: API for filters hold lock?)
145 			this._q.insertAfter(this._q[], i); // todo: placement in queue?
146 			DEBUG("calling notify()...");
147 			this._c.notify(); // wake up one waiter
148 
149 			DEBUG("post-notify");
150 			
151 			return true;
152 		}
153 
154 		/** 
155 		 * Blocks until an item is available
156 		 * for dequeuing.
157 		 *
158 		 * This is akin to calling `wait(Duration)`
159 		 * with `Duration.zero`.
160 		 *
161 		 * Returns: the item
162 		 */
163 		public Item wait()
164 		{
165 			auto res = wait(Duration.zero());
166 			//sanity: only way an error is if timed out
167 			// but that should not be possible with
168 			// a timeout of 0
169 			assert(res.is_okay()); 
170 			return res.ok();
171 		}
172 
173 		/** 
174 		 * Blocks up until the timeout for an
175 		 * item to become available for dequeuing.
176 		 *
177 		 * However, if the timeout is reached
178 		 * then an exception is returned.
179 		 *
180 		 * Params:
181 		 *   timeout = the timeout
182 		 * 
183 		 * Returns: a `Result` containing the
184 		 * the dequeued item or a `QixException`
185 		 * if the timeout was exceeded
186 		 */
187 		public Result!(Item, QixException) wait(Duration timeout)
188 		{
189 			this._l.lock();
190 
191 			scope(exit)
192 			{
193 				this._l.unlock();
194 			}
195 
196 			// check if item already present
197 			// then there is no need to wait
198 			DEBUG("calling size()");
199 			bool early_return = size() > 0;
200 			if(early_return)
201 			{
202 				DEBUG("early return");
203 				return ok!(Item, QixException)(pop());
204 			}
205 
206 			// then no timeout
207 			if(timeout == Duration.zero)
208 			{
209 				DEBUG("wait()...");
210 				this._c.wait();
211 				DEBUG("wait()... [unblock]");
212 			}
213 			// handle timeouts
214 			else
215 			{
216 				DEBUG("wait(Duration)...");
217 				bool in_time = this._c.wait(timeout); // true if `notify()`'d before timeout
218 				DEBUG("wait(Duration)... [unblock]");
219 				DEBUG("timed out?: ", !in_time);
220 
221 				if(!in_time)
222 				{
223 					return error!(QixException, Item)(new TimeoutException()); // todo: log time taken
224 				}
225 			}
226 			
227 			// pop single item off
228 			return ok!(Item, QixException)(pop());
229 		}
230 
231 		// mt: assumes lock held
232 		private Item pop()
233 		{
234 			assert(size() > 0);
235 
236 			import std.range;
237 			auto i = this._q.front(); // store
238 			this._q.removeFront(); // remove 
239 
240 			DEBUG("popped item: ", i);
241 			return i;
242 		}
243 
244 		/** 
245 		 * Returns the number of items
246 		 * in the queue
247 		 *
248 		 * Returns: the count
249 		 */
250 		public size_t size() // TODO: Make safe justd ebug that is bad
251 		{
252 			this._l.lock();
253 			
254 			scope(exit)
255 			{
256 				this._l.unlock();
257 			}
258 
259 			DEBUG("dd: ", walkLength(this._q[]));
260 			return walkLength(this._q[]);
261 		}
262 
263 		/** 
264 		 * Returns a string representation
265 		 * of this queue
266 		 *
267 		 * Returns: a string
268 		 */
269 		public string toString()
270 		{
271 			import std.string : format;
272 			return format
273 			(
274 				"Queue (qid: %d)",
275 				this._id	
276 			);
277 		}
278 	}
279 }
280 
281 private version(unittest)
282 {
283 	import core.thread : Thread, dur;
284 }
285 
286 private version(unittest)
287 {
288 	// custom item type
289 	class Message
290 	{
291 		private string _m;
292 		this(string m)
293 		{
294 			this._m = m;
295 		}
296 	
297 		public auto m()
298 		{
299 			return this._m;
300 		}
301 	}
302 }
303 
304 unittest
305 {
306 	// create a single queue
307 	Queue!(Message) q = Queue!(Message)(1);
308 
309 	// now make a thread that awaits reception
310 	// of message
311 	class Waiter : Thread
312 	{
313 		private Queue!(Message)* _q; // queue to wait on
314 		private Message _m; // (eventually) dequeued message
315 		// todo: make above volatile
316 		
317 		
318 		this(Queue!(Message)* q)
319 		{
320 			super(&run);
321 			this._q = q;
322 		}
323 
324 		private void run()
325 		{
326 			DEBUG("waiter about to call wait()...");
327 			Message i = this._q.wait();
328 			this._m = i;
329 		}
330 
331 		public Message m()
332 		{
333 			return this._m;
334 		}
335 	}
336 	Waiter wt = new Waiter(&q);
337 	wt.start();
338 
339 	// todo: do version with sleep here and version without
340 
341 	// Push a single message in
342 	Message m = new Message("Hey there");
343 	q.receive(m);
344 
345 	// wait for thread to end and then grab the internal
346 	// value for comparison
347 	wt.join();
348 	DEBUG("Expected: ", m);
349 	DEBUG("Thread got: ", wt.m());
350 	assert(m == wt.m());
351 
352 	// wait with timeout and knowing nothing will
353 	// be enqueued
354 	auto res = q.wait(dur!("seconds")(1));
355 	assert(res.is_error());
356 	assert(cast(TimeoutException)res.error());
357 }
358 
359 // test admit policy
360 unittest
361 {
362 	// admit policy that accepts
363 	bool accept(Message m) { return true; }
364 	AdmitPolicy!(Message) ap_a = &accept;
365 	
366 	// create a single queue with it
367 	Queue!(Message) q1 = Queue!(Message)(1, ap_a);
368 
369 	// should accept
370 	assert(q1.receive(new Message("Hi")));
371 
372 	// admit policy that rejects
373 	bool reject(Message m) { return false; }
374 	AdmitPolicy!(Message) ap_r = &reject;
375 	
376 	// create a single queue with it
377 	Queue!(Message) q2 = Queue!(Message)(1, ap_r);
378 
379 	// should reject
380 	assert(!q2.receive(new Message("Hi")));
381 }