1 /**
2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module my.actor.system;
7 
8 import core.sync.mutex : Mutex;
9 import core.sync.condition : Condition;
10 import core.thread : Thread;
11 import std.algorithm : min, max;
12 import std.datetime : dur, Clock, Duration;
13 import std.parallelism : Task, TaskPool, task;
14 import std.traits : Parameters, ReturnType;
15 
16 import my.optional;
17 
18 public import my.actor.typed;
19 public import my.actor.actor : Actor, build, makePromise, Promise, scopedActor, impl;
20 public import my.actor.mailbox : Address, makeAddress2, WeakAddress;
21 public import my.actor.msg;
22 import my.actor.common;
23 import my.actor.memory : ActorAlloc;
24 
25 System makeSystem(TaskPool pool) @safe {
26     return System(pool, false);
27 }
28 
29 System makeSystem() @safe {
30     return System(new TaskPool, true);
31 }
32 
33 struct SystemConfig {
34     static struct Scheduler {
35         // number of messages each actor is allowed to consume per scheduled run.
36         Optional!ulong maxThroughput;
37         // how long a worker sleeps before polling the actor queue.
38         Optional!Duration pollInterval;
39     }
40 
41     Scheduler scheduler;
42 }
43 
44 struct System {
45     import std.functional : forward;
46 
47     private {
48         bool running;
49         bool ownsPool;
50         TaskPool pool;
51         Backend bg;
52     }
53 
54     @disable this(this);
55 
56     this(TaskPool pool, bool ownsPool) @safe {
57         this(SystemConfig.init, pool, ownsPool);
58     }
59 
60     /**
61      * Params:
62      *  pool = thread pool to use for scheduling actors.
63      */
64     this(SystemConfig conf, TaskPool pool, bool ownsPool) @safe {
65         this.pool = pool;
66         this.ownsPool = ownsPool;
67         this.bg = Backend(new Scheduler(conf.scheduler, pool));
68 
69         this.running = true;
70         this.bg.start(pool, pool.size);
71     }
72 
73     ~this() @safe {
74         shutdown;
75     }
76 
77     /// Shutdown all actors as fast as possible.
78     void shutdown() @safe {
79         if (!running)
80             return;
81 
82         bg.shutdown;
83         if (ownsPool)
84             pool.finish(true);
85         pool = null;
86 
87         running = false;
88     }
89 
90     /// Wait for all actors to finish (terminate) before returning.
91     void wait() @safe {
92         if (!running)
93             return;
94 
95         bg.shutdown;
96         if (ownsPool)
97             pool.finish(true);
98         pool = null;
99 
100         running = false;
101     }
102 
103     /// spawn dynamic actor.
104     WeakAddress spawn(Fn, Args...)(Fn fn, auto ref Args args)
105             if (is(Parameters!Fn[0] == Actor*) && is(ReturnType!Fn == Actor*)) {
106         auto actor = bg.alloc.make(makeAddress2);
107         return schedule(fn(actor, forward!args));
108     }
109 
110     /// spawn typed actor.
111     auto spawn(Fn, Args...)(Fn fn, auto ref Args args)
112             if (isTypedActorImpl!(Parameters!(Fn)[0])) {
113         alias ActorT = TypedActor!(Parameters!(Fn)[0].AllowedMessages);
114         auto actor = bg.alloc.make(makeAddress2);
115         auto impl = fn(ActorT.Impl(actor), forward!args);
116         schedule(actor);
117         return impl.address;
118     }
119 
120     // schedule an actor for execution in the thread pool.
121     // Returns: the address of the actor.
122     private WeakAddress schedule(Actor* actor) @safe {
123         assert(bg.scheduler.isActive);
124         actor.setHomeSystem(&this);
125         bg.scheduler.putWaiting(actor);
126         return actor.address;
127     }
128 }
129 
130 @("shall start an actor system, execute an actor and shutdown")
131 @safe unittest {
132     auto sys = makeSystem;
133 
134     int hasExecutedWith42;
135     static void fn(ref Capture!(int*, "hasExecutedWith42") c, int x) {
136         if (x == 42)
137             (*c.hasExecutedWith42)++;
138     }
139 
140     auto addr = sys.spawn((Actor* a) => build(a).set(&fn, capture(&hasExecutedWith42)).finalize);
141     send(addr, 42);
142     send(addr, 43);
143 
144     const failAfter = Clock.currTime + 3.dur!"seconds";
145     const start = Clock.currTime;
146     while (hasExecutedWith42 == 0 && Clock.currTime < failAfter) {
147     }
148     const td = Clock.currTime - start;
149 
150     assert(hasExecutedWith42 == 1);
151     assert(td < 3.dur!"seconds");
152 }
153 
154 @("shall be possible to send a message to self during construction")
155 unittest {
156     auto sys = makeSystem;
157 
158     int hasExecutedWith42;
159     static void fn(ref Capture!(int*, "hasExecutedWith42") c, int x) {
160         if (x == 42)
161             (*c.hasExecutedWith42)++;
162     }
163 
164     auto addr = sys.spawn((Actor* self) {
165         send(self, 42);
166         return impl(self, &fn, capture(&hasExecutedWith42));
167     });
168     send(addr, 42);
169     send(addr, 43);
170 
171     const failAfter = Clock.currTime + 3.dur!"seconds";
172     while (hasExecutedWith42 < 2 && Clock.currTime < failAfter) {
173     }
174 
175     assert(hasExecutedWith42 == 2);
176 }
177 
178 @("shall spawn two typed actors which are connected, execute and shutdow")
179 @safe unittest {
180     import std.typecons : Tuple;
181 
182     auto sys = makeSystem;
183 
184     alias A1 = typedActor!(int function(int), string function(int, int));
185     alias A2 = typedActor!(int function(int));
186 
187     auto spawnA1(A1.Impl self) {
188         return my.actor.typed.impl(self, (int a) { return a + 10; }, (int a, int b) => "hej");
189     }
190 
191     auto a1 = sys.spawn(&spawnA1);
192 
193     // final result from A2's continuation.
194     auto spawnA2(A2.Impl self) {
195         return my.actor.typed.impl(self, (ref Capture!(A2.Impl, "self", A1.Address, "a1") c, int x) {
196             auto p = makePromise!int;
197             // dfmt off
198             c.self.request(c.a1, infTimeout)
199                 .send(x + 10)
200                 .capture(p)
201                 .then((ref Tuple!(Promise!int, "p") ctx, int a) { ctx.p.deliver(a); });
202             // dfmt on
203             return p;
204         }, capture(self, a1));
205     }
206 
207     auto a2 = sys.spawn(&spawnA2);
208 
209     auto self = scopedActor;
210     int ok;
211     // start msg to a2 which pass it on to a1.
212     self.request(a2, infTimeout).send(10).then((int x) { ok = x; });
213 
214     assert(ok == 30);
215 }
216 
217 private:
218 @safe:
219 
220 struct Backend {
221     Scheduler scheduler;
222     ActorAlloc alloc;
223 
224     void start(TaskPool pool, ulong workers) {
225         scheduler.start(pool, workers, &alloc);
226     }
227 
228     void shutdown() {
229         import core.memory : GC;
230         import my.libc : malloc_trim;
231 
232         scheduler.shutdown;
233         scheduler = null;
234         //() @trusted { .destroy(scheduler); GC.collect; malloc_trim(0); }();
235         () @trusted { .destroy(scheduler); }();
236         () @trusted { GC.collect; }();
237         () @trusted { malloc_trim(0); }();
238     }
239 }
240 
241 /** Schedule actors for execution.
242  *
243  * A worker pop an actor, execute it and then put it back for later scheduling.
244  *
245  * A watcher monitors inactive actors for either messages to have arrived or
246  * timeouts to trigger. They are then moved back to the waiting queue. The
247  * workers are notified that there are actors waiting to be executed.
248  */
249 class Scheduler {
250     import core.atomic : atomicOp, atomicLoad;
251 
252     SystemConfig.Scheduler conf;
253 
254     ActorAlloc* alloc;
255 
256     /// Workers will shutdown cleanly if it is false.
257     bool isActive;
258 
259     /// Watcher will shutdown cleanly if this is false.
260     bool isWatcher;
261 
262     /// Shutdowner will shutdown cleanly if false;
263     bool isShutdown;
264 
265     // Workers waiting to be activated
266     Mutex waitingWorkerMtx;
267     Condition waitingWorker;
268 
269     // actors waiting to be executed by a worker.
270     Queue!(Actor*) waiting;
271 
272     // Actors waiting for messages to arrive thus they are inactive.
273     Queue!(Actor*) inactive;
274 
275     // Actors that are shutting down.
276     Queue!(Actor*) inShutdown;
277 
278     Task!(worker, Scheduler, const ulong)*[] workers;
279     Task!(watchInactive, Scheduler)* watcher;
280     Task!(watchShutdown, Scheduler)* shutdowner;
281 
282     this(SystemConfig.Scheduler conf, TaskPool pool) {
283         this.conf = conf;
284         this.isActive = true;
285         this.isWatcher = true;
286         this.isShutdown = true;
287         this.waiting = typeof(waiting)(new Mutex);
288         this.inactive = typeof(inactive)(new Mutex);
289         this.inShutdown = typeof(inShutdown)(new Mutex);
290 
291         this.waitingWorkerMtx = new Mutex;
292         this.waitingWorker = new Condition(this.waitingWorkerMtx);
293     }
294 
295     void wakeup() @trusted {
296         synchronized (waitingWorkerMtx) {
297             waitingWorker.notify;
298         }
299     }
300 
301     void wait(Duration w) @trusted {
302         synchronized (waitingWorkerMtx) {
303             waitingWorker.wait(w);
304         }
305     }
306 
307     /// check the inactive actors for activity.
308     private static void watchInactive(Scheduler sched) {
309         const maxThroughput = sched.conf.maxThroughput.orElse(50UL);
310         const shutdownPoll = sched.conf.pollInterval.orElse(20.dur!"msecs");
311 
312         const minPoll = 100.dur!"usecs";
313         const stepPoll = minPoll;
314         const maxPoll = sched.conf.pollInterval.orElse(10.dur!"msecs");
315 
316         Duration pollInterval = minPoll;
317 
318         while (sched.isActive) {
319             const runActors = sched.inactive.length;
320             ulong inactive;
321             Duration nextPoll = pollInterval;
322 
323             foreach (_; 0 .. runActors) {
324                 if (auto a = sched.inactive.pop.unsafeMove) {
325 
326                     if (a.hasMessage) {
327                         sched.putWaiting(a);
328                     } else {
329                         const t = a.nextTimeout(Clock.currTime, maxPoll);
330 
331                         if (t < minPoll) {
332                             sched.putWaiting(a);
333                         } else {
334                             sched.putInactive(a);
335                             nextPoll = inactive == 0 ? t : min(nextPoll, t);
336                             inactive++;
337                         }
338                     }
339                 }
340             }
341 
342             if (inactive != 0) {
343                 pollInterval = max(minPoll, nextPoll);
344             }
345 
346             if (inactive == runActors) {
347                 () @trusted { Thread.sleep(pollInterval); }();
348                 pollInterval = min(maxPoll, pollInterval);
349             } else {
350                 sched.wakeup;
351                 pollInterval = minPoll;
352             }
353         }
354 
355         while (sched.isWatcher || !sched.inactive.empty) {
356             if (auto a = sched.inactive.pop.unsafeMove) {
357                 sched.inShutdown.put(a);
358             }
359         }
360     }
361 
362     /// finish shutdown of actors that are shutting down.
363     private static void watchShutdown(Scheduler sched) {
364         import my.actor.msg : sendSystemMsgIfEmpty;
365         import my.actor.common : ExitReason;
366         import my.actor.mailbox : SystemExitMsg;
367 
368         const shutdownPoll = sched.conf.pollInterval.orElse(20.dur!"msecs");
369 
370         const minPoll = 100.dur!"usecs";
371         const stepPoll = minPoll;
372         const maxPoll = sched.conf.pollInterval.orElse(10.dur!"msecs");
373 
374         Duration pollInterval = minPoll;
375 
376         while (sched.isActive) {
377             const runActors = sched.inShutdown.length;
378             ulong alive;
379 
380             foreach (_; 0 .. runActors) {
381                 if (auto a = sched.inShutdown.pop.unsafeMove) {
382                     if (a.isAlive) {
383                         alive++;
384                         a.process(Clock.currTime);
385                         sched.inShutdown.put(a);
386                     } else {
387                         sched.alloc.dispose(a);
388                     }
389                 }
390             }
391 
392             if (alive == 0) {
393                 () @trusted { Thread.sleep(pollInterval); }();
394                 pollInterval = max(minPoll, pollInterval + stepPoll);
395             } else {
396                 pollInterval = minPoll;
397             }
398         }
399 
400         while (sched.isShutdown || !sched.inShutdown.empty) {
401             if (auto a = sched.inShutdown.pop.unsafeMove) {
402                 if (a.isAlive) {
403                     sendSystemMsgIfEmpty(a.address, SystemExitMsg(ExitReason.kill));
404                     a.process(Clock.currTime);
405                     sched.inShutdown.put(a);
406                 } else {
407                     sched.alloc.dispose(a);
408                 }
409             }
410         }
411     }
412 
413     private static void worker(Scheduler sched, const ulong id) {
414         import my.actor.msg : sendSystemMsgIfEmpty;
415         import my.actor.common : ExitReason;
416         import my.actor.mailbox : SystemExitMsg;
417 
418         const maxThroughput = sched.conf.maxThroughput.orElse(50UL);
419         const pollInterval = sched.conf.pollInterval.orElse(50.dur!"msecs");
420         const inactiveLimit = min(500.dur!"msecs", pollInterval * 3);
421 
422         while (sched.isActive) {
423             const runActors = sched.waiting.length;
424             ulong consecutiveInactive;
425 
426             foreach (_; 0 .. runActors) {
427                 // reduce clock polling
428                 const now = Clock.currTime;
429                 if (auto ctx = sched.pop) {
430                     ulong msgs;
431                     ulong totalMsgs;
432                     do {
433                         ctx.process(now);
434                         msgs = ctx.messages;
435                         totalMsgs += msgs;
436                     }
437                     while (totalMsgs < maxThroughput && msgs != 0);
438 
439                     if (totalMsgs == 0) {
440                         sched.putInactive(ctx);
441                         consecutiveInactive++;
442                     } else {
443                         consecutiveInactive = 0;
444                         sched.putWaiting(ctx);
445                     }
446                 } else {
447                     sched.wait(pollInterval);
448                 }
449             }
450 
451             // sleep if it is detected that actors are not sending messages
452             if (consecutiveInactive == runActors) {
453                 sched.wait(inactiveLimit);
454             }
455         }
456 
457         while (!sched.waiting.empty) {
458             const sleepAfter = 1 + sched.waiting.length;
459             for (size_t i; i < sleepAfter; ++i) {
460                 if (auto ctx = sched.pop) {
461                     sendSystemMsgIfEmpty(ctx.address, SystemExitMsg(ExitReason.kill));
462                     ctx.process(Clock.currTime);
463                     sched.putWaiting(ctx);
464                 }
465             }
466 
467             () @trusted { Thread.sleep(pollInterval); }();
468         }
469     }
470 
471     /// Start the workers.
472     void start(TaskPool pool, const ulong nr, ActorAlloc* alloc) {
473         this.alloc = alloc;
474         foreach (const id; 0 .. nr) {
475             auto t = task!worker(this, id);
476             workers ~= t;
477             pool.put(t);
478         }
479         watcher = task!watchInactive(this);
480         watcher.executeInNewThread(Thread.PRIORITY_MIN);
481 
482         shutdowner = task!watchShutdown(this);
483         shutdowner.executeInNewThread(Thread.PRIORITY_MIN);
484     }
485 
486     void shutdown() {
487         isActive = false;
488         foreach (a; workers) {
489             try {
490                 a.yieldForce;
491             } catch (Exception e) {
492                 // TODO: log exceptions?
493             }
494         }
495 
496         isWatcher = false;
497         try {
498             watcher.yieldForce;
499         } catch (Exception e) {
500         }
501 
502         isShutdown = false;
503         try {
504             shutdowner.yieldForce;
505         } catch (Exception e) {
506         }
507     }
508 
509     Actor* pop() {
510         return waiting.pop.unsafeMove;
511     }
512 
513     void putWaiting(Actor* a) @safe {
514         if (a.isAccepting) {
515             waiting.put(a);
516         } else if (a.isAlive) {
517             inShutdown.put(a);
518         } else {
519             // TODO: should terminated actors be logged?
520             alloc.dispose(a);
521         }
522     }
523 
524     void putInactive(Actor* a) @safe {
525         if (a.isAccepting) {
526             inactive.put(a);
527         } else if (a.isAlive) {
528             inShutdown.put(a);
529         } else {
530             // TODO: should terminated actors be logged?
531             alloc.dispose(a);
532         }
533     }
534 }