1 /** 2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module my.actor.typed; 7 8 import core.time : Duration, dur; 9 import logger = std.experimental.logger; 10 import std.datetime : Clock; 11 import std.meta : AliasSeq; 12 import std.stdio : writefln; 13 import std.traits : Parameters; 14 15 public import std.typecons : Tuple; 16 17 import my.actor : Aid; 18 import my.actor.mbox : Message; 19 import my.gc.refc; 20 21 @("shall construct a typed actor and process two messages") 22 unittest { 23 alias MyActor = TypedActor!(Tuple!(int), Tuple!(int, double)); 24 25 static void msg0(int x, double y) @safe { 26 writefln!"%s:%s yeey %s %s"(__FUNCTION__, __LINE__, x, y); 27 } 28 29 static void msg1(int x) @safe { 30 writefln!"%s:%s yeey %s"(__FUNCTION__, __LINE__, x); 31 } 32 33 auto aid = Aid(42); 34 auto actor = makeTypedActor!(MyActor, msg0, msg1)(aid); 35 actor.actorId.normalMbox.put(Message(42, 84.0)); 36 actor.actorId.normalMbox.put(Message(42)); 37 38 actor.act(1.dur!"seconds"); 39 actor.act(1.dur!"seconds"); 40 } 41 42 @("shall construct a stateful typed actor and process two messages") 43 unittest { 44 alias MyActor = TypedActor!(Tuple!(int), Tuple!(int, double)); 45 46 struct State { 47 int cnt; 48 } 49 50 static void msg0(ref State st, int x, double y) @safe { 51 st.cnt += 1; 52 writefln!"%s:%s yeey %s %s"(__FUNCTION__, __LINE__, x, y); 53 } 54 55 static void msg1(ref State st, int x) @safe { 56 st.cnt += 2; 57 writefln!"%s:%s yeey %s"(__FUNCTION__, __LINE__, x); 58 } 59 60 auto aid = Aid(42); 61 auto actor = makeStatefulTypedActor!(MyActor, State, msg0, msg1)(aid, State.init); 62 actor.actorId.normalMbox.put(Message(42, 84.0)); 63 actor.actorId.normalMbox.put(Message(42)); 64 65 actor.act(1.dur!"seconds"); 66 actor.act(1.dur!"seconds"); 67 68 assert(actor.state.cnt == 3); 69 } 70 71 struct TypedActor(AllowedMsg...) { 72 alias AllowedMessages = AliasSeq!AllowedMsg; 73 } 74 75 enum isTypedActor(T) = is(T : TypedActor!U, U); 76 77 auto extend(TActor, AllowedMsg...)() if (isTypedActor!TActor) { 78 return TypedActor!(U, AllowedMsg); 79 } 80 81 /// Interface that all actors that execute in an ActorSystem must implement. 82 interface ActorRuntime { 83 /** Act on one message in the mailbox. 84 * 85 * Params: 86 * timeout = max time to wait for a message to arrive and execute the 87 * behavior for it. 88 */ 89 void act(Duration timeout); 90 91 /// The actor ID of the runtime. 92 ref Aid actorId(); 93 94 /// Release any held resources. 95 void release(); 96 } 97 98 class TypedActorRuntime : ActorRuntime { 99 alias Behavior = bool function(ref Message msg); 100 101 /// The behavior of the actor for messages. 102 Behavior behavior; 103 Aid aid; 104 105 this(Behavior bh, Aid aid) { 106 this.behavior = bh; 107 this.aid = aid; 108 } 109 110 override void release() { 111 aid.release; 112 } 113 114 override ref Aid actorId() { 115 return aid; 116 } 117 118 /// Act on one message in the mailbox. 119 override void act(Duration timeout) { 120 Message m; 121 do { 122 if (aid.systemMbox.get(Duration.zero, m)) 123 break; 124 if (aid.priorityMbox.get(Duration.zero, m)) 125 break; 126 if (aid.delayedMbox.get(Duration.zero, Clock.currTime, m)) 127 break; 128 if (!aid.normalMbox.get(timeout, m)) 129 return; 130 } 131 while (false); 132 133 if (!behavior(m)) { 134 logger.tracef("%s:%s no behavior for '%s'", __FUNCTION__, __LINE__, m); 135 } 136 } 137 } 138 139 class StatefulActorRuntime(StateT) : ActorRuntime { 140 alias Behavior = bool function(ref StateT state, ref Message msg); 141 142 /// The behavior of the actor for messages. 143 Behavior behavior; 144 Aid aid; 145 StateT state; 146 147 this(Behavior bh, Aid aid, StateT state) { 148 this.behavior = bh; 149 this.aid = aid; 150 this.state = state; 151 } 152 153 override void release() { 154 aid.release; 155 } 156 157 override ref Aid actorId() { 158 return aid; 159 } 160 161 /// Act on one message in the mailbox. 162 override void act(Duration timeout) { 163 Message m; 164 do { 165 if (aid.systemMbox.get(Duration.zero, m)) 166 break; 167 if (aid.priorityMbox.get(Duration.zero, m)) 168 break; 169 if (aid.delayedMbox.get(Duration.zero, Clock.currTime, m)) 170 break; 171 if (!aid.normalMbox.get(timeout, m)) 172 return; 173 } 174 while (false); 175 176 if (!behavior(state, m)) { 177 logger.tracef("%s:%s no behavior for '%s'", __FUNCTION__, __LINE__, m); 178 } 179 } 180 } 181 182 /// Check that `Behavior` implement the actor interface `TActor`. 183 auto makeTypedActor(TActor, Behavior...)(Aid aid) 184 if (isTypedActor!TActor && typeCheck!(0, TActor, Behavior)) { 185 return new TypedActorRuntime(&actorBehaviorImpl!Behavior, aid); 186 } 187 188 /// Check that `Behavior` implement the actor interface `TActor`. 189 auto makeStatefulTypedActor(TActor, StateT, Behavior...)(Aid aid, StateT state) 190 if (isTypedActor!TActor && typeCheck!(1, TActor, Behavior)) { 191 return new StatefulActorRuntime!StateT(&statefulActorBehaviorImpl!(StateT, 192 Behavior), aid, state); 193 } 194 195 package(my.actor) bool typeCheck(size_t paramFromIndex, TActor, Behavior...)() 196 if (isTypedActor!TActor) { 197 alias AllowedTypes = TActor.AllowedMessages; 198 199 foreach (T; AllowedTypes) { 200 bool added = false; 201 foreach (bh; Behavior) { 202 alias Args = Parameters!bh[paramFromIndex .. $]; 203 alias BehaviorTuple = Tuple!Args; 204 205 static if (is(BehaviorTuple == T)) { 206 if (added) { 207 assert(false, "duplicate overload specified for type '" ~ T.stringof ~ "'"); 208 } 209 added = true; 210 } 211 } 212 213 if (!added) { 214 assert(false, "unhandled message type '" ~ T.stringof ~ "'"); 215 } 216 } 217 return true; 218 } 219 220 /* Apply the first behavior `T` that matches the message. 221 * 222 * Matches ops against each message in turn until a match is found. 223 * 224 * Params: 225 * Behaviors = the behaviors (functions) to match the message against. 226 * ops = The operations to match. 227 * 228 * Returns: 229 * true if a message was retrieved and false if not. 230 */ 231 bool actorBehaviorImpl(Behaviors...)(ref Message msg) { 232 import std.meta : AliasSeq; 233 234 static assert(Behaviors.length, "Behaviors must not be empty"); 235 236 alias Ops = AliasSeq!(Behaviors); 237 238 foreach (i, t; Ops) { 239 alias Args = Parameters!(t); 240 241 if (msg.convertsTo!(Args)) { 242 msg.map!(Ops[i]); 243 return true; 244 } 245 } 246 return false; 247 } 248 249 bool statefulActorBehaviorImpl(StateT, Behaviors...)(ref StateT state, ref Message msg) { 250 import std.meta : AliasSeq; 251 252 static assert(Behaviors.length, "Behaviors must not be empty"); 253 254 alias Ops = AliasSeq!(Behaviors); 255 256 foreach (i, t; Ops) { 257 /// dropping the first parameter because it is the state. 258 alias Args = Parameters!(t)[1 .. $]; 259 260 if (msg.convertsTo!(Args)) { 261 msg.map!(Ops[i])(state); 262 return true; 263 } 264 } 265 return false; 266 }