1 /**
2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module my.actor.system;
7 
8 import core.sync.condition : Condition;
9 import core.sync.mutex : Mutex;
10 import core.thread : Thread;
11 import logger = std.experimental.logger;
12 import std.algorithm : min, max, clamp;
13 import std.datetime : dur, Clock, Duration;
14 import std.parallelism : Task, TaskPool, task;
15 import std.traits : Parameters, ReturnType;
16 
17 import my.optional;
18 
19 public import my.actor.typed;
20 public import my.actor.actor : Actor, build, makePromise, Promise, scopedActor, impl;
21 public import my.actor.mailbox : Address, makeAddress2, WeakAddress;
22 public import my.actor.msg;
23 import my.actor.common;
24 import my.actor.memory : ActorAlloc;
25 
26 System makeSystem(TaskPool pool) @safe {
27     return System(pool, false);
28 }
29 
30 System makeSystem() @safe {
31     return System(new TaskPool, true);
32 }
33 
34 struct SystemConfig {
35     static struct Scheduler {
36         // number of messages each actor is allowed to consume per scheduled run.
37         Optional!ulong maxThroughput;
38         // how long a worker sleeps before polling the actor queue.
39         Optional!Duration pollInterval;
40     }
41 
42     Scheduler scheduler;
43 }
44 
45 struct System {
46     import std.functional : forward;
47 
48     private {
49         bool running;
50         bool ownsPool;
51         TaskPool pool;
52         Backend bg;
53     }
54 
55     @disable this(this);
56 
57     this(TaskPool pool, bool ownsPool) @safe {
58         this(SystemConfig.init, pool, ownsPool);
59     }
60 
61     /**
62      * Params:
63      *  pool = thread pool to use for scheduling actors.
64      */
65     this(SystemConfig conf, TaskPool pool, bool ownsPool) @safe {
66         this.pool = pool;
67         this.ownsPool = ownsPool;
68         this.bg = Backend(new Scheduler(conf.scheduler, pool));
69 
70         this.running = true;
71         this.bg.start(pool, pool.size);
72     }
73 
74     ~this() @safe {
75         shutdown;
76     }
77 
78     /// Shutdown all actors as fast as possible.
79     void shutdown() @safe {
80         if (!running)
81             return;
82 
83         bg.shutdown;
84         if (ownsPool)
85             pool.finish(true);
86         pool = null;
87 
88         running = false;
89     }
90 
91     /// Wait for all actors to finish (terminate) before returning.
92     void wait() @safe {
93         if (!running)
94             return;
95 
96         bg.shutdown;
97         if (ownsPool)
98             pool.finish(true);
99         pool = null;
100 
101         running = false;
102     }
103 
104     /// spawn dynamic actor.
105     WeakAddress spawn(Fn, Args...)(Fn fn, auto ref Args args)
106             if (is(Parameters!Fn[0] == Actor*) && is(ReturnType!Fn == Actor*)) {
107         auto actor = bg.alloc.make(makeAddress2);
108         return schedule(fn(actor, forward!args));
109     }
110 
111     /// spawn typed actor.
112     auto spawn(Fn, Args...)(Fn fn, auto ref Args args)
113             if (isTypedActorImpl!(Parameters!(Fn)[0])) {
114         alias ActorT = TypedActor!(Parameters!(Fn)[0].AllowedMessages);
115         auto actor = bg.alloc.make(makeAddress2);
116         auto impl = fn(ActorT.Impl(actor), forward!args);
117         schedule(actor);
118         return impl.address;
119     }
120 
121     // schedule an actor for execution in the thread pool.
122     // Returns: the address of the actor.
123     private WeakAddress schedule(Actor* actor) @safe {
124         assert(bg.scheduler.isActive);
125         setHomeSystem(actor);
126         bg.scheduler.putWaiting(actor);
127         return actor.address;
128     }
129 
130     // set the homesystem of the actor. this is safe on the assumption that the
131     // actor system is the last to terminate.
132     private void setHomeSystem(Actor* actor) @trusted {
133         actor.setHomeSystem(&this);
134     }
135 }
136 
137 @("shall start an actor system, execute an actor and shutdown")
138 @safe unittest {
139     auto sys = makeSystem;
140 
141     int hasExecutedWith42;
142     static void fn(ref Capture!(int*, "hasExecutedWith42") c, int x) {
143         if (x == 42)
144             (*c.hasExecutedWith42)++;
145     }
146 
147     auto addr = sys.spawn((Actor* a) => build(a).set(&fn, capture(&hasExecutedWith42)).finalize);
148     send(addr, 42);
149     send(addr, 43);
150 
151     const failAfter = Clock.currTime + 3.dur!"seconds";
152     const start = Clock.currTime;
153     while (hasExecutedWith42 == 0 && Clock.currTime < failAfter) {
154     }
155     const td = Clock.currTime - start;
156 
157     assert(hasExecutedWith42 == 1);
158     assert(td < 3.dur!"seconds");
159 }
160 
161 @("shall be possible to send a message to self during construction")
162 unittest {
163     auto sys = makeSystem;
164 
165     int hasExecutedWith42;
166     static void fn(ref Capture!(int*, "hasExecutedWith42") c, int x) {
167         if (x == 42)
168             (*c.hasExecutedWith42)++;
169     }
170 
171     auto addr = sys.spawn((Actor* self) {
172         send(self, 42);
173         return impl(self, &fn, capture(&hasExecutedWith42));
174     });
175     send(addr, 42);
176     send(addr, 43);
177 
178     const failAfter = Clock.currTime + 3.dur!"seconds";
179     while (hasExecutedWith42 < 2 && Clock.currTime < failAfter) {
180     }
181 
182     assert(hasExecutedWith42 == 2);
183 }
184 
185 @("shall spawn two typed actors which are connected, execute and shutdow")
186 @safe unittest {
187     import std.typecons : Tuple;
188 
189     auto sys = makeSystem;
190 
191     alias A1 = typedActor!(int function(int), string function(int, int));
192     alias A2 = typedActor!(int function(int));
193 
194     auto spawnA1(A1.Impl self) {
195         return my.actor.typed.impl(self, (int a) { return a + 10; }, (int a, int b) => "hej");
196     }
197 
198     auto a1 = sys.spawn(&spawnA1);
199 
200     // final result from A2's continuation.
201     auto spawnA2(A2.Impl self) {
202         return my.actor.typed.impl(self, (ref Capture!(A2.Impl, "self", A1.Address, "a1") c, int x) {
203             auto p = makePromise!int;
204             // dfmt off
205             c.self.request(c.a1, infTimeout)
206                 .send(x + 10)
207                 .capture(p)
208                 .then((ref Tuple!(Promise!int, "p") ctx, int a) { ctx.p.deliver(a); });
209             // dfmt on
210             return p;
211         }, capture(self, a1));
212     }
213 
214     auto a2 = sys.spawn(&spawnA2);
215 
216     auto self = scopedActor;
217     int ok;
218     // start msg to a2 which pass it on to a1.
219     self.request(a2, infTimeout).send(10).then((int x) { ok = x; });
220 
221     assert(ok == 30);
222 }
223 
224 private:
225 @safe:
226 
227 struct Backend {
228     Scheduler scheduler;
229     ActorAlloc alloc;
230 
231     // trusted: the ref to alloc on the assumption that the actor system is the
232     // last to terminate. the backend is owned by the system and correctly
233     // shutdown.
234     void start(TaskPool pool, ulong workers) @trusted {
235         scheduler.start(pool, workers, &alloc);
236     }
237 
238     void shutdown() {
239         import core.memory : GC;
240         import my.libc : malloc_trim;
241 
242         scheduler.shutdown;
243         scheduler = null;
244         //() @trusted { .destroy(scheduler); GC.collect; malloc_trim(0); }();
245         () @trusted { .destroy(scheduler); }();
246         () @trusted { GC.collect; }();
247         () @trusted { malloc_trim(0); }();
248     }
249 }
250 
251 /** Schedule actors for execution.
252  *
253  * A worker pop an actor, execute it and then put it back for later scheduling.
254  *
255  * A watcher monitors inactive actors for either messages to have arrived or
256  * timeouts to trigger. They are then moved back to the waiting queue. The
257  * workers are notified that there are actors waiting to be executed.
258  */
259 class Scheduler {
260     import core.atomic : atomicOp, atomicLoad;
261 
262     SystemConfig.Scheduler conf;
263 
264     ActorAlloc* alloc;
265 
266     /// Workers will shutdown cleanly if it is false.
267     bool isActive;
268 
269     /// Watcher will shutdown cleanly if this is false.
270     bool isWatcher;
271 
272     /// Shutdowner will shutdown cleanly if false;
273     bool isShutdown;
274 
275     // Workers waiting to be activated
276     Mutex waitingWorkerMtx;
277     Condition waitingWorker;
278 
279     // actors waiting to be executed by a worker.
280     Queue!(Actor*) waiting;
281 
282     // Actors waiting for messages to arrive thus they are inactive.
283     Queue!(Actor*) inactive;
284 
285     // Actors that are shutting down.
286     Queue!(Actor*) inShutdown;
287 
288     Task!(worker, Scheduler, const ulong)*[] workers;
289     Task!(watchInactive, Scheduler)* watcher;
290     Task!(watchShutdown, Scheduler)* shutdowner;
291 
292     this(SystemConfig.Scheduler conf, TaskPool pool) {
293         this.conf = conf;
294         this.isActive = true;
295         this.isWatcher = true;
296         this.isShutdown = true;
297         this.waiting = typeof(waiting)(new Mutex);
298         this.inactive = typeof(inactive)(new Mutex);
299         this.inShutdown = typeof(inShutdown)(new Mutex);
300 
301         this.waitingWorkerMtx = new Mutex;
302         this.waitingWorker = new Condition(this.waitingWorkerMtx);
303     }
304 
305     void wakeup() @trusted {
306         synchronized (waitingWorkerMtx) {
307             waitingWorker.notify;
308         }
309     }
310 
311     void wait(Duration w) @trusted {
312         synchronized (waitingWorkerMtx) {
313             waitingWorker.wait(w);
314         }
315     }
316 
317     /// check the inactive actors for activity.
318     private static void watchInactive(Scheduler sched) {
319         const maxThroughput = sched.conf.maxThroughput.orElse(50UL);
320         const shutdownPoll = sched.conf.pollInterval.orElse(20.dur!"msecs");
321 
322         const minPoll = 100.dur!"usecs";
323         const stepPoll = minPoll;
324         const maxPoll = sched.conf.pollInterval.orElse(10.dur!"msecs");
325 
326         Duration pollInterval = minPoll;
327 
328         while (sched.isActive) {
329             const runActors = sched.inactive.length;
330             ulong inactive;
331             Duration nextPoll = pollInterval;
332 
333             foreach (_; 0 .. runActors) {
334                 if (auto a = sched.inactive.pop.unsafeMove) {
335                     if (a.hasMessage) {
336                         sched.putWaiting(a);
337                     } else {
338                         const t = a.nextTimeout(Clock.currTime, maxPoll);
339 
340                         if (t < minPoll) {
341                             sched.putWaiting(a);
342                         } else {
343                             sched.putInactive(a);
344                             nextPoll = inactive == 0 ? t : min(nextPoll, t);
345                             inactive++;
346                         }
347                     }
348                 }
349             }
350 
351             if (inactive != 0) {
352                 pollInterval = clamp(nextPoll, minPoll, maxPoll);
353             }
354 
355             if (inactive == runActors) {
356                 () @trusted { Thread.sleep(pollInterval); }();
357                 pollInterval = min(maxPoll, pollInterval);
358             } else {
359                 sched.wakeup;
360                 pollInterval = minPoll;
361             }
362         }
363 
364         while (sched.isWatcher || !sched.inactive.empty) {
365             if (auto a = sched.inactive.pop.unsafeMove) {
366                 sched.inShutdown.put(a);
367             }
368         }
369     }
370 
371     /// finish shutdown of actors that are shutting down.
372     private static void watchShutdown(Scheduler sched) {
373         import my.actor.msg : sendSystemMsgIfEmpty;
374         import my.actor.common : ExitReason;
375         import my.actor.mailbox : SystemExitMsg;
376 
377         const shutdownPoll = sched.conf.pollInterval.orElse(20.dur!"msecs");
378 
379         const minPoll = 100.dur!"usecs";
380         const stepPoll = minPoll;
381         const maxPoll = sched.conf.pollInterval.orElse(10.dur!"msecs");
382 
383         Duration pollInterval = minPoll;
384 
385         while (sched.isActive) {
386             const runActors = sched.inShutdown.length;
387             ulong alive;
388 
389             foreach (_; 0 .. runActors) {
390                 if (auto a = sched.inShutdown.pop.unsafeMove) {
391                     if (a.isAlive) {
392                         alive++;
393                         a.process(Clock.currTime);
394                         sched.inShutdown.put(a);
395                     } else {
396                         sched.alloc.dispose(a);
397                     }
398                 }
399             }
400 
401             if (alive == 0) {
402                 () @trusted { Thread.sleep(pollInterval); }();
403                 pollInterval = max(minPoll, pollInterval + stepPoll);
404             } else {
405                 pollInterval = minPoll;
406             }
407         }
408 
409         while (sched.isShutdown || !sched.inShutdown.empty) {
410             if (auto a = sched.inShutdown.pop.unsafeMove) {
411                 if (a.isAlive) {
412                     sendSystemMsgIfEmpty(a.address, SystemExitMsg(ExitReason.kill));
413                     a.process(Clock.currTime);
414                     sched.inShutdown.put(a);
415                 } else {
416                     sched.alloc.dispose(a);
417                 }
418             }
419         }
420     }
421 
422     private static void worker(Scheduler sched, const ulong id) {
423         import my.actor.msg : sendSystemMsgIfEmpty;
424         import my.actor.common : ExitReason;
425         import my.actor.mailbox : SystemExitMsg;
426 
427         const maxThroughput = sched.conf.maxThroughput.orElse(50UL);
428         const pollInterval = sched.conf.pollInterval.orElse(50.dur!"msecs");
429         const inactiveLimit = min(500.dur!"msecs", pollInterval * 3);
430 
431         while (sched.isActive) {
432             const runActors = sched.waiting.length;
433             ulong consecutiveInactive;
434 
435             foreach (_; 0 .. runActors) {
436                 if (auto ctx = sched.pop) {
437                     ulong msgs;
438                     ulong prevMsgs;
439                     ulong totalMsgs;
440                     do {
441                         // reduce clock polling
442                         const now = Clock.currTime;
443                         ctx.process(now);
444                         prevMsgs = msgs;
445                         msgs = ctx.messages;
446                         totalMsgs += msgs;
447                     }
448                     while (totalMsgs < maxThroughput && msgs != prevMsgs);
449 
450                     if (totalMsgs == 0) {
451                         sched.putInactive(ctx);
452                         consecutiveInactive++;
453                     } else {
454                         consecutiveInactive = 0;
455                         sched.putWaiting(ctx);
456                     }
457                 } else {
458                     sched.wait(pollInterval);
459                 }
460             }
461 
462             // sleep if it is detected that actors are not sending messages
463             if (consecutiveInactive == runActors) {
464                 sched.wait(inactiveLimit);
465             }
466         }
467 
468         while (!sched.waiting.empty) {
469             const sleepAfter = 1 + sched.waiting.length;
470             for (size_t i; i < sleepAfter; ++i) {
471                 if (auto ctx = sched.pop) {
472                     sendSystemMsgIfEmpty(ctx.address, SystemExitMsg(ExitReason.kill));
473                     ctx.process(Clock.currTime);
474                     sched.putWaiting(ctx);
475                 }
476             }
477 
478             () @trusted { Thread.sleep(pollInterval); }();
479         }
480     }
481 
482     /// Start the workers.
483     void start(TaskPool pool, const ulong nr, ActorAlloc* alloc) {
484         this.alloc = alloc;
485         foreach (const id; 0 .. nr) {
486             auto t = task!worker(this, id);
487             workers ~= t;
488             pool.put(t);
489         }
490         watcher = task!watchInactive(this);
491         watcher.executeInNewThread(Thread.PRIORITY_MIN);
492 
493         shutdowner = task!watchShutdown(this);
494         shutdowner.executeInNewThread(Thread.PRIORITY_MIN);
495     }
496 
497     void shutdown() {
498         isActive = false;
499         foreach (a; workers) {
500             try {
501                 a.yieldForce;
502             } catch (Exception e) {
503                 // TODO: log exceptions?
504             }
505         }
506 
507         isWatcher = false;
508         try {
509             watcher.yieldForce;
510         } catch (Exception e) {
511         }
512 
513         isShutdown = false;
514         try {
515             shutdowner.yieldForce;
516         } catch (Exception e) {
517         }
518     }
519 
520     Actor* pop() {
521         return waiting.pop.unsafeMove;
522     }
523 
524     void putWaiting(Actor* a) @safe {
525         if (a.isAccepting) {
526             waiting.put(a);
527         } else if (a.isAlive) {
528             inShutdown.put(a);
529         } else {
530             // TODO: should terminated actors be logged?
531             alloc.dispose(a);
532         }
533     }
534 
535     void putInactive(Actor* a) @safe {
536         if (a.isAccepting) {
537             inactive.put(a);
538         } else if (a.isAlive) {
539             inShutdown.put(a);
540         } else {
541             // TODO: should terminated actors be logged?
542             alloc.dispose(a);
543         }
544     }
545 }