1 /**
2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module my.actor.mailbox;
7 
8 import logger = std.experimental.logger;
9 import core.sync.mutex : Mutex;
10 import std.datetime : SysTime;
11 import std.variant : Variant;
12 
13 import sumtype;
14 
15 import my.actor.common;
16 import my.gc.refc;
17 public import my.actor.system_msg;
18 
19 struct MsgOneShot {
20     Variant data;
21 }
22 
23 struct MsgRequest {
24     WeakAddress replyTo;
25     ulong replyId;
26     Variant data;
27 }
28 
29 alias MsgType = SumType!(MsgOneShot, MsgRequest);
30 
31 struct Msg {
32     ulong signature;
33     MsgType type;
34 
35     this(ref return typeof(this) a) @trusted {
36         signature = a.signature;
37         type = a.type;
38     }
39 
40     @disable this(this);
41 }
42 
43 alias SystemMsg = SumType!(ErrorMsg, DownMsg, ExitMsg, SystemExitMsg,
44         MonitorRequest, DemonitorRequest, LinkRequest, UnlinkRequest);
45 
46 struct Reply {
47     ulong id;
48     Variant data;
49 
50     this(ref return typeof(this) a) {
51         id = a.id;
52         data = a.data;
53     }
54 
55     @disable this(this);
56 }
57 
58 struct DelayedMsg {
59     Msg msg;
60     SysTime triggerAt;
61 
62     this(ref return DelayedMsg a) @trusted {
63         msg = a.msg;
64         triggerAt = a.triggerAt;
65     }
66 
67     this(const ref return DelayedMsg a) inout @safe {
68         assert(0, "not supported");
69     }
70 
71     @disable this(this);
72 }
73 
74 struct Address {
75     private {
76         // If the actor that use the address is active and processing messages.
77         bool open_;
78         ulong id_;
79         Mutex mtx;
80     }
81 
82     package {
83         Queue!Msg incoming;
84 
85         Queue!SystemMsg sysMsg;
86 
87         // Delayed messages for this actor that will be triggered in the future.
88         Queue!DelayedMsg delayed;
89 
90         // Incoming replies on requests.
91         Queue!Reply replies;
92     }
93 
94     invariant {
95         assert(mtx !is null,
96                 "mutex must always be set or the address will fail on sporadic method calls");
97     }
98 
99     private this(Mutex mtx) @safe
100     in (mtx !is null) {
101         this.mtx = mtx;
102 
103         // lazy way of generating an ID. a mutex is a class thus allocated on
104         // the heap at a unique location. just... use the pointer as the ID.
105         () @trusted { id_ = cast(ulong) cast(void*) mtx; }();
106         incoming = typeof(incoming)(mtx);
107         sysMsg = typeof(sysMsg)(mtx);
108         delayed = typeof(delayed)(mtx);
109         replies = typeof(replies)(mtx);
110     }
111 
112     @disable this(this);
113 
114     void shutdown() @safe nothrow {
115         try {
116             synchronized (mtx) {
117                 open_ = false;
118                 incoming.teardown((ref Msg a) { a.type = MsgType.init; });
119                 sysMsg.teardown((ref SystemMsg a) { a = SystemMsg.init; });
120                 delayed.teardown((ref DelayedMsg a) { a.msg.type = MsgType.init; });
121                 replies.teardown((ref Reply a) { a.data = a.data.type.init; });
122             }
123         } catch (Exception e) {
124             assert(0, "this should never happen");
125         }
126     }
127 
128     package bool put(T)(T msg) {
129         synchronized (mtx) {
130             if (!open_)
131                 return false;
132 
133             static if (is(T : Msg))
134                 return incoming.put(msg);
135             else static if (is(T : SystemMsg))
136                 return sysMsg.put(msg);
137             else static if (is(T : DelayedMsg))
138                 return delayed.put(msg);
139             else static if (is(T : Reply))
140                 return replies.put(msg);
141             else
142                 static assert(0, "msg type not supported " ~ T.stringof);
143         }
144     }
145 
146     package auto pop(T)() @safe {
147         synchronized (mtx) {
148             static if (is(T : Msg)) {
149                 if (!open_)
150                     return incoming.PopReturnType.init;
151                 return incoming.pop;
152             } else static if (is(T : SystemMsg)) {
153                 if (!open_)
154                     return sysMsg.PopReturnType.init;
155                 return sysMsg.pop;
156             } else static if (is(T : DelayedMsg)) {
157                 if (!open_)
158                     return delayed.PopReturnType.init;
159                 return delayed.pop;
160             } else static if (is(T : Reply)) {
161                 if (!open_)
162                     return replies.PopReturnType.init;
163                 return replies.pop;
164             } else {
165                 static assert(0, "msg type not supported " ~ T.stringof);
166             }
167         }
168     }
169 
170     package bool empty(T)() @safe {
171         synchronized (mtx) {
172             if (!open_)
173                 return true;
174 
175             static if (is(T : Msg))
176                 return incoming.empty;
177             else static if (is(T : SystemMsg))
178                 return sysMsg.empty;
179             else static if (is(T : DelayedMsg))
180                 return delayed.empty;
181             else static if (is(T : Reply))
182                 return replies.empty;
183             else
184                 static assert(0, "msg type not supported " ~ T.stringof);
185         }
186     }
187 
188     package bool hasMessage() @safe pure nothrow const @nogc {
189         try {
190             synchronized (mtx) {
191                 return !(incoming.empty && sysMsg.empty && delayed.empty && replies.empty);
192             }
193         } catch (Exception e) {
194         }
195         return false;
196     }
197 
198     package void setOpen() @safe pure nothrow @nogc {
199         open_ = true;
200     }
201 
202     package void setClosed() @safe pure nothrow @nogc {
203         open_ = false;
204     }
205 }
206 
207 struct WeakAddress {
208     private Address* addr;
209 
210     StrongAddress lock() @safe nothrow @nogc {
211         return StrongAddress(addr);
212     }
213 
214     T opCast(T : bool)() @safe nothrow const @nogc {
215         return cast(bool) addr;
216     }
217 
218     bool empty() @safe nothrow const @nogc {
219         return addr is null;
220     }
221 
222     void opAssign(WeakAddress rhs) @safe nothrow @nogc {
223         this.addr = rhs.addr;
224     }
225 
226     size_t toHash() @safe pure nothrow const @nogc scope {
227         return cast(size_t) addr;
228     }
229 
230     void release() @safe nothrow @nogc {
231         addr = null;
232     }
233 }
234 
235 /** Messages can be sent to a strong address.
236  */
237 struct StrongAddress {
238     package {
239         Address* addr;
240     }
241 
242     private this(Address* addr) @safe nothrow @nogc {
243         this.addr = addr;
244     }
245 
246     void release() @safe nothrow @nogc {
247         addr = null;
248     }
249 
250     ulong id() @safe pure nothrow const @nogc {
251         return cast(ulong) addr;
252     }
253 
254     size_t toHash() @safe pure nothrow const @nogc scope {
255         return cast(size_t) addr;
256     }
257 
258     void opAssign(StrongAddress rhs) @safe nothrow @nogc {
259         this.addr = rhs.addr;
260     }
261 
262     T opCast(T : bool)() @safe nothrow const @nogc {
263         return cast(bool) addr;
264     }
265 
266     bool empty() @safe pure nothrow const @nogc {
267         return addr is null;
268     }
269 
270     WeakAddress weakRef() @safe nothrow {
271         return WeakAddress(addr);
272     }
273 
274     package Address* get() @safe pure nothrow @nogc scope return  {
275         return addr;
276     }
277 }
278 
279 StrongAddress makeAddress2() @safe {
280     return StrongAddress(new Address(new Mutex));
281 }