1 /**
2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 
6 A simple, featureless actor framework. It allows you to write an async
7 application using the actor pattern. It is suitable for applications that need
8 async:ness but not the highest achievable performance.
9 
10 It is modelled after [C++ Actor Framework]() which I have used and am very
11 happy with how my applications turned out. Credit to Dominik Charousset, author
12 of CAF.
13 
14 Most of the code is copied from Phobos std.concurrency.
15 
16 A thread executes one actor at a time. The actor ID of the current actor is
17 accessible via `thisAid()`.
18 */
19 module my.actor;
20 
21 import core.atomic : cas, atomicStore, MemoryOrder, atomicLoad, atomicOp;
22 import core.sync.condition : Condition;
23 import core.sync.mutex : Mutex;
24 import core.thread : Thread;
25 import core.time : dur, Duration;
26 import logger = std.experimental.logger;
27 import std.array : appender, empty, array;
28 import std.datetime : SysTime, Clock;
29 import std.exception : collectException;
30 import std.format : formattedWrite;
31 import std.range : isOutputRange;
32 import std.stdio;
33 import std.traits;
34 import std.typecons : Tuple;
35 import std.variant : Variant;
36 
37 import my.actor.mbox : MessageBox, Message, DelayedMessageBox, DelayedMessage;
38 import my.actor.typed;
39 import my.gc.refc;
40 
41 class ActorException : Exception {
42     this(string msg) @safe pure nothrow @nogc {
43         super(msg);
44     }
45 }
46 
47 alias RcActorInfo = RefCounted!ActorInfo;
48 
49 struct ActorSystem {
50     /// Statistics about the actor system that is only updated in debug build.
51     static struct Stat {
52         // Method calls.
53         long putCnt;
54         long removeCnt;
55         long spawnCnt;
56     }
57 
58     private {
59         Mutex lock_;
60         ulong nextId_;
61         shared Stat stat_;
62 
63         RcActorInfo[ulong] actors_;
64         RcActorInfo[] toBeRemoved_;
65     }
66 
67     this(this) @disable;
68 
69     /// Spawn a scoped actor that can be used by the local thread/context to send/receive messages.
70     Aid scopedActor() @trusted {
71         synchronized (lock_) {
72             return Aid(++nextId_);
73         }
74     }
75 
76     /** Spawn a new actor with the behavior `Behavior`.
77      *
78      * Params:
79      *  TActor = the messages that the spawned actor must implement behaviors for.
80      *  Behavior = behavior for each message.
81      *
82      * Returns:
83      *  An `Aid` representing the new actor.
84      */
85     Aid spawn(TActor, Behavior...)() if (isTypedActor!TActor) {
86         debug atomicOp!"+="(stat_.spawnCnt, 1);
87 
88         auto aid = () {
89             synchronized (lock_) {
90                 return Aid(++nextId_);
91             }
92         }();
93 
94         _spawnDetached(this, aid, false, makeTypedActor!(TActor, Behavior)(aid));
95         return aid;
96     }
97 
98     /** Spawn a new stateful actor with the behavior `Behavior` and state `State`.
99      *
100      * Params:
101      *  TActor = the messages that the spawned actor must implement behaviors for.
102      *  Behavior = behavior for each message.
103      *
104      * Returns:
105      *  An `Aid` representing the new actor.
106      */
107     Aid spawn(TActor, StateT, Behavior...)(StateT state = StateT.init)
108             if (isTypedActor!TActor) {
109         debug atomicOp!"+="(stat_.spawnCnt, 1);
110 
111         auto aid = () {
112             synchronized (lock_) {
113                 return Aid(++nextId_);
114             }
115         }();
116 
117         _spawnDetached(this, aid, false, makeStatefulTypedActor!(TActor,
118                 StateT, Behavior)(aid, state));
119         return aid;
120     }
121 
122     size_t length() @safe pure const @nogc {
123         synchronized (lock_) {
124             return actors_.length;
125         }
126     }
127 
128     /** Statistics about the actor system.
129      *
130      * Only updated in debug build.
131      */
132     Stat systemStat() @safe pure nothrow const @nogc {
133         return stat_;
134     }
135 
136     private RcActorInfo actor(Aid aid) @safe {
137         synchronized (lock_) {
138             if (auto v = aid.id in actors_) {
139                 return *v;
140             }
141         }
142         throw new ActorException(null);
143     }
144 
145     // TODO: change to safe
146     private void put(RcActorInfo actor) @safe {
147         debug atomicOp!"+="(stat_.putCnt, 1);
148 
149         const id = actor.ident.id;
150         synchronized (lock_) {
151             actors_[id] = actor;
152         }
153     }
154 
155     private void remove(RcActorInfo actor) @safe {
156         debug atomicOp!"+="(stat_.removeCnt, 1);
157 
158         const id = actor.ident.id;
159         synchronized (lock_) {
160             if (auto v = id in actors_) {
161                 toBeRemoved_ ~= *v;
162                 (*v).release;
163                 actors_.remove(id);
164             }
165         }
166     }
167 
168     /** Periodic cleanup of stopped actors.
169      *
170      */
171     private void cleanup() @trusted {
172         synchronized (lock_) {
173             foreach (ref a; toBeRemoved_) {
174                 a.cleanup;
175                 a.release;
176             }
177             toBeRemoved_ = null;
178         }
179     }
180 }
181 
182 RefCounted!ActorSystem makeActorSystem() {
183     return ActorSystem(new Mutex).refCounted;
184 }
185 
186 @("shall spawn a typed actor, process messages and cleanup")
187 unittest {
188     alias MyActor = TypedActor!(Tuple!(immutable(int)*));
189 
190     int cnt;
191     static void incr(immutable(int)* cnt) {
192         (*(cast(int*) cnt)) = 1;
193         thisAid.stop;
194     }
195 
196     auto sys = makeActorSystem;
197     {
198         auto a = sys.spawn!(MyActor, incr);
199         auto self = sys.scopedActor;
200         send(self, a, cast(immutable(int*))&cnt);
201         delayedSend(self, a, 50.dur!"msecs", cast(immutable(int*))&cnt);
202         delayedSend(self, a, Clock.currTime + 50.dur!"msecs", cast(immutable(int*))&cnt);
203 
204         Thread.sleep(200.dur!"msecs");
205 
206         assert(sys.systemStat.spawnCnt == 1);
207         assert(sys.systemStat.putCnt == 1);
208         assert(sys.systemStat.removeCnt == 1);
209         assert(cnt == 1);
210     }
211 
212     assert(sys.length == 0);
213     assert(sys.toBeRemoved_.length == 1);
214     auto x = sys.toBeRemoved_[0];
215     assert(x.refCount > 0);
216     sys.cleanup;
217     assert(sys.toBeRemoved_.length == 0);
218     assert(x.refCount == 1);
219 }
220 
221 @("shall spawn a stateful typed actor and cleanup")
222 unittest {
223     alias MyActor = TypedActor!(Tuple!(int));
224 
225     struct State {
226         int cnt;
227     }
228 
229     static void incr(ref RefCounted!State state, int value) {
230         state.cnt += value;
231         thisAid.stop;
232     }
233 
234     auto sys = makeActorSystem;
235     {
236         auto st = State(8).refCounted;
237         auto a = sys.spawn!(MyActor, typeof(st), incr)(st);
238         auto self = sys.scopedActor;
239         send(self, a, 2);
240         delayedSend(self, a, 50.dur!"msecs", 2);
241         delayedSend(self, a, Clock.currTime + 50.dur!"msecs", 2);
242 
243         Thread.sleep(200.dur!"msecs");
244 
245         assert(sys.systemStat.spawnCnt == 1);
246         assert(sys.systemStat.putCnt == 1);
247         assert(sys.systemStat.removeCnt == 1);
248         assert(st.cnt == 10);
249     }
250 
251     assert(sys.length == 0);
252     assert(sys.toBeRemoved_.length == 1);
253     auto x = sys.toBeRemoved_[0];
254     assert(x.refCount > 0);
255     sys.cleanup;
256     assert(sys.toBeRemoved_.length == 0);
257     assert(x.refCount == 1);
258 }
259 
260 @("shall process messages in a stateful typed actor as fast as possible")
261 unittest {
262     import std.datetime : Clock, SysTime;
263 
264     alias MyActor = TypedActor!(Tuple!(int));
265 
266     enum maxCnt = 1000;
267 
268     struct State {
269         int cnt;
270         SysTime stopAt;
271     }
272 
273     static void incr(ref RefCounted!State state, int value) {
274         state.cnt += value;
275         if (state.cnt == maxCnt) {
276             state.stopAt = Clock.currTime;
277             thisAid.stop;
278         }
279     }
280 
281     auto sys = makeActorSystem;
282     {
283         const startAt = Clock.currTime;
284         auto st = State(0).refCounted;
285         auto a = sys.spawn!(MyActor, typeof(st), incr)(st);
286 
287         auto self = sys.scopedActor;
288         foreach (_; 0 .. maxCnt) {
289             send(self, a, 1);
290         }
291 
292         while (a.isAlive) {
293             Thread.sleep(1.dur!"msecs");
294         }
295 
296         assert(st.cnt == maxCnt);
297         const diff = (st.stopAt - startAt);
298         debug writefln!"%s:%s %s msg in %s (%s msg/s)"(__FUNCTION__, __LINE__, st.cnt,
299                 diff, cast(double) st.cnt / (cast(double) diff.total!"nsecs" / 1000000000.0));
300     }
301 
302     sys.cleanup;
303 }
304 
305 /// Actor ID
306 struct Aid {
307     private static struct Value {
308         shared ulong id;
309         Mutex lock;
310         MessageBox normal;
311         MessageBox priority;
312         DelayedMessageBox delayed;
313         MessageBox system;
314         shared ActorState state;
315 
316         this(this) @disable;
317 
318         ~this() @trusted {
319             import std.algorithm : filter;
320             import std.range : only;
321 
322             if (lock is null)
323                 return;
324 
325             synchronized (lock) {
326                 if (normal !is null && !normal.isClosed)
327                     normal.close;
328                 if (priority !is null && !priority.isClosed)
329                     priority.close;
330                 if (delayed !is null && !delayed.isClosed)
331                     delayed.close;
332                 if (system !is null && !system.isClosed)
333                     system.close;
334             }
335         }
336     }
337 
338     private RefCounted!Value value_;
339 
340     this(ulong id) {
341         this.value_ = Value(id, new Mutex, new MessageBox, new MessageBox,
342                 new DelayedMessageBox, new MessageBox);
343     }
344 
345     /// Copy constructor
346     this(ref return scope typeof(this) rhs) @safe pure nothrow @nogc {
347         this.value_ = rhs.value_;
348     }
349 
350     string toString() @safe pure const {
351         auto buf = appender!string;
352         toString(buf);
353         return buf.data;
354     }
355 
356     void toString(Writer)(ref Writer w) const if (isOutputRange!(Writer, char)) {
357         if (value_.refCount == 0) {
358             formattedWrite(w, "%s(<uninitialized>)", typeof(this).stringof);
359         } else {
360             formattedWrite(w, "%s(refCount:%s, id:%s, state:%s)", typeof(this)
361                     .stringof, value_.refCount, id, state);
362         }
363     }
364 
365     bool isAlive() @safe pure const @nogc {
366         import std.algorithm : among;
367 
368         if (value_.refCount == 0)
369             return false;
370 
371         synchronized (value_.lock) {
372             return value_.state.among(ActorState.waiting, ActorState.running) != 0;
373         }
374     }
375 
376     /// Mark an actor as running if it where previously waiting.
377     void running() @safe pure @nogc {
378         synchronized (value_.lock) {
379             if (value_.state == ActorState.waiting) {
380                 value_.state = ActorState.running;
381             }
382         }
383     }
384 
385     /** Stop an actor.
386      *
387      * The actor will be marked as stopping which will mean that it will at
388      * most process one more message and then be terminated.
389      */
390     void stop() @safe pure @nogc {
391         synchronized (value_.lock) {
392             final switch (value_.state) with (ActorState) {
393             case waiting:
394                 goto case;
395             case running:
396                 value_.state = ActorState.stopping;
397                 break;
398             case stopping:
399                 goto case;
400             case terminated:
401                 break;
402             }
403         }
404     }
405 
406     /** Move the actor to the terminated state only if it where in the stopped state previously.
407      */
408     package(my.actor) void terminated() @safe pure @nogc {
409         synchronized (value_.lock) {
410             if (value_.state == ActorState.stopping) {
411                 value_.state = ActorState.terminated;
412             }
413         }
414     }
415 
416     /// Release the references counted values of the actor id.
417     package(my.actor) void release() @trusted {
418         if (value_.refCount == 0)
419             return;
420 
421         auto lock = value_.lock;
422         synchronized (lock) {
423             value_.release;
424         }
425     }
426 
427     private ulong id() @safe pure nothrow @nogc const {
428         return atomicLoad(value_.get.id);
429     }
430 
431     package(my.actor) MessageBox normalMbox() @safe pure @nogc {
432         return value_.normal;
433     }
434 
435     package(my.actor) MessageBox priorityMbox() @safe pure @nogc {
436         return value_.priority;
437     }
438 
439     package(my.actor) DelayedMessageBox delayedMbox() @safe pure @nogc {
440         return value_.delayed;
441     }
442 
443     package(my.actor) MessageBox systemMbox() @safe pure @nogc {
444         return value_.system;
445     }
446 
447     private ActorState state() @safe pure nothrow @nogc const {
448         return atomicLoad(value_.state);
449     }
450 
451     private void setState(ActorState st) @safe pure nothrow @nogc {
452         atomicStore(value_.state, st);
453     }
454 }
455 
456 enum ActorState : ubyte {
457     /// the actor has spawned and is waiting to start executing.
458     waiting,
459     /// the actor is running/active
460     running,
461     /// the actor is signaled to stop. It will process at most one more message.
462     stopping,
463     /// the actor has terminated and thus all its resources can be freed.
464     terminated,
465 }
466 
467 /// Configure how an actor, when spawned, will be executed.
468 enum SpawnMode : ubyte {
469     /// the spawned actor is executed in the worker pool
470     pool,
471     /// executing in its own thread
472     detached
473 }
474 
475 /** Send a message `msg` to actor `aid`.
476  */
477 void send(T...)(Aid from, Aid to, T params) {
478     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
479     auto msg = Message(params);
480     if (!to.normalMbox.put(msg)) {
481         // TODO: add error handling when it fail.
482     }
483 }
484 
485 /** Send a delayed message `msg` to actor `aid`.
486  *
487  * Params:
488  *  delay = how much to delay the message with.
489  */
490 void delayedSend(T...)(Aid from, Aid to, Duration delay, T params) {
491     import std.datetime : Clock;
492 
493     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
494     auto msg = DelayedMessage(Clock.currTime + delay, Message(params));
495     if (!to.delayedMbox.put(msg)) {
496         // TODO: add error handling when it fail.
497     }
498 }
499 
500 /** Delay the message being processed until `delay`.
501  *
502  * Params:
503  *  delay = how much to delay the message with.
504  */
505 void delayedSend(T...)(Aid from, Aid to, SysTime delay, T params) {
506     import std.datetime : Clock;
507 
508     static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
509     auto msg = DelayedMessage(delay, Message(params));
510     if (!to.delayedMbox.put(msg)) {
511         // TODO: add error handling when it fail.
512     }
513 }
514 
515 /// Currently active actor..
516 Aid thisAid() @safe {
517     auto info = thisInfo;
518     if (info.refCount != 0) {
519         return info.ident;
520     }
521     return Aid.init;
522 }
523 
524 ref ActorSystem thisActorSystem() @safe {
525     auto info = thisInfo;
526     if (info.refCount != 0) {
527         return *info.system;
528     }
529     throw new ActorException(null);
530 }
531 
532 /** Encapsulates all implementation-level data needed for scheduling.
533  *
534  * When defining a Scheduler, an instance of this struct must be associated
535  * with each logical thread.  It contains all implementation-level information
536  * needed by the internal API.
537  */
538 struct ActorInfo {
539     /// The system the actor belongs to.
540     ActorSystem* system;
541     Aid ident;
542     Aid owner;
543     bool[Aid] links;
544 
545     /** Cleans up this ThreadInfo.
546      *
547      * This must be called when a scheduled thread terminates.  It tears down
548      * the messaging system for the thread and notifies interested parties of
549      * the thread's termination.
550      */
551     void cleanup() {
552         Aid discard;
553         foreach (aid; links.byKey) {
554             if (aid.isAlive) {
555                 send(discard, aid, SystemMsgType.linkDead, ident);
556             }
557         }
558         if (owner.isAlive) {
559             send(discard, owner, SystemMsgType.linkDead, ident);
560         }
561 
562         ident.release;
563         owner.release;
564     }
565 }
566 
567 enum SystemMsgType {
568     linkDead,
569 }
570 
571 private:
572 
573 static ~this() {
574     thisInfo.release;
575 }
576 
577 ref RefCounted!ActorInfo thisInfo() @safe nothrow @nogc {
578     static RefCounted!ActorInfo self;
579     return self;
580 }
581 
582 /*
583  *
584  */
585 void _spawnDetached(ref ActorSystem system, ref Aid newAid, bool linked, ActorRuntime actor) {
586     import std.stdio;
587 
588     auto ownerAid = thisAid;
589 
590     void exec() {
591         auto info = ActorInfo().refCounted;
592 
593         info.ident = newAid;
594         info.ident.setState(ActorState.running);
595         info.owner = ownerAid;
596         system.put(info);
597         thisInfo() = info;
598 
599         scope (exit)
600             () { info.ident.stop; system.remove(info); }();
601 
602         while (info.ident.state != ActorState.stopping) {
603             try {
604                 actor.act(100.dur!"msecs");
605             } catch (Exception e) {
606                 debug logger.warning(e.msg);
607             }
608         }
609     }
610 
611     auto t = new Thread(&exec);
612     t.start();
613 
614     auto info = thisInfo;
615     if (info.refCount != 0) {
616         thisInfo.links[newAid] = linked;
617     }
618 }
619 
620 bool hasLocalAliasing(Types...)() {
621     import std.typecons : Rebindable;
622 
623     // Works around "statement is not reachable"
624     bool doesIt = false;
625     static foreach (T; Types) {
626         static if (is(T == Aid)) { /* Allowed */ } else static if (is(T : Rebindable!R, R))
627             doesIt |= hasLocalAliasing!R;
628         else static if (is(T == struct))
629             doesIt |= hasLocalAliasing!(typeof(T.tupleof));
630         else
631             doesIt |= std.traits.hasUnsharedAliasing!(T);
632     }
633     return doesIt;
634 }
635 
636 template isSpawnable(F, T...) {
637     template isParamsImplicitlyConvertible(F1, F2, int i = 0) {
638         alias param1 = Parameters!F1;
639         alias param2 = Parameters!F2;
640         static if (param1.length != param2.length)
641             enum isParamsImplicitlyConvertible = false;
642         else static if (param1.length == i)
643             enum isParamsImplicitlyConvertible = true;
644         else static if (isImplicitlyConvertible!(param2[i], param1[i]))
645             enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1, F2, i + 1);
646         else
647             enum isParamsImplicitlyConvertible = false;
648     }
649 
650     enum isSpawnable = isCallable!F && is(ReturnType!F == void) && isParamsImplicitlyConvertible!(F,
651                 void function(T)) && (isFunctionPointer!F || !hasUnsharedAliasing!F);
652 }