1 /**
2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module my.actor.system;
7 
8 import core.sync.condition : Condition;
9 import core.sync.mutex : Mutex;
10 import core.thread : Thread;
11 import logger = std.experimental.logger;
12 import std.algorithm : min, max, clamp;
13 import std.datetime : dur, Clock, Duration;
14 import std.parallelism : Task, TaskPool, task;
15 import std.traits : Parameters, ReturnType;
16 
17 import my.optional;
18 
19 public import my.actor.typed;
20 public import my.actor.actor : Actor, build, makePromise, Promise, scopedActor, impl;
21 public import my.actor.mailbox : Address, makeAddress2, WeakAddress;
22 public import my.actor.msg;
23 import my.actor.common;
24 import my.actor.memory : ActorAlloc;
25 
26 System makeSystem(TaskPool pool) @safe {
27     return System(pool, false);
28 }
29 
30 System makeSystem() @safe {
31     return System(new TaskPool, true);
32 }
33 
34 struct SystemConfig {
35     static struct Scheduler {
36         // number of messages each actor is allowed to consume per scheduled run.
37         Optional!ulong maxThroughput;
38         // how long a worker sleeps before polling the actor queue.
39         Optional!Duration pollInterval;
40     }
41 
42     Scheduler scheduler;
43 }
44 
45 struct System {
46     import std.functional : forward;
47 
48     private {
49         bool running;
50         bool ownsPool;
51         TaskPool pool;
52         Backend bg;
53     }
54 
55     @disable this(this);
56 
57     this(TaskPool pool, bool ownsPool) @safe {
58         this(SystemConfig.init, pool, ownsPool);
59     }
60 
61     /**
62      * Params:
63      *  pool = thread pool to use for scheduling actors.
64      */
65     this(SystemConfig conf, TaskPool pool, bool ownsPool) @safe {
66         this.pool = pool;
67         this.ownsPool = ownsPool;
68         this.bg = Backend(new Scheduler(conf.scheduler, pool));
69 
70         this.running = true;
71         this.bg.start(pool, pool.size);
72     }
73 
74     ~this() @safe {
75         shutdown;
76     }
77 
78     /// Shutdown all actors as fast as possible.
79     void shutdown() @safe {
80         if (!running)
81             return;
82 
83         bg.shutdown;
84         if (ownsPool)
85             pool.finish(true);
86         pool = null;
87 
88         running = false;
89     }
90 
91     /// Wait for all actors to finish (terminate) before returning.
92     void wait() @safe {
93         if (!running)
94             return;
95 
96         bg.shutdown;
97         if (ownsPool)
98             pool.finish(true);
99         pool = null;
100 
101         running = false;
102     }
103 
104     /// spawn dynamic actor.
105     WeakAddress spawn(Fn, Args...)(Fn fn, auto ref Args args)
106             if (is(Parameters!Fn[0] == Actor*) && is(ReturnType!Fn == Actor*)) {
107         auto actor = bg.alloc.make(makeAddress2);
108         return schedule(fn(actor, forward!args));
109     }
110 
111     /// spawn typed actor.
112     auto spawn(Fn, Args...)(Fn fn, auto ref Args args)
113             if (isTypedActorImpl!(Parameters!(Fn)[0])) {
114         alias ActorT = TypedActor!(Parameters!(Fn)[0].AllowedMessages);
115         auto actor = bg.alloc.make(makeAddress2);
116         auto impl = fn(ActorT.Impl(actor), forward!args);
117         schedule(actor);
118         return impl.address;
119     }
120 
121     // schedule an actor for execution in the thread pool.
122     // Returns: the address of the actor.
123     private WeakAddress schedule(Actor* actor) @safe {
124         assert(bg.scheduler.isActive);
125         actor.setHomeSystem(&this);
126         bg.scheduler.putWaiting(actor);
127         return actor.address;
128     }
129 }
130 
131 @("shall start an actor system, execute an actor and shutdown")
132 @safe unittest {
133     auto sys = makeSystem;
134 
135     int hasExecutedWith42;
136     static void fn(ref Capture!(int*, "hasExecutedWith42") c, int x) {
137         if (x == 42)
138             (*c.hasExecutedWith42)++;
139     }
140 
141     auto addr = sys.spawn((Actor* a) => build(a).set(&fn, capture(&hasExecutedWith42)).finalize);
142     send(addr, 42);
143     send(addr, 43);
144 
145     const failAfter = Clock.currTime + 3.dur!"seconds";
146     const start = Clock.currTime;
147     while (hasExecutedWith42 == 0 && Clock.currTime < failAfter) {
148     }
149     const td = Clock.currTime - start;
150 
151     assert(hasExecutedWith42 == 1);
152     assert(td < 3.dur!"seconds");
153 }
154 
155 @("shall be possible to send a message to self during construction")
156 unittest {
157     auto sys = makeSystem;
158 
159     int hasExecutedWith42;
160     static void fn(ref Capture!(int*, "hasExecutedWith42") c, int x) {
161         if (x == 42)
162             (*c.hasExecutedWith42)++;
163     }
164 
165     auto addr = sys.spawn((Actor* self) {
166         send(self, 42);
167         return impl(self, &fn, capture(&hasExecutedWith42));
168     });
169     send(addr, 42);
170     send(addr, 43);
171 
172     const failAfter = Clock.currTime + 3.dur!"seconds";
173     while (hasExecutedWith42 < 2 && Clock.currTime < failAfter) {
174     }
175 
176     assert(hasExecutedWith42 == 2);
177 }
178 
179 @("shall spawn two typed actors which are connected, execute and shutdow")
180 @safe unittest {
181     import std.typecons : Tuple;
182 
183     auto sys = makeSystem;
184 
185     alias A1 = typedActor!(int function(int), string function(int, int));
186     alias A2 = typedActor!(int function(int));
187 
188     auto spawnA1(A1.Impl self) {
189         return my.actor.typed.impl(self, (int a) { return a + 10; }, (int a, int b) => "hej");
190     }
191 
192     auto a1 = sys.spawn(&spawnA1);
193 
194     // final result from A2's continuation.
195     auto spawnA2(A2.Impl self) {
196         return my.actor.typed.impl(self, (ref Capture!(A2.Impl, "self", A1.Address, "a1") c, int x) {
197             auto p = makePromise!int;
198             // dfmt off
199             c.self.request(c.a1, infTimeout)
200                 .send(x + 10)
201                 .capture(p)
202                 .then((ref Tuple!(Promise!int, "p") ctx, int a) { ctx.p.deliver(a); });
203             // dfmt on
204             return p;
205         }, capture(self, a1));
206     }
207 
208     auto a2 = sys.spawn(&spawnA2);
209 
210     auto self = scopedActor;
211     int ok;
212     // start msg to a2 which pass it on to a1.
213     self.request(a2, infTimeout).send(10).then((int x) { ok = x; });
214 
215     assert(ok == 30);
216 }
217 
218 private:
219 @safe:
220 
221 struct Backend {
222     Scheduler scheduler;
223     ActorAlloc alloc;
224 
225     void start(TaskPool pool, ulong workers) {
226         scheduler.start(pool, workers, &alloc);
227     }
228 
229     void shutdown() {
230         import core.memory : GC;
231         import my.libc : malloc_trim;
232 
233         scheduler.shutdown;
234         scheduler = null;
235         //() @trusted { .destroy(scheduler); GC.collect; malloc_trim(0); }();
236         () @trusted { .destroy(scheduler); }();
237         () @trusted { GC.collect; }();
238         () @trusted { malloc_trim(0); }();
239     }
240 }
241 
242 /** Schedule actors for execution.
243  *
244  * A worker pop an actor, execute it and then put it back for later scheduling.
245  *
246  * A watcher monitors inactive actors for either messages to have arrived or
247  * timeouts to trigger. They are then moved back to the waiting queue. The
248  * workers are notified that there are actors waiting to be executed.
249  */
250 class Scheduler {
251     import core.atomic : atomicOp, atomicLoad;
252 
253     SystemConfig.Scheduler conf;
254 
255     ActorAlloc* alloc;
256 
257     /// Workers will shutdown cleanly if it is false.
258     bool isActive;
259 
260     /// Watcher will shutdown cleanly if this is false.
261     bool isWatcher;
262 
263     /// Shutdowner will shutdown cleanly if false;
264     bool isShutdown;
265 
266     // Workers waiting to be activated
267     Mutex waitingWorkerMtx;
268     Condition waitingWorker;
269 
270     // actors waiting to be executed by a worker.
271     Queue!(Actor*) waiting;
272 
273     // Actors waiting for messages to arrive thus they are inactive.
274     Queue!(Actor*) inactive;
275 
276     // Actors that are shutting down.
277     Queue!(Actor*) inShutdown;
278 
279     Task!(worker, Scheduler, const ulong)*[] workers;
280     Task!(watchInactive, Scheduler)* watcher;
281     Task!(watchShutdown, Scheduler)* shutdowner;
282 
283     this(SystemConfig.Scheduler conf, TaskPool pool) {
284         this.conf = conf;
285         this.isActive = true;
286         this.isWatcher = true;
287         this.isShutdown = true;
288         this.waiting = typeof(waiting)(new Mutex);
289         this.inactive = typeof(inactive)(new Mutex);
290         this.inShutdown = typeof(inShutdown)(new Mutex);
291 
292         this.waitingWorkerMtx = new Mutex;
293         this.waitingWorker = new Condition(this.waitingWorkerMtx);
294     }
295 
296     void wakeup() @trusted {
297         synchronized (waitingWorkerMtx) {
298             waitingWorker.notify;
299         }
300     }
301 
302     void wait(Duration w) @trusted {
303         synchronized (waitingWorkerMtx) {
304             waitingWorker.wait(w);
305         }
306     }
307 
308     /// check the inactive actors for activity.
309     private static void watchInactive(Scheduler sched) {
310         const maxThroughput = sched.conf.maxThroughput.orElse(50UL);
311         const shutdownPoll = sched.conf.pollInterval.orElse(20.dur!"msecs");
312 
313         const minPoll = 100.dur!"usecs";
314         const stepPoll = minPoll;
315         const maxPoll = sched.conf.pollInterval.orElse(10.dur!"msecs");
316 
317         Duration pollInterval = minPoll;
318 
319         while (sched.isActive) {
320             const runActors = sched.inactive.length;
321             ulong inactive;
322             Duration nextPoll = pollInterval;
323 
324             foreach (_; 0 .. runActors) {
325                 if (auto a = sched.inactive.pop.unsafeMove) {
326                     if (a.hasMessage) {
327                         sched.putWaiting(a);
328                     } else {
329                         const t = a.nextTimeout(Clock.currTime, maxPoll);
330 
331                         if (t < minPoll) {
332                             sched.putWaiting(a);
333                         } else {
334                             sched.putInactive(a);
335                             nextPoll = inactive == 0 ? t : min(nextPoll, t);
336                             inactive++;
337                         }
338                     }
339                 }
340             }
341 
342             if (inactive != 0) {
343                 pollInterval = clamp(nextPoll, minPoll, maxPoll);
344             }
345 
346             if (inactive == runActors) {
347                 () @trusted { Thread.sleep(pollInterval); }();
348                 pollInterval = min(maxPoll, pollInterval);
349             } else {
350                 sched.wakeup;
351                 pollInterval = minPoll;
352             }
353         }
354 
355         while (sched.isWatcher || !sched.inactive.empty) {
356             if (auto a = sched.inactive.pop.unsafeMove) {
357                 sched.inShutdown.put(a);
358             }
359         }
360     }
361 
362     /// finish shutdown of actors that are shutting down.
363     private static void watchShutdown(Scheduler sched) {
364         import my.actor.msg : sendSystemMsgIfEmpty;
365         import my.actor.common : ExitReason;
366         import my.actor.mailbox : SystemExitMsg;
367 
368         const shutdownPoll = sched.conf.pollInterval.orElse(20.dur!"msecs");
369 
370         const minPoll = 100.dur!"usecs";
371         const stepPoll = minPoll;
372         const maxPoll = sched.conf.pollInterval.orElse(10.dur!"msecs");
373 
374         Duration pollInterval = minPoll;
375 
376         while (sched.isActive) {
377             const runActors = sched.inShutdown.length;
378             ulong alive;
379 
380             foreach (_; 0 .. runActors) {
381                 if (auto a = sched.inShutdown.pop.unsafeMove) {
382                     if (a.isAlive) {
383                         alive++;
384                         a.process(Clock.currTime);
385                         sched.inShutdown.put(a);
386                     } else {
387                         sched.alloc.dispose(a);
388                     }
389                 }
390             }
391 
392             if (alive == 0) {
393                 () @trusted { Thread.sleep(pollInterval); }();
394                 pollInterval = max(minPoll, pollInterval + stepPoll);
395             } else {
396                 pollInterval = minPoll;
397             }
398         }
399 
400         while (sched.isShutdown || !sched.inShutdown.empty) {
401             if (auto a = sched.inShutdown.pop.unsafeMove) {
402                 if (a.isAlive) {
403                     sendSystemMsgIfEmpty(a.address, SystemExitMsg(ExitReason.kill));
404                     a.process(Clock.currTime);
405                     sched.inShutdown.put(a);
406                 } else {
407                     sched.alloc.dispose(a);
408                 }
409             }
410         }
411     }
412 
413     private static void worker(Scheduler sched, const ulong id) {
414         import my.actor.msg : sendSystemMsgIfEmpty;
415         import my.actor.common : ExitReason;
416         import my.actor.mailbox : SystemExitMsg;
417 
418         const maxThroughput = sched.conf.maxThroughput.orElse(50UL);
419         const pollInterval = sched.conf.pollInterval.orElse(50.dur!"msecs");
420         const inactiveLimit = min(500.dur!"msecs", pollInterval * 3);
421 
422         while (sched.isActive) {
423             const runActors = sched.waiting.length;
424             ulong consecutiveInactive;
425 
426             foreach (_; 0 .. runActors) {
427                 if (auto ctx = sched.pop) {
428                     ulong msgs;
429                     ulong prevMsgs;
430                     ulong totalMsgs;
431                     do {
432                         // reduce clock polling
433                         const now = Clock.currTime;
434                         ctx.process(now);
435                         prevMsgs = msgs;
436                         msgs = ctx.messages;
437                         totalMsgs += msgs;
438                     }
439                     while (totalMsgs < maxThroughput && msgs != prevMsgs);
440 
441                     if (totalMsgs == 0) {
442                         sched.putInactive(ctx);
443                         consecutiveInactive++;
444                     } else {
445                         consecutiveInactive = 0;
446                         sched.putWaiting(ctx);
447                     }
448                 } else {
449                     sched.wait(pollInterval);
450                 }
451             }
452 
453             // sleep if it is detected that actors are not sending messages
454             if (consecutiveInactive == runActors) {
455                 sched.wait(inactiveLimit);
456             }
457         }
458 
459         while (!sched.waiting.empty) {
460             const sleepAfter = 1 + sched.waiting.length;
461             for (size_t i; i < sleepAfter; ++i) {
462                 if (auto ctx = sched.pop) {
463                     sendSystemMsgIfEmpty(ctx.address, SystemExitMsg(ExitReason.kill));
464                     ctx.process(Clock.currTime);
465                     sched.putWaiting(ctx);
466                 }
467             }
468 
469             () @trusted { Thread.sleep(pollInterval); }();
470         }
471     }
472 
473     /// Start the workers.
474     void start(TaskPool pool, const ulong nr, ActorAlloc* alloc) {
475         this.alloc = alloc;
476         foreach (const id; 0 .. nr) {
477             auto t = task!worker(this, id);
478             workers ~= t;
479             pool.put(t);
480         }
481         watcher = task!watchInactive(this);
482         watcher.executeInNewThread(Thread.PRIORITY_MIN);
483 
484         shutdowner = task!watchShutdown(this);
485         shutdowner.executeInNewThread(Thread.PRIORITY_MIN);
486     }
487 
488     void shutdown() {
489         isActive = false;
490         foreach (a; workers) {
491             try {
492                 a.yieldForce;
493             } catch (Exception e) {
494                 // TODO: log exceptions?
495             }
496         }
497 
498         isWatcher = false;
499         try {
500             watcher.yieldForce;
501         } catch (Exception e) {
502         }
503 
504         isShutdown = false;
505         try {
506             shutdowner.yieldForce;
507         } catch (Exception e) {
508         }
509     }
510 
511     Actor* pop() {
512         return waiting.pop.unsafeMove;
513     }
514 
515     void putWaiting(Actor* a) @safe {
516         if (a.isAccepting) {
517             waiting.put(a);
518         } else if (a.isAlive) {
519             inShutdown.put(a);
520         } else {
521             // TODO: should terminated actors be logged?
522             alloc.dispose(a);
523         }
524     }
525 
526     void putInactive(Actor* a) @safe {
527         if (a.isAccepting) {
528             inactive.put(a);
529         } else if (a.isAlive) {
530             inShutdown.put(a);
531         } else {
532             // TODO: should terminated actors be logged?
533             alloc.dispose(a);
534         }
535     }
536 }