1 /**
2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module my.actor.actor;
7 
8 import std.stdio : writeln, writefln;
9 
10 import core.thread : Thread;
11 import logger = std.experimental.logger;
12 import std.algorithm : schwartzSort, max, min, among;
13 import std.array : empty;
14 import std.datetime : SysTime, Clock, dur;
15 import std.exception : collectException;
16 import std.functional : toDelegate;
17 import std.meta : staticMap;
18 import std.traits : Parameters, Unqual, ReturnType, isFunctionPointer, isFunction;
19 import std.typecons : Tuple, tuple;
20 import std.variant : Variant;
21 
22 import my.actor.common : ExitReason, SystemError, makeSignature;
23 import my.actor.mailbox;
24 import my.actor.msg;
25 import my.actor.system : System;
26 import my.actor.typed : isTypedAddress, isTypedActorImpl;
27 import my.gc.refc;
28 import sumtype;
29 
30 private struct PromiseData {
31     WeakAddress replyTo;
32     ulong replyId;
33 
34     /// Copy constructor
35     this(ref return scope typeof(this) rhs) @safe nothrow @nogc {
36         replyTo = rhs.replyTo;
37         replyId = rhs.replyId;
38     }
39 
40     @disable this(this);
41 }
42 
43 // deliver can only be called one time.
44 struct Promise(T) {
45     package {
46         RefCounted!PromiseData data;
47     }
48 
49     void deliver(T reply) {
50         auto tmp = reply;
51         deliver(reply);
52     }
53 
54     /** Deliver the message `reply`.
55      *
56      * A promise can only be delivered once.
57      */
58     void deliver(ref T reply) @trusted
59     in (!data.empty, "promise must be initialized") {
60         if (data.empty)
61             return;
62         scope (exit)
63             data.release;
64 
65         // TODO: should probably call delivering actor with an ErrorMsg if replyTo is closed.
66         if (auto replyTo = data.get.replyTo.lock.get) {
67             enum wrapInTuple = !is(T : Tuple!U, U);
68             static if (wrapInTuple)
69                 replyTo.put(Reply(data.get.replyId, Variant(tuple(reply))));
70             else
71                 replyTo.put(Reply(data.get.replyId, Variant(reply)));
72         }
73     }
74 
75     void opAssign(Promise!T rhs) {
76         data = rhs.data;
77     }
78 
79     /// True if the promise is not initialized.
80     bool empty() {
81         return data.empty || data.get.replyId == 0;
82     }
83 
84     /// Clear the promise.
85     void clear() {
86         data.release;
87     }
88 }
89 
90 auto makePromise(T)() {
91     return Promise!T(refCounted(PromiseData.init));
92 }
93 
94 struct RequestResult(T) {
95     this(T v) {
96         value = typeof(value)(v);
97     }
98 
99     this(ErrorMsg v) {
100         value = typeof(value)(v);
101     }
102 
103     this(Promise!T v) {
104         value = typeof(value)(v);
105     }
106 
107     SumType!(T, ErrorMsg, Promise!T) value;
108 }
109 
110 private alias MsgHandler = void delegate(void* ctx, ref Variant msg) @safe;
111 private alias RequestHandler = void delegate(void* ctx, ref Variant msg,
112         ulong replyId, WeakAddress replyTo) @safe;
113 private alias ReplyHandler = void delegate(void* ctx, ref Variant msg) @safe;
114 
115 alias DefaultHandler = void delegate(ref Actor self, ref Variant msg) @safe nothrow;
116 
117 /** Actors send error messages to others by returning an error (see Errors)
118  * from a message handler. Similar to exit messages, error messages usually
119  * cause the receiving actor to terminate, unless a custom handler was
120  * installed. The default handler is used as fallback if request is used
121  * without error handler.
122  */
123 alias ErrorHandler = void delegate(ref Actor self, ErrorMsg) @safe nothrow;
124 
125 /** Bidirectional monitoring with a strong lifetime coupling is established by
126  * calling a `LinkRequest` to an address. This will cause the runtime to send
127  * an `ExitMsg` if either this or other dies. Per default, actors terminate
128  * after receiving an `ExitMsg` unless the exit reason is exit_reason::normal.
129  * This mechanism propagates failure states in an actor system. Linked actors
130  * form a sub system in which an error causes all actors to fail collectively.
131  */
132 alias ExitHandler = void delegate(ref Actor self, ExitMsg msg) @safe nothrow;
133 
134 /// An exception has been thrown while processing a message.
135 alias ExceptionHandler = void delegate(ref Actor self, Exception e) @safe nothrow;
136 
137 /** Actors can monitor the lifetime of other actors by sending a `MonitorRequest`
138  * to an address. This will cause the runtime system to send a `DownMsg` for
139  * other if it dies.
140  *
141  * Actors drop down messages unless they provide a custom handler.
142  */
143 alias DownHandler = void delegate(ref Actor self, DownMsg msg) @safe nothrow;
144 
145 void defaultHandler(ref Actor self, ref Variant msg) @safe nothrow {
146 }
147 
148 /// Write the name of the actor and the message type to the console.
149 void logAndDropHandler(ref Actor self, ref Variant msg) @trusted nothrow {
150     import std.stdio : writeln;
151 
152     try {
153         writeln("UNKNOWN message sent to actor ", self.name);
154         writeln(msg.toString);
155     } catch (Exception e) {
156     }
157 }
158 
159 void defaultErrorHandler(ref Actor self, ErrorMsg msg) @safe nothrow {
160     self.lastError = msg.reason;
161     self.shutdown;
162 }
163 
164 void defaultExitHandler(ref Actor self, ExitMsg msg) @safe nothrow {
165     self.lastError = msg.reason;
166     self.forceShutdown;
167 }
168 
169 void defaultExceptionHandler(ref Actor self, Exception e) @safe nothrow {
170     self.lastError = SystemError.runtimeError;
171     // TODO: should log?
172     self.forceShutdown;
173 }
174 
175 // Write the name of the actor and the exception to stdout.
176 void logExceptionHandler(ref Actor self, Exception e) @safe nothrow {
177     import std.stdio : writeln;
178 
179     self.lastError = SystemError.runtimeError;
180 
181     try {
182         writeln("EXCEPTION thrown by actor ", self.name);
183         writeln(e.msg);
184         writeln("TERMINATING");
185     } catch (Exception e) {
186     }
187 
188     self.forceShutdown;
189 }
190 
191 /// Timeout for an outstanding request.
192 struct ReplyHandlerTimeout {
193     ulong id;
194     SysTime timeout;
195 }
196 
197 package enum ActorState {
198     /// waiting to be started.
199     waiting,
200     /// active and processing messages.
201     active,
202     /// wait for all awaited responses to finish
203     shutdown,
204     /// discard also the awaite responses, just shutdown fast
205     forceShutdown,
206     /// in process of shutting down
207     finishShutdown,
208     /// stopped.
209     stopped,
210 }
211 
212 private struct AwaitReponse {
213     Closure!(ReplyHandler, void*) behavior;
214     ErrorHandler onError;
215 }
216 
217 struct Actor {
218     import std.container.rbtree : RedBlackTree, redBlackTree;
219 
220     package StrongAddress addr;
221     // visible in the package for logging purpose.
222     package ActorState state_ = ActorState.stopped;
223 
224     private {
225         // TODO: rename to behavior.
226         Closure!(MsgHandler, void*)[ulong] incoming;
227         Closure!(RequestHandler, void*)[ulong] reqBehavior;
228 
229         // callbacks for awaited responses key:ed on their id.
230         AwaitReponse[ulong] awaitedResponses;
231         ReplyHandlerTimeout[] replyTimeouts;
232 
233         // important that it start at 1 because then zero is known to not be initialized.
234         ulong nextReplyId = 1;
235 
236         /// Delayed messages ordered by their trigger time.
237         RedBlackTree!(DelayedMsg*, "a.triggerAt < b.triggerAt", true) delayed;
238 
239         /// Used during shutdown to signal monitors and links why this actor is terminating.
240         SystemError lastError;
241 
242         /// monitoring the actor lifetime.
243         WeakAddress[size_t] monitors;
244 
245         /// strong, bidirectional link of the actors lifetime.
246         WeakAddress[size_t] links;
247 
248         // Number of messages that has been processed.
249         ulong messages_;
250 
251         /// System the actor belongs to.
252         System* homeSystem_;
253 
254         string name_;
255 
256         ErrorHandler errorHandler_;
257 
258         /// callback when a link goes down.
259         DownHandler downHandler_;
260 
261         ExitHandler exitHandler_;
262 
263         ExceptionHandler exceptionHandler_;
264 
265         DefaultHandler defaultHandler_;
266     }
267 
268     invariant () {
269         if (addr && !state_.among(ActorState.waiting, ActorState.shutdown)) {
270             assert(errorHandler_);
271             assert(exitHandler_);
272             assert(exceptionHandler_);
273             assert(defaultHandler_);
274         }
275     }
276 
277     this(StrongAddress a) @trusted
278     in (!a.empty, "address is empty") {
279         state_ = ActorState.waiting;
280 
281         addr = a;
282         addr.get.setOpen;
283         delayed = new typeof(delayed);
284 
285         errorHandler_ = toDelegate(&defaultErrorHandler);
286         downHandler_ = null;
287         exitHandler_ = toDelegate(&defaultExitHandler);
288         exceptionHandler_ = toDelegate(&defaultExceptionHandler);
289         defaultHandler_ = toDelegate(&.defaultHandler);
290     }
291 
292     WeakAddress address() @safe {
293         return addr.weakRef;
294     }
295 
296     package ref StrongAddress addressRef() return @safe pure nothrow @nogc {
297         return addr;
298     }
299 
300     ref System homeSystem() @safe pure nothrow @nogc {
301         return *homeSystem_;
302     }
303 
304     /** Clean shutdown of the actor
305      *
306      * Stopping incoming messages from triggering new behavior and finish all
307      * awaited respones.
308      */
309     void shutdown() @safe nothrow {
310         if (state_.among(ActorState.waiting, ActorState.active))
311             state_ = ActorState.shutdown;
312     }
313 
314     /** Force an immediate shutdown.
315      *
316      * Stopping incoming messages from triggering new behavior and finish all
317      * awaited respones.
318      */
319     void forceShutdown() @safe nothrow {
320         if (state_.among(ActorState.waiting, ActorState.active, ActorState.shutdown))
321             state_ = ActorState.forceShutdown;
322     }
323 
324     ulong id() @safe pure nothrow const @nogc {
325         return addr.id;
326     }
327 
328     /// Returns: the name of the actor.
329     string name() @safe pure nothrow const @nogc {
330         return name_;
331     }
332 
333     // dfmt off
334 
335     /// Set name name of the actor.
336     void name(string n) @safe pure nothrow @nogc {
337         this.name_ = n;
338     }
339 
340     void errorHandler(ErrorHandler v) @safe pure nothrow @nogc {
341         errorHandler_ = v;
342     }
343 
344     void downHandler(DownHandler v) @safe pure nothrow @nogc {
345         downHandler_ = v;
346     }
347 
348     void exitHandler(ExitHandler v) @safe pure nothrow @nogc {
349         exitHandler_ = v;
350     }
351 
352     void exceptionHandler(ExceptionHandler v) @safe pure nothrow @nogc {
353         exceptionHandler_ = v;
354     }
355 
356     void defaultHandler(DefaultHandler v) @safe pure nothrow @nogc {
357         defaultHandler_ = v;
358     }
359 
360     // dfmt on
361 
362 package:
363     bool hasMessage() @safe pure nothrow @nogc {
364         return addr && addr.get.hasMessage;
365     }
366 
367     /// How long until a delayed message or a timeout fires.
368     Duration nextTimeout(const SysTime now, const Duration default_) @safe {
369         return min(delayed.empty ? default_ : (delayed.front.triggerAt - now),
370                 replyTimeouts.empty ? default_ : (replyTimeouts[0].timeout - now));
371     }
372 
373     bool waitingForReply() @safe pure nothrow const @nogc {
374         return !awaitedResponses.empty;
375     }
376 
377     /// Number of messages that has been processed.
378     ulong messages() @safe pure nothrow const @nogc {
379         return messages_;
380     }
381 
382     void setHomeSystem(System* sys) @safe pure nothrow @nogc {
383         homeSystem_ = sys;
384     }
385 
386     void cleanupBehavior() @trusted nothrow {
387         foreach (ref a; incoming.byValue) {
388             try {
389                 a.free;
390             } catch (Exception e) {
391                 // TODO: call exceptionHandler?
392             }
393         }
394         incoming = null;
395         foreach (ref a; reqBehavior.byValue) {
396             try {
397                 a.free;
398             } catch (Exception e) {
399             }
400         }
401         reqBehavior = null;
402     }
403 
404     void cleanupAwait() @trusted nothrow {
405         foreach (ref a; awaitedResponses.byValue) {
406             try {
407                 a.behavior.free;
408             } catch (Exception e) {
409             }
410         }
411         awaitedResponses = null;
412     }
413 
414     void cleanupDelayed() @trusted nothrow {
415         foreach (const _; 0 .. delayed.length) {
416             try {
417                 delayed.front.msg = Msg.init;
418                 delayed.removeFront;
419             } catch (Exception e) {
420             }
421         }
422         .destroy(delayed);
423     }
424 
425     bool isAlive() @safe pure nothrow const @nogc {
426         final switch (state_) {
427         case ActorState.waiting:
428             goto case;
429         case ActorState.active:
430             goto case;
431         case ActorState.shutdown:
432             goto case;
433         case ActorState.forceShutdown:
434             goto case;
435         case ActorState.finishShutdown:
436             return true;
437         case ActorState.stopped:
438             return false;
439         }
440     }
441 
442     /// Accepting messages.
443     bool isAccepting() @safe pure nothrow const @nogc {
444         final switch (state_) {
445         case ActorState.waiting:
446             goto case;
447         case ActorState.active:
448             goto case;
449         case ActorState.shutdown:
450             return true;
451         case ActorState.forceShutdown:
452             goto case;
453         case ActorState.finishShutdown:
454             goto case;
455         case ActorState.stopped:
456             return false;
457         }
458     }
459 
460     ulong replyId() @safe {
461         return nextReplyId++;
462     }
463 
464     void process(const SysTime now) @safe nothrow {
465         import core.memory : GC;
466 
467         assert(!GC.inFinalizer);
468 
469         messages_ = 0;
470 
471         void tick() {
472             // philosophy of the order is that a timeout should only trigger if it
473             // is really required thus it is checked last. This order then mean
474             // that a request may have triggered a timeout but because
475             // `processReply` is called before `checkReplyTimeout` it is *ignored*.
476             // Thus "better to accept even if it is timeout rather than fail".
477             //
478             // NOTE: the assumption that a message that has timed out should be
479             // processed turned out to be... wrong. It is annoying that
480             // sometimes a timeout message triggers even though it shouldn't,
481             // because it is now too old to be useful!
482             // Thus the order is changed to first check for timeout, then process.
483             try {
484                 processSystemMsg();
485                 checkReplyTimeout(now);
486                 processDelayed(now);
487                 processIncoming();
488                 processReply();
489             } catch (Exception e) {
490                 exceptionHandler_(this, e);
491             }
492         }
493 
494         assert(state_ == ActorState.stopped || addr, "no address");
495 
496         final switch (state_) {
497         case ActorState.waiting:
498             state_ = ActorState.active;
499             tick;
500             // the state can be changed before the actor have executed.
501             break;
502         case ActorState.active:
503             tick;
504             // self terminate if the actor has no behavior.
505             if (incoming.empty && awaitedResponses.empty && reqBehavior.empty)
506                 state_ = ActorState.forceShutdown;
507             break;
508         case ActorState.shutdown:
509             tick;
510             if (awaitedResponses.empty)
511                 state_ = ActorState.finishShutdown;
512             cleanupBehavior;
513             break;
514         case ActorState.forceShutdown:
515             state_ = ActorState.finishShutdown;
516             cleanupBehavior;
517             addr.get.setClosed;
518             break;
519         case ActorState.finishShutdown:
520             state_ = ActorState.stopped;
521 
522             sendToMonitors(DownMsg(addr.weakRef, lastError));
523 
524             sendToLinks(ExitMsg(addr.weakRef, lastError));
525 
526             replyTimeouts = null;
527             cleanupDelayed;
528             cleanupAwait;
529 
530             // must be last because sendToLinks and sendToMonitors uses addr.
531             addr.get.shutdown();
532             addr.release;
533             break;
534         case ActorState.stopped:
535             break;
536         }
537     }
538 
539     void sendToMonitors(DownMsg msg) @safe nothrow {
540         foreach (ref a; monitors.byValue) {
541             try {
542                 auto tmp = a.lock;
543                 auto rc = tmp.get;
544                 if (rc)
545                     rc.put(SystemMsg(msg));
546                 a.release;
547             } catch (Exception e) {
548             }
549         }
550 
551         monitors = null;
552     }
553 
554     void sendToLinks(ExitMsg msg) @safe nothrow {
555         foreach (ref a; links.byValue) {
556             try {
557                 auto tmp = a.lock;
558                 auto rc = tmp.get;
559                 if (rc)
560                     rc.put(SystemMsg(msg));
561                 a.release;
562             } catch (Exception e) {
563             }
564         }
565 
566         links = null;
567     }
568 
569     void checkReplyTimeout(const SysTime now) @safe {
570         if (replyTimeouts.empty)
571             return;
572 
573         size_t removeTo;
574         foreach (const i; 0 .. replyTimeouts.length) {
575             if (now > replyTimeouts[i].timeout) {
576                 const id = replyTimeouts[i].id;
577                 if (auto v = id in awaitedResponses) {
578                     messages_++;
579                     v.onError(this, ErrorMsg(addr.weakRef, SystemError.requestTimeout));
580                     try {
581                         () @trusted { v.behavior.free; }();
582                     } catch (Exception e) {
583                     }
584                     awaitedResponses.remove(id);
585                 }
586                 removeTo = i + 1;
587             } else {
588                 break;
589             }
590         }
591 
592         if (removeTo >= replyTimeouts.length) {
593             replyTimeouts = null;
594         } else if (removeTo != 0) {
595             replyTimeouts = replyTimeouts[removeTo .. $];
596         }
597     }
598 
599     void processIncoming() @safe {
600         if (addr.get.empty!Msg)
601             return;
602         messages_++;
603 
604         auto front = addr.get.pop!Msg;
605         scope (exit)
606             .destroy(front);
607 
608         void doSend(ref MsgOneShot msg) {
609             if (auto v = front.get.signature in incoming) {
610                 (*v)(msg.data);
611             } else {
612                 defaultHandler_(this, msg.data);
613             }
614         }
615 
616         void doRequest(ref MsgRequest msg) @trusted {
617             if (auto v = front.get.signature in reqBehavior) {
618                 (*v)(msg.data, msg.replyId, msg.replyTo);
619             } else {
620                 defaultHandler_(this, msg.data);
621             }
622         }
623 
624         front.get.type.match!((ref MsgOneShot a) { doSend(a); }, (ref MsgRequest a) {
625             doRequest(a);
626         });
627     }
628 
629     /** All system messages are handled.
630      *
631      * Assuming:
632      *  * they are not heavy to process
633      *  * very important that if there are any they should be handled as soon as possible
634      *  * ignoring the case when there is a "storm" of system messages which
635      *    "could" overload the actor system and lead to a crash. I classify this,
636      *    for now, as intentional, malicious coding by the developer themself.
637      *    External inputs that could trigger such a behavior should be controlled
638      *    and limited. Other types of input such as a developer trying to break
639      *    the actor system is out of scope.
640      */
641     void processSystemMsg() @safe {
642         //() @trusted {
643         //logger.infof("run %X", cast(void*) &this);
644         //}();
645         while (!addr.get.empty!SystemMsg) {
646             messages_++;
647             //logger.infof("%X %s %s", addr.toHash, state_, messages_);
648             auto front = addr.get.pop!SystemMsg;
649             scope (exit)
650                 .destroy(front);
651 
652             front.get.match!((ref DownMsg a) {
653                 if (downHandler_)
654                     downHandler_(this, a);
655             }, (ref MonitorRequest a) { monitors[a.addr.toHash] = a.addr; }, (ref DemonitorRequest a) {
656                 if (auto v = a.addr.toHash in monitors)
657                     v.release;
658                 monitors.remove(a.addr.toHash);
659             }, (ref LinkRequest a) { links[a.addr.toHash] = a.addr; }, (ref UnlinkRequest a) {
660                 if (auto v = a.addr.toHash in links)
661                     v.release;
662                 links.remove(a.addr.toHash);
663             }, (ref ErrorMsg a) { errorHandler_(this, a); }, (ref ExitMsg a) {
664                 exitHandler_(this, a);
665             }, (ref SystemExitMsg a) {
666                 final switch (a.reason) {
667                 case ExitReason.normal:
668                     break;
669                 case ExitReason.unhandledException:
670                     exitHandler_(this, ExitMsg.init);
671                     break;
672                 case ExitReason.unknown:
673                     exitHandler_(this, ExitMsg.init);
674                     break;
675                 case ExitReason.userShutdown:
676                     exitHandler_(this, ExitMsg.init);
677                     break;
678                 case ExitReason.kill:
679                     exitHandler_(this, ExitMsg.init);
680                     // the user do NOT have an option here
681                     forceShutdown;
682                     break;
683                 }
684             });
685         }
686     }
687 
688     void processReply() @safe {
689         if (addr.get.empty!Reply)
690             return;
691         messages_++;
692 
693         auto front = addr.get.pop!Reply;
694         scope (exit)
695             .destroy(front);
696 
697         if (auto v = front.get.id in awaitedResponses) {
698             // TODO: reduce the lookups on front.id
699             v.behavior(front.get.data);
700             try {
701                 () @trusted { v.behavior.free; }();
702             } catch (Exception e) {
703             }
704             awaitedResponses.remove(front.get.id);
705             removeReplyTimeout(front.get.id);
706         } else {
707             // TODO: should probably be SystemError.unexpectedResponse?
708             defaultHandler_(this, front.get.data);
709         }
710     }
711 
712     void processDelayed(const SysTime now) @trusted {
713         if (!addr.get.empty!DelayedMsg) {
714             // count as a message because handling them are "expensive".
715             // Ignoring the case that the message right away is moved to the
716             // incoming queue. This lead to "double accounting" but ohh well.
717             // Don't use delayedSend when you should have used send.
718             messages_++;
719             delayed.insert(addr.get.pop!DelayedMsg.unsafeMove);
720         } else if (delayed.empty) {
721             return;
722         }
723 
724         foreach (const i; 0 .. delayed.length) {
725             if (now > delayed.front.triggerAt) {
726                 addr.get.put(delayed.front.msg);
727                 delayed.removeFront;
728             } else {
729                 break;
730             }
731         }
732     }
733 
734     private void removeReplyTimeout(ulong id) @safe nothrow {
735         import std.algorithm : remove;
736 
737         foreach (const i; 0 .. replyTimeouts.length) {
738             if (replyTimeouts[i].id == id) {
739                 remove(replyTimeouts, i);
740                 break;
741             }
742         }
743     }
744 
745     void register(ulong signature, Closure!(MsgHandler, void*) handler) @trusted {
746         if (!isAccepting)
747             return;
748 
749         if (auto v = signature in incoming) {
750             try {
751                 v.free;
752             } catch (Exception e) {
753             }
754         }
755         incoming[signature] = handler;
756     }
757 
758     void register(ulong signature, Closure!(RequestHandler, void*) handler) @trusted {
759         if (!isAccepting)
760             return;
761 
762         if (auto v = signature in reqBehavior) {
763             try {
764                 v.free;
765             } catch (Exception e) {
766             }
767         }
768         reqBehavior[signature] = handler;
769     }
770 
771     void register(ulong replyId, SysTime timeout, Closure!(ReplyHandler,
772             void*) reply, ErrorHandler onError) @safe {
773         if (!isAccepting)
774             return;
775 
776         awaitedResponses[replyId] = AwaitReponse(reply, onError is null ? errorHandler_ : onError);
777         replyTimeouts ~= ReplyHandlerTimeout(replyId, timeout);
778         schwartzSort!(a => a.timeout, (a, b) => a < b)(replyTimeouts);
779     }
780 }
781 
782 struct Closure(Fn, CtxT) {
783     alias FreeFn = void function(CtxT);
784 
785     Fn fn;
786     CtxT ctx;
787     FreeFn cleanup;
788 
789     this(Fn fn) {
790         this.fn = fn;
791     }
792 
793     this(Fn fn, CtxT* ctx, FreeFn cleanup) {
794         this.fn = fn;
795         this.ctx = ctx;
796         this.cleanup = cleanup;
797     }
798 
799     void opCall(Args...)(auto ref Args args) {
800         assert(fn !is null);
801         fn(ctx, args);
802     }
803 
804     void free() {
805         // will crash, on purpuse, if there is a ctx and no cleanup registered.
806         // maybe a bad idea? dunno... lets see
807         if (ctx)
808             cleanup(ctx);
809         ctx = CtxT.init;
810     }
811 }
812 
813 @("shall register a behavior to be called when msg received matching signature")
814 unittest {
815     auto addr = makeAddress2;
816     auto actor = Actor(addr);
817 
818     bool processedIncoming;
819     void fn(void* ctx, ref Variant msg) {
820         processedIncoming = true;
821     }
822 
823     actor.register(1, Closure!(MsgHandler, void*)(&fn));
824     addr.get.put(Msg(1, MsgType(MsgOneShot(Variant(42)))));
825 
826     actor.process(Clock.currTime);
827 
828     assert(processedIncoming);
829 }
830 
831 private void cleanupCtx(CtxT)(void* ctx)
832         if (is(CtxT == Tuple!T, T) || is(CtxT == void)) {
833     import std.traits;
834     import my.actor.typed;
835 
836     static if (!is(CtxT == void)) {
837         // trust that any use of this also pass on the correct context type.
838         auto userCtx = () @trusted { return cast(CtxT*) ctx; }();
839         // release the context such as if it holds a rc object.
840         alias Types = CtxT.Types;
841 
842         static foreach (const i; 0 .. CtxT.Types.length) {
843             {
844                 alias T = CtxT.Types[i];
845                 alias UT = Unqual!T;
846                 static if (!is(T == UT)) {
847                     static assert(!is(UT : WeakAddress),
848                             "WeakAddress must NEVER be const or immutable");
849                     static assert(!is(UT : TypedAddress!M, M...),
850                             "WeakAddress must NEVER be const or immutable: " ~ T.stringof);
851                 }
852                 // TODO: add a -version actor_ctx_diagnostic that prints when it is unable to deinit?
853 
854                 static if (is(UT == T)) {
855                     .destroy((*userCtx)[i]);
856                 }
857             }
858         }
859     }
860 }
861 
862 @("shall default initialize when possible, skipping const/immutable")
863 unittest {
864     {
865         auto x = tuple(cast(const) 42, 43);
866         alias T = typeof(x);
867         cleanupCtx!T(cast(void*)&x);
868         assert(x[0] == 42); // can't assign to const
869         assert(x[1] == 0);
870     }
871 
872     {
873         import my.path : Path;
874 
875         auto x = tuple(Path.init, cast(const) Path("foo"));
876         alias T = typeof(x);
877         cleanupCtx!T(cast(void*)&x);
878         assert(x[0] == Path.init);
879         assert(x[1] == Path("foo"));
880     }
881 }
882 
883 package struct Action {
884     Closure!(MsgHandler, void*) action;
885     ulong signature;
886 }
887 
888 /// An behavior for an actor when it receive a message of `signature`.
889 package auto makeAction(T, CtxT = void)(T handler) @safe
890         if (isFunction!T || isFunctionPointer!T) {
891     static if (is(CtxT == void))
892         alias Params = Parameters!T;
893     else {
894         alias CtxParam = Parameters!T[0];
895         alias Params = Parameters!T[1 .. $];
896         checkMatchingCtx!(CtxParam, CtxT);
897         checkRefForContext!handler;
898     }
899 
900     alias HArgs = staticMap!(Unqual, Params);
901 
902     void fn(void* ctx, ref Variant msg) @trusted {
903         static if (is(CtxT == void)) {
904             handler(msg.get!(Tuple!HArgs).expand);
905         } else {
906             auto userCtx = cast(CtxParam*) cast(CtxT*) ctx;
907             handler(*userCtx, msg.get!(Tuple!HArgs).expand);
908         }
909     }
910 
911     return Action(typeof(Action.action)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs);
912 }
913 
914 package Closure!(ReplyHandler, void*) makeReply(T, CtxT)(T handler) @safe {
915     static if (is(CtxT == void))
916         alias Params = Parameters!T;
917     else {
918         alias CtxParam = Parameters!T[0];
919         alias Params = Parameters!T[1 .. $];
920         checkMatchingCtx!(CtxParam, CtxT);
921         checkRefForContext!handler;
922     }
923 
924     alias HArgs = staticMap!(Unqual, Params);
925 
926     void fn(void* ctx, ref Variant msg) @trusted {
927         static if (is(CtxT == void)) {
928             handler(msg.get!(Tuple!HArgs).expand);
929         } else {
930             auto userCtx = cast(CtxParam*) cast(CtxT*) ctx;
931             handler(*userCtx, msg.get!(Tuple!HArgs).expand);
932         }
933     }
934 
935     return typeof(return)(&fn, null, &cleanupCtx!CtxT);
936 }
937 
938 package struct Request {
939     Closure!(RequestHandler, void*) request;
940     ulong signature;
941 }
942 
943 private string locToString(Loc...)() {
944     import std.conv : to;
945 
946     return Loc[0] ~ ":" ~ Loc[1].to!string ~ ":" ~ Loc[2].to!string;
947 }
948 
949 /// Check that the context parameter is `ref` otherwise issue a warning.
950 package void checkRefForContext(alias handler)() {
951     import std.traits : ParameterStorageClass, ParameterStorageClassTuple;
952 
953     alias CtxParam = ParameterStorageClassTuple!(typeof(handler))[0];
954 
955     static if (CtxParam != ParameterStorageClass.ref_) {
956         pragma(msg, "INFO: handler type is " ~ typeof(handler).stringof);
957         static assert(CtxParam == ParameterStorageClass.ref_,
958                 "The context must be `ref` to avoid unnecessary copying");
959     }
960 }
961 
962 package void checkMatchingCtx(CtxParam, CtxT)() {
963     static if (!is(CtxT == CtxParam)) {
964         static assert(__traits(compiles, { auto x = CtxParam(CtxT.init.expand); }),
965                 "mismatch between the context type " ~ CtxT.stringof
966                 ~ " and the first parameter " ~ CtxParam.stringof);
967     }
968 }
969 
970 package auto makeRequest(T, CtxT = void)(T handler) @safe {
971     static assert(!is(ReturnType!T == void), "handler returns void, not allowed");
972 
973     alias RType = ReturnType!T;
974     enum isReqResult = is(RType : RequestResult!ReqT, ReqT);
975     enum isPromise = is(RType : Promise!PromT, PromT);
976 
977     static if (is(CtxT == void))
978         alias Params = Parameters!T;
979     else {
980         alias CtxParam = Parameters!T[0];
981         alias Params = Parameters!T[1 .. $];
982         checkMatchingCtx!(CtxParam, CtxT);
983         checkRefForContext!handler;
984     }
985 
986     alias HArgs = staticMap!(Unqual, Params);
987 
988     void fn(void* rawCtx, ref Variant msg, ulong replyId, WeakAddress replyTo) @trusted {
989         static if (is(CtxT == void)) {
990             auto r = handler(msg.get!(Tuple!HArgs).expand);
991         } else {
992             auto ctx = cast(CtxParam*) cast(CtxT*) rawCtx;
993             auto r = handler(*ctx, msg.get!(Tuple!HArgs).expand);
994         }
995 
996         static if (isReqResult) {
997             r.value.match!((ErrorMsg a) { sendSystemMsg(replyTo, a); }, (Promise!ReqT a) {
998                 assert(!a.data.empty, "the promise MUST be constructed before it is returned");
999                 a.data.get.replyId = replyId;
1000                 a.data.get.replyTo = replyTo;
1001             }, (data) {
1002                 enum wrapInTuple = !is(typeof(data) : Tuple!U, U);
1003                 if (auto rc = replyTo.lock.get) {
1004                     static if (wrapInTuple)
1005                         rc.put(Reply(replyId, Variant(tuple(data))));
1006                     else
1007                         rc.put(Reply(replyId, Variant(data)));
1008                 }
1009             });
1010         } else static if (isPromise) {
1011             r.data.get.replyId = replyId;
1012             r.data.get.replyTo = replyTo;
1013         } else {
1014             // TODO: is this syntax for U one variable or variable. I want it to be variable.
1015             enum wrapInTuple = !is(RType : Tuple!U, U);
1016             if (auto rc = replyTo.lock.get) {
1017                 static if (wrapInTuple)
1018                     rc.put(Reply(replyId, Variant(tuple(r))));
1019                 else
1020                     rc.put(Reply(replyId, Variant(r)));
1021             }
1022         }
1023     }
1024 
1025     return Request(typeof(Request.request)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs);
1026 }
1027 
1028 @("shall link two actors lifetime")
1029 unittest {
1030     int count;
1031     void countExits(ref Actor self, ExitMsg msg) @safe nothrow {
1032         count++;
1033         self.shutdown;
1034     }
1035 
1036     auto aa1 = Actor(makeAddress2);
1037     auto a1 = build(&aa1).set((int x) {}).exitHandler_(&countExits).finalize;
1038     auto aa2 = Actor(makeAddress2);
1039     auto a2 = build(&aa2).set((int x) {}).exitHandler_(&countExits).finalize;
1040 
1041     a1.linkTo(a2.address);
1042     a1.process(Clock.currTime);
1043     a2.process(Clock.currTime);
1044 
1045     assert(a1.isAlive);
1046     assert(a2.isAlive);
1047 
1048     sendExit(a1.address, ExitReason.userShutdown);
1049     foreach (_; 0 .. 5) {
1050         a1.process(Clock.currTime);
1051         a2.process(Clock.currTime);
1052     }
1053 
1054     assert(!a1.isAlive);
1055     assert(!a2.isAlive);
1056     assert(count == 2);
1057 }
1058 
1059 @("shall let one actor monitor the lifetime of the other one")
1060 unittest {
1061     int count;
1062     void downMsg(ref Actor self, DownMsg msg) @safe nothrow {
1063         count++;
1064     }
1065 
1066     auto aa1 = Actor(makeAddress2);
1067     auto a1 = build(&aa1).set((int x) {}).downHandler_(&downMsg).finalize;
1068     auto aa2 = Actor(makeAddress2);
1069     auto a2 = build(&aa2).set((int x) {}).finalize;
1070 
1071     a1.monitor(a2.address);
1072     a1.process(Clock.currTime);
1073     a2.process(Clock.currTime);
1074 
1075     assert(a1.isAlive);
1076     assert(a2.isAlive);
1077 
1078     sendExit(a2.address, ExitReason.userShutdown);
1079     foreach (_; 0 .. 5) {
1080         a1.process(Clock.currTime);
1081         a2.process(Clock.currTime);
1082     }
1083 
1084     assert(a1.isAlive);
1085     assert(!a2.isAlive);
1086     assert(count == 1);
1087 }
1088 
1089 private struct BuildActor {
1090     Actor* actor;
1091 
1092     Actor* finalize() @safe {
1093         auto rval = actor;
1094         actor = null;
1095         return rval;
1096     }
1097 
1098     auto errorHandler(ErrorHandler a) {
1099         actor.errorHandler = a;
1100         return this;
1101     }
1102 
1103     auto downHandler_(DownHandler a) {
1104         actor.downHandler_ = a;
1105         return this;
1106     }
1107 
1108     auto exitHandler_(ExitHandler a) {
1109         actor.exitHandler_ = a;
1110         return this;
1111     }
1112 
1113     auto exceptionHandler_(ExceptionHandler a) {
1114         actor.exceptionHandler_ = a;
1115         return this;
1116     }
1117 
1118     auto defaultHandler_(DefaultHandler a) {
1119         actor.defaultHandler_ = a;
1120         return this;
1121     }
1122 
1123     auto set(BehaviorT)(BehaviorT behavior)
1124             if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1125                 && !is(ReturnType!BehaviorT == void)) {
1126         auto act = makeRequest(behavior);
1127         actor.register(act.signature, act.request);
1128         return this;
1129     }
1130 
1131     auto set(BehaviorT, CT)(BehaviorT behavior, CT c)
1132             if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1133                 && !is(ReturnType!BehaviorT == void)) {
1134         auto act = makeRequest!(BehaviorT, CT)(behavior);
1135         // for now just use the GC to allocate the context on.
1136         // TODO: use an allocator.
1137         act.request.ctx = cast(void*) new CT(c);
1138         actor.register(act.signature, act.request);
1139         return this;
1140     }
1141 
1142     auto set(BehaviorT)(BehaviorT behavior)
1143             if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1144                 && is(ReturnType!BehaviorT == void)) {
1145         auto act = makeAction(behavior);
1146         actor.register(act.signature, act.action);
1147         return this;
1148     }
1149 
1150     auto set(BehaviorT, CT)(BehaviorT behavior, CT c)
1151             if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1152                 && is(ReturnType!BehaviorT == void)) {
1153         auto act = makeAction!(BehaviorT, CT)(behavior);
1154         // for now just use the GC to allocate the context on.
1155         // TODO: use an allocator.
1156         act.action.ctx = cast(void*) new CT(c);
1157         actor.register(act.signature, act.action);
1158         return this;
1159     }
1160 }
1161 
1162 package BuildActor build(Actor* a) @safe {
1163     return BuildActor(a);
1164 }
1165 
1166 /// Implement an actor.
1167 Actor* impl(Behavior...)(Actor* self, Behavior behaviors) {
1168     import my.actor.msg : isCapture, Capture;
1169 
1170     auto bactor = build(self);
1171     static foreach (const i; 0 .. Behavior.length) {
1172         {
1173             alias b = Behavior[i];
1174 
1175             static if (!isCapture!b) {
1176                 static if (!(isFunction!(b) || isFunctionPointer!(b)))
1177                     static assert(0, "behavior may only be functions, not delgates: " ~ b.stringof);
1178 
1179                 static if (i + 1 < Behavior.length && isCapture!(Behavior[i + 1])) {
1180                     bactor.set(behaviors[i], behaviors[i + 1]);
1181                 } else
1182                     bactor.set(behaviors[i]);
1183             }
1184         }
1185     }
1186 
1187     return bactor.finalize;
1188 }
1189 
1190 @("build dynamic actor from functions")
1191 unittest {
1192     static void fn3(int s) @safe {
1193     }
1194 
1195     static string fn4(int s) @safe {
1196         return "foo";
1197     }
1198 
1199     static Tuple!(int, string) fn5(const string s) @safe {
1200         return typeof(return)(42, "hej");
1201     }
1202 
1203     auto aa1 = Actor(makeAddress2);
1204     auto a1 = build(&aa1).set(&fn3).set(&fn4).set(&fn5).finalize;
1205 }
1206 
1207 unittest {
1208     bool delayOk;
1209     static void fn1(ref Tuple!(bool*, "delayOk") c, const string s) @safe {
1210         *c.delayOk = true;
1211     }
1212 
1213     bool delayShouldNeverHappen;
1214     static void fn2(ref Tuple!(bool*, "delayShouldNeverHappen") c, int s) @safe {
1215         *c.delayShouldNeverHappen = true;
1216     }
1217 
1218     auto aa1 = Actor(makeAddress2);
1219     auto actor = build(&aa1).set(&fn1, capture(&delayOk)).set(&fn2,
1220             capture(&delayShouldNeverHappen)).finalize;
1221     delayedSend(actor.address, Clock.currTime - 1.dur!"seconds", "foo");
1222     delayedSend(actor.address, Clock.currTime + 1.dur!"hours", 42);
1223 
1224     assert(!actor.addressRef.get.empty!DelayedMsg);
1225     assert(actor.addressRef.get.empty!Msg);
1226     assert(actor.addressRef.get.empty!Reply);
1227 
1228     actor.process(Clock.currTime);
1229 
1230     assert(!actor.addressRef.get.empty!DelayedMsg);
1231     assert(actor.addressRef.get.empty!Msg);
1232     assert(actor.addressRef.get.empty!Reply);
1233 
1234     actor.process(Clock.currTime);
1235     actor.process(Clock.currTime);
1236 
1237     assert(actor.addressRef.get.empty!DelayedMsg);
1238     assert(actor.addressRef.get.empty!Msg);
1239     assert(actor.addressRef.get.empty!Reply);
1240 
1241     assert(delayOk);
1242     assert(!delayShouldNeverHappen);
1243 }
1244 
1245 @("shall process a request->then chain xyz")
1246 @system unittest {
1247     // checking capture is correctly setup/teardown by using captured rc.
1248 
1249     auto rcReq = refCounted(42);
1250     bool calledOk;
1251     static string fn(ref Tuple!(bool*, "calledOk", RefCounted!int) ctx, const string s,
1252             const string b) {
1253         assert(2 == ctx[1].refCount);
1254         if (s == "apa")
1255             *ctx.calledOk = true;
1256         return "foo";
1257     }
1258 
1259     auto rcReply = refCounted(42);
1260     bool calledReply;
1261     static void reply(ref Tuple!(bool*, RefCounted!int) ctx, const string s) {
1262         *ctx[0] = s == "foo";
1263         assert(2 == ctx[1].refCount);
1264     }
1265 
1266     auto aa1 = Actor(makeAddress2);
1267     auto actor = build(&aa1).set(&fn, capture(&calledOk, rcReq)).finalize;
1268 
1269     assert(2 == rcReq.refCount);
1270     assert(1 == rcReply.refCount);
1271 
1272     actor.request(actor.address, infTimeout).send("apa", "foo")
1273         .capture(&calledReply, rcReply).then(&reply);
1274     assert(2 == rcReply.refCount);
1275 
1276     assert(!actor.addr.get.empty!Msg);
1277     assert(actor.addr.get.empty!Reply);
1278 
1279     actor.process(Clock.currTime);
1280     assert(actor.addr.get.empty!Msg);
1281     assert(actor.addr.get.empty!Reply);
1282 
1283     assert(2 == rcReq.refCount);
1284     assert(1 == rcReply.refCount, "after the message is consumed the refcount should go back");
1285 
1286     assert(calledOk);
1287     assert(calledReply);
1288 
1289     actor.shutdown;
1290     while (actor.isAlive)
1291         actor.process(Clock.currTime);
1292 }
1293 
1294 @("shall process a request->then chain using promises")
1295 unittest {
1296     static struct A {
1297         string v;
1298     }
1299 
1300     static struct B {
1301         string v;
1302     }
1303 
1304     int calledOk;
1305     auto fn1p = makePromise!string;
1306     static RequestResult!string fn1(ref Capture!(int*, "calledOk", Promise!string, "p") c, A a) @trusted {
1307         if (a.v == "apa")
1308             (*c.calledOk)++;
1309         return typeof(return)(c.p);
1310     }
1311 
1312     auto fn2p = makePromise!string;
1313     static Promise!string fn2(ref Capture!(int*, "calledOk", Promise!string, "p") c, B a) {
1314         (*c.calledOk)++;
1315         return c.p;
1316     }
1317 
1318     int calledReply;
1319     static void reply(ref Tuple!(int*) ctx, const string s) {
1320         if (s == "foo")
1321             *ctx[0] += 1;
1322     }
1323 
1324     auto aa1 = Actor(makeAddress2);
1325     auto actor = build(&aa1).set(&fn1, capture(&calledOk, fn1p)).set(&fn2,
1326             capture(&calledOk, fn2p)).finalize;
1327 
1328     actor.request(actor.address, infTimeout).send(A("apa")).capture(&calledReply).then(&reply);
1329     actor.request(actor.address, infTimeout).send(B("apa")).capture(&calledReply).then(&reply);
1330 
1331     actor.process(Clock.currTime);
1332     assert(calledOk == 1); // first request
1333     assert(calledReply == 0);
1334 
1335     fn1p.deliver("foo");
1336 
1337     assert(calledReply == 0);
1338 
1339     actor.process(Clock.currTime);
1340     assert(calledOk == 2); // second request triggered
1341     assert(calledReply == 1);
1342 
1343     fn2p.deliver("foo");
1344     actor.process(Clock.currTime);
1345 
1346     assert(calledReply == 2);
1347 
1348     actor.shutdown;
1349     while (actor.isAlive) {
1350         actor.process(Clock.currTime);
1351     }
1352 }
1353 
1354 /// The timeout triggered.
1355 class ScopedActorException : Exception {
1356     this(ScopedActorError err, string file = __FILE__, int line = __LINE__) @safe pure nothrow {
1357         super(null, file, line);
1358         error = err;
1359     }
1360 
1361     ScopedActorError error;
1362 }
1363 
1364 enum ScopedActorError : ubyte {
1365     none,
1366     // actor address is down
1367     down,
1368     // request timeout
1369     timeout,
1370     // the address where unable to process the received message
1371     unknownMsg,
1372     // some type of fatal error occured.
1373     fatal,
1374 }
1375 
1376 /** Intended to be used in a local scope by a user.
1377  *
1378  * `ScopedActor` is not thread safe.
1379  */
1380 struct ScopedActor {
1381     import my.actor.typed : underlyingAddress, underlyingWeakAddress;
1382 
1383     private {
1384         static struct Data {
1385             Actor self;
1386             ScopedActorError errSt;
1387 
1388             ~this() @safe {
1389                 if (self.addr.empty)
1390                     return;
1391 
1392                 () @trusted {
1393                     self.downHandler = null;
1394                     self.defaultHandler = toDelegate(&.defaultHandler);
1395                     self.errorHandler = toDelegate(&defaultErrorHandler);
1396                 }();
1397 
1398                 self.shutdown;
1399                 while (self.isAlive) {
1400                     self.process(Clock.currTime);
1401                 }
1402             }
1403         }
1404 
1405         RefCounted!Data data;
1406     }
1407 
1408     this(StrongAddress addr, string name) @safe {
1409         data = refCounted(Data(Actor(addr)));
1410         data.get.self.name = name;
1411     }
1412 
1413     private void reset() @safe nothrow {
1414         data.get.errSt = ScopedActorError.none;
1415     }
1416 
1417     SRequestSend request(TAddress)(TAddress requestTo, SysTime timeout)
1418             if (isAddress!TAddress) {
1419         reset;
1420         auto rs = .request(&data.get.self, underlyingWeakAddress(requestTo), timeout);
1421         return SRequestSend(rs, this);
1422     }
1423 
1424     private static struct SRequestSend {
1425         RequestSend rs;
1426         ScopedActor self;
1427 
1428         /// Copy constructor
1429         this(ref return typeof(this) rhs) @safe pure nothrow @nogc {
1430             rs = rhs.rs;
1431             self = rhs.self;
1432         }
1433 
1434         @disable this(this);
1435 
1436         SRequestSendThen send(Args...)(auto ref Args args) {
1437             return SRequestSendThen(.send(rs, args), self);
1438         }
1439     }
1440 
1441     private static struct SRequestSendThen {
1442         RequestSendThen rs;
1443         ScopedActor self;
1444         uint backoff;
1445 
1446         /// Copy constructor
1447         this(ref return typeof(this) rhs) {
1448             rs = rhs.rs;
1449             self = rhs.self;
1450             backoff = rhs.backoff;
1451         }
1452 
1453         @disable this(this);
1454 
1455         void dynIntervalSleep() @trusted {
1456             // +100 usecs "feels good", magic number. current OS and
1457             // implementation of message passing isn't that much faster than
1458             // 100us. A bit slow behavior, ehum, for a scoped actor is OK. They
1459             // aren't expected to be used for "time critical" sections.
1460             Thread.sleep(backoff.dur!"usecs");
1461             backoff = min(backoff + 100, 20000);
1462         }
1463 
1464         private static struct ValueCapture {
1465             RefCounted!Data data;
1466 
1467             void downHandler(ref Actor, DownMsg) @safe nothrow {
1468                 data.get.errSt = ScopedActorError.down;
1469             }
1470 
1471             void errorHandler(ref Actor, ErrorMsg msg) @safe nothrow {
1472                 if (msg.reason == SystemError.requestTimeout)
1473                     data.get.errSt = ScopedActorError.timeout;
1474                 else
1475                     data.get.errSt = ScopedActorError.fatal;
1476             }
1477 
1478             void unknownMsgHandler(ref Actor a, ref Variant msg) @safe nothrow {
1479                 logAndDropHandler(a, msg);
1480                 data.get.errSt = ScopedActorError.unknownMsg;
1481             }
1482         }
1483 
1484         void then(T)(T handler, ErrorHandler onError = null) {
1485             scope (exit)
1486                 demonitor(rs.rs.self, rs.rs.requestTo);
1487             monitor(rs.rs.self, rs.rs.requestTo);
1488 
1489             auto callback = new ValueCapture(self.data);
1490             self.data.get.self.downHandler = &callback.downHandler;
1491             self.data.get.self.defaultHandler = &callback.unknownMsgHandler;
1492             self.data.get.self.errorHandler = &callback.errorHandler;
1493 
1494             () @trusted { .thenUnsafe!(T, void)(rs, handler, null, onError); }();
1495 
1496             scope (exit)
1497                 () @trusted {
1498                 self.data.get.self.downHandler = null;
1499                 self.data.get.self.defaultHandler = toDelegate(&.defaultHandler);
1500                 self.data.get.self.errorHandler = toDelegate(&defaultErrorHandler);
1501             }();
1502 
1503             auto requestTo = rs.rs.requestTo.lock;
1504             if (!requestTo)
1505                 throw new ScopedActorException(ScopedActorError.down);
1506 
1507             // TODO: this loop is stupid... should use a conditional variable
1508             // instead but that requires changing the mailbox. later
1509             do {
1510                 rs.rs.self.process(Clock.currTime);
1511                 // force the actor to be alive even though there are no behaviors.
1512                 rs.rs.self.state_ = ActorState.waiting;
1513 
1514                 if (self.data.get.errSt == ScopedActorError.none) {
1515                     dynIntervalSleep;
1516                 } else {
1517                     throw new ScopedActorException(self.data.get.errSt);
1518                 }
1519 
1520             }
1521             while (self.data.get.self.waitingForReply);
1522         }
1523     }
1524 }
1525 
1526 ScopedActor scopedActor(string file = __FILE__, uint line = __LINE__)() @safe {
1527     import std.format : format;
1528 
1529     return ScopedActor(makeAddress2, format!"ScopedActor.%s:%s"(file, line));
1530 }
1531 
1532 @(
1533         "scoped actor shall throw an exception if the actor that is sent a request terminates or is closed")
1534 unittest {
1535     import my.actor.system;
1536 
1537     auto sys = makeSystem;
1538 
1539     auto a0 = sys.spawn((Actor* self) {
1540         return impl(self, (ref CSelf!() ctx, int x) {
1541             Thread.sleep(50.dur!"msecs");
1542             return 42;
1543         }, capture(self), (ref CSelf!() ctx, double x) {}, capture(self),
1544             (ref CSelf!() ctx, string x) { ctx.self.shutdown; return 42; }, capture(self));
1545     });
1546 
1547     {
1548         auto self = scopedActor;
1549         bool excThrown;
1550         auto stopAt = Clock.currTime + 3.dur!"seconds";
1551         while (!excThrown && Clock.currTime < stopAt) {
1552             try {
1553                 self.request(a0, delay(1.dur!"nsecs")).send(42).then((int x) {});
1554             } catch (ScopedActorException e) {
1555                 excThrown = e.error == ScopedActorError.timeout;
1556             } catch (Exception e) {
1557                 logger.info(e.msg);
1558             }
1559         }
1560         assert(excThrown, "timeout did not trigger as expected");
1561     }
1562 
1563     {
1564         auto self = scopedActor;
1565         bool excThrown;
1566         auto stopAt = Clock.currTime + 3.dur!"seconds";
1567         while (!excThrown && Clock.currTime < stopAt) {
1568             try {
1569                 self.request(a0, delay(1.dur!"seconds")).send("hello").then((int x) {
1570                 });
1571             } catch (ScopedActorException e) {
1572                 excThrown = e.error == ScopedActorError.down;
1573             } catch (Exception e) {
1574                 logger.info(e.msg);
1575             }
1576         }
1577         assert(excThrown, "detecting terminated actor did not trigger as expected");
1578     }
1579 }