1 /** 2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module my.actor.system; 7 8 import core.sync.mutex : Mutex; 9 import core.sync.condition : Condition; 10 import core.thread : Thread; 11 import std.algorithm : min, max; 12 import std.datetime : dur, Clock, Duration; 13 import std.parallelism : Task, TaskPool, task; 14 import std.traits : Parameters, ReturnType; 15 16 import my.optional; 17 18 public import my.actor.typed; 19 public import my.actor.actor : Actor, build, makePromise, Promise, scopedActor, impl; 20 public import my.actor.mailbox : Address, makeAddress2, WeakAddress; 21 public import my.actor.msg; 22 import my.actor.common; 23 import my.actor.memory : ActorAlloc; 24 25 System makeSystem(TaskPool pool) @safe { 26 return System(pool, false); 27 } 28 29 System makeSystem() @safe { 30 return System(new TaskPool, true); 31 } 32 33 struct SystemConfig { 34 static struct Scheduler { 35 // number of messages each actor is allowed to consume per scheduled run. 36 Optional!ulong maxThroughput; 37 // how long a worker sleeps before polling the actor queue. 38 Optional!Duration pollInterval; 39 } 40 41 Scheduler scheduler; 42 } 43 44 struct System { 45 import std.functional : forward; 46 47 private { 48 bool running; 49 bool ownsPool; 50 TaskPool pool; 51 Backend bg; 52 } 53 54 @disable this(this); 55 56 this(TaskPool pool, bool ownsPool) @safe { 57 this(SystemConfig.init, pool, ownsPool); 58 } 59 60 /** 61 * Params: 62 * pool = thread pool to use for scheduling actors. 63 */ 64 this(SystemConfig conf, TaskPool pool, bool ownsPool) @safe { 65 this.pool = pool; 66 this.ownsPool = ownsPool; 67 this.bg = Backend(new Scheduler(conf.scheduler, pool)); 68 69 this.running = true; 70 this.bg.start(pool, pool.size); 71 } 72 73 ~this() @safe { 74 shutdown; 75 } 76 77 /// Shutdown all actors as fast as possible. 78 void shutdown() @safe { 79 if (!running) 80 return; 81 82 bg.shutdown; 83 if (ownsPool) 84 pool.finish(true); 85 pool = null; 86 87 running = false; 88 } 89 90 /// Wait for all actors to finish (terminate) before returning. 91 void wait() @safe { 92 if (!running) 93 return; 94 95 bg.shutdown; 96 if (ownsPool) 97 pool.finish(true); 98 pool = null; 99 100 running = false; 101 } 102 103 /// spawn dynamic actor. 104 WeakAddress spawn(Fn, Args...)(Fn fn, auto ref Args args) 105 if (is(Parameters!Fn[0] == Actor*) && is(ReturnType!Fn == Actor*)) { 106 auto actor = bg.alloc.make(makeAddress2); 107 return schedule(fn(actor, forward!args)); 108 } 109 110 /// spawn typed actor. 111 auto spawn(Fn, Args...)(Fn fn, auto ref Args args) 112 if (isTypedActorImpl!(Parameters!(Fn)[0])) { 113 alias ActorT = TypedActor!(Parameters!(Fn)[0].AllowedMessages); 114 auto actor = bg.alloc.make(makeAddress2); 115 auto impl = fn(ActorT.Impl(actor), forward!args); 116 schedule(actor); 117 return impl.address; 118 } 119 120 // schedule an actor for execution in the thread pool. 121 // Returns: the address of the actor. 122 private WeakAddress schedule(Actor* actor) @safe { 123 assert(bg.scheduler.isActive); 124 actor.setHomeSystem(&this); 125 bg.scheduler.putWaiting(actor); 126 return actor.address; 127 } 128 } 129 130 @("shall start an actor system, execute an actor and shutdown") 131 @safe unittest { 132 auto sys = makeSystem; 133 134 int hasExecutedWith42; 135 static void fn(ref Capture!(int*, "hasExecutedWith42") c, int x) { 136 if (x == 42) 137 (*c.hasExecutedWith42)++; 138 } 139 140 auto addr = sys.spawn((Actor* a) => build(a).set(&fn, capture(&hasExecutedWith42)).finalize); 141 send(addr, 42); 142 send(addr, 43); 143 144 const failAfter = Clock.currTime + 3.dur!"seconds"; 145 const start = Clock.currTime; 146 while (hasExecutedWith42 == 0 && Clock.currTime < failAfter) { 147 } 148 const td = Clock.currTime - start; 149 150 assert(hasExecutedWith42 == 1); 151 assert(td < 3.dur!"seconds"); 152 } 153 154 @("shall be possible to send a message to self during construction") 155 unittest { 156 auto sys = makeSystem; 157 158 int hasExecutedWith42; 159 static void fn(ref Capture!(int*, "hasExecutedWith42") c, int x) { 160 if (x == 42) 161 (*c.hasExecutedWith42)++; 162 } 163 164 auto addr = sys.spawn((Actor* self) { 165 send(self, 42); 166 return impl(self, &fn, capture(&hasExecutedWith42)); 167 }); 168 send(addr, 42); 169 send(addr, 43); 170 171 const failAfter = Clock.currTime + 3.dur!"seconds"; 172 while (hasExecutedWith42 < 2 && Clock.currTime < failAfter) { 173 } 174 175 assert(hasExecutedWith42 == 2); 176 } 177 178 @("shall spawn two typed actors which are connected, execute and shutdow") 179 @safe unittest { 180 import std.typecons : Tuple; 181 182 auto sys = makeSystem; 183 184 alias A1 = typedActor!(int function(int), string function(int, int)); 185 alias A2 = typedActor!(int function(int)); 186 187 auto spawnA1(A1.Impl self) { 188 return my.actor.typed.impl(self, (int a) { return a + 10; }, (int a, int b) => "hej"); 189 } 190 191 auto a1 = sys.spawn(&spawnA1); 192 193 // final result from A2's continuation. 194 auto spawnA2(A2.Impl self) { 195 return my.actor.typed.impl(self, (ref Capture!(A2.Impl, "self", A1.Address, "a1") c, int x) { 196 auto p = makePromise!int; 197 // dfmt off 198 c.self.request(c.a1, infTimeout) 199 .send(x + 10) 200 .capture(p) 201 .then((ref Tuple!(Promise!int, "p") ctx, int a) { ctx.p.deliver(a); }); 202 // dfmt on 203 return p; 204 }, capture(self, a1)); 205 } 206 207 auto a2 = sys.spawn(&spawnA2); 208 209 auto self = scopedActor; 210 int ok; 211 // start msg to a2 which pass it on to a1. 212 self.request(a2, infTimeout).send(10).then((int x) { ok = x; }); 213 214 assert(ok == 30); 215 } 216 217 private: 218 @safe: 219 220 struct Backend { 221 Scheduler scheduler; 222 ActorAlloc alloc; 223 224 void start(TaskPool pool, ulong workers) { 225 scheduler.start(pool, workers, &alloc); 226 } 227 228 void shutdown() { 229 import core.memory : GC; 230 import my.libc : malloc_trim; 231 232 scheduler.shutdown; 233 scheduler = null; 234 //() @trusted { .destroy(scheduler); GC.collect; malloc_trim(0); }(); 235 () @trusted { .destroy(scheduler); }(); 236 () @trusted { GC.collect; }(); 237 () @trusted { malloc_trim(0); }(); 238 } 239 } 240 241 /** Schedule actors for execution. 242 * 243 * A worker pop an actor, execute it and then put it back for later scheduling. 244 * 245 * A watcher monitors inactive actors for either messages to have arrived or 246 * timeouts to trigger. They are then moved back to the waiting queue. The 247 * workers are notified that there are actors waiting to be executed. 248 */ 249 class Scheduler { 250 import core.atomic : atomicOp, atomicLoad; 251 252 SystemConfig.Scheduler conf; 253 254 ActorAlloc* alloc; 255 256 /// Workers will shutdown cleanly if it is false. 257 bool isActive; 258 259 /// Watcher will shutdown cleanly if this is false. 260 bool isWatcher; 261 262 /// Shutdowner will shutdown cleanly if false; 263 bool isShutdown; 264 265 // Workers waiting to be activated 266 Mutex waitingWorkerMtx; 267 Condition waitingWorker; 268 269 // actors waiting to be executed by a worker. 270 Queue!(Actor*) waiting; 271 272 // Actors waiting for messages to arrive thus they are inactive. 273 Queue!(Actor*) inactive; 274 275 // Actors that are shutting down. 276 Queue!(Actor*) inShutdown; 277 278 Task!(worker, Scheduler, const ulong)*[] workers; 279 Task!(watchInactive, Scheduler)* watcher; 280 Task!(watchShutdown, Scheduler)* shutdowner; 281 282 this(SystemConfig.Scheduler conf, TaskPool pool) { 283 this.conf = conf; 284 this.isActive = true; 285 this.isWatcher = true; 286 this.isShutdown = true; 287 this.waiting = typeof(waiting)(new Mutex); 288 this.inactive = typeof(inactive)(new Mutex); 289 this.inShutdown = typeof(inShutdown)(new Mutex); 290 291 this.waitingWorkerMtx = new Mutex; 292 this.waitingWorker = new Condition(this.waitingWorkerMtx); 293 } 294 295 void wakeup() @trusted { 296 synchronized (waitingWorkerMtx) { 297 waitingWorker.notify; 298 } 299 } 300 301 void wait(Duration w) @trusted { 302 synchronized (waitingWorkerMtx) { 303 waitingWorker.wait(w); 304 } 305 } 306 307 /// check the inactive actors for activity. 308 private static void watchInactive(Scheduler sched) { 309 const maxThroughput = sched.conf.maxThroughput.orElse(50UL); 310 const shutdownPoll = sched.conf.pollInterval.orElse(20.dur!"msecs"); 311 312 const minPoll = 100.dur!"usecs"; 313 const stepPoll = minPoll; 314 const maxPoll = sched.conf.pollInterval.orElse(10.dur!"msecs"); 315 316 Duration pollInterval = minPoll; 317 318 while (sched.isActive) { 319 const runActors = sched.inactive.length; 320 ulong inactive; 321 Duration nextPoll = pollInterval; 322 323 foreach (_; 0 .. runActors) { 324 if (auto a = sched.inactive.pop.unsafeMove) { 325 326 if (a.hasMessage) { 327 sched.putWaiting(a); 328 } else { 329 const t = a.nextTimeout(Clock.currTime, maxPoll); 330 331 if (t < minPoll) { 332 sched.putWaiting(a); 333 } else { 334 sched.putInactive(a); 335 nextPoll = inactive == 0 ? t : min(nextPoll, t); 336 inactive++; 337 } 338 } 339 } 340 } 341 342 if (inactive != 0) { 343 pollInterval = max(minPoll, nextPoll); 344 } 345 346 if (inactive == runActors) { 347 () @trusted { Thread.sleep(pollInterval); }(); 348 pollInterval = min(maxPoll, pollInterval); 349 } else { 350 sched.wakeup; 351 pollInterval = minPoll; 352 } 353 } 354 355 while (sched.isWatcher || !sched.inactive.empty) { 356 if (auto a = sched.inactive.pop.unsafeMove) { 357 sched.inShutdown.put(a); 358 } 359 } 360 } 361 362 /// finish shutdown of actors that are shutting down. 363 private static void watchShutdown(Scheduler sched) { 364 import my.actor.msg : sendSystemMsgIfEmpty; 365 import my.actor.common : ExitReason; 366 import my.actor.mailbox : SystemExitMsg; 367 368 const shutdownPoll = sched.conf.pollInterval.orElse(20.dur!"msecs"); 369 370 const minPoll = 100.dur!"usecs"; 371 const stepPoll = minPoll; 372 const maxPoll = sched.conf.pollInterval.orElse(10.dur!"msecs"); 373 374 Duration pollInterval = minPoll; 375 376 while (sched.isActive) { 377 const runActors = sched.inShutdown.length; 378 ulong alive; 379 380 foreach (_; 0 .. runActors) { 381 if (auto a = sched.inShutdown.pop.unsafeMove) { 382 if (a.isAlive) { 383 alive++; 384 a.process(Clock.currTime); 385 sched.inShutdown.put(a); 386 } else { 387 sched.alloc.dispose(a); 388 } 389 } 390 } 391 392 if (alive == 0) { 393 () @trusted { Thread.sleep(pollInterval); }(); 394 pollInterval = max(minPoll, pollInterval + stepPoll); 395 } else { 396 pollInterval = minPoll; 397 } 398 } 399 400 while (sched.isShutdown || !sched.inShutdown.empty) { 401 if (auto a = sched.inShutdown.pop.unsafeMove) { 402 if (a.isAlive) { 403 sendSystemMsgIfEmpty(a.address, SystemExitMsg(ExitReason.kill)); 404 a.process(Clock.currTime); 405 sched.inShutdown.put(a); 406 } else { 407 sched.alloc.dispose(a); 408 } 409 } 410 } 411 } 412 413 private static void worker(Scheduler sched, const ulong id) { 414 import my.actor.msg : sendSystemMsgIfEmpty; 415 import my.actor.common : ExitReason; 416 import my.actor.mailbox : SystemExitMsg; 417 418 const maxThroughput = sched.conf.maxThroughput.orElse(50UL); 419 const pollInterval = sched.conf.pollInterval.orElse(50.dur!"msecs"); 420 const inactiveLimit = min(500.dur!"msecs", pollInterval * 3); 421 422 while (sched.isActive) { 423 const runActors = sched.waiting.length; 424 ulong consecutiveInactive; 425 426 foreach (_; 0 .. runActors) { 427 // reduce clock polling 428 const now = Clock.currTime; 429 if (auto ctx = sched.pop) { 430 ulong msgs; 431 ulong totalMsgs; 432 do { 433 ctx.process(now); 434 msgs = ctx.messages; 435 totalMsgs += msgs; 436 } 437 while (totalMsgs < maxThroughput && msgs != 0); 438 439 if (totalMsgs == 0) { 440 sched.putInactive(ctx); 441 consecutiveInactive++; 442 } else { 443 consecutiveInactive = 0; 444 sched.putWaiting(ctx); 445 } 446 } else { 447 sched.wait(pollInterval); 448 } 449 } 450 451 // sleep if it is detected that actors are not sending messages 452 if (consecutiveInactive == runActors) { 453 sched.wait(inactiveLimit); 454 } 455 } 456 457 while (!sched.waiting.empty) { 458 const sleepAfter = 1 + sched.waiting.length; 459 for (size_t i; i < sleepAfter; ++i) { 460 if (auto ctx = sched.pop) { 461 sendSystemMsgIfEmpty(ctx.address, SystemExitMsg(ExitReason.kill)); 462 ctx.process(Clock.currTime); 463 sched.putWaiting(ctx); 464 } 465 } 466 467 () @trusted { Thread.sleep(pollInterval); }(); 468 } 469 } 470 471 /// Start the workers. 472 void start(TaskPool pool, const ulong nr, ActorAlloc* alloc) { 473 this.alloc = alloc; 474 foreach (const id; 0 .. nr) { 475 auto t = task!worker(this, id); 476 workers ~= t; 477 pool.put(t); 478 } 479 watcher = task!watchInactive(this); 480 watcher.executeInNewThread(Thread.PRIORITY_MIN); 481 482 shutdowner = task!watchShutdown(this); 483 shutdowner.executeInNewThread(Thread.PRIORITY_MIN); 484 } 485 486 void shutdown() { 487 isActive = false; 488 foreach (a; workers) { 489 try { 490 a.yieldForce; 491 } catch (Exception e) { 492 // TODO: log exceptions? 493 } 494 } 495 496 isWatcher = false; 497 try { 498 watcher.yieldForce; 499 } catch (Exception e) { 500 } 501 502 isShutdown = false; 503 try { 504 shutdowner.yieldForce; 505 } catch (Exception e) { 506 } 507 } 508 509 Actor* pop() { 510 return waiting.pop.unsafeMove; 511 } 512 513 void putWaiting(Actor* a) @safe { 514 if (a.isAccepting) { 515 waiting.put(a); 516 } else if (a.isAlive) { 517 inShutdown.put(a); 518 } else { 519 // TODO: should terminated actors be logged? 520 alloc.dispose(a); 521 } 522 } 523 524 void putInactive(Actor* a) @safe { 525 if (a.isAccepting) { 526 inactive.put(a); 527 } else if (a.isAlive) { 528 inShutdown.put(a); 529 } else { 530 // TODO: should terminated actors be logged? 531 alloc.dispose(a); 532 } 533 } 534 }