1 /**
2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module my.actor.mbox;
7 
8 import core.atomic : cas, atomicStore, MemoryOrder;
9 import core.sync.condition : Condition;
10 import core.sync.mutex : Mutex;
11 import core.thread : Thread;
12 import core.time : Duration, dur;
13 import logger = std.experimental.logger;
14 import std.array : appender, empty;
15 import std.datetime : SysTime, Clock;
16 import std.format : formattedWrite;
17 import std.range : isOutputRange;
18 import std.traits;
19 import std.typecons : Tuple;
20 import std.variant : Variant;
21 
22 import my.actor : Aid;
23 
24 @("shall retrieve the message from the mailbox")
25 unittest {
26     auto mbox = new MessageBox;
27     mbox.put(Message(42));
28     Message m;
29     assert(mbox.get(10.dur!"msecs", m));
30     assert(m != Message.init);
31 }
32 
33 @("shall retrieve the message from the mailbox that has trigged because of the clock")
34 unittest {
35     const clock = Clock.currTime;
36     auto mbox = new DelayedMessageBox;
37     mbox.put(DelayedMessage(clock + 20.dur!"seconds", Message(42)));
38     mbox.put(DelayedMessage(clock + 5.dur!"seconds", Message(52)));
39     mbox.put(DelayedMessage(clock + 10.dur!"seconds", Message(62)));
40     Message m;
41     assert(mbox.get(10.dur!"msecs", clock + 6.dur!"seconds", m));
42     assert(m != Message.init);
43     auto v = m.data.get!int;
44     assert(v == 52);
45 }
46 
47 /** A MessageBox is a message queue for one actor.
48  *
49  * Other actors may send messages to this owner by calling put(), and the owner
50  * receives them by calling get().  The put() call is therefore effectively
51  * shared and the get() call is effectively local. `setMaxMsgs` may be used by
52  * any actor to limit the size of the message queue.
53  */
54 class MessageBox {
55     /* TODO: make @safe after relevant druntime PR gets merged */
56     this() @trusted nothrow {
57         m_lock = new Mutex;
58         m_closed = false;
59 
60         m_putMsg = new Condition(m_lock);
61         m_notFull = new Condition(m_lock);
62     }
63 
64     ///
65     final @property bool isClosed() @safe @nogc pure {
66         synchronized (m_lock) {
67             return m_closed;
68         }
69     }
70 
71     /*
72      * Sets a limit on the maximum number of user messages allowed in the
73      * mailbox.  If this limit is reached, the caller attempting to add a
74      * new message will execute `call`.  If num is zero, there is no limit
75      * on the message queue.
76      *
77      * Params:
78      *  num  = The maximum size of the queue or zero if the queue is
79      *         unbounded.
80      *  call = The routine to call when the queue is full.
81      */
82     final void setMaxMsgs(size_t num) @safe @nogc pure {
83         synchronized (m_lock) {
84             m_maxMsgs = num;
85         }
86     }
87 
88     /*
89      * If maxMsgs is not set, the message is added to the queue and the
90      * owner is notified.  If the queue is full, the message will still be
91      * accepted if it is a control message, otherwise onCrowdingDoThis is
92      * called.  If the routine returns true, this call will block until
93      * the owner has made space available in the queue.  If it returns
94      * false, this call will abort.
95      *
96      * Params:
97      *  fromActor = the actor that is sending the message
98      *  msg = The message to put in the queue.
99      *
100      *  Returns: true if the message where successfully added to the mailbox.
101      *
102      * Throws:
103      *  An exception if the queue is full and onCrowdingDoThis throws.
104      */
105     final bool put(ref Message msg) {
106         synchronized (m_lock) {
107             if (m_closed) {
108                 // TODO: Generate an error here if m_closed is true, or maybe
109                 //       put a message in the caller's queue?
110                 return false;
111             }
112 
113             // try only a limited number of times then give up.
114             for (int i = 0; i < 3; ++i) {
115                 if (mboxFull) {
116                     m_putQueue++;
117                     m_notFull.wait();
118                     m_putQueue--;
119                 } else {
120                     m_sharedBox.put(msg);
121                     m_putMsg.notify();
122                     return true;
123                 }
124             }
125 
126             return false;
127         }
128     }
129 
130     /// ditto
131     final void put(Message msg) {
132         this.put(msg);
133     }
134 
135     /** Try to pop a message from the mailbox.
136      *
137      * Params:
138      *  timeout = max time to wait for a message to arrive.
139      *  msg = the retrieved message is written here.
140      *
141      * Returns:
142      *  true if a message was retrieved and false if not (such as if a
143      *  timeout occurred).
144      */
145     bool get(Duration timeout, ref Message msg) {
146         import core.time : MonoTime;
147 
148         const limit = () {
149             if (timeout <= Duration.zero)
150                 return MonoTime.currTime;
151             return MonoTime.currTime + timeout;
152         }();
153 
154         static bool tryMsg(ref ListT list, ref Message msg) {
155             if (list.empty)
156                 return false;
157             auto range = list[];
158             msg = range.front;
159             list.removeAt(range);
160             return true;
161         }
162 
163         while (true) {
164             if (tryMsg(m_localBox, msg)) {
165                 return true;
166             }
167 
168             ListT arrived;
169 
170             synchronized (m_lock) {
171                 updateMsgCount();
172                 while (m_sharedBox.empty) {
173                     // NOTE: We're notifying all waiters here instead of just
174                     //       a few because the onCrowding behavior may have
175                     //       changed and we don't want to block sender threads
176                     //       unnecessarily if the new behavior is not to block.
177                     //       This will admittedly result in spurious wakeups
178                     //       in other situations, but what can you do?
179                     if (m_putQueue && !mboxFull()) {
180                         m_notFull.notifyAll();
181                     }
182                     if (timeout <= Duration.zero || !m_putMsg.wait(timeout)) {
183                         return false;
184                     }
185                 }
186                 arrived.put(m_sharedBox);
187             }
188 
189             scope (exit)
190                 m_localBox.put(arrived);
191             if (tryMsg(arrived, msg)) {
192                 return true;
193             } else {
194                 timeout = limit - MonoTime.currTime;
195             }
196         }
197     }
198 
199     /*
200      * Called on thread termination.  This routine processes any remaining
201      * control messages, clears out message queues, and sets a flag to
202      * reject any future messages.
203      */
204     final void close() {
205         synchronized (m_lock) {
206             m_sharedBox.clear;
207             m_closed = true;
208         }
209         m_localBox.clear();
210     }
211 
212 private:
213     // Routines involving local data only, no lock needed.
214 
215     bool mboxFull() @safe @nogc pure nothrow {
216         return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length;
217     }
218 
219     void updateMsgCount() @safe @nogc pure nothrow {
220         m_localMsgs = m_localBox.length;
221     }
222 
223     alias ListT = List!(Message);
224 
225     ListT m_localBox;
226 
227     Mutex m_lock;
228     Condition m_putMsg;
229     Condition m_notFull;
230     size_t m_putQueue;
231     ListT m_sharedBox;
232     size_t m_localMsgs;
233     size_t m_maxMsgs;
234     bool m_closed;
235 }
236 
237 /** A MessageBox that keep messages sorted by the time they should be processed.
238  *
239  * Other actors may send messages to this owner by calling put(), and the owner
240  * receives them by calling get().  The put() call is therefore effectively
241  * shared and the get() call is effectively local. `setMaxMsgs` may be used by
242  * any actor to limit the size of the message queue.
243  */
244 class DelayedMessageBox {
245     /* TODO: make @safe after relevant druntime PR gets merged */
246     this() @trusted nothrow {
247         m_lock = new Mutex;
248         m_closed = false;
249 
250         m_putMsg = new Condition(m_lock);
251         m_notFull = new Condition(m_lock);
252     }
253 
254     ///
255     final @property bool isClosed() @safe @nogc pure {
256         synchronized (m_lock) {
257             return m_closed;
258         }
259     }
260 
261     /*
262      * Sets a limit on the maximum number of user messages allowed in the
263      * mailbox.  If this limit is reached, the caller attempting to add a
264      * new message will execute `call`.  If num is zero, there is no limit
265      * on the message queue.
266      *
267      * Params:
268      *  num  = The maximum size of the queue or zero if the queue is
269      *         unbounded.
270      *  call = The routine to call when the queue is full.
271      */
272     final void setMaxMsgs(size_t num) @safe @nogc pure {
273         synchronized (m_lock) {
274             m_maxMsgs = num;
275         }
276     }
277 
278     /** Add a message that will trigger in the future.
279      *
280      * If maxMsgs is not set, the message is added to the queue and the
281      * owner is notified.  If the queue is full, the message will still be
282      * accepted if it is a control message, otherwise onCrowdingDoThis is
283      * called.  If the routine returns true, this call will block until
284      * the owner has made space available in the queue.  If it returns
285      * false, this call will abort.
286      *
287      * Params:
288      *  fromActor = the actor that is sending the message
289      *  msg = The message to put in the queue.
290      *
291      *  Returns: true if the message where successfully added to the mailbox.
292      *
293      * Throws:
294      *  An exception if the queue is full and onCrowdingDoThis throws.
295      */
296     final bool put(ref DelayedMessage msg) {
297         synchronized (m_lock) {
298             if (m_closed) {
299                 // TODO: Generate an error here if m_closed is true, or maybe
300                 //       put a message in the caller's queue?
301                 return false;
302             }
303 
304             // try only a limited number of times then give up.
305             for (int i = 0; i < 3; ++i) {
306                 if (mboxFull) {
307                     m_putQueue++;
308                     m_notFull.wait();
309                     m_putQueue--;
310                 } else {
311                     m_sharedBox.put(msg);
312                     m_putMsg.notify();
313                     return true;
314                 }
315             }
316 
317             return false;
318         }
319     }
320 
321     /// ditto
322     final void put(DelayedMessage msg) {
323         this.put(msg);
324     }
325 
326     /** Try to pop a message from the mailbox.
327      *
328      * The messages aren
329      *
330      * Params:
331      *  timeout = max time to wait for a message to arrive.
332      *  clock = retrieve the first message that are delayed until this clock
333      *  msg = the retrieved message is written here.
334      *
335      * Returns:
336      *  true if a message was retrieved and false if not (such as if a
337      *  timeout occurred).
338      */
339     bool get(Duration timeout, const SysTime clock, ref Message msg) {
340         import core.time : MonoTime;
341 
342         // Move the front message if it has longer until it triggers than the
343         // message after it. This is so messages "far" in the future are at the
344         // end of the list "eventually".
345         static void shiftOldToBack(ref ListT list, const SysTime clock) {
346             if (list.length < 3)
347                 return;
348             auto r0 = list[];
349             // the next message will trigger a timeout thus do nothing.
350             if (r0.front.delayUntil < clock)
351                 return;
352 
353             auto r1 = r0;
354             r1.popFront;
355             if (r0.front.delayUntil - clock < r1.front.delayUntil - clock)
356                 return;
357 
358             auto msg = r0.front;
359             list.removeAt(r0);
360             list.put(msg);
361         }
362 
363         static bool tryScanForMsg(SysTime clock, ref ListT list, ref Message msg) {
364             for (auto range = list[]; !range.empty;) {
365                 if (range.front.delayUntil < clock) {
366                     msg = range.front.value;
367                     list.removeAt(range);
368                     return true;
369                 }
370                 range.popFront();
371             }
372             return false;
373         }
374 
375         // max time to wait for a message.
376         const limit = () {
377             if (timeout <= Duration.zero)
378                 return MonoTime.currTime;
379             return MonoTime.currTime + timeout;
380         }();
381 
382         while (true) {
383             if (tryScanForMsg(clock, m_localBox, msg)) {
384                 return true;
385             }
386 
387             ListT arrived;
388 
389             synchronized (m_lock) {
390                 updateMsgCount();
391                 while (m_sharedBox.empty) {
392                     // NOTE: We're notifying all waiters here instead of just
393                     //       a few because the onCrowding behavior may have
394                     //       changed and we don't want to block sender threads
395                     //       unnecessarily if the new behavior is not to block.
396                     //       This will admittedly result in spurious wakeups
397                     //       in other situations, but what can you do?
398                     if (m_putQueue && !mboxFull()) {
399                         m_notFull.notifyAll();
400                     }
401                     if (timeout <= Duration.zero || !m_putMsg.wait(timeout)) {
402                         return false;
403                     }
404                 }
405                 arrived.put(m_sharedBox);
406             }
407 
408             scope (exit)
409                 m_localBox.put(arrived);
410             if (tryScanForMsg(clock, arrived, msg)) {
411                 return true;
412             } else {
413                 timeout = limit - MonoTime.currTime;
414                 shiftOldToBack(m_localBox, clock);
415             }
416         }
417     }
418 
419     /*
420      * Called on thread termination.  This routine processes any remaining
421      * control messages, clears out message queues, and sets a flag to
422      * reject any future messages.
423      */
424     final void close() {
425         synchronized (m_lock) {
426             m_sharedBox.clear;
427             m_closed = true;
428         }
429         m_localBox.clear();
430     }
431 
432 private:
433     // Routines involving local data only, no lock needed.
434 
435     bool mboxFull() @safe @nogc pure nothrow {
436         return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length;
437     }
438 
439     void updateMsgCount() @safe @nogc pure nothrow {
440         m_localMsgs = m_localBox.length;
441     }
442 
443     alias ListT = List!(DelayedMessage);
444 
445     ListT m_localBox;
446 
447     Mutex m_lock;
448     Condition m_putMsg;
449     Condition m_notFull;
450     size_t m_putQueue;
451     ListT m_sharedBox;
452     size_t m_localMsgs;
453     size_t m_maxMsgs;
454     bool m_closed;
455 }
456 
457 enum MsgType {
458     normal,
459     priority,
460     delayed,
461     system,
462 }
463 
464 struct Message {
465     Variant data;
466 
467     this(T...)(T vals) if (T.length > 0 && !is(T[0] == MsgType)) {
468         static if (T.length == 1) {
469             data = vals[0];
470         } else {
471             data = Tuple!(T)(vals);
472         }
473     }
474 
475     string toString() @safe const {
476         auto buf = appender!string;
477         toString(buf);
478         return buf.data;
479     }
480 
481     void toString(Writer)(ref Writer w) const if (isOutputRange!(Writer, char)) {
482         formattedWrite(w, "Message(%s)", data.type);
483     }
484 
485     @property auto convertsTo(T...)() {
486         static if (T.length == 1) {
487             return is(T[0] == Variant) || data.convertsTo!(T);
488         } else {
489             import std.typecons : Tuple;
490 
491             return data.convertsTo!(Tuple!(T));
492         }
493     }
494 
495     @property auto get(T...)() {
496         static if (T.length == 1) {
497             static if (is(T[0] == Variant))
498                 return data;
499             else
500                 return data.get!(T);
501         } else {
502             import std.typecons : Tuple;
503 
504             return data.get!(Tuple!(T));
505         }
506     }
507 
508     auto map(alias Op)() {
509         alias OpArgs = Parameters!Op;
510 
511         static if (OpArgs.length == 1) {
512             static if (is(OpArgs[0] == Variant)) {
513                 return Op(data);
514             } else {
515                 return Op(data.get!(OpArgs));
516             }
517         } else {
518             import std.typecons : Tuple;
519 
520             return Op(data.get!(Tuple!(OpArgs)).expand);
521         }
522     }
523 
524     auto map(alias Op, StateT)(ref StateT state) {
525         alias OpArgs = Parameters!Op[1 .. $];
526 
527         static if (OpArgs.length == 1) {
528             static if (is(OpArgs[0] == Variant)) {
529                 return Op(state, data);
530             } else {
531                 return Op(state, data.get!(OpArgs));
532             }
533         } else {
534             import std.typecons : Tuple;
535 
536             return Op(state, data.get!(Tuple!(OpArgs)).expand);
537         }
538     }
539 }
540 
541 struct List(T) {
542     struct Range {
543         import std.exception : enforce;
544 
545         @property bool empty() const {
546             return !m_prev.next;
547         }
548 
549         @property ref T front() {
550             enforce(m_prev.next, "invalid list node");
551             return m_prev.next.val;
552         }
553 
554         @property void front(T val) {
555             enforce(m_prev.next, "invalid list node");
556             m_prev.next.val = val;
557         }
558 
559         void popFront() {
560             enforce(m_prev.next, "invalid list node");
561             m_prev = m_prev.next;
562         }
563 
564         private this(Node* p) {
565             m_prev = p;
566         }
567 
568         private Node* m_prev;
569     }
570 
571     void put(T val) {
572         put(newNode(val));
573     }
574 
575     void put(ref List!(T) rhs) {
576         if (!rhs.empty) {
577             put(rhs.m_first);
578             while (m_last.next !is null) {
579                 m_last = m_last.next;
580                 m_count++;
581             }
582             rhs.m_first = null;
583             rhs.m_last = null;
584             rhs.m_count = 0;
585         }
586     }
587 
588     Range opSlice() {
589         return Range(cast(Node*)&m_first);
590     }
591 
592     void removeAt(Range r) {
593         import std.exception : enforce;
594 
595         assert(m_count, "Can not remove from empty Range");
596         Node* n = r.m_prev;
597         enforce(n && n.next, "attempting to remove invalid list node");
598 
599         if (m_last is m_first)
600             m_last = null;
601         else if (m_last is n.next)
602             m_last = n; // nocoverage
603         Node* to_free = n.next;
604         n.next = n.next.next;
605         freeNode(to_free);
606         m_count--;
607     }
608 
609     @property size_t length() {
610         return m_count;
611     }
612 
613     void clear() {
614         m_first = m_last = null;
615         m_count = 0;
616     }
617 
618     @property bool empty() {
619         return m_first is null;
620     }
621 
622 private:
623     struct Node {
624         Node* next;
625         T val;
626 
627         this(T v) {
628             val = v;
629         }
630     }
631 
632     static shared struct SpinLock {
633         void lock() {
634             while (!cas(&locked, false, true)) {
635                 Thread.yield();
636             }
637         }
638 
639         void unlock() {
640             atomicStore!(MemoryOrder.rel)(locked, false);
641         }
642 
643         bool locked;
644     }
645 
646     static shared SpinLock sm_lock;
647     static shared Node* sm_head;
648 
649     Node* newNode(T v) {
650         Node* n;
651         {
652             sm_lock.lock();
653             scope (exit)
654                 sm_lock.unlock();
655 
656             if (sm_head) {
657                 n = cast(Node*) sm_head;
658                 sm_head = sm_head.next;
659             }
660         }
661         if (n) {
662             import std.conv : emplace;
663 
664             emplace!Node(n, v);
665         } else {
666             n = new Node(v);
667         }
668         return n;
669     }
670 
671     void freeNode(Node* n) {
672         // destroy val to free any owned GC memory
673         destroy(n.val);
674 
675         sm_lock.lock();
676         scope (exit)
677             sm_lock.unlock();
678 
679         auto sn = cast(shared(Node)*) n;
680         sn.next = sm_head;
681         sm_head = sn;
682     }
683 
684     void put(Node* n) {
685         m_count++;
686         if (!empty) {
687             m_last.next = n;
688             m_last = n;
689             return;
690         }
691         m_first = n;
692         m_last = n;
693     }
694 
695     Node* m_first;
696     Node* m_last;
697     size_t m_count;
698 }
699 
700 struct DelayedMessage {
701     SysTime delayUntil;
702     Message value;
703 }