1 /** 2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module my.actor.mbox; 7 8 import core.atomic : cas, atomicStore, MemoryOrder; 9 import core.sync.condition : Condition; 10 import core.sync.mutex : Mutex; 11 import core.thread : Thread; 12 import core.time : Duration, dur; 13 import logger = std.experimental.logger; 14 import std.array : appender, empty; 15 import std.datetime : SysTime, Clock; 16 import std.format : formattedWrite; 17 import std.range : isOutputRange; 18 import std.traits; 19 import std.typecons : Tuple; 20 import std.variant : Variant; 21 22 import my.actor : Aid; 23 24 @("shall retrieve the message from the mailbox") 25 unittest { 26 auto mbox = new MessageBox; 27 mbox.put(Message(42)); 28 Message m; 29 assert(mbox.get(10.dur!"msecs", m)); 30 assert(m != Message.init); 31 } 32 33 @("shall retrieve the message from the mailbox that has trigged because of the clock") 34 unittest { 35 const clock = Clock.currTime; 36 auto mbox = new DelayedMessageBox; 37 mbox.put(DelayedMessage(clock + 20.dur!"seconds", Message(42))); 38 mbox.put(DelayedMessage(clock + 5.dur!"seconds", Message(52))); 39 mbox.put(DelayedMessage(clock + 10.dur!"seconds", Message(62))); 40 Message m; 41 assert(mbox.get(10.dur!"msecs", clock + 6.dur!"seconds", m)); 42 assert(m != Message.init); 43 auto v = m.data.get!int; 44 assert(v == 52); 45 } 46 47 /** A MessageBox is a message queue for one actor. 48 * 49 * Other actors may send messages to this owner by calling put(), and the owner 50 * receives them by calling get(). The put() call is therefore effectively 51 * shared and the get() call is effectively local. `setMaxMsgs` may be used by 52 * any actor to limit the size of the message queue. 53 */ 54 class MessageBox { 55 /* TODO: make @safe after relevant druntime PR gets merged */ 56 this() @trusted nothrow { 57 m_lock = new Mutex; 58 m_closed = false; 59 60 m_putMsg = new Condition(m_lock); 61 m_notFull = new Condition(m_lock); 62 } 63 64 /// 65 final @property bool isClosed() @safe @nogc pure { 66 synchronized (m_lock) { 67 return m_closed; 68 } 69 } 70 71 /* 72 * Sets a limit on the maximum number of user messages allowed in the 73 * mailbox. If this limit is reached, the caller attempting to add a 74 * new message will execute `call`. If num is zero, there is no limit 75 * on the message queue. 76 * 77 * Params: 78 * num = The maximum size of the queue or zero if the queue is 79 * unbounded. 80 * call = The routine to call when the queue is full. 81 */ 82 final void setMaxMsgs(size_t num) @safe @nogc pure { 83 synchronized (m_lock) { 84 m_maxMsgs = num; 85 } 86 } 87 88 /* 89 * If maxMsgs is not set, the message is added to the queue and the 90 * owner is notified. If the queue is full, the message will still be 91 * accepted if it is a control message, otherwise onCrowdingDoThis is 92 * called. If the routine returns true, this call will block until 93 * the owner has made space available in the queue. If it returns 94 * false, this call will abort. 95 * 96 * Params: 97 * fromActor = the actor that is sending the message 98 * msg = The message to put in the queue. 99 * 100 * Returns: true if the message where successfully added to the mailbox. 101 * 102 * Throws: 103 * An exception if the queue is full and onCrowdingDoThis throws. 104 */ 105 final bool put(ref Message msg) { 106 synchronized (m_lock) { 107 if (m_closed) { 108 // TODO: Generate an error here if m_closed is true, or maybe 109 // put a message in the caller's queue? 110 return false; 111 } 112 113 // try only a limited number of times then give up. 114 for (int i = 0; i < 3; ++i) { 115 if (mboxFull) { 116 m_putQueue++; 117 m_notFull.wait(); 118 m_putQueue--; 119 } else { 120 m_sharedBox.put(msg); 121 m_putMsg.notify(); 122 return true; 123 } 124 } 125 126 return false; 127 } 128 } 129 130 /// ditto 131 final void put(Message msg) { 132 this.put(msg); 133 } 134 135 /** Try to pop a message from the mailbox. 136 * 137 * Params: 138 * timeout = max time to wait for a message to arrive. 139 * msg = the retrieved message is written here. 140 * 141 * Returns: 142 * true if a message was retrieved and false if not (such as if a 143 * timeout occurred). 144 */ 145 bool get(Duration timeout, ref Message msg) { 146 import core.time : MonoTime; 147 148 const limit = () { 149 if (timeout <= Duration.zero) 150 return MonoTime.currTime; 151 return MonoTime.currTime + timeout; 152 }(); 153 154 static bool tryMsg(ref ListT list, ref Message msg) { 155 if (list.empty) 156 return false; 157 auto range = list[]; 158 msg = range.front; 159 list.removeAt(range); 160 return true; 161 } 162 163 while (true) { 164 if (tryMsg(m_localBox, msg)) { 165 return true; 166 } 167 168 ListT arrived; 169 170 synchronized (m_lock) { 171 updateMsgCount(); 172 while (m_sharedBox.empty) { 173 // NOTE: We're notifying all waiters here instead of just 174 // a few because the onCrowding behavior may have 175 // changed and we don't want to block sender threads 176 // unnecessarily if the new behavior is not to block. 177 // This will admittedly result in spurious wakeups 178 // in other situations, but what can you do? 179 if (m_putQueue && !mboxFull()) { 180 m_notFull.notifyAll(); 181 } 182 if (timeout <= Duration.zero || !m_putMsg.wait(timeout)) { 183 return false; 184 } 185 } 186 arrived.put(m_sharedBox); 187 } 188 189 scope (exit) 190 m_localBox.put(arrived); 191 if (tryMsg(arrived, msg)) { 192 return true; 193 } else { 194 timeout = limit - MonoTime.currTime; 195 } 196 } 197 } 198 199 /* 200 * Called on thread termination. This routine processes any remaining 201 * control messages, clears out message queues, and sets a flag to 202 * reject any future messages. 203 */ 204 final void close() { 205 synchronized (m_lock) { 206 m_sharedBox.clear; 207 m_closed = true; 208 } 209 m_localBox.clear(); 210 } 211 212 private: 213 // Routines involving local data only, no lock needed. 214 215 bool mboxFull() @safe @nogc pure nothrow { 216 return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length; 217 } 218 219 void updateMsgCount() @safe @nogc pure nothrow { 220 m_localMsgs = m_localBox.length; 221 } 222 223 alias ListT = List!(Message); 224 225 ListT m_localBox; 226 227 Mutex m_lock; 228 Condition m_putMsg; 229 Condition m_notFull; 230 size_t m_putQueue; 231 ListT m_sharedBox; 232 size_t m_localMsgs; 233 size_t m_maxMsgs; 234 bool m_closed; 235 } 236 237 /** A MessageBox that keep messages sorted by the time they should be processed. 238 * 239 * Other actors may send messages to this owner by calling put(), and the owner 240 * receives them by calling get(). The put() call is therefore effectively 241 * shared and the get() call is effectively local. `setMaxMsgs` may be used by 242 * any actor to limit the size of the message queue. 243 */ 244 class DelayedMessageBox { 245 /* TODO: make @safe after relevant druntime PR gets merged */ 246 this() @trusted nothrow { 247 m_lock = new Mutex; 248 m_closed = false; 249 250 m_putMsg = new Condition(m_lock); 251 m_notFull = new Condition(m_lock); 252 } 253 254 /// 255 final @property bool isClosed() @safe @nogc pure { 256 synchronized (m_lock) { 257 return m_closed; 258 } 259 } 260 261 /* 262 * Sets a limit on the maximum number of user messages allowed in the 263 * mailbox. If this limit is reached, the caller attempting to add a 264 * new message will execute `call`. If num is zero, there is no limit 265 * on the message queue. 266 * 267 * Params: 268 * num = The maximum size of the queue or zero if the queue is 269 * unbounded. 270 * call = The routine to call when the queue is full. 271 */ 272 final void setMaxMsgs(size_t num) @safe @nogc pure { 273 synchronized (m_lock) { 274 m_maxMsgs = num; 275 } 276 } 277 278 /** Add a message that will trigger in the future. 279 * 280 * If maxMsgs is not set, the message is added to the queue and the 281 * owner is notified. If the queue is full, the message will still be 282 * accepted if it is a control message, otherwise onCrowdingDoThis is 283 * called. If the routine returns true, this call will block until 284 * the owner has made space available in the queue. If it returns 285 * false, this call will abort. 286 * 287 * Params: 288 * fromActor = the actor that is sending the message 289 * msg = The message to put in the queue. 290 * 291 * Returns: true if the message where successfully added to the mailbox. 292 * 293 * Throws: 294 * An exception if the queue is full and onCrowdingDoThis throws. 295 */ 296 final bool put(ref DelayedMessage msg) { 297 synchronized (m_lock) { 298 if (m_closed) { 299 // TODO: Generate an error here if m_closed is true, or maybe 300 // put a message in the caller's queue? 301 return false; 302 } 303 304 // try only a limited number of times then give up. 305 for (int i = 0; i < 3; ++i) { 306 if (mboxFull) { 307 m_putQueue++; 308 m_notFull.wait(); 309 m_putQueue--; 310 } else { 311 m_sharedBox.put(msg); 312 m_putMsg.notify(); 313 return true; 314 } 315 } 316 317 return false; 318 } 319 } 320 321 /// ditto 322 final void put(DelayedMessage msg) { 323 this.put(msg); 324 } 325 326 /** Try to pop a message from the mailbox. 327 * 328 * The messages aren 329 * 330 * Params: 331 * timeout = max time to wait for a message to arrive. 332 * clock = retrieve the first message that are delayed until this clock 333 * msg = the retrieved message is written here. 334 * 335 * Returns: 336 * true if a message was retrieved and false if not (such as if a 337 * timeout occurred). 338 */ 339 bool get(Duration timeout, const SysTime clock, ref Message msg) { 340 import core.time : MonoTime; 341 342 // Move the front message if it has longer until it triggers than the 343 // message after it. This is so messages "far" in the future are at the 344 // end of the list "eventually". 345 static void shiftOldToBack(ref ListT list, const SysTime clock) { 346 if (list.length < 3) 347 return; 348 auto r0 = list[]; 349 // the next message will trigger a timeout thus do nothing. 350 if (r0.front.delayUntil < clock) 351 return; 352 353 auto r1 = r0; 354 r1.popFront; 355 if (r0.front.delayUntil - clock < r1.front.delayUntil - clock) 356 return; 357 358 auto msg = r0.front; 359 list.removeAt(r0); 360 list.put(msg); 361 } 362 363 static bool tryScanForMsg(SysTime clock, ref ListT list, ref Message msg) { 364 for (auto range = list[]; !range.empty;) { 365 if (range.front.delayUntil < clock) { 366 msg = range.front.value; 367 list.removeAt(range); 368 return true; 369 } 370 range.popFront(); 371 } 372 return false; 373 } 374 375 // max time to wait for a message. 376 const limit = () { 377 if (timeout <= Duration.zero) 378 return MonoTime.currTime; 379 return MonoTime.currTime + timeout; 380 }(); 381 382 while (true) { 383 if (tryScanForMsg(clock, m_localBox, msg)) { 384 return true; 385 } 386 387 ListT arrived; 388 389 synchronized (m_lock) { 390 updateMsgCount(); 391 while (m_sharedBox.empty) { 392 // NOTE: We're notifying all waiters here instead of just 393 // a few because the onCrowding behavior may have 394 // changed and we don't want to block sender threads 395 // unnecessarily if the new behavior is not to block. 396 // This will admittedly result in spurious wakeups 397 // in other situations, but what can you do? 398 if (m_putQueue && !mboxFull()) { 399 m_notFull.notifyAll(); 400 } 401 if (timeout <= Duration.zero || !m_putMsg.wait(timeout)) { 402 return false; 403 } 404 } 405 arrived.put(m_sharedBox); 406 } 407 408 scope (exit) 409 m_localBox.put(arrived); 410 if (tryScanForMsg(clock, arrived, msg)) { 411 return true; 412 } else { 413 timeout = limit - MonoTime.currTime; 414 shiftOldToBack(m_localBox, clock); 415 } 416 } 417 } 418 419 /* 420 * Called on thread termination. This routine processes any remaining 421 * control messages, clears out message queues, and sets a flag to 422 * reject any future messages. 423 */ 424 final void close() { 425 synchronized (m_lock) { 426 m_sharedBox.clear; 427 m_closed = true; 428 } 429 m_localBox.clear(); 430 } 431 432 private: 433 // Routines involving local data only, no lock needed. 434 435 bool mboxFull() @safe @nogc pure nothrow { 436 return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length; 437 } 438 439 void updateMsgCount() @safe @nogc pure nothrow { 440 m_localMsgs = m_localBox.length; 441 } 442 443 alias ListT = List!(DelayedMessage); 444 445 ListT m_localBox; 446 447 Mutex m_lock; 448 Condition m_putMsg; 449 Condition m_notFull; 450 size_t m_putQueue; 451 ListT m_sharedBox; 452 size_t m_localMsgs; 453 size_t m_maxMsgs; 454 bool m_closed; 455 } 456 457 enum MsgType { 458 normal, 459 priority, 460 delayed, 461 system, 462 } 463 464 struct Message { 465 Variant data; 466 467 this(T...)(T vals) if (T.length > 0 && !is(T[0] == MsgType)) { 468 static if (T.length == 1) { 469 data = vals[0]; 470 } else { 471 data = Tuple!(T)(vals); 472 } 473 } 474 475 string toString() @safe const { 476 auto buf = appender!string; 477 toString(buf); 478 return buf.data; 479 } 480 481 void toString(Writer)(ref Writer w) const if (isOutputRange!(Writer, char)) { 482 formattedWrite(w, "Message(%s)", data.type); 483 } 484 485 @property auto convertsTo(T...)() { 486 static if (T.length == 1) { 487 return is(T[0] == Variant) || data.convertsTo!(T); 488 } else { 489 import std.typecons : Tuple; 490 491 return data.convertsTo!(Tuple!(T)); 492 } 493 } 494 495 @property auto get(T...)() { 496 static if (T.length == 1) { 497 static if (is(T[0] == Variant)) 498 return data; 499 else 500 return data.get!(T); 501 } else { 502 import std.typecons : Tuple; 503 504 return data.get!(Tuple!(T)); 505 } 506 } 507 508 auto map(alias Op)() { 509 alias OpArgs = Parameters!Op; 510 511 static if (OpArgs.length == 1) { 512 static if (is(OpArgs[0] == Variant)) { 513 return Op(data); 514 } else { 515 return Op(data.get!(OpArgs)); 516 } 517 } else { 518 import std.typecons : Tuple; 519 520 return Op(data.get!(Tuple!(OpArgs)).expand); 521 } 522 } 523 524 auto map(alias Op, StateT)(ref StateT state) { 525 alias OpArgs = Parameters!Op[1 .. $]; 526 527 static if (OpArgs.length == 1) { 528 static if (is(OpArgs[0] == Variant)) { 529 return Op(state, data); 530 } else { 531 return Op(state, data.get!(OpArgs)); 532 } 533 } else { 534 import std.typecons : Tuple; 535 536 return Op(state, data.get!(Tuple!(OpArgs)).expand); 537 } 538 } 539 } 540 541 struct List(T) { 542 struct Range { 543 import std.exception : enforce; 544 545 @property bool empty() const { 546 return !m_prev.next; 547 } 548 549 @property ref T front() { 550 enforce(m_prev.next, "invalid list node"); 551 return m_prev.next.val; 552 } 553 554 @property void front(T val) { 555 enforce(m_prev.next, "invalid list node"); 556 m_prev.next.val = val; 557 } 558 559 void popFront() { 560 enforce(m_prev.next, "invalid list node"); 561 m_prev = m_prev.next; 562 } 563 564 private this(Node* p) { 565 m_prev = p; 566 } 567 568 private Node* m_prev; 569 } 570 571 void put(T val) { 572 put(newNode(val)); 573 } 574 575 void put(ref List!(T) rhs) { 576 if (!rhs.empty) { 577 put(rhs.m_first); 578 while (m_last.next !is null) { 579 m_last = m_last.next; 580 m_count++; 581 } 582 rhs.m_first = null; 583 rhs.m_last = null; 584 rhs.m_count = 0; 585 } 586 } 587 588 Range opSlice() { 589 return Range(cast(Node*)&m_first); 590 } 591 592 void removeAt(Range r) { 593 import std.exception : enforce; 594 595 assert(m_count, "Can not remove from empty Range"); 596 Node* n = r.m_prev; 597 enforce(n && n.next, "attempting to remove invalid list node"); 598 599 if (m_last is m_first) 600 m_last = null; 601 else if (m_last is n.next) 602 m_last = n; // nocoverage 603 Node* to_free = n.next; 604 n.next = n.next.next; 605 freeNode(to_free); 606 m_count--; 607 } 608 609 @property size_t length() { 610 return m_count; 611 } 612 613 void clear() { 614 m_first = m_last = null; 615 m_count = 0; 616 } 617 618 @property bool empty() { 619 return m_first is null; 620 } 621 622 private: 623 struct Node { 624 Node* next; 625 T val; 626 627 this(T v) { 628 val = v; 629 } 630 } 631 632 static shared struct SpinLock { 633 void lock() { 634 while (!cas(&locked, false, true)) { 635 Thread.yield(); 636 } 637 } 638 639 void unlock() { 640 atomicStore!(MemoryOrder.rel)(locked, false); 641 } 642 643 bool locked; 644 } 645 646 static shared SpinLock sm_lock; 647 static shared Node* sm_head; 648 649 Node* newNode(T v) { 650 Node* n; 651 { 652 sm_lock.lock(); 653 scope (exit) 654 sm_lock.unlock(); 655 656 if (sm_head) { 657 n = cast(Node*) sm_head; 658 sm_head = sm_head.next; 659 } 660 } 661 if (n) { 662 import std.conv : emplace; 663 664 emplace!Node(n, v); 665 } else { 666 n = new Node(v); 667 } 668 return n; 669 } 670 671 void freeNode(Node* n) { 672 // destroy val to free any owned GC memory 673 destroy(n.val); 674 675 sm_lock.lock(); 676 scope (exit) 677 sm_lock.unlock(); 678 679 auto sn = cast(shared(Node)*) n; 680 sn.next = sm_head; 681 sm_head = sn; 682 } 683 684 void put(Node* n) { 685 m_count++; 686 if (!empty) { 687 m_last.next = n; 688 m_last = n; 689 return; 690 } 691 m_first = n; 692 m_last = n; 693 } 694 695 Node* m_first; 696 Node* m_last; 697 size_t m_count; 698 } 699 700 struct DelayedMessage { 701 SysTime delayUntil; 702 Message value; 703 }