1 /** 2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 6 An actor that can limit the flow of messages between consumer/producer. 7 8 The limiter is initialized with a number of tokens. 9 10 Producers try and take a token from the limiter. Either one is free and they 11 get it right away or a promise is returned. The promise will trigger whenever 12 a token is returned by the consumre. The waiting producers are triggered in 13 LIFO (just because that is a more efficient data structure). 14 15 A consumer receiver a message from a producer containing the token and data. 16 When the consumer has finished processing the message it returns the token to 17 the limier. 18 */ 19 module my.actor.utility.limiter; 20 21 import std.array : empty; 22 import std.typecons : Tuple, tuple; 23 24 import my.actor.actor; 25 import my.actor.typed; 26 import my.actor.msg; 27 import my.gc.refc; 28 29 /// A token of work. 30 struct Token { 31 } 32 33 /// Take a token if there are any free. 34 struct TakeTokenMsg { 35 } 36 37 /// Return a token. 38 struct ReturnTokenMsg { 39 } 40 41 private struct RefreshMsg { 42 } 43 44 alias FlowControlActor = typedActor!(Token function(TakeTokenMsg), 45 void function(ReturnTokenMsg), void function(RefreshMsg)); 46 47 /// Initialize the flow controller to total cpu's + 1. 48 FlowControlActor.Impl spawnFlowControlTotalCPUs(FlowControlActor.Impl self) { 49 import std.parallelism : totalCPUs; 50 51 return spawnFlowControl(self, totalCPUs + 1); 52 } 53 54 FlowControlActor.Impl spawnFlowControl(FlowControlActor.Impl self, const uint tokens) { 55 static struct State { 56 uint tokens; 57 Promise!Token[] takeReq; 58 } 59 60 self.name = "limiter"; 61 auto st = tuple!("self", "state")(self, refCounted(State(tokens))); 62 alias CT = typeof(st); 63 64 static RequestResult!Token takeMsg(ref CT ctx, TakeTokenMsg) { 65 typeof(return) rval; 66 67 if (ctx.state.get.tokens > 0) { 68 ctx.state.get.tokens--; 69 rval = typeof(return)(Token.init); 70 } else { 71 auto p = makePromise!Token; 72 ctx.state.get.takeReq ~= p; 73 rval = typeof(return)(p); 74 } 75 return rval; 76 } 77 78 static void returnMsg(ref CT ctx, ReturnTokenMsg) { 79 ctx.state.get.tokens++; 80 send(ctx.self, RefreshMsg.init); 81 } 82 83 static void refreshMsg(ref CT ctx, RefreshMsg) { 84 while (ctx.state.get.tokens > 0 && !ctx.state.get.takeReq.empty) { 85 ctx.state.get.tokens--; 86 ctx.state.get.takeReq[$ - 1].deliver(Token.init); 87 ctx.state.get.takeReq = ctx.state.get.takeReq[0 .. $ - 1]; 88 } 89 90 // extra caution to refresh in case something is missed. 91 delayedSend(ctx.self, delay(50.dur!"msecs"), RefreshMsg.init); 92 } 93 94 return impl(self, &takeMsg, capture(st), &returnMsg, capture(st), &refreshMsg, capture(st)); 95 } 96 97 @("shall limit the message rate of senders by using a limiter to control the flow") 98 unittest { 99 import core.thread : Thread; 100 import core.time : dur; 101 import std.datetime.stopwatch : StopWatch, AutoStart; 102 import my.actor.system; 103 104 auto sys = makeSystem; 105 106 auto limiter = sys.spawn(&spawnFlowControl, 40); 107 108 immutable SenderRate = 1.dur!"msecs"; 109 immutable ReaderRate = 100.dur!"msecs"; 110 111 static struct Tick { 112 } 113 114 WeakAddress[] senders; 115 foreach (_; 0 .. 100) { 116 static struct State { 117 WeakAddress recv; 118 FlowControlActor.Address limiter; 119 } 120 121 static struct SendMsg { 122 } 123 124 senders ~= sys.spawn((Actor* self) { 125 auto st = tuple!("self", "state")(self, refCounted(State(WeakAddress.init, limiter))); 126 alias CT = typeof(st); 127 128 return build(self).set((ref CT ctx, WeakAddress recv) { 129 ctx.state.get.recv = recv; 130 send(ctx.self.address, Tick.init); 131 }, capture(st)).set((ref CT ctx, Tick _) { 132 ctx.self.request(ctx.state.get.limiter, infTimeout) 133 .send(TakeTokenMsg.init).capture(ctx).then((ref CT ctx, Token t) { 134 send(ctx.self, Tick.init); 135 send(ctx.state.get.recv, t, 42); 136 }); 137 }, capture(st)).finalize; 138 }); 139 } 140 141 auto counter = refCounted(0); 142 auto consumer = sys.spawn((Actor* self) { 143 auto st = tuple!("self", "limiter", "count")(self, limiter, counter); 144 alias CT = typeof(st); 145 146 return impl(self, (ref CT ctx, Tick _) { 147 if (ctx.count.get == 100) 148 ctx.self.shutdown; 149 else 150 delayedSend(ctx.self, delay(100.dur!"msecs"), Tick.init); 151 }, capture(st), (ref CT ctx, Token t, int _) { 152 delayedSend(ctx.limiter, delay(100.dur!"msecs"), ReturnTokenMsg.init); 153 ctx.count.get++; 154 send(ctx.self, Tick.init); 155 }, capture(st)); 156 }); 157 158 foreach (s; senders) 159 s.linkTo(consumer); 160 limiter.linkTo(consumer); 161 162 auto sw = StopWatch(AutoStart.yes); 163 foreach (s; senders) 164 send(s, consumer); 165 166 while (counter.get < 100 && sw.peek < 4.dur!"seconds") { 167 Thread.sleep(1.dur!"msecs"); 168 } 169 170 assert(counter.get >= 100); 171 // 40 tokens mean that it will trigger at least two "slowdown" which is at least 200 ms. 172 assert(sw.peek > 200.dur!"msecs"); 173 }