1 /**
2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module my.actor.typed;
7 
8 import core.time : Duration, dur;
9 import logger = std.experimental.logger;
10 import std.datetime : Clock;
11 import std.meta : AliasSeq;
12 import std.stdio : writefln;
13 import std.traits : Parameters;
14 
15 public import std.typecons : Tuple;
16 
17 import my.actor : Aid;
18 import my.actor.mbox : Message;
19 import my.gc.refc;
20 
21 @("shall construct a typed actor and process two messages")
22 unittest {
23     alias MyActor = TypedActor!(Tuple!(int), Tuple!(int, double));
24 
25     static void msg0(int x, double y) @safe {
26         writefln!"%s:%s yeey %s %s"(__FUNCTION__, __LINE__, x, y);
27     }
28 
29     static void msg1(int x) @safe {
30         writefln!"%s:%s yeey %s"(__FUNCTION__, __LINE__, x);
31     }
32 
33     auto aid = Aid(42);
34     auto actor = makeTypedActor!(MyActor, msg0, msg1)(aid);
35     actor.actorId.normalMbox.put(Message(42, 84.0));
36     actor.actorId.normalMbox.put(Message(42));
37 
38     actor.act(1.dur!"seconds");
39     actor.act(1.dur!"seconds");
40 }
41 
42 @("shall construct a stateful typed actor and process two messages")
43 unittest {
44     alias MyActor = TypedActor!(Tuple!(int), Tuple!(int, double));
45 
46     struct State {
47         int cnt;
48     }
49 
50     static void msg0(ref State st, int x, double y) @safe {
51         st.cnt += 1;
52         writefln!"%s:%s yeey %s %s"(__FUNCTION__, __LINE__, x, y);
53     }
54 
55     static void msg1(ref State st, int x) @safe {
56         st.cnt += 2;
57         writefln!"%s:%s yeey %s"(__FUNCTION__, __LINE__, x);
58     }
59 
60     auto aid = Aid(42);
61     auto actor = makeStatefulTypedActor!(MyActor, State, msg0, msg1)(aid, State.init);
62     actor.actorId.normalMbox.put(Message(42, 84.0));
63     actor.actorId.normalMbox.put(Message(42));
64 
65     actor.act(1.dur!"seconds");
66     actor.act(1.dur!"seconds");
67 
68     assert(actor.state.cnt == 3);
69 }
70 
71 struct TypedActor(AllowedMsg...) {
72     alias AllowedMessages = AliasSeq!AllowedMsg;
73 }
74 
75 enum isTypedActor(T) = is(T : TypedActor!U, U);
76 
77 auto extend(TActor, AllowedMsg...)() if (isTypedActor!TActor) {
78     return TypedActor!(U, AllowedMsg);
79 }
80 
81 /// Interface that all actors that execute in an ActorSystem must implement.
82 interface ActorRuntime {
83     /** Act on one message in the mailbox.
84      *
85      * Params:
86      *  timeout = max time to wait for a message to arrive and execute the
87      *            behavior for it.
88      */
89     void act(Duration timeout);
90 
91     /// The actor ID of the runtime.
92     ref Aid actorId();
93 
94     /// Release any held resources.
95     void release();
96 }
97 
98 class TypedActorRuntime : ActorRuntime {
99     alias Behavior = bool function(ref Message msg);
100 
101     /// The behavior of the actor for messages.
102     Behavior behavior;
103     Aid aid;
104 
105     this(Behavior bh, Aid aid) {
106         this.behavior = bh;
107         this.aid = aid;
108     }
109 
110     override void release() {
111         aid.release;
112     }
113 
114     override ref Aid actorId() {
115         return aid;
116     }
117 
118     /// Act on one message in the mailbox.
119     override void act(Duration timeout) {
120         Message m;
121         do {
122             if (aid.systemMbox.get(Duration.zero, m))
123                 break;
124             if (aid.priorityMbox.get(Duration.zero, m))
125                 break;
126             if (aid.delayedMbox.get(Duration.zero, Clock.currTime, m))
127                 break;
128             if (!aid.normalMbox.get(timeout, m))
129                 return;
130         }
131         while (false);
132 
133         if (!behavior(m)) {
134             logger.tracef("%s:%s no behavior for '%s'", __FUNCTION__, __LINE__, m);
135         }
136     }
137 }
138 
139 class StatefulActorRuntime(StateT) : ActorRuntime {
140     alias Behavior = bool function(ref StateT state, ref Message msg);
141 
142     /// The behavior of the actor for messages.
143     Behavior behavior;
144     Aid aid;
145     StateT state;
146 
147     this(Behavior bh, Aid aid, StateT state) {
148         this.behavior = bh;
149         this.aid = aid;
150         this.state = state;
151     }
152 
153     override void release() {
154         aid.release;
155     }
156 
157     override ref Aid actorId() {
158         return aid;
159     }
160 
161     /// Act on one message in the mailbox.
162     override void act(Duration timeout) {
163         Message m;
164         do {
165             if (aid.systemMbox.get(Duration.zero, m))
166                 break;
167             if (aid.priorityMbox.get(Duration.zero, m))
168                 break;
169             if (aid.delayedMbox.get(Duration.zero, Clock.currTime, m))
170                 break;
171             if (!aid.normalMbox.get(timeout, m))
172                 return;
173         }
174         while (false);
175 
176         if (!behavior(state, m)) {
177             logger.tracef("%s:%s no behavior for '%s'", __FUNCTION__, __LINE__, m);
178         }
179     }
180 }
181 
182 /// Check that `Behavior` implement the actor interface `TActor`.
183 auto makeTypedActor(TActor, Behavior...)(Aid aid)
184         if (isTypedActor!TActor && typeCheck!(0, TActor, Behavior)) {
185     return new TypedActorRuntime(&actorBehaviorImpl!Behavior, aid);
186 }
187 
188 /// Check that `Behavior` implement the actor interface `TActor`.
189 auto makeStatefulTypedActor(TActor, StateT, Behavior...)(Aid aid, StateT state)
190         if (isTypedActor!TActor && typeCheck!(1, TActor, Behavior)) {
191     return new StatefulActorRuntime!StateT(&statefulActorBehaviorImpl!(StateT,
192             Behavior), aid, state);
193 }
194 
195 package(my.actor) bool typeCheck(size_t paramFromIndex, TActor, Behavior...)()
196         if (isTypedActor!TActor) {
197     alias AllowedTypes = TActor.AllowedMessages;
198 
199     foreach (T; AllowedTypes) {
200         bool added = false;
201         foreach (bh; Behavior) {
202             alias Args = Parameters!bh[paramFromIndex .. $];
203             alias BehaviorTuple = Tuple!Args;
204 
205             static if (is(BehaviorTuple == T)) {
206                 if (added) {
207                     assert(false, "duplicate overload specified for type '" ~ T.stringof ~ "'");
208                 }
209                 added = true;
210             }
211         }
212 
213         if (!added) {
214             assert(false, "unhandled message type '" ~ T.stringof ~ "'");
215         }
216     }
217     return true;
218 }
219 
220 /* Apply the first behavior `T` that matches the message.
221  *
222  * Matches ops against each message in turn until a match is found.
223  *
224  * Params:
225  *  Behaviors = the behaviors (functions) to match the message against.
226  *  ops = The operations to match.
227  *
228  * Returns:
229  *  true if a message was retrieved and false if not.
230  */
231 bool actorBehaviorImpl(Behaviors...)(ref Message msg) {
232     import std.meta : AliasSeq;
233 
234     static assert(Behaviors.length, "Behaviors must not be empty");
235 
236     alias Ops = AliasSeq!(Behaviors);
237 
238     foreach (i, t; Ops) {
239         alias Args = Parameters!(t);
240 
241         if (msg.convertsTo!(Args)) {
242             msg.map!(Ops[i]);
243             return true;
244         }
245     }
246     return false;
247 }
248 
249 bool statefulActorBehaviorImpl(StateT, Behaviors...)(ref StateT state, ref Message msg) {
250     import std.meta : AliasSeq;
251 
252     static assert(Behaviors.length, "Behaviors must not be empty");
253 
254     alias Ops = AliasSeq!(Behaviors);
255 
256     foreach (i, t; Ops) {
257         /// dropping the first parameter because it is the state.
258         alias Args = Parameters!(t)[1 .. $];
259 
260         if (msg.convertsTo!(Args)) {
261             msg.map!(Ops[i])(state);
262             return true;
263         }
264     }
265     return false;
266 }