1 /** 2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module my.actor.system; 7 8 import core.sync.condition : Condition; 9 import core.sync.mutex : Mutex; 10 import core.thread : Thread; 11 import logger = std.experimental.logger; 12 import std.algorithm : min, max, clamp; 13 import std.datetime : dur, Clock, Duration; 14 import std.parallelism : Task, TaskPool, task; 15 import std.traits : Parameters, ReturnType; 16 17 import my.optional; 18 19 public import my.actor.typed; 20 public import my.actor.actor : Actor, build, makePromise, Promise, scopedActor, impl; 21 public import my.actor.mailbox : Address, makeAddress2, WeakAddress; 22 public import my.actor.msg; 23 import my.actor.common; 24 import my.actor.memory : ActorAlloc; 25 26 System makeSystem(TaskPool pool) @safe { 27 return System(pool, false); 28 } 29 30 System makeSystem() @safe { 31 return System(new TaskPool, true); 32 } 33 34 struct SystemConfig { 35 static struct Scheduler { 36 // number of messages each actor is allowed to consume per scheduled run. 37 Optional!ulong maxThroughput; 38 // how long a worker sleeps before polling the actor queue. 39 Optional!Duration pollInterval; 40 } 41 42 Scheduler scheduler; 43 } 44 45 struct System { 46 import std.functional : forward; 47 48 private { 49 bool running; 50 bool ownsPool; 51 TaskPool pool; 52 Backend bg; 53 } 54 55 @disable this(this); 56 57 this(TaskPool pool, bool ownsPool) @safe { 58 this(SystemConfig.init, pool, ownsPool); 59 } 60 61 /** 62 * Params: 63 * pool = thread pool to use for scheduling actors. 64 */ 65 this(SystemConfig conf, TaskPool pool, bool ownsPool) @safe { 66 this.pool = pool; 67 this.ownsPool = ownsPool; 68 this.bg = Backend(new Scheduler(conf.scheduler, pool)); 69 70 this.running = true; 71 this.bg.start(pool, pool.size); 72 } 73 74 ~this() @safe { 75 shutdown; 76 } 77 78 /// Shutdown all actors as fast as possible. 79 void shutdown() @safe { 80 if (!running) 81 return; 82 83 bg.shutdown; 84 if (ownsPool) 85 pool.finish(true); 86 pool = null; 87 88 running = false; 89 } 90 91 /// Wait for all actors to finish (terminate) before returning. 92 void wait() @safe { 93 if (!running) 94 return; 95 96 bg.shutdown; 97 if (ownsPool) 98 pool.finish(true); 99 pool = null; 100 101 running = false; 102 } 103 104 /// spawn dynamic actor. 105 WeakAddress spawn(Fn, Args...)(Fn fn, auto ref Args args) 106 if (is(Parameters!Fn[0] == Actor*) && is(ReturnType!Fn == Actor*)) { 107 auto actor = bg.alloc.make(makeAddress2); 108 return schedule(fn(actor, forward!args)); 109 } 110 111 /// spawn typed actor. 112 auto spawn(Fn, Args...)(Fn fn, auto ref Args args) 113 if (isTypedActorImpl!(Parameters!(Fn)[0])) { 114 alias ActorT = TypedActor!(Parameters!(Fn)[0].AllowedMessages); 115 auto actor = bg.alloc.make(makeAddress2); 116 auto impl = fn(ActorT.Impl(actor), forward!args); 117 schedule(actor); 118 return impl.address; 119 } 120 121 // schedule an actor for execution in the thread pool. 122 // Returns: the address of the actor. 123 private WeakAddress schedule(Actor* actor) @safe { 124 assert(bg.scheduler.isActive); 125 actor.setHomeSystem(&this); 126 bg.scheduler.putWaiting(actor); 127 return actor.address; 128 } 129 } 130 131 @("shall start an actor system, execute an actor and shutdown") 132 @safe unittest { 133 auto sys = makeSystem; 134 135 int hasExecutedWith42; 136 static void fn(ref Capture!(int*, "hasExecutedWith42") c, int x) { 137 if (x == 42) 138 (*c.hasExecutedWith42)++; 139 } 140 141 auto addr = sys.spawn((Actor* a) => build(a).set(&fn, capture(&hasExecutedWith42)).finalize); 142 send(addr, 42); 143 send(addr, 43); 144 145 const failAfter = Clock.currTime + 3.dur!"seconds"; 146 const start = Clock.currTime; 147 while (hasExecutedWith42 == 0 && Clock.currTime < failAfter) { 148 } 149 const td = Clock.currTime - start; 150 151 assert(hasExecutedWith42 == 1); 152 assert(td < 3.dur!"seconds"); 153 } 154 155 @("shall be possible to send a message to self during construction") 156 unittest { 157 auto sys = makeSystem; 158 159 int hasExecutedWith42; 160 static void fn(ref Capture!(int*, "hasExecutedWith42") c, int x) { 161 if (x == 42) 162 (*c.hasExecutedWith42)++; 163 } 164 165 auto addr = sys.spawn((Actor* self) { 166 send(self, 42); 167 return impl(self, &fn, capture(&hasExecutedWith42)); 168 }); 169 send(addr, 42); 170 send(addr, 43); 171 172 const failAfter = Clock.currTime + 3.dur!"seconds"; 173 while (hasExecutedWith42 < 2 && Clock.currTime < failAfter) { 174 } 175 176 assert(hasExecutedWith42 == 2); 177 } 178 179 @("shall spawn two typed actors which are connected, execute and shutdow") 180 @safe unittest { 181 import std.typecons : Tuple; 182 183 auto sys = makeSystem; 184 185 alias A1 = typedActor!(int function(int), string function(int, int)); 186 alias A2 = typedActor!(int function(int)); 187 188 auto spawnA1(A1.Impl self) { 189 return my.actor.typed.impl(self, (int a) { return a + 10; }, (int a, int b) => "hej"); 190 } 191 192 auto a1 = sys.spawn(&spawnA1); 193 194 // final result from A2's continuation. 195 auto spawnA2(A2.Impl self) { 196 return my.actor.typed.impl(self, (ref Capture!(A2.Impl, "self", A1.Address, "a1") c, int x) { 197 auto p = makePromise!int; 198 // dfmt off 199 c.self.request(c.a1, infTimeout) 200 .send(x + 10) 201 .capture(p) 202 .then((ref Tuple!(Promise!int, "p") ctx, int a) { ctx.p.deliver(a); }); 203 // dfmt on 204 return p; 205 }, capture(self, a1)); 206 } 207 208 auto a2 = sys.spawn(&spawnA2); 209 210 auto self = scopedActor; 211 int ok; 212 // start msg to a2 which pass it on to a1. 213 self.request(a2, infTimeout).send(10).then((int x) { ok = x; }); 214 215 assert(ok == 30); 216 } 217 218 private: 219 @safe: 220 221 struct Backend { 222 Scheduler scheduler; 223 ActorAlloc alloc; 224 225 void start(TaskPool pool, ulong workers) { 226 scheduler.start(pool, workers, &alloc); 227 } 228 229 void shutdown() { 230 import core.memory : GC; 231 import my.libc : malloc_trim; 232 233 scheduler.shutdown; 234 scheduler = null; 235 //() @trusted { .destroy(scheduler); GC.collect; malloc_trim(0); }(); 236 () @trusted { .destroy(scheduler); }(); 237 () @trusted { GC.collect; }(); 238 () @trusted { malloc_trim(0); }(); 239 } 240 } 241 242 /** Schedule actors for execution. 243 * 244 * A worker pop an actor, execute it and then put it back for later scheduling. 245 * 246 * A watcher monitors inactive actors for either messages to have arrived or 247 * timeouts to trigger. They are then moved back to the waiting queue. The 248 * workers are notified that there are actors waiting to be executed. 249 */ 250 class Scheduler { 251 import core.atomic : atomicOp, atomicLoad; 252 253 SystemConfig.Scheduler conf; 254 255 ActorAlloc* alloc; 256 257 /// Workers will shutdown cleanly if it is false. 258 bool isActive; 259 260 /// Watcher will shutdown cleanly if this is false. 261 bool isWatcher; 262 263 /// Shutdowner will shutdown cleanly if false; 264 bool isShutdown; 265 266 // Workers waiting to be activated 267 Mutex waitingWorkerMtx; 268 Condition waitingWorker; 269 270 // actors waiting to be executed by a worker. 271 Queue!(Actor*) waiting; 272 273 // Actors waiting for messages to arrive thus they are inactive. 274 Queue!(Actor*) inactive; 275 276 // Actors that are shutting down. 277 Queue!(Actor*) inShutdown; 278 279 Task!(worker, Scheduler, const ulong)*[] workers; 280 Task!(watchInactive, Scheduler)* watcher; 281 Task!(watchShutdown, Scheduler)* shutdowner; 282 283 this(SystemConfig.Scheduler conf, TaskPool pool) { 284 this.conf = conf; 285 this.isActive = true; 286 this.isWatcher = true; 287 this.isShutdown = true; 288 this.waiting = typeof(waiting)(new Mutex); 289 this.inactive = typeof(inactive)(new Mutex); 290 this.inShutdown = typeof(inShutdown)(new Mutex); 291 292 this.waitingWorkerMtx = new Mutex; 293 this.waitingWorker = new Condition(this.waitingWorkerMtx); 294 } 295 296 void wakeup() @trusted { 297 synchronized (waitingWorkerMtx) { 298 waitingWorker.notify; 299 } 300 } 301 302 void wait(Duration w) @trusted { 303 synchronized (waitingWorkerMtx) { 304 waitingWorker.wait(w); 305 } 306 } 307 308 /// check the inactive actors for activity. 309 private static void watchInactive(Scheduler sched) { 310 const maxThroughput = sched.conf.maxThroughput.orElse(50UL); 311 const shutdownPoll = sched.conf.pollInterval.orElse(20.dur!"msecs"); 312 313 const minPoll = 100.dur!"usecs"; 314 const stepPoll = minPoll; 315 const maxPoll = sched.conf.pollInterval.orElse(10.dur!"msecs"); 316 317 Duration pollInterval = minPoll; 318 319 while (sched.isActive) { 320 const runActors = sched.inactive.length; 321 ulong inactive; 322 Duration nextPoll = pollInterval; 323 324 foreach (_; 0 .. runActors) { 325 if (auto a = sched.inactive.pop.unsafeMove) { 326 if (a.hasMessage) { 327 sched.putWaiting(a); 328 } else { 329 const t = a.nextTimeout(Clock.currTime, maxPoll); 330 331 if (t < minPoll) { 332 sched.putWaiting(a); 333 } else { 334 sched.putInactive(a); 335 nextPoll = inactive == 0 ? t : min(nextPoll, t); 336 inactive++; 337 } 338 } 339 } 340 } 341 342 if (inactive != 0) { 343 pollInterval = clamp(nextPoll, minPoll, maxPoll); 344 } 345 346 if (inactive == runActors) { 347 () @trusted { Thread.sleep(pollInterval); }(); 348 pollInterval = min(maxPoll, pollInterval); 349 } else { 350 sched.wakeup; 351 pollInterval = minPoll; 352 } 353 } 354 355 while (sched.isWatcher || !sched.inactive.empty) { 356 if (auto a = sched.inactive.pop.unsafeMove) { 357 sched.inShutdown.put(a); 358 } 359 } 360 } 361 362 /// finish shutdown of actors that are shutting down. 363 private static void watchShutdown(Scheduler sched) { 364 import my.actor.msg : sendSystemMsgIfEmpty; 365 import my.actor.common : ExitReason; 366 import my.actor.mailbox : SystemExitMsg; 367 368 const shutdownPoll = sched.conf.pollInterval.orElse(20.dur!"msecs"); 369 370 const minPoll = 100.dur!"usecs"; 371 const stepPoll = minPoll; 372 const maxPoll = sched.conf.pollInterval.orElse(10.dur!"msecs"); 373 374 Duration pollInterval = minPoll; 375 376 while (sched.isActive) { 377 const runActors = sched.inShutdown.length; 378 ulong alive; 379 380 foreach (_; 0 .. runActors) { 381 if (auto a = sched.inShutdown.pop.unsafeMove) { 382 if (a.isAlive) { 383 alive++; 384 a.process(Clock.currTime); 385 sched.inShutdown.put(a); 386 } else { 387 sched.alloc.dispose(a); 388 } 389 } 390 } 391 392 if (alive == 0) { 393 () @trusted { Thread.sleep(pollInterval); }(); 394 pollInterval = max(minPoll, pollInterval + stepPoll); 395 } else { 396 pollInterval = minPoll; 397 } 398 } 399 400 while (sched.isShutdown || !sched.inShutdown.empty) { 401 if (auto a = sched.inShutdown.pop.unsafeMove) { 402 if (a.isAlive) { 403 sendSystemMsgIfEmpty(a.address, SystemExitMsg(ExitReason.kill)); 404 a.process(Clock.currTime); 405 sched.inShutdown.put(a); 406 } else { 407 sched.alloc.dispose(a); 408 } 409 } 410 } 411 } 412 413 private static void worker(Scheduler sched, const ulong id) { 414 import my.actor.msg : sendSystemMsgIfEmpty; 415 import my.actor.common : ExitReason; 416 import my.actor.mailbox : SystemExitMsg; 417 418 const maxThroughput = sched.conf.maxThroughput.orElse(50UL); 419 const pollInterval = sched.conf.pollInterval.orElse(50.dur!"msecs"); 420 const inactiveLimit = min(500.dur!"msecs", pollInterval * 3); 421 422 while (sched.isActive) { 423 const runActors = sched.waiting.length; 424 ulong consecutiveInactive; 425 426 foreach (_; 0 .. runActors) { 427 if (auto ctx = sched.pop) { 428 ulong msgs; 429 ulong prevMsgs; 430 ulong totalMsgs; 431 do { 432 // reduce clock polling 433 const now = Clock.currTime; 434 ctx.process(now); 435 prevMsgs = msgs; 436 msgs = ctx.messages; 437 totalMsgs += msgs; 438 } 439 while (totalMsgs < maxThroughput && msgs != prevMsgs); 440 441 if (totalMsgs == 0) { 442 sched.putInactive(ctx); 443 consecutiveInactive++; 444 } else { 445 consecutiveInactive = 0; 446 sched.putWaiting(ctx); 447 } 448 } else { 449 sched.wait(pollInterval); 450 } 451 } 452 453 // sleep if it is detected that actors are not sending messages 454 if (consecutiveInactive == runActors) { 455 sched.wait(inactiveLimit); 456 } 457 } 458 459 while (!sched.waiting.empty) { 460 const sleepAfter = 1 + sched.waiting.length; 461 for (size_t i; i < sleepAfter; ++i) { 462 if (auto ctx = sched.pop) { 463 sendSystemMsgIfEmpty(ctx.address, SystemExitMsg(ExitReason.kill)); 464 ctx.process(Clock.currTime); 465 sched.putWaiting(ctx); 466 } 467 } 468 469 () @trusted { Thread.sleep(pollInterval); }(); 470 } 471 } 472 473 /// Start the workers. 474 void start(TaskPool pool, const ulong nr, ActorAlloc* alloc) { 475 this.alloc = alloc; 476 foreach (const id; 0 .. nr) { 477 auto t = task!worker(this, id); 478 workers ~= t; 479 pool.put(t); 480 } 481 watcher = task!watchInactive(this); 482 watcher.executeInNewThread(Thread.PRIORITY_MIN); 483 484 shutdowner = task!watchShutdown(this); 485 shutdowner.executeInNewThread(Thread.PRIORITY_MIN); 486 } 487 488 void shutdown() { 489 isActive = false; 490 foreach (a; workers) { 491 try { 492 a.yieldForce; 493 } catch (Exception e) { 494 // TODO: log exceptions? 495 } 496 } 497 498 isWatcher = false; 499 try { 500 watcher.yieldForce; 501 } catch (Exception e) { 502 } 503 504 isShutdown = false; 505 try { 506 shutdowner.yieldForce; 507 } catch (Exception e) { 508 } 509 } 510 511 Actor* pop() { 512 return waiting.pop.unsafeMove; 513 } 514 515 void putWaiting(Actor* a) @safe { 516 if (a.isAccepting) { 517 waiting.put(a); 518 } else if (a.isAlive) { 519 inShutdown.put(a); 520 } else { 521 // TODO: should terminated actors be logged? 522 alloc.dispose(a); 523 } 524 } 525 526 void putInactive(Actor* a) @safe { 527 if (a.isAccepting) { 528 inactive.put(a); 529 } else if (a.isAlive) { 530 inShutdown.put(a); 531 } else { 532 // TODO: should terminated actors be logged? 533 alloc.dispose(a); 534 } 535 } 536 }