1 /** 2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module my.actor.system; 7 8 import core.sync.condition : Condition; 9 import core.sync.mutex : Mutex; 10 import core.thread : Thread; 11 import logger = std.experimental.logger; 12 import std.algorithm : min, max, clamp; 13 import std.datetime : dur, Clock, Duration; 14 import std.parallelism : Task, TaskPool, task; 15 import std.traits : Parameters, ReturnType; 16 17 import my.optional; 18 19 public import my.actor.typed; 20 public import my.actor.actor : Actor, build, makePromise, Promise, scopedActor, impl; 21 public import my.actor.mailbox : Address, makeAddress2, WeakAddress; 22 public import my.actor.msg; 23 import my.actor.common; 24 import my.actor.memory : ActorAlloc; 25 26 System makeSystem(TaskPool pool) @safe { 27 return System(pool, false); 28 } 29 30 System makeSystem() @safe { 31 return System(new TaskPool, true); 32 } 33 34 struct SystemConfig { 35 static struct Scheduler { 36 // number of messages each actor is allowed to consume per scheduled run. 37 Optional!ulong maxThroughput; 38 // how long a worker sleeps before polling the actor queue. 39 Optional!Duration pollInterval; 40 } 41 42 Scheduler scheduler; 43 } 44 45 struct System { 46 import std.functional : forward; 47 48 private { 49 bool running; 50 bool ownsPool; 51 TaskPool pool; 52 Backend bg; 53 } 54 55 @disable this(this); 56 57 this(TaskPool pool, bool ownsPool) @safe { 58 this(SystemConfig.init, pool, ownsPool); 59 } 60 61 /** 62 * Params: 63 * pool = thread pool to use for scheduling actors. 64 */ 65 this(SystemConfig conf, TaskPool pool, bool ownsPool) @safe { 66 this.pool = pool; 67 this.ownsPool = ownsPool; 68 this.bg = Backend(new Scheduler(conf.scheduler, pool)); 69 70 this.running = true; 71 this.bg.start(pool, pool.size); 72 } 73 74 ~this() @safe { 75 shutdown; 76 } 77 78 /// Shutdown all actors as fast as possible. 79 void shutdown() @safe { 80 if (!running) 81 return; 82 83 bg.shutdown; 84 if (ownsPool) 85 pool.finish(true); 86 pool = null; 87 88 running = false; 89 } 90 91 /// Wait for all actors to finish (terminate) before returning. 92 void wait() @safe { 93 if (!running) 94 return; 95 96 bg.shutdown; 97 if (ownsPool) 98 pool.finish(true); 99 pool = null; 100 101 running = false; 102 } 103 104 /// spawn dynamic actor. 105 WeakAddress spawn(Fn, Args...)(Fn fn, auto ref Args args) 106 if (is(Parameters!Fn[0] == Actor*) && is(ReturnType!Fn == Actor*)) { 107 auto actor = bg.alloc.make(makeAddress2); 108 return schedule(fn(actor, forward!args)); 109 } 110 111 /// spawn typed actor. 112 auto spawn(Fn, Args...)(Fn fn, auto ref Args args) 113 if (isTypedActorImpl!(Parameters!(Fn)[0])) { 114 alias ActorT = TypedActor!(Parameters!(Fn)[0].AllowedMessages); 115 auto actor = bg.alloc.make(makeAddress2); 116 auto impl = fn(ActorT.Impl(actor), forward!args); 117 schedule(actor); 118 return impl.address; 119 } 120 121 // schedule an actor for execution in the thread pool. 122 // Returns: the address of the actor. 123 private WeakAddress schedule(Actor* actor) @safe { 124 assert(bg.scheduler.isActive); 125 setHomeSystem(actor); 126 bg.scheduler.putWaiting(actor); 127 return actor.address; 128 } 129 130 // set the homesystem of the actor. this is safe on the assumption that the 131 // actor system is the last to terminate. 132 private void setHomeSystem(Actor* actor) @trusted { 133 actor.setHomeSystem(&this); 134 } 135 } 136 137 @("shall start an actor system, execute an actor and shutdown") 138 @safe unittest { 139 auto sys = makeSystem; 140 141 int hasExecutedWith42; 142 static void fn(ref Capture!(int*, "hasExecutedWith42") c, int x) { 143 if (x == 42) 144 (*c.hasExecutedWith42)++; 145 } 146 147 auto addr = sys.spawn((Actor* a) => build(a).set(&fn, capture(&hasExecutedWith42)).finalize); 148 send(addr, 42); 149 send(addr, 43); 150 151 const failAfter = Clock.currTime + 3.dur!"seconds"; 152 const start = Clock.currTime; 153 while (hasExecutedWith42 == 0 && Clock.currTime < failAfter) { 154 } 155 const td = Clock.currTime - start; 156 157 assert(hasExecutedWith42 == 1); 158 assert(td < 3.dur!"seconds"); 159 } 160 161 @("shall be possible to send a message to self during construction") 162 unittest { 163 auto sys = makeSystem; 164 165 int hasExecutedWith42; 166 static void fn(ref Capture!(int*, "hasExecutedWith42") c, int x) { 167 if (x == 42) 168 (*c.hasExecutedWith42)++; 169 } 170 171 auto addr = sys.spawn((Actor* self) { 172 send(self, 42); 173 return impl(self, &fn, capture(&hasExecutedWith42)); 174 }); 175 send(addr, 42); 176 send(addr, 43); 177 178 const failAfter = Clock.currTime + 3.dur!"seconds"; 179 while (hasExecutedWith42 < 2 && Clock.currTime < failAfter) { 180 } 181 182 assert(hasExecutedWith42 == 2); 183 } 184 185 @("shall spawn two typed actors which are connected, execute and shutdow") 186 @safe unittest { 187 import std.typecons : Tuple; 188 189 auto sys = makeSystem; 190 191 alias A1 = typedActor!(int function(int), string function(int, int)); 192 alias A2 = typedActor!(int function(int)); 193 194 auto spawnA1(A1.Impl self) { 195 return my.actor.typed.impl(self, (int a) { return a + 10; }, (int a, int b) => "hej"); 196 } 197 198 auto a1 = sys.spawn(&spawnA1); 199 200 // final result from A2's continuation. 201 auto spawnA2(A2.Impl self) { 202 return my.actor.typed.impl(self, (ref Capture!(A2.Impl, "self", A1.Address, "a1") c, int x) { 203 auto p = makePromise!int; 204 // dfmt off 205 c.self.request(c.a1, infTimeout) 206 .send(x + 10) 207 .capture(p) 208 .then((ref Tuple!(Promise!int, "p") ctx, int a) { ctx.p.deliver(a); }); 209 // dfmt on 210 return p; 211 }, capture(self, a1)); 212 } 213 214 auto a2 = sys.spawn(&spawnA2); 215 216 auto self = scopedActor; 217 int ok; 218 // start msg to a2 which pass it on to a1. 219 self.request(a2, infTimeout).send(10).then((int x) { ok = x; }); 220 221 assert(ok == 30); 222 } 223 224 private: 225 @safe: 226 227 struct Backend { 228 Scheduler scheduler; 229 ActorAlloc alloc; 230 231 // trusted: the ref to alloc on the assumption that the actor system is the 232 // last to terminate. the backend is owned by the system and correctly 233 // shutdown. 234 void start(TaskPool pool, ulong workers) @trusted { 235 scheduler.start(pool, workers, &alloc); 236 } 237 238 void shutdown() { 239 import core.memory : GC; 240 import my.libc : malloc_trim; 241 242 scheduler.shutdown; 243 scheduler = null; 244 //() @trusted { .destroy(scheduler); GC.collect; malloc_trim(0); }(); 245 () @trusted { .destroy(scheduler); }(); 246 () @trusted { GC.collect; }(); 247 () @trusted { malloc_trim(0); }(); 248 } 249 } 250 251 /** Schedule actors for execution. 252 * 253 * A worker pop an actor, execute it and then put it back for later scheduling. 254 * 255 * A watcher monitors inactive actors for either messages to have arrived or 256 * timeouts to trigger. They are then moved back to the waiting queue. The 257 * workers are notified that there are actors waiting to be executed. 258 */ 259 class Scheduler { 260 import core.atomic : atomicOp, atomicLoad; 261 262 SystemConfig.Scheduler conf; 263 264 ActorAlloc* alloc; 265 266 /// Workers will shutdown cleanly if it is false. 267 bool isActive; 268 269 /// Watcher will shutdown cleanly if this is false. 270 bool isWatcher; 271 272 /// Shutdowner will shutdown cleanly if false; 273 bool isShutdown; 274 275 // Workers waiting to be activated 276 Mutex waitingWorkerMtx; 277 Condition waitingWorker; 278 279 // actors waiting to be executed by a worker. 280 Queue!(Actor*) waiting; 281 282 // Actors waiting for messages to arrive thus they are inactive. 283 Queue!(Actor*) inactive; 284 285 // Actors that are shutting down. 286 Queue!(Actor*) inShutdown; 287 288 Task!(worker, Scheduler, const ulong)*[] workers; 289 Task!(watchInactive, Scheduler)* watcher; 290 Task!(watchShutdown, Scheduler)* shutdowner; 291 292 this(SystemConfig.Scheduler conf, TaskPool pool) { 293 this.conf = conf; 294 this.isActive = true; 295 this.isWatcher = true; 296 this.isShutdown = true; 297 this.waiting = typeof(waiting)(new Mutex); 298 this.inactive = typeof(inactive)(new Mutex); 299 this.inShutdown = typeof(inShutdown)(new Mutex); 300 301 this.waitingWorkerMtx = new Mutex; 302 this.waitingWorker = new Condition(this.waitingWorkerMtx); 303 } 304 305 void wakeup() @trusted { 306 synchronized (waitingWorkerMtx) { 307 waitingWorker.notify; 308 } 309 } 310 311 void wait(Duration w) @trusted { 312 synchronized (waitingWorkerMtx) { 313 waitingWorker.wait(w); 314 } 315 } 316 317 /// check the inactive actors for activity. 318 private static void watchInactive(Scheduler sched) { 319 const maxThroughput = sched.conf.maxThroughput.orElse(50UL); 320 const shutdownPoll = sched.conf.pollInterval.orElse(20.dur!"msecs"); 321 322 const minPoll = 100.dur!"usecs"; 323 const stepPoll = minPoll; 324 const maxPoll = sched.conf.pollInterval.orElse(10.dur!"msecs"); 325 326 Duration pollInterval = minPoll; 327 328 while (sched.isActive) { 329 const runActors = sched.inactive.length; 330 ulong inactive; 331 Duration nextPoll = pollInterval; 332 333 foreach (_; 0 .. runActors) { 334 if (auto a = sched.inactive.pop.unsafeMove) { 335 if (a.hasMessage) { 336 sched.putWaiting(a); 337 } else { 338 const t = a.nextTimeout(Clock.currTime, maxPoll); 339 340 if (t < minPoll) { 341 sched.putWaiting(a); 342 } else { 343 sched.putInactive(a); 344 nextPoll = inactive == 0 ? t : min(nextPoll, t); 345 inactive++; 346 } 347 } 348 } 349 } 350 351 if (inactive != 0) { 352 pollInterval = clamp(nextPoll, minPoll, maxPoll); 353 } 354 355 if (inactive == runActors) { 356 () @trusted { Thread.sleep(pollInterval); }(); 357 pollInterval = min(maxPoll, pollInterval); 358 } else { 359 sched.wakeup; 360 pollInterval = minPoll; 361 } 362 } 363 364 while (sched.isWatcher || !sched.inactive.empty) { 365 if (auto a = sched.inactive.pop.unsafeMove) { 366 sched.inShutdown.put(a); 367 } 368 } 369 } 370 371 /// finish shutdown of actors that are shutting down. 372 private static void watchShutdown(Scheduler sched) { 373 import my.actor.msg : sendSystemMsgIfEmpty; 374 import my.actor.common : ExitReason; 375 import my.actor.mailbox : SystemExitMsg; 376 377 const shutdownPoll = sched.conf.pollInterval.orElse(20.dur!"msecs"); 378 379 const minPoll = 100.dur!"usecs"; 380 const stepPoll = minPoll; 381 const maxPoll = sched.conf.pollInterval.orElse(10.dur!"msecs"); 382 383 Duration pollInterval = minPoll; 384 385 while (sched.isActive) { 386 const runActors = sched.inShutdown.length; 387 ulong alive; 388 389 foreach (_; 0 .. runActors) { 390 if (auto a = sched.inShutdown.pop.unsafeMove) { 391 if (a.isAlive) { 392 alive++; 393 a.process(Clock.currTime); 394 sched.inShutdown.put(a); 395 } else { 396 sched.alloc.dispose(a); 397 } 398 } 399 } 400 401 if (alive == 0) { 402 () @trusted { Thread.sleep(pollInterval); }(); 403 pollInterval = max(minPoll, pollInterval + stepPoll); 404 } else { 405 pollInterval = minPoll; 406 } 407 } 408 409 while (sched.isShutdown || !sched.inShutdown.empty) { 410 if (auto a = sched.inShutdown.pop.unsafeMove) { 411 if (a.isAlive) { 412 sendSystemMsgIfEmpty(a.address, SystemExitMsg(ExitReason.kill)); 413 a.process(Clock.currTime); 414 sched.inShutdown.put(a); 415 } else { 416 sched.alloc.dispose(a); 417 } 418 } 419 } 420 } 421 422 private static void worker(Scheduler sched, const ulong id) { 423 import my.actor.msg : sendSystemMsgIfEmpty; 424 import my.actor.common : ExitReason; 425 import my.actor.mailbox : SystemExitMsg; 426 427 const maxThroughput = sched.conf.maxThroughput.orElse(50UL); 428 const pollInterval = sched.conf.pollInterval.orElse(50.dur!"msecs"); 429 const inactiveLimit = min(500.dur!"msecs", pollInterval * 3); 430 431 while (sched.isActive) { 432 const runActors = sched.waiting.length; 433 ulong consecutiveInactive; 434 435 foreach (_; 0 .. runActors) { 436 if (auto ctx = sched.pop) { 437 ulong msgs; 438 ulong prevMsgs; 439 ulong totalMsgs; 440 do { 441 // reduce clock polling 442 const now = Clock.currTime; 443 ctx.process(now); 444 prevMsgs = msgs; 445 msgs = ctx.messages; 446 totalMsgs += msgs; 447 } 448 while (totalMsgs < maxThroughput && msgs != prevMsgs); 449 450 if (totalMsgs == 0) { 451 sched.putInactive(ctx); 452 consecutiveInactive++; 453 } else { 454 consecutiveInactive = 0; 455 sched.putWaiting(ctx); 456 } 457 } else { 458 sched.wait(pollInterval); 459 } 460 } 461 462 // sleep if it is detected that actors are not sending messages 463 if (consecutiveInactive == runActors) { 464 sched.wait(inactiveLimit); 465 } 466 } 467 468 while (!sched.waiting.empty) { 469 const sleepAfter = 1 + sched.waiting.length; 470 for (size_t i; i < sleepAfter; ++i) { 471 if (auto ctx = sched.pop) { 472 sendSystemMsgIfEmpty(ctx.address, SystemExitMsg(ExitReason.kill)); 473 ctx.process(Clock.currTime); 474 sched.putWaiting(ctx); 475 } 476 } 477 478 () @trusted { Thread.sleep(pollInterval); }(); 479 } 480 } 481 482 /// Start the workers. 483 void start(TaskPool pool, const ulong nr, ActorAlloc* alloc) { 484 this.alloc = alloc; 485 foreach (const id; 0 .. nr) { 486 auto t = task!worker(this, id); 487 workers ~= t; 488 pool.put(t); 489 } 490 watcher = task!watchInactive(this); 491 watcher.executeInNewThread(Thread.PRIORITY_MIN); 492 493 shutdowner = task!watchShutdown(this); 494 shutdowner.executeInNewThread(Thread.PRIORITY_MIN); 495 } 496 497 void shutdown() { 498 isActive = false; 499 foreach (a; workers) { 500 try { 501 a.yieldForce; 502 } catch (Exception e) { 503 // TODO: log exceptions? 504 } 505 } 506 507 isWatcher = false; 508 try { 509 watcher.yieldForce; 510 } catch (Exception e) { 511 } 512 513 isShutdown = false; 514 try { 515 shutdowner.yieldForce; 516 } catch (Exception e) { 517 } 518 } 519 520 Actor* pop() { 521 return waiting.pop.unsafeMove; 522 } 523 524 void putWaiting(Actor* a) @safe { 525 if (a.isAccepting) { 526 waiting.put(a); 527 } else if (a.isAlive) { 528 inShutdown.put(a); 529 } else { 530 // TODO: should terminated actors be logged? 531 alloc.dispose(a); 532 } 533 } 534 535 void putInactive(Actor* a) @safe { 536 if (a.isAccepting) { 537 inactive.put(a); 538 } else if (a.isAlive) { 539 inShutdown.put(a); 540 } else { 541 // TODO: should terminated actors be logged? 542 alloc.dispose(a); 543 } 544 } 545 }