1 /**
2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module my.actor.msg;
7 
8 import std.meta : staticMap, AliasSeq;
9 import std.traits : Unqual, Parameters, isFunction, isFunctionPointer;
10 import std.typecons : Tuple, tuple;
11 import std.variant : Variant;
12 
13 public import std.datetime : SysTime, Duration, dur;
14 
15 import my.actor.mailbox;
16 import my.actor.common : ExitReason, makeSignature, SystemError;
17 import my.actor.actor : Actor, makeAction, makeRequest, makeReply, makePromise,
18     ErrorHandler, Promise, RequestResult;
19 import my.actor.system_msg;
20 import my.actor.typed : isTypedAddress, isTypedActor, isTypedActorImpl,
21     typeCheckMsg, ParamsToTuple, ReturnToTupleOrVoid,
22     underlyingActor, underlyingAddress, underlyingTypedAddress, underlyingWeakAddress;
23 
24 SysTime infTimeout() @safe pure nothrow {
25     return SysTime.max;
26 }
27 
28 SysTime timeout(Duration d) @safe nothrow {
29     import std.datetime : Clock;
30 
31     return Clock.currTime + d;
32 }
33 
34 /// Code looks better if it says delay when using delayedSend.
35 alias delay = timeout;
36 
37 enum isActor(T) = is(T == Actor*) || isTypedActor!T || isTypedActorImpl!T;
38 enum isAddress(T) = is(T == WeakAddress) || is(T == StrongAddress) || isTypedAddress!T;
39 enum isDynamicAddress(T) = is(T == WeakAddress) || is(T == StrongAddress);
40 
41 /** Link the lifetime of `self` to the actor using `sendTo`.
42  *
43  * An `ExitMsg` is sent to `self` if `sendTo` is terminated and vice versa.
44  *
45  * `ExitMsg` triggers `exitHandler`.
46  */
47 void linkTo(AddressT0, AddressT1)(AddressT0 self, AddressT1 sendTo) @safe
48         if ((isActor!AddressT0 || isAddress!AddressT0) && (isActor!AddressT1
49             || isAddress!AddressT1)) {
50     import my.actor.mailbox : LinkRequest;
51 
52     auto self_ = underlyingAddress(self);
53     auto addr = underlyingAddress(sendTo);
54 
55     if (self_.empty || addr.empty)
56         return;
57 
58     sendSystemMsg(self_, LinkRequest(addr.weakRef));
59     sendSystemMsg(addr, LinkRequest(self_.weakRef));
60 }
61 
62 /// Remove the link between `self` and the actor using `sendTo`.
63 void unlinkTo(AddressT0, AddressT1)(AddressT0 self, AddressT1 sendTo) @safe
64         if ((isActor!AddressT0 || isAddress!AddressT0) && (isActor!AddressT1
65             || isAddress!AddressT1)) {
66     import my.actor.mailbox : UnlinkRequest;
67 
68     auto self_ = underlyingAddress(self);
69     auto addr = underlyingAddress(sendTo);
70 
71     // do NOT check if the addresses exist becuase it doesn't matter. Just
72     // remove the link.
73 
74     sendSystemMsg(self_, UnlinkRequest(addr.weakRef));
75     sendSystemMsg(addr, UnlinkRequest(self_.weakRef));
76 }
77 
78 /** Actor `self` will receive a `DownMsg` when `sendTo` shutdown.
79  *
80  * `DownMsg` triggers `downHandler`.
81  */
82 void monitor(AddressT0, AddressT1)(AddressT0 self, AddressT1 sendTo) @safe
83         if ((isActor!AddressT0 || isAddress!AddressT0) && (isActor!AddressT1
84             || isAddress!AddressT1)) {
85     import my.actor.system_msg : MonitorRequest;
86 
87     if (auto self_ = underlyingAddress(self))
88         sendSystemMsg(sendTo, MonitorRequest(self_.weakRef));
89 }
90 
91 /// Remove `self` as a monitor of the actor using `sendTo`.
92 void demonitor(AddressT0, AddressT1)(AddressT0 self, AddressT1 sendTo) @safe
93         if ((isActor!AddressT0 || isAddress!AddressT0) && (isActor!AddressT1
94             || isAddress!AddressT1)) {
95     import my.actor.system_msg : MonitorRequest;
96 
97     if (auto self_ = underlyingAddress(self))
98         sendSystemMsg(sendTo, DemonitorRequest(self_.weakRef));
99 }
100 
101 // Only send the message if the system message queue is empty.
102 package void sendSystemMsgIfEmpty(AddressT, T)(AddressT sendTo, T msg) @safe
103         if (isAddress!AddressT)
104 in (!sendTo.empty, "cannot send to an empty address") {
105     auto tmp = underlyingAddress(sendTo);
106     auto addr = tmp.get;
107     if (addr && addr.empty!SystemMsg)
108         addr.put(SystemMsg(msg));
109 }
110 
111 package void sendSystemMsg(AddressT, T)(AddressT sendTo, T msg) @safe
112         if (isAddress!AddressT) {
113     auto tmp = underlyingAddress(sendTo);
114     auto addr = tmp.get;
115     if (addr)
116         addr.put(SystemMsg(msg));
117 }
118 
119 /// Trigger the message in the future.
120 void delayedSend(AddressT, Args...)(AddressT sendTo, SysTime delayTo, auto ref Args args) @trusted
121         if (is(AddressT == WeakAddress) || is(AddressT == StrongAddress) || is(AddressT == Actor*)) {
122     alias UArgs = staticMap!(Unqual, Args);
123     if (auto addr = underlyingAddress(sendTo).get)
124         addr.put(DelayedMsg(Msg(makeSignature!UArgs,
125                 MsgType(MsgOneShot(Variant(Tuple!UArgs(args))))), delayTo));
126 }
127 
128 void sendExit(AddressT)(AddressT sendTo, const ExitReason reason) @safe
129         if (isAddress!AddressT) {
130     import my.actor.system_msg : SystemExitMsg;
131 
132     sendSystemMsg(sendTo, SystemExitMsg(reason));
133 }
134 
135 // TODO: add verification that args do not have interior pointers
136 void send(AddressT, Args...)(AddressT sendTo, auto ref Args args) @trusted
137         if (isDynamicAddress!AddressT || is(AddressT == Actor*)) {
138     alias UArgs = staticMap!(Unqual, Args);
139     if (auto addr = underlyingAddress(sendTo).get)
140         addr.put(Msg(makeSignature!UArgs, MsgType(MsgOneShot(Variant(Tuple!UArgs(args))))));
141 }
142 
143 package struct RequestSend {
144     Actor* self;
145     WeakAddress requestTo;
146     SysTime timeout;
147     ulong replyId;
148 
149     /// Copy constructor
150     this(ref return scope typeof(this) rhs) @safe pure nothrow @nogc {
151         self = rhs.self;
152         requestTo = rhs.requestTo;
153         timeout = rhs.timeout;
154         replyId = rhs.replyId;
155     }
156 
157     @disable this(this);
158 }
159 
160 package struct RequestSendThen {
161     RequestSend rs;
162     Msg msg;
163 
164     @disable this(this);
165 
166     /// Copy constructor
167     this(ref return typeof(this) rhs) {
168         rs = rhs.rs;
169         msg = rhs.msg;
170     }
171 }
172 
173 RequestSend request(ActorT)(ActorT self, WeakAddress requestTo, SysTime timeout)
174         if (is(ActorT == Actor*)) {
175     return RequestSend(self, requestTo, timeout, self.replyId);
176 }
177 
178 RequestSendThen send(Args...)(RequestSend r, auto ref Args args) {
179     alias UArgs = staticMap!(Unqual, Args);
180 
181     auto replyTo = r.self.addr.weakRef;
182 
183     // dfmt off
184     auto msg = () @trusted {
185         return Msg(
186         makeSignature!UArgs,
187         MsgType(MsgRequest(replyTo, r.replyId, Variant(Tuple!UArgs(args)))));
188     }();
189     // dfmt on
190 
191     return () @trusted { return RequestSendThen(r, msg); }();
192 }
193 
194 private struct ThenContext(Captures...) {
195     alias Ctx = Tuple!Captures;
196 
197     RequestSendThen r;
198     Ctx* ctx;
199 
200     void then(T)(T handler, ErrorHandler onError = null)
201             if (isFunction!T || isFunctionPointer!T) {
202         thenUnsafe!(T, Ctx)(r, handler, cast(void*) ctx, onError);
203         ctx = null;
204     }
205 }
206 
207 // allows delegates but the context for them may be corrupted by the GC if they
208 // are used in another thread thus use of `thenUnsafe` must ensure it is not
209 // escaped.
210 package void thenUnsafe(T, CtxT = void)(scope RequestSendThen r, T handler,
211         void* ctx, ErrorHandler onError = null) @trusted {
212     auto requestTo = r.rs.requestTo.lock.get;
213     if (!requestTo) {
214         if (onError)
215             onError(*r.rs.self, ErrorMsg(r.rs.requestTo, SystemError.requestReceiverDown));
216         return;
217     }
218 
219     // TODO: compiler bug? how can SysTime be inferred to being scoped?
220     SysTime timeout = () @trusted { return r.rs.timeout; }();
221 
222     // first register a handler for the message.
223     // this order ensure that there is always a handler that can receive the message.
224 
225     () @safe {
226         auto reply = makeReply!(T, CtxT)(handler);
227         reply.ctx = ctx;
228         r.rs.self.register(r.rs.replyId, timeout, reply, onError);
229     }();
230 
231     // then send it
232     requestTo.put(r.msg);
233 }
234 
235 void then(T, CtxT = void)(scope RequestSendThen r, T handler, ErrorHandler onError = null) @trusted
236         if (isFunction!T || isFunctionPointer!T) {
237     thenUnsafe!(T, CtxT)(r, handler, null, onError);
238 }
239 
240 void send(T, Args...)(T sendTo, auto ref Args args)
241         if ((isTypedAddress!T || isTypedActorImpl!T) && typeCheckMsg!(T, void, Args)) {
242     send(underlyingAddress(sendTo), args);
243 }
244 
245 void delayedSend(T, Args...)(T sendTo, SysTime delayTo, auto ref Args args)
246         if ((isTypedAddress!T || isTypedActorImpl!T) && typeCheckMsg!(T, void, Args)) {
247     delayedSend(underlyingAddress(sendTo), delayTo, args);
248 }
249 
250 private struct TypedRequestSend(TAddress) {
251     alias TypeAddress = TAddress;
252     RequestSend rs;
253 }
254 
255 TypedRequestSend!TAddress request(TActor, TAddress)(ref TActor self, TAddress sendTo,
256         SysTime timeout)
257         if (isActor!TActor && (isTypedActorImpl!TAddress || isTypedAddress!TAddress)) {
258     return typeof(return)(.request(underlyingActor(self), underlyingWeakAddress(sendTo), timeout));
259 }
260 
261 private struct TypedRequestSendThen(TAddress, Params_...) {
262     alias TypeAddress = TAddress;
263     alias Params = Params_;
264     RequestSendThen rs;
265 
266     /// Copy constructor
267     this(ref return scope typeof(this) rhs) {
268         rs = rhs.rs;
269     }
270 
271     @disable this(this);
272 }
273 
274 auto send(TR, Args...)(scope TR tr, auto ref Args args)
275         if (is(TR == TypedRequestSend!TAddress, TAddress)) {
276     return TypedRequestSendThen!(TR.TypeAddress, Args)(send(tr.rs, args));
277 }
278 
279 void then(TR, T, CtxT = void)(scope TR tr, T handler, ErrorHandler onError = null)
280         if ((isFunction!T || isFunctionPointer!T) && is(TR : TypedRequestSendThen!(TAddress,
281             Params), TAddress, Params...) && typeCheckMsg!(TAddress,
282             ParamsToTuple!(Parameters!T), Params)) {
283     then(tr.rs, handler, onError);
284 }
285 
286 private struct TypedThenContext(TR, Captures...) {
287     import my.actor.actor : checkRefForContext, checkMatchingCtx;
288 
289     alias Ctx = Tuple!Captures;
290 
291     TR r;
292     Ctx* ctx;
293 
294     void then(T)(T handler, ErrorHandler onError = null)
295             if ((isFunction!T || isFunctionPointer!T) && typeCheckMsg!(TR.TypeAddress,
296                 ParamsToTuple!(Parameters!T[1 .. $]), TR.Params)) {
297         // better error message for the user by checking in the body instead of
298         // the constraint because the constraint gagges the static assert
299         // messages.
300         checkMatchingCtx!(Parameters!T[0], Ctx);
301         checkRefForContext!handler;
302         .thenUnsafe!(T, Ctx)(r.rs, handler, cast(void*) ctx, onError);
303         ctx = null;
304     }
305 }
306 
307 alias Capture(T...) = Tuple!T;
308 enum isCapture(T) = is(T == Tuple!U, U);
309 enum isFirstParamCtx(Fn, CtxT) = is(Parameters!Fn[0] == CtxT);
310 
311 /// Convenient function for capturing the actor itself when spawning.
312 alias CSelf(T = Actor*) = Capture!(T, "self");
313 
314 Capture!T capture(T...)(auto ref T args)
315         if (!is(T[0] == RequestSendThen)
316             && !is(T[0] == TypedRequestSendThen!(TAddress, Params), TAddress, Params...)) {
317     return Tuple!T(args);
318 }
319 
320 auto capture(Captures...)(RequestSendThen r, auto ref Captures captures) {
321     // TODO: how to read the identifiers from captures? Using
322     // ParameterIdentifierTuple didn't work.
323     auto ctx = new Tuple!Captures(captures);
324     return ThenContext!Captures(r, ctx);
325 }
326 
327 auto capture(TR, Captures...)(TR r, auto ref Captures captures)
328         if (is(TR : TypedRequestSendThen!(TAddress, Params), TAddress, Params...)) {
329     auto ctx = new Tuple!Captures(captures);
330     return TypedThenContext!(TR, Captures)(r, ctx);
331 }