1 /** 2 * Definitions for queues 3 * 4 * Authors: Tristan Brice Velloza Kildaire (deavmi) 5 */ 6 module qix.queue; 7 8 /** 9 * The type used to represent 10 * a queue id 11 */ 12 public alias QueueKey = size_t; 13 14 import core.sync.mutex : Mutex; 15 import core.sync.condition : Condition; 16 import std.datetime : Duration; 17 import qix.exceptions; 18 import niknaks.functional : Result, ok, error; 19 20 import gogga.mixins; 21 22 /** 23 * Admittance policy 24 * 25 * A delegate that takes in an `Item` 26 * and returns `true` if it should be 27 * admitted to the queue, `false` 28 * otherwise 29 */ 30 public alias AdmitPolicy(Item) = bool delegate(Item); 31 32 /** 33 * Timeout exception thrown when 34 * a time-based wait times-out 35 */ 36 public final class TimeoutException : QixException 37 { 38 private this() 39 { 40 super("Timeout after waiting"); 41 } 42 } 43 44 /** 45 * Queue type 46 */ 47 public template Queue(Item) 48 { 49 private alias AP = AdmitPolicy!(Item); 50 51 /** 52 * Queue type 53 */ 54 public struct Queue 55 { 56 private QueueKey _id; 57 58 private Mutex _l; 59 private Condition _c; 60 import std.container.slist : SList; 61 import std.range : walkLength; 62 private SList!(Item) _q; 63 // todo: list here 64 private AP _ap; // admit policy 65 66 /** 67 * Constructs a new queue with 68 * the given id and the admittance 69 * policy 70 * 71 * Params: 72 * id = the id 73 * ap = admittance policy 74 */ 75 package this(QueueKey id, AP ap) @safe 76 { 77 this._id = id; 78 this._l = new Mutex(); 79 this._c = new Condition(this._l); 80 this._ap = ap; 81 } 82 83 /** 84 * Constructs a new queue with 85 * the given id 86 * 87 * Params: 88 * id = the id 89 */ 90 package this(QueueKey id) @safe 91 { 92 this(id, null); 93 } 94 95 /** 96 * Returns this queue's id 97 * 98 * Returns: the id 99 */ 100 public QueueKey id() @safe 101 { 102 return this._id; 103 } 104 105 private bool wouldAdmit(Item i) 106 { 107 // if no policy => true 108 // else, apply policy 109 bool s = this._ap is null ? true : this._ap(i); 110 DEBUG("Admit policy returned: ", s); 111 return s; 112 } 113 114 /** 115 * Places an item into this queue and 116 * wakes up one of the waiter(s) 117 * 118 * The item is only enqueued if 119 * there is an admittance policy 120 * associated with this queue, and 121 * if so, if it evaluates to `true`. 122 * 123 * Params: 124 * i = the item to attempt to enqueue 125 * Returns: `true` if enqueued, `false` 126 * otherwise 127 */ 128 public bool receive(Item i) 129 { 130 // lock, apply filter delegate (if any), insert (if so), unlock 131 this._l.lock(); 132 133 scope(exit) 134 { 135 this._l.unlock(); 136 } 137 138 if(!wouldAdmit(i)) 139 { 140 DEBUG("Admit policy denied: '", i, "'"); 141 return false; 142 } 143 144 // todo: filter here (Document: API for filters hold lock?) 145 this._q.insertAfter(this._q[], i); // todo: placement in queue? 146 DEBUG("calling notify()..."); 147 this._c.notify(); // wake up one waiter 148 149 DEBUG("post-notify"); 150 151 return true; 152 } 153 154 /** 155 * Blocks until an item is available 156 * for dequeuing. 157 * 158 * This is akin to calling `wait(Duration)` 159 * with `Duration.zero`. 160 * 161 * Returns: the item 162 */ 163 public Item wait() 164 { 165 auto res = wait(Duration.zero()); 166 //sanity: only way an error is if timed out 167 // but that should not be possible with 168 // a timeout of 0 169 assert(res.is_okay()); 170 return res.ok(); 171 } 172 173 /** 174 * Blocks up until the timeout for an 175 * item to become available for dequeuing. 176 * 177 * However, if the timeout is reached 178 * then an exception is returned. 179 * 180 * Params: 181 * timeout = the timeout 182 * 183 * Returns: a `Result` containing the 184 * the dequeued item or a `QixException` 185 * if the timeout was exceeded 186 */ 187 public Result!(Item, QixException) wait(Duration timeout) 188 { 189 this._l.lock(); 190 191 scope(exit) 192 { 193 this._l.unlock(); 194 } 195 196 // check if item already present 197 // then there is no need to wait 198 DEBUG("calling size()"); 199 bool early_return = size() > 0; 200 if(early_return) 201 { 202 DEBUG("early return"); 203 return ok!(Item, QixException)(pop()); 204 } 205 206 // then no timeout 207 if(timeout == Duration.zero) 208 { 209 DEBUG("wait()..."); 210 this._c.wait(); 211 DEBUG("wait()... [unblock]"); 212 } 213 // handle timeouts 214 else 215 { 216 DEBUG("wait(Duration)..."); 217 bool in_time = this._c.wait(timeout); // true if `notify()`'d before timeout 218 DEBUG("wait(Duration)... [unblock]"); 219 DEBUG("timed out?: ", !in_time); 220 221 if(!in_time) 222 { 223 return error!(QixException, Item)(new TimeoutException()); // todo: log time taken 224 } 225 } 226 227 // pop single item off 228 return ok!(Item, QixException)(pop()); 229 } 230 231 // mt: assumes lock held 232 private Item pop() 233 { 234 assert(size() > 0); 235 236 import std.range; 237 auto i = this._q.front(); // store 238 this._q.removeFront(); // remove 239 240 DEBUG("popped item: ", i); 241 return i; 242 } 243 244 /** 245 * Returns the number of items 246 * in the queue 247 * 248 * Returns: the count 249 */ 250 public size_t size() // TODO: Make safe justd ebug that is bad 251 { 252 this._l.lock(); 253 254 scope(exit) 255 { 256 this._l.unlock(); 257 } 258 259 DEBUG("dd: ", walkLength(this._q[])); 260 return walkLength(this._q[]); 261 } 262 263 /** 264 * Returns a string representation 265 * of this queue 266 * 267 * Returns: a string 268 */ 269 public string toString() 270 { 271 import std.string : format; 272 return format 273 ( 274 "Queue (qid: %d)", 275 this._id 276 ); 277 } 278 } 279 } 280 281 private version(unittest) 282 { 283 import core.thread : Thread, dur; 284 } 285 286 private version(unittest) 287 { 288 // custom item type 289 class Message 290 { 291 private string _m; 292 this(string m) 293 { 294 this._m = m; 295 } 296 297 public auto m() 298 { 299 return this._m; 300 } 301 } 302 } 303 304 unittest 305 { 306 // create a single queue 307 Queue!(Message) q = Queue!(Message)(1); 308 309 // now make a thread that awaits reception 310 // of message 311 class Waiter : Thread 312 { 313 private Queue!(Message)* _q; // queue to wait on 314 private Message _m; // (eventually) dequeued message 315 // todo: make above volatile 316 317 318 this(Queue!(Message)* q) 319 { 320 super(&run); 321 this._q = q; 322 } 323 324 private void run() 325 { 326 DEBUG("waiter about to call wait()..."); 327 Message i = this._q.wait(); 328 this._m = i; 329 } 330 331 public Message m() 332 { 333 return this._m; 334 } 335 } 336 Waiter wt = new Waiter(&q); 337 wt.start(); 338 339 // todo: do version with sleep here and version without 340 341 // Push a single message in 342 Message m = new Message("Hey there"); 343 q.receive(m); 344 345 // wait for thread to end and then grab the internal 346 // value for comparison 347 wt.join(); 348 DEBUG("Expected: ", m); 349 DEBUG("Thread got: ", wt.m()); 350 assert(m == wt.m()); 351 352 // wait with timeout and knowing nothing will 353 // be enqueued 354 auto res = q.wait(dur!("seconds")(1)); 355 assert(res.is_error()); 356 assert(cast(TimeoutException)res.error()); 357 } 358 359 // test admit policy 360 unittest 361 { 362 // admit policy that accepts 363 bool accept(Message m) { return true; } 364 AdmitPolicy!(Message) ap_a = &accept; 365 366 // create a single queue with it 367 Queue!(Message) q1 = Queue!(Message)(1, ap_a); 368 369 // should accept 370 assert(q1.receive(new Message("Hi"))); 371 372 // admit policy that rejects 373 bool reject(Message m) { return false; } 374 AdmitPolicy!(Message) ap_r = &reject; 375 376 // create a single queue with it 377 Queue!(Message) q2 = Queue!(Message)(1, ap_r); 378 379 // should reject 380 assert(!q2.receive(new Message("Hi"))); 381 }