1 /** 2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module my.actor.msg; 7 8 import std.meta : staticMap, AliasSeq; 9 import std.traits : Unqual, Parameters, isFunction, isFunctionPointer; 10 import std.typecons : Tuple, tuple; 11 import std.variant : Variant; 12 13 public import std.datetime : SysTime, Duration, dur; 14 15 import my.actor.mailbox; 16 import my.actor.common : ExitReason, makeSignature, SystemError; 17 import my.actor.actor : Actor, makeAction, makeRequest, makeReply, makePromise, 18 ErrorHandler, Promise, RequestResult; 19 import my.actor.system_msg; 20 import my.actor.typed : isTypedAddress, isTypedActor, isTypedActorImpl, 21 typeCheckMsg, ParamsToTuple, ReturnToTupleOrVoid, 22 underlyingActor, underlyingAddress, underlyingTypedAddress, underlyingWeakAddress; 23 24 SysTime infTimeout() @safe pure nothrow { 25 return SysTime.max; 26 } 27 28 SysTime timeout(Duration d) @safe nothrow { 29 import std.datetime : Clock; 30 31 return Clock.currTime + d; 32 } 33 34 /// Code looks better if it says delay when using delayedSend. 35 alias delay = timeout; 36 37 enum isActor(T) = is(T == Actor*) || isTypedActor!T || isTypedActorImpl!T; 38 enum isAddress(T) = is(T == WeakAddress) || is(T == StrongAddress) || isTypedAddress!T; 39 enum isDynamicAddress(T) = is(T == WeakAddress) || is(T == StrongAddress); 40 41 /** Link the lifetime of `self` to the actor using `sendTo`. 42 * 43 * An `ExitMsg` is sent to `self` if `sendTo` is terminated and vice versa. 44 * 45 * `ExitMsg` triggers `exitHandler`. 46 */ 47 void linkTo(AddressT0, AddressT1)(AddressT0 self, AddressT1 sendTo) @safe 48 if ((isActor!AddressT0 || isAddress!AddressT0) && (isActor!AddressT1 49 || isAddress!AddressT1)) { 50 import my.actor.mailbox : LinkRequest; 51 52 auto self_ = underlyingAddress(self); 53 auto addr = underlyingAddress(sendTo); 54 55 if (self_.empty || addr.empty) 56 return; 57 58 sendSystemMsg(self_, LinkRequest(addr.weakRef)); 59 sendSystemMsg(addr, LinkRequest(self_.weakRef)); 60 } 61 62 /// Remove the link between `self` and the actor using `sendTo`. 63 void unlinkTo(AddressT0, AddressT1)(AddressT0 self, AddressT1 sendTo) @safe 64 if ((isActor!AddressT0 || isAddress!AddressT0) && (isActor!AddressT1 65 || isAddress!AddressT1)) { 66 import my.actor.mailbox : UnlinkRequest; 67 68 auto self_ = underlyingAddress(self); 69 auto addr = underlyingAddress(sendTo); 70 71 // do NOT check if the addresses exist becuase it doesn't matter. Just 72 // remove the link. 73 74 sendSystemMsg(self_, UnlinkRequest(addr.weakRef)); 75 sendSystemMsg(addr, UnlinkRequest(self_.weakRef)); 76 } 77 78 /** Actor `self` will receive a `DownMsg` when `sendTo` shutdown. 79 * 80 * `DownMsg` triggers `downHandler`. 81 */ 82 void monitor(AddressT0, AddressT1)(AddressT0 self, AddressT1 sendTo) @safe 83 if ((isActor!AddressT0 || isAddress!AddressT0) && (isActor!AddressT1 84 || isAddress!AddressT1)) { 85 import my.actor.system_msg : MonitorRequest; 86 87 if (auto self_ = underlyingAddress(self)) 88 sendSystemMsg(sendTo, MonitorRequest(self_.weakRef)); 89 } 90 91 /// Remove `self` as a monitor of the actor using `sendTo`. 92 void demonitor(AddressT0, AddressT1)(AddressT0 self, AddressT1 sendTo) @safe 93 if ((isActor!AddressT0 || isAddress!AddressT0) && (isActor!AddressT1 94 || isAddress!AddressT1)) { 95 import my.actor.system_msg : MonitorRequest; 96 97 if (auto self_ = underlyingAddress(self)) 98 sendSystemMsg(sendTo, DemonitorRequest(self_.weakRef)); 99 } 100 101 // Only send the message if the system message queue is empty. 102 package void sendSystemMsgIfEmpty(AddressT, T)(AddressT sendTo, T msg) @safe 103 if (isAddress!AddressT) 104 in (!sendTo.empty, "cannot send to an empty address") { 105 auto tmp = underlyingAddress(sendTo); 106 auto addr = tmp.get; 107 if (addr && addr.empty!SystemMsg) 108 addr.put(SystemMsg(msg)); 109 } 110 111 package void sendSystemMsg(AddressT, T)(AddressT sendTo, T msg) @safe 112 if (isAddress!AddressT) { 113 auto tmp = underlyingAddress(sendTo); 114 auto addr = tmp.get; 115 if (addr) 116 addr.put(SystemMsg(msg)); 117 } 118 119 /// Trigger the message in the future. 120 void delayedSend(AddressT, Args...)(AddressT sendTo, SysTime delayTo, auto ref Args args) @trusted 121 if (is(AddressT == WeakAddress) || is(AddressT == StrongAddress) || is(AddressT == Actor*)) { 122 alias UArgs = staticMap!(Unqual, Args); 123 if (auto addr = underlyingAddress(sendTo).get) 124 addr.put(DelayedMsg(Msg(makeSignature!UArgs, 125 MsgType(MsgOneShot(Variant(Tuple!UArgs(args))))), delayTo)); 126 } 127 128 void sendExit(AddressT)(AddressT sendTo, const ExitReason reason) @safe 129 if (isAddress!AddressT) { 130 import my.actor.system_msg : SystemExitMsg; 131 132 sendSystemMsg(sendTo, SystemExitMsg(reason)); 133 } 134 135 // TODO: add verification that args do not have interior pointers 136 void send(AddressT, Args...)(AddressT sendTo, auto ref Args args) @trusted 137 if (isDynamicAddress!AddressT || is(AddressT == Actor*)) { 138 alias UArgs = staticMap!(Unqual, Args); 139 if (auto addr = underlyingAddress(sendTo).get) 140 addr.put(Msg(makeSignature!UArgs, MsgType(MsgOneShot(Variant(Tuple!UArgs(args)))))); 141 } 142 143 package struct RequestSend { 144 Actor* self; 145 WeakAddress requestTo; 146 SysTime timeout; 147 ulong replyId; 148 149 /// Copy constructor 150 this(ref return scope typeof(this) rhs) @safe pure nothrow @nogc { 151 self = rhs.self; 152 requestTo = rhs.requestTo; 153 timeout = rhs.timeout; 154 replyId = rhs.replyId; 155 } 156 157 @disable this(this); 158 } 159 160 package struct RequestSendThen { 161 RequestSend rs; 162 Msg msg; 163 164 @disable this(this); 165 166 /// Copy constructor 167 this(ref return typeof(this) rhs) { 168 rs = rhs.rs; 169 msg = rhs.msg; 170 } 171 } 172 173 RequestSend request(ActorT)(ActorT self, WeakAddress requestTo, SysTime timeout) 174 if (is(ActorT == Actor*)) { 175 return RequestSend(self, requestTo, timeout, self.replyId); 176 } 177 178 RequestSendThen send(Args...)(RequestSend r, auto ref Args args) { 179 alias UArgs = staticMap!(Unqual, Args); 180 181 auto replyTo = r.self.addr.weakRef; 182 183 // dfmt off 184 auto msg = () @trusted { 185 return Msg( 186 makeSignature!UArgs, 187 MsgType(MsgRequest(replyTo, r.replyId, Variant(Tuple!UArgs(args))))); 188 }(); 189 // dfmt on 190 191 return () @trusted { return RequestSendThen(r, msg); }(); 192 } 193 194 private struct ThenContext(Captures...) { 195 alias Ctx = Tuple!Captures; 196 197 RequestSendThen r; 198 Ctx* ctx; 199 200 void then(T)(T handler, ErrorHandler onError = null) 201 if (isFunction!T || isFunctionPointer!T) { 202 thenUnsafe!(T, Ctx)(r, handler, cast(void*) ctx, onError); 203 ctx = null; 204 } 205 } 206 207 // allows delegates but the context for them may be corrupted by the GC if they 208 // are used in another thread thus use of `thenUnsafe` must ensure it is not 209 // escaped. 210 package void thenUnsafe(T, CtxT = void)(scope RequestSendThen r, T handler, 211 void* ctx, ErrorHandler onError = null) @trusted { 212 auto requestTo = r.rs.requestTo.lock.get; 213 if (!requestTo) { 214 if (onError) 215 onError(*r.rs.self, ErrorMsg(r.rs.requestTo, SystemError.requestReceiverDown)); 216 return; 217 } 218 219 // TODO: compiler bug? how can SysTime be inferred to being scoped? 220 SysTime timeout = () @trusted { return r.rs.timeout; }(); 221 222 // first register a handler for the message. 223 // this order ensure that there is always a handler that can receive the message. 224 225 () @safe { 226 auto reply = makeReply!(T, CtxT)(handler); 227 reply.ctx = ctx; 228 r.rs.self.register(r.rs.replyId, timeout, reply, onError); 229 }(); 230 231 // then send it 232 requestTo.put(r.msg); 233 } 234 235 void then(T, CtxT = void)(scope RequestSendThen r, T handler, ErrorHandler onError = null) @trusted 236 if (isFunction!T || isFunctionPointer!T) { 237 thenUnsafe!(T, CtxT)(r, handler, null, onError); 238 } 239 240 void send(T, Args...)(T sendTo, auto ref Args args) 241 if ((isTypedAddress!T || isTypedActorImpl!T) && typeCheckMsg!(T, void, Args)) { 242 send(underlyingAddress(sendTo), args); 243 } 244 245 void delayedSend(T, Args...)(T sendTo, SysTime delayTo, auto ref Args args) 246 if ((isTypedAddress!T || isTypedActorImpl!T) && typeCheckMsg!(T, void, Args)) { 247 delayedSend(underlyingAddress(sendTo), delayTo, args); 248 } 249 250 private struct TypedRequestSend(TAddress) { 251 alias TypeAddress = TAddress; 252 RequestSend rs; 253 } 254 255 TypedRequestSend!TAddress request(TActor, TAddress)(ref TActor self, TAddress sendTo, 256 SysTime timeout) 257 if (isActor!TActor && (isTypedActorImpl!TAddress || isTypedAddress!TAddress)) { 258 return typeof(return)(.request(underlyingActor(self), underlyingWeakAddress(sendTo), timeout)); 259 } 260 261 private struct TypedRequestSendThen(TAddress, Params_...) { 262 alias TypeAddress = TAddress; 263 alias Params = Params_; 264 RequestSendThen rs; 265 266 /// Copy constructor 267 this(ref return scope typeof(this) rhs) { 268 rs = rhs.rs; 269 } 270 271 @disable this(this); 272 } 273 274 auto send(TR, Args...)(scope TR tr, auto ref Args args) 275 if (is(TR == TypedRequestSend!TAddress, TAddress)) { 276 return TypedRequestSendThen!(TR.TypeAddress, Args)(send(tr.rs, args)); 277 } 278 279 void then(TR, T, CtxT = void)(scope TR tr, T handler, ErrorHandler onError = null) 280 if ((isFunction!T || isFunctionPointer!T) && is(TR : TypedRequestSendThen!(TAddress, 281 Params), TAddress, Params...) && typeCheckMsg!(TAddress, 282 ParamsToTuple!(Parameters!T), Params)) { 283 then(tr.rs, handler, onError); 284 } 285 286 private struct TypedThenContext(TR, Captures...) { 287 import my.actor.actor : checkRefForContext, checkMatchingCtx; 288 289 alias Ctx = Tuple!Captures; 290 291 TR r; 292 Ctx* ctx; 293 294 void then(T)(T handler, ErrorHandler onError = null) 295 if ((isFunction!T || isFunctionPointer!T) && typeCheckMsg!(TR.TypeAddress, 296 ParamsToTuple!(Parameters!T[1 .. $]), TR.Params)) { 297 // better error message for the user by checking in the body instead of 298 // the constraint because the constraint gagges the static assert 299 // messages. 300 checkMatchingCtx!(Parameters!T[0], Ctx); 301 checkRefForContext!handler; 302 .thenUnsafe!(T, Ctx)(r.rs, handler, cast(void*) ctx, onError); 303 ctx = null; 304 } 305 } 306 307 alias Capture(T...) = Tuple!T; 308 enum isCapture(T) = is(T == Tuple!U, U); 309 enum isFirstParamCtx(Fn, CtxT) = is(Parameters!Fn[0] == CtxT); 310 311 /// Convenient function for capturing the actor itself when spawning. 312 alias CSelf(T = Actor*) = Capture!(T, "self"); 313 314 Capture!T capture(T...)(auto ref T args) 315 if (!is(T[0] == RequestSendThen) 316 && !is(T[0] == TypedRequestSendThen!(TAddress, Params), TAddress, Params...)) { 317 return Tuple!T(args); 318 } 319 320 auto capture(Captures...)(RequestSendThen r, auto ref Captures captures) { 321 // TODO: how to read the identifiers from captures? Using 322 // ParameterIdentifierTuple didn't work. 323 auto ctx = new Tuple!Captures(captures); 324 return ThenContext!Captures(r, ctx); 325 } 326 327 auto capture(TR, Captures...)(TR r, auto ref Captures captures) 328 if (is(TR : TypedRequestSendThen!(TAddress, Params), TAddress, Params...)) { 329 auto ctx = new Tuple!Captures(captures); 330 return TypedThenContext!(TR, Captures)(r, ctx); 331 }