1 /**
2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 
6 An actor that can limit the flow of messages between consumer/producer.
7 
8 The limiter is initialized with a number of tokens.
9 
10 Producers try and take a token from the limiter. Either one is free and they
11 get it right away or a promise is returned. The promise will trigger whenever
12 a token is returned by the consumre.  The waiting producers are triggered in
13 LIFO (just because that is a more efficient data structure).
14 
15 A consumer receiver a message from a producer containing the token and data.
16 When the consumer has finished processing the message it returns the token to
17 the limier.
18 */
19 module my.actor.utility.limiter;
20 
21 import std.array : empty;
22 import std.typecons : Tuple, tuple;
23 
24 import my.actor.actor;
25 import my.actor.typed;
26 import my.actor.msg;
27 import my.gc.refc;
28 
29 /// A token of work.
30 struct Token {
31 }
32 
33 /// Take a token if there are any free.
34 struct TakeTokenMsg {
35 }
36 
37 /// Return a token.
38 struct ReturnTokenMsg {
39 }
40 
41 private struct RefreshMsg {
42 }
43 
44 alias FlowControlActor = typedActor!(Token function(TakeTokenMsg),
45         void function(ReturnTokenMsg), void function(RefreshMsg));
46 
47 /// Initialize the flow controller to total cpu's + 1.
48 FlowControlActor.Impl spawnFlowControlTotalCPUs(FlowControlActor.Impl self) {
49     import std.parallelism : totalCPUs;
50 
51     return spawnFlowControl(self, totalCPUs + 1);
52 }
53 
54 FlowControlActor.Impl spawnFlowControl(FlowControlActor.Impl self, const uint tokens) {
55     static struct State {
56         uint tokens;
57         Promise!Token[] takeReq;
58     }
59 
60     self.name = "limiter";
61     auto st = tuple!("self", "state")(self, refCounted(State(tokens)));
62     alias CT = typeof(st);
63 
64     static RequestResult!Token takeMsg(ref CT ctx, TakeTokenMsg) {
65         typeof(return) rval;
66 
67         if (ctx.state.get.tokens > 0) {
68             ctx.state.get.tokens--;
69             rval = typeof(return)(Token.init);
70         } else {
71             auto p = makePromise!Token;
72             ctx.state.get.takeReq ~= p;
73             rval = typeof(return)(p);
74         }
75         return rval;
76     }
77 
78     static void returnMsg(ref CT ctx, ReturnTokenMsg) {
79         ctx.state.get.tokens++;
80         send(ctx.self, RefreshMsg.init);
81     }
82 
83     static void refreshMsg(ref CT ctx, RefreshMsg) {
84         while (ctx.state.get.tokens > 0 && !ctx.state.get.takeReq.empty) {
85             ctx.state.get.tokens--;
86             ctx.state.get.takeReq[$ - 1].deliver(Token.init);
87             ctx.state.get.takeReq = ctx.state.get.takeReq[0 .. $ - 1];
88         }
89 
90         // extra caution to refresh in case something is missed.
91         delayedSend(ctx.self, delay(50.dur!"msecs"), RefreshMsg.init);
92     }
93 
94     return impl(self, &takeMsg, capture(st), &returnMsg, capture(st), &refreshMsg, capture(st));
95 }
96 
97 @("shall limit the message rate of senders by using a limiter to control the flow")
98 unittest {
99     import core.thread : Thread;
100     import core.time : dur;
101     import std.datetime.stopwatch : StopWatch, AutoStart;
102     import my.actor.system;
103 
104     auto sys = makeSystem;
105 
106     auto limiter = sys.spawn(&spawnFlowControl, 40);
107 
108     immutable SenderRate = 1.dur!"msecs";
109     immutable ReaderRate = 100.dur!"msecs";
110 
111     static struct Tick {
112     }
113 
114     WeakAddress[] senders;
115     foreach (_; 0 .. 100) {
116         static struct State {
117             WeakAddress recv;
118             FlowControlActor.Address limiter;
119         }
120 
121         static struct SendMsg {
122         }
123 
124         senders ~= sys.spawn((Actor* self) {
125             auto st = tuple!("self", "state")(self, refCounted(State(WeakAddress.init, limiter)));
126             alias CT = typeof(st);
127 
128             return build(self).set((ref CT ctx, WeakAddress recv) {
129                 ctx.state.get.recv = recv;
130                 send(ctx.self.address, Tick.init);
131             }, capture(st)).set((ref CT ctx, Tick _) {
132                 ctx.self.request(ctx.state.get.limiter, infTimeout)
133                 .send(TakeTokenMsg.init).capture(ctx).then((ref CT ctx, Token t) {
134                     send(ctx.self, Tick.init);
135                     send(ctx.state.get.recv, t, 42);
136                 });
137             }, capture(st)).finalize;
138         });
139     }
140 
141     auto counter = refCounted(0);
142     auto consumer = sys.spawn((Actor* self) {
143         auto st = tuple!("self", "limiter", "count")(self, limiter, counter);
144         alias CT = typeof(st);
145 
146         return impl(self, (ref CT ctx, Tick _) {
147             if (ctx.count.get == 100)
148                 ctx.self.shutdown;
149             else
150                 delayedSend(ctx.self, delay(100.dur!"msecs"), Tick.init);
151         }, capture(st), (ref CT ctx, Token t, int _) {
152             delayedSend(ctx.limiter, delay(100.dur!"msecs"), ReturnTokenMsg.init);
153             ctx.count.get++;
154             send(ctx.self, Tick.init);
155         }, capture(st));
156     });
157 
158     foreach (s; senders)
159         s.linkTo(consumer);
160     limiter.linkTo(consumer);
161 
162     auto sw = StopWatch(AutoStart.yes);
163     foreach (s; senders)
164         send(s, consumer);
165 
166     while (counter.get < 100 && sw.peek < 4.dur!"seconds") {
167         Thread.sleep(1.dur!"msecs");
168     }
169 
170     assert(counter.get >= 100);
171     // 40 tokens mean that it will trigger at least two "slowdown" which is at least 200 ms.
172     assert(sw.peek > 200.dur!"msecs");
173 }