1 /** 2 * Queue management 3 * 4 * Authors: Tristan Brice Velloza Kildaire (deavmi) 5 */ 6 module qix.manager; 7 8 import qix.queue; 9 import niknaks.functional : Result, ok, error; 10 import std.string : format; 11 import gogga.mixins; 12 13 /** 14 * Default max iterations when 15 * trying to find an unused queue 16 * id 17 */ 18 private enum NEWQUEUE_MAX_ITER = 1000; 19 20 import qix.exceptions; 21 22 /** 23 * An exception that occurs during 24 * usage of the `Manager` 25 */ 26 public final class ManagerException : QixException 27 { 28 private this(string m) 29 { 30 super(m); 31 } 32 } 33 34 /** 35 * A queue manager 36 */ 37 public template Manager(Item) 38 { 39 /** 40 * A queue manager 41 */ 42 public class Manager 43 { 44 import core.sync.mutex : Mutex; 45 46 private alias QueueType = Queue!(Item); 47 48 private QueueType[QueueKey] _q; 49 private Mutex _ql; // todo: readers-writers lock 50 51 private size_t _mi; 52 53 /** 54 * Construct a new queue manager 55 * 56 * Params: 57 * maxIter = maximum number of 58 * iterations allowed when searching 59 * for a free queue id 60 */ 61 this(size_t maxIter) 62 { 63 this._ql = new Mutex(); 64 this._mi = maxIter; 65 } 66 67 /** 68 * Constructs a new queue manager 69 */ 70 this() 71 { 72 this(NEWQUEUE_MAX_ITER); 73 } 74 75 /** 76 * Finds an unused queue id and then 77 * instantiates a queue with that id 78 * 79 * Returns: a `Result` that contains 80 * a pointer to the queue if a free 81 * id was found, otherwise `false` 82 * if no free id could be found or 83 * we reached the maximum number 84 * of allowed iterations for searching 85 */ 86 public Result!(QueueType*, string) newQueue() 87 { 88 this._ql.lock(); 89 90 scope(exit) 91 { 92 this._ql.unlock(); 93 } 94 95 // find a free queue id with the 96 // random startergy 97 Result!(QueueKey, string) qid_res = newQueue_randStrat(); 98 DEBUG("qid_res: ", qid_res); 99 100 if(qid_res.is_error()) 101 { 102 return error!(string, QueueType*)(qid_res.error()); 103 } 104 105 // create new queue and insert it 106 QueueKey qid = qid_res.ok(); 107 this._q[qid] = QueueType(qid); 108 QueueType* q = qid in this._q; 109 DEBUG("stored new queue: ", *q); 110 111 return ok!(QueueType*, string)(q); 112 } 113 114 // uses a random dice roll as a startergy 115 // for finding potentially free qids 116 // 117 // mt: assumes caller holds lock `this._ql` 118 private Result!(QueueKey, string) newQueue_randStrat() 119 { 120 import qix.utils : rand; 121 122 size_t c = 0; // iterations 123 QueueKey new_qid; 124 bool succ = false; 125 for(c = 0; c < NEWQUEUE_MAX_ITER; c++) 126 { 127 // try find free qid 128 new_qid = rand(); 129 if((new_qid in this._q) is null) 130 { 131 succ = true; 132 break; 133 } 134 } 135 DEBUG("iterations: ", c); 136 137 // ran out of iterations before we 138 // could fine free qid 139 if(!succ) 140 { 141 return error!(string, QueueKey) 142 ( 143 format 144 ( 145 "Reached NEWQUEUE_MAX_ITER of %d before finding free queue id", 146 NEWQUEUE_MAX_ITER 147 ) 148 ); 149 } 150 // found a free qid 151 else 152 { 153 return ok!(QueueKey, string)(new_qid); 154 } 155 } 156 157 /** 158 * Removes the provided queue from the manager 159 * 160 * Params: 161 * queue = a pointer to the queue 162 * Returns: `true` if the queue existed, `false` 163 * if not or if `null` was provided 164 */ 165 public bool removeQueue(QueueType* queue) 166 { 167 return queue is null ? false : removeQueue(queue.id()); 168 } 169 170 /** 171 * Removes the queue by the provided id 172 * from the manager 173 * 174 * Params: 175 * key = the queue's id 176 * Returns: `true` if the queue existed, `false` 177 * otherwise 178 */ 179 public bool removeQueue(QueueKey key) 180 { 181 this._ql.lock(); 182 183 scope(exit) 184 { 185 this._ql.unlock(); 186 } 187 188 QueueType* f = key in this._q; 189 190 if(f is null) 191 { 192 return false; 193 } 194 195 this._q.remove(key); 196 return true; 197 } 198 199 private QueueType* getQueue0(QueueKey id) 200 { 201 this._ql.lock(); 202 203 scope(exit) 204 { 205 this._ql.unlock(); 206 } 207 208 return id in this._q; 209 } 210 211 private Result!(QueueType*, QixException) getQueue(QueueKey id) 212 { 213 auto q = getQueue0(id); 214 if(q is null) 215 { 216 return error!(QixException, QueueType*) 217 ( 218 new ManagerException 219 ( 220 format 221 ( 222 "Could not find a queue with id %d", 223 id 224 ) 225 ) 226 ); 227 } 228 229 return ok!(QueueType*, QixException)(q); 230 } 231 232 // TODO: In future version let's add: 233 // 234 // 2. receive(QueueKey, T) 235 // 4. wait(QueueKey) 236 // 6. wait(QueueKey, Duration) 237 // 238 239 /** 240 * Pushes a new message into the queue, 241 * waking up one of the threads currently 242 * blocking to dequeue an item from it 243 * 244 * Params: 245 * id = the queue's id 246 * item = the item to enqueue 247 * Returns: a `Result` either containing 248 * a boolean flag about whether the admit 249 * policy allowed the enqueueing to occur 250 * or a `QixException` if the id does not 251 * refer to a queue registered with this 252 * manager 253 */ 254 public Result!(bool, QixException) receive(QueueKey id, Item item) 255 { 256 auto q_r = getQueue(id); 257 if(!q_r) 258 { 259 return error!(QixException, bool)(q_r.error()); 260 } 261 262 auto q = q_r.ok(); 263 return ok!(bool, QixException)(q.receive(item)); 264 } 265 266 /** 267 * Wait indefinately to dequeue an item 268 * from the queue given by the provided 269 * id 270 * 271 * Params: 272 * id = the queue's id 273 * Returns: a `Result` either containing 274 * the dequeued item or a `QixException` 275 * if the id does not refer to a queue 276 * registered with this manager 277 */ 278 public Result!(Item, QixException) wait(QueueKey id) 279 { 280 return wait(id, Duration.zero); 281 } 282 283 import std.datetime : Duration; 284 285 /** 286 * Wait up until a specified maximum 287 * amount of time to dequeue an item 288 * from the queue given by the provided 289 * id 290 * 291 * Params: 292 * id = the queue's id 293 * timeout = the maximum time to wait 294 * whilst blocking/waiting to dequeue 295 * an item from the queue 296 * Returns: a `Result` either containing 297 * the dequeued item or a `QixException` 298 * if the id does not refer to a queue 299 * registered with this manager or the 300 * timeout was exceeded 301 */ 302 public Result!(Item, QixException) wait(QueueKey id, Duration timeout) 303 { 304 auto q_r = getQueue(id); 305 if(!q_r) 306 { 307 return error!(QixException, Item)(q_r.error()); 308 } 309 310 auto q = q_r.ok(); 311 return q.wait(timeout); 312 } 313 } 314 } 315 316 unittest 317 { 318 // item type 319 struct Message 320 { 321 private string _t; 322 this(string t) 323 { 324 this._t = t; 325 } 326 327 public string t() 328 { 329 return this._t; 330 } 331 } 332 333 // queue manager for queues that hold messages 334 auto m = new Manager!(Message)(); 335 336 // no queues present 337 assert(m.removeQueue(0) == false); 338 assert(m.removeQueue(1) == false); 339 340 // create two new queues 341 Result!(Queue!(Message)*, string) q1_r = m.newQueue(); 342 Result!(Queue!(Message)*, string) q2_r = m.newQueue(); 343 344 assert(q1_r.is_okay()); 345 assert(q2_r.is_okay()); 346 auto q1 = q1_r.ok(); 347 auto q2 = q2_r.ok(); 348 349 // enqueue two messages, one per queue, then read them off 350 // 351 // we won't block as the messages are already arrived 352 Message m1_in = Message("First message"); 353 Message m2_in = Message("Second message"); 354 assert(m.receive(q1.id(), m1_in)); // (indirect usage via manager) should not be rejected 355 assert(q2.receive(m2_in)); // (direct usage via queue itself) should not be rejected 356 assert(q1.wait() == m1_in); // should be the same message we sent in 357 assert(q2.wait() == m2_in); // should be the same message we sent in 358 359 // remove queues 360 assert(m.removeQueue(q1)); // by QueueType* 361 assert(m.removeQueue(q2.id())); // by id 362 363 // handle nulls for queue removal 364 assert(m.removeQueue(null) == false); 365 366 // no queues present 367 assert(m.removeQueue(0) == false); 368 assert(m.removeQueue(1) == false); 369 }