1 /** 2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module my.actor.actor; 7 8 import std.stdio : writeln, writefln; 9 10 import core.thread : Thread; 11 import logger = std.experimental.logger; 12 import std.algorithm : schwartzSort, max, min, among; 13 import std.array : empty; 14 import std.datetime : SysTime, Clock, dur; 15 import std.exception : collectException; 16 import std.functional : toDelegate; 17 import std.meta : staticMap; 18 import std.traits : Parameters, Unqual, ReturnType, isFunctionPointer, isFunction; 19 import std.typecons : Tuple, tuple; 20 import std.variant : Variant; 21 22 import my.actor.common : ExitReason, SystemError, makeSignature; 23 import my.actor.mailbox; 24 import my.actor.msg; 25 import my.actor.system : System; 26 import my.actor.typed : isTypedAddress, isTypedActorImpl; 27 import my.gc.refc; 28 import sumtype; 29 30 private struct PromiseData { 31 WeakAddress replyTo; 32 ulong replyId; 33 34 /// Copy constructor 35 this(ref return scope typeof(this) rhs) @safe nothrow @nogc { 36 replyTo = rhs.replyTo; 37 replyId = rhs.replyId; 38 } 39 40 @disable this(this); 41 } 42 43 // deliver can only be called one time. 44 struct Promise(T) { 45 package { 46 RefCounted!PromiseData data; 47 } 48 49 void deliver(T reply) { 50 auto tmp = reply; 51 deliver(reply); 52 } 53 54 /** Deliver the message `reply`. 55 * 56 * A promise can only be delivered once. 57 */ 58 void deliver(ref T reply) @trusted 59 in (!data.empty, "promise must be initialized") { 60 if (data.empty) 61 return; 62 scope (exit) 63 data.release; 64 65 // TODO: should probably call delivering actor with an ErrorMsg if replyTo is closed. 66 if (auto replyTo = data.get.replyTo.lock.get) { 67 enum wrapInTuple = !is(T : Tuple!U, U); 68 static if (wrapInTuple) 69 replyTo.put(Reply(data.get.replyId, Variant(tuple(reply)))); 70 else 71 replyTo.put(Reply(data.get.replyId, Variant(reply))); 72 } 73 } 74 75 void opAssign(Promise!T rhs) { 76 data = rhs.data; 77 } 78 79 /// True if the promise is not initialized. 80 bool empty() { 81 return data.empty || data.get.replyId == 0; 82 } 83 84 /// Clear the promise. 85 void clear() { 86 data.release; 87 } 88 } 89 90 auto makePromise(T)() { 91 return Promise!T(refCounted(PromiseData.init)); 92 } 93 94 struct RequestResult(T) { 95 this(T v) { 96 value = typeof(value)(v); 97 } 98 99 this(ErrorMsg v) { 100 value = typeof(value)(v); 101 } 102 103 this(Promise!T v) { 104 value = typeof(value)(v); 105 } 106 107 SumType!(T, ErrorMsg, Promise!T) value; 108 } 109 110 private alias MsgHandler = void delegate(void* ctx, ref Variant msg) @safe; 111 private alias RequestHandler = void delegate(void* ctx, ref Variant msg, 112 ulong replyId, WeakAddress replyTo) @safe; 113 private alias ReplyHandler = void delegate(void* ctx, ref Variant msg) @safe; 114 115 alias DefaultHandler = void delegate(ref Actor self, ref Variant msg) @safe nothrow; 116 117 /** Actors send error messages to others by returning an error (see Errors) 118 * from a message handler. Similar to exit messages, error messages usually 119 * cause the receiving actor to terminate, unless a custom handler was 120 * installed. The default handler is used as fallback if request is used 121 * without error handler. 122 */ 123 alias ErrorHandler = void delegate(ref Actor self, ErrorMsg) @safe nothrow; 124 125 /** Bidirectional monitoring with a strong lifetime coupling is established by 126 * calling a `LinkRequest` to an address. This will cause the runtime to send 127 * an `ExitMsg` if either this or other dies. Per default, actors terminate 128 * after receiving an `ExitMsg` unless the exit reason is exit_reason::normal. 129 * This mechanism propagates failure states in an actor system. Linked actors 130 * form a sub system in which an error causes all actors to fail collectively. 131 */ 132 alias ExitHandler = void delegate(ref Actor self, ExitMsg msg) @safe nothrow; 133 134 /// An exception has been thrown while processing a message. 135 alias ExceptionHandler = void delegate(ref Actor self, Exception e) @safe nothrow; 136 137 /** Actors can monitor the lifetime of other actors by sending a `MonitorRequest` 138 * to an address. This will cause the runtime system to send a `DownMsg` for 139 * other if it dies. 140 * 141 * Actors drop down messages unless they provide a custom handler. 142 */ 143 alias DownHandler = void delegate(ref Actor self, DownMsg msg) @safe nothrow; 144 145 void defaultHandler(ref Actor self, ref Variant msg) @safe nothrow { 146 } 147 148 /// Write the name of the actor and the message type to the console. 149 void logAndDropHandler(ref Actor self, ref Variant msg) @trusted nothrow { 150 import std.stdio : writeln; 151 152 try { 153 writeln("UNKNOWN message sent to actor ", self.name); 154 writeln(msg.toString); 155 } catch (Exception e) { 156 } 157 } 158 159 void defaultErrorHandler(ref Actor self, ErrorMsg msg) @safe nothrow { 160 self.lastError = msg.reason; 161 self.shutdown; 162 } 163 164 void defaultExitHandler(ref Actor self, ExitMsg msg) @safe nothrow { 165 self.lastError = msg.reason; 166 self.forceShutdown; 167 } 168 169 void defaultExceptionHandler(ref Actor self, Exception e) @safe nothrow { 170 self.lastError = SystemError.runtimeError; 171 // TODO: should log? 172 self.forceShutdown; 173 } 174 175 // Write the name of the actor and the exception to stdout. 176 void logExceptionHandler(ref Actor self, Exception e) @safe nothrow { 177 import std.stdio : writeln; 178 179 self.lastError = SystemError.runtimeError; 180 181 try { 182 writeln("EXCEPTION thrown by actor ", self.name); 183 writeln(e.msg); 184 writeln("TERMINATING"); 185 } catch (Exception e) { 186 } 187 188 self.forceShutdown; 189 } 190 191 /// Timeout for an outstanding request. 192 struct ReplyHandlerTimeout { 193 ulong id; 194 SysTime timeout; 195 } 196 197 package enum ActorState { 198 /// waiting to be started. 199 waiting, 200 /// active and processing messages. 201 active, 202 /// wait for all awaited responses to finish 203 shutdown, 204 /// discard also the awaite responses, just shutdown fast 205 forceShutdown, 206 /// in process of shutting down 207 finishShutdown, 208 /// stopped. 209 stopped, 210 } 211 212 private struct AwaitReponse { 213 Closure!(ReplyHandler, void*) behavior; 214 ErrorHandler onError; 215 } 216 217 struct Actor { 218 import std.container.rbtree : RedBlackTree, redBlackTree; 219 220 package StrongAddress addr; 221 // visible in the package for logging purpose. 222 package ActorState state_ = ActorState.stopped; 223 224 private { 225 // TODO: rename to behavior. 226 Closure!(MsgHandler, void*)[ulong] incoming; 227 Closure!(RequestHandler, void*)[ulong] reqBehavior; 228 229 // callbacks for awaited responses key:ed on their id. 230 AwaitReponse[ulong] awaitedResponses; 231 ReplyHandlerTimeout[] replyTimeouts; 232 233 // important that it start at 1 because then zero is known to not be initialized. 234 ulong nextReplyId = 1; 235 236 /// Delayed messages ordered by their trigger time. 237 RedBlackTree!(DelayedMsg*, "a.triggerAt < b.triggerAt", true) delayed; 238 239 /// Used during shutdown to signal monitors and links why this actor is terminating. 240 SystemError lastError; 241 242 /// monitoring the actor lifetime. 243 WeakAddress[size_t] monitors; 244 245 /// strong, bidirectional link of the actors lifetime. 246 WeakAddress[size_t] links; 247 248 // Number of messages that has been processed. 249 ulong messages_; 250 251 /// System the actor belongs to. 252 System* homeSystem_; 253 254 string name_; 255 256 ErrorHandler errorHandler_; 257 258 /// callback when a link goes down. 259 DownHandler downHandler_; 260 261 ExitHandler exitHandler_; 262 263 ExceptionHandler exceptionHandler_; 264 265 DefaultHandler defaultHandler_; 266 } 267 268 invariant () { 269 if (addr && !state_.among(ActorState.waiting, ActorState.shutdown)) { 270 assert(errorHandler_); 271 assert(exitHandler_); 272 assert(exceptionHandler_); 273 assert(defaultHandler_); 274 } 275 } 276 277 this(StrongAddress a) @trusted 278 in (!a.empty, "address is empty") { 279 state_ = ActorState.waiting; 280 281 addr = a; 282 addr.get.setOpen; 283 delayed = new typeof(delayed); 284 285 errorHandler_ = toDelegate(&defaultErrorHandler); 286 downHandler_ = null; 287 exitHandler_ = toDelegate(&defaultExitHandler); 288 exceptionHandler_ = toDelegate(&defaultExceptionHandler); 289 defaultHandler_ = toDelegate(&.defaultHandler); 290 } 291 292 WeakAddress address() @safe { 293 return addr.weakRef; 294 } 295 296 package ref StrongAddress addressRef() return @safe pure nothrow @nogc { 297 return addr; 298 } 299 300 ref System homeSystem() @safe pure nothrow @nogc { 301 return *homeSystem_; 302 } 303 304 /** Clean shutdown of the actor 305 * 306 * Stopping incoming messages from triggering new behavior and finish all 307 * awaited respones. 308 */ 309 void shutdown() @safe nothrow { 310 if (state_.among(ActorState.waiting, ActorState.active)) 311 state_ = ActorState.shutdown; 312 } 313 314 /** Force an immediate shutdown. 315 * 316 * Stopping incoming messages from triggering new behavior and finish all 317 * awaited respones. 318 */ 319 void forceShutdown() @safe nothrow { 320 if (state_.among(ActorState.waiting, ActorState.active, ActorState.shutdown)) 321 state_ = ActorState.forceShutdown; 322 } 323 324 ulong id() @safe pure nothrow const @nogc { 325 return addr.id; 326 } 327 328 /// Returns: the name of the actor. 329 string name() @safe pure nothrow const @nogc { 330 return name_; 331 } 332 333 // dfmt off 334 335 /// Set name name of the actor. 336 void name(string n) @safe pure nothrow @nogc { 337 this.name_ = n; 338 } 339 340 void errorHandler(ErrorHandler v) @safe pure nothrow @nogc { 341 errorHandler_ = v; 342 } 343 344 void downHandler(DownHandler v) @safe pure nothrow @nogc { 345 downHandler_ = v; 346 } 347 348 void exitHandler(ExitHandler v) @safe pure nothrow @nogc { 349 exitHandler_ = v; 350 } 351 352 void exceptionHandler(ExceptionHandler v) @safe pure nothrow @nogc { 353 exceptionHandler_ = v; 354 } 355 356 void defaultHandler(DefaultHandler v) @safe pure nothrow @nogc { 357 defaultHandler_ = v; 358 } 359 360 // dfmt on 361 362 package: 363 bool hasMessage() @safe pure nothrow @nogc { 364 return addr && addr.get.hasMessage; 365 } 366 367 /// How long until a delayed message or a timeout fires. 368 Duration nextTimeout(const SysTime now, const Duration default_) @safe { 369 return min(delayed.empty ? default_ : (delayed.front.triggerAt - now), 370 replyTimeouts.empty ? default_ : (replyTimeouts[0].timeout - now)); 371 } 372 373 bool waitingForReply() @safe pure nothrow const @nogc { 374 return !awaitedResponses.empty; 375 } 376 377 /// Number of messages that has been processed. 378 ulong messages() @safe pure nothrow const @nogc { 379 return messages_; 380 } 381 382 void setHomeSystem(System* sys) @safe pure nothrow @nogc { 383 homeSystem_ = sys; 384 } 385 386 void cleanupBehavior() @trusted nothrow { 387 foreach (ref a; incoming.byValue) { 388 try { 389 a.free; 390 } catch (Exception e) { 391 // TODO: call exceptionHandler? 392 } 393 } 394 incoming = null; 395 foreach (ref a; reqBehavior.byValue) { 396 try { 397 a.free; 398 } catch (Exception e) { 399 } 400 } 401 reqBehavior = null; 402 } 403 404 void cleanupAwait() @trusted nothrow { 405 foreach (ref a; awaitedResponses.byValue) { 406 try { 407 a.behavior.free; 408 } catch (Exception e) { 409 } 410 } 411 awaitedResponses = null; 412 } 413 414 void cleanupDelayed() @trusted nothrow { 415 foreach (const _; 0 .. delayed.length) { 416 try { 417 delayed.front.msg = Msg.init; 418 delayed.removeFront; 419 } catch (Exception e) { 420 } 421 } 422 .destroy(delayed); 423 } 424 425 bool isAlive() @safe pure nothrow const @nogc { 426 final switch (state_) { 427 case ActorState.waiting: 428 goto case; 429 case ActorState.active: 430 goto case; 431 case ActorState.shutdown: 432 goto case; 433 case ActorState.forceShutdown: 434 goto case; 435 case ActorState.finishShutdown: 436 return true; 437 case ActorState.stopped: 438 return false; 439 } 440 } 441 442 /// Accepting messages. 443 bool isAccepting() @safe pure nothrow const @nogc { 444 final switch (state_) { 445 case ActorState.waiting: 446 goto case; 447 case ActorState.active: 448 goto case; 449 case ActorState.shutdown: 450 return true; 451 case ActorState.forceShutdown: 452 goto case; 453 case ActorState.finishShutdown: 454 goto case; 455 case ActorState.stopped: 456 return false; 457 } 458 } 459 460 ulong replyId() @safe { 461 return nextReplyId++; 462 } 463 464 void process(const SysTime now) @safe nothrow { 465 import core.memory : GC; 466 467 assert(!GC.inFinalizer); 468 469 messages_ = 0; 470 471 void tick() { 472 // philosophy of the order is that a timeout should only trigger if it 473 // is really required thus it is checked last. This order then mean 474 // that a request may have triggered a timeout but because 475 // `processReply` is called before `checkReplyTimeout` it is *ignored*. 476 // Thus "better to accept even if it is timeout rather than fail". 477 // 478 // NOTE: the assumption that a message that has timed out should be 479 // processed turned out to be... wrong. It is annoying that 480 // sometimes a timeout message triggers even though it shouldn't, 481 // because it is now too old to be useful! 482 // Thus the order is changed to first check for timeout, then process. 483 try { 484 processSystemMsg(); 485 checkReplyTimeout(now); 486 processDelayed(now); 487 processIncoming(); 488 processReply(); 489 } catch (Exception e) { 490 exceptionHandler_(this, e); 491 } 492 } 493 494 assert(state_ == ActorState.stopped || addr, "no address"); 495 496 final switch (state_) { 497 case ActorState.waiting: 498 state_ = ActorState.active; 499 tick; 500 // the state can be changed before the actor have executed. 501 break; 502 case ActorState.active: 503 tick; 504 // self terminate if the actor has no behavior. 505 if (incoming.empty && awaitedResponses.empty && reqBehavior.empty) 506 state_ = ActorState.forceShutdown; 507 break; 508 case ActorState.shutdown: 509 tick; 510 if (awaitedResponses.empty) 511 state_ = ActorState.finishShutdown; 512 cleanupBehavior; 513 break; 514 case ActorState.forceShutdown: 515 state_ = ActorState.finishShutdown; 516 cleanupBehavior; 517 addr.get.setClosed; 518 break; 519 case ActorState.finishShutdown: 520 state_ = ActorState.stopped; 521 522 sendToMonitors(DownMsg(addr.weakRef, lastError)); 523 524 sendToLinks(ExitMsg(addr.weakRef, lastError)); 525 526 replyTimeouts = null; 527 cleanupDelayed; 528 cleanupAwait; 529 530 // must be last because sendToLinks and sendToMonitors uses addr. 531 addr.get.shutdown(); 532 addr.release; 533 break; 534 case ActorState.stopped: 535 break; 536 } 537 } 538 539 void sendToMonitors(DownMsg msg) @safe nothrow { 540 foreach (ref a; monitors.byValue) { 541 try { 542 if (auto rc = a.lock.get) 543 rc.put(SystemMsg(msg)); 544 a.release; 545 } catch (Exception e) { 546 } 547 } 548 549 monitors = null; 550 } 551 552 void sendToLinks(ExitMsg msg) @safe nothrow { 553 foreach (ref a; links.byValue) { 554 try { 555 if (auto rc = a.lock.get) 556 rc.put(SystemMsg(msg)); 557 a.release; 558 } catch (Exception e) { 559 } 560 } 561 562 links = null; 563 } 564 565 void checkReplyTimeout(const SysTime now) @safe { 566 if (replyTimeouts.empty) 567 return; 568 569 size_t removeTo; 570 foreach (const i; 0 .. replyTimeouts.length) { 571 if (now > replyTimeouts[i].timeout) { 572 const id = replyTimeouts[i].id; 573 if (auto v = id in awaitedResponses) { 574 messages_++; 575 v.onError(this, ErrorMsg(addr.weakRef, SystemError.requestTimeout)); 576 try { 577 () @trusted { v.behavior.free; }(); 578 } catch (Exception e) { 579 } 580 awaitedResponses.remove(id); 581 } 582 removeTo = i + 1; 583 } else { 584 break; 585 } 586 } 587 588 if (removeTo >= replyTimeouts.length) { 589 replyTimeouts = null; 590 } else if (removeTo != 0) { 591 replyTimeouts = replyTimeouts[removeTo .. $]; 592 } 593 } 594 595 void processIncoming() @safe { 596 if (addr.get.empty!Msg) 597 return; 598 messages_++; 599 600 auto front = addr.get.pop!Msg; 601 scope (exit) 602 .destroy(front); 603 604 void doSend(ref MsgOneShot msg) { 605 if (auto v = front.get.signature in incoming) { 606 (*v)(msg.data); 607 } else { 608 defaultHandler_(this, msg.data); 609 } 610 } 611 612 void doRequest(ref MsgRequest msg) @trusted { 613 if (auto v = front.get.signature in reqBehavior) { 614 (*v)(msg.data, msg.replyId, msg.replyTo); 615 } else { 616 defaultHandler_(this, msg.data); 617 } 618 } 619 620 front.get.type.match!((ref MsgOneShot a) { doSend(a); }, (ref MsgRequest a) { 621 doRequest(a); 622 }); 623 } 624 625 /** All system messages are handled. 626 * 627 * Assuming: 628 * * they are not heavy to process 629 * * very important that if there are any they should be handled as soon as possible 630 * * ignoring the case when there is a "storm" of system messages which 631 * "could" overload the actor system and lead to a crash. I classify this, 632 * for now, as intentional, malicious coding by the developer themself. 633 * External inputs that could trigger such a behavior should be controlled 634 * and limited. Other types of input such as a developer trying to break 635 * the actor system is out of scope. 636 */ 637 void processSystemMsg() @safe { 638 //() @trusted { 639 //logger.infof("run %X", cast(void*) &this); 640 //}(); 641 while (!addr.get.empty!SystemMsg) { 642 messages_++; 643 //logger.infof("%X %s %s", addr.toHash, state_, messages_); 644 auto front = addr.get.pop!SystemMsg; 645 scope (exit) 646 .destroy(front); 647 648 front.get.match!((ref DownMsg a) { 649 if (downHandler_) 650 downHandler_(this, a); 651 }, (ref MonitorRequest a) { monitors[a.addr.toHash] = a.addr; }, (ref DemonitorRequest a) { 652 if (auto v = a.addr.toHash in monitors) 653 v.release; 654 monitors.remove(a.addr.toHash); 655 }, (ref LinkRequest a) { links[a.addr.toHash] = a.addr; }, (ref UnlinkRequest a) { 656 if (auto v = a.addr.toHash in links) 657 v.release; 658 links.remove(a.addr.toHash); 659 }, (ref ErrorMsg a) { errorHandler_(this, a); }, (ref ExitMsg a) { 660 exitHandler_(this, a); 661 }, (ref SystemExitMsg a) { 662 final switch (a.reason) { 663 case ExitReason.normal: 664 break; 665 case ExitReason.unhandledException: 666 exitHandler_(this, ExitMsg.init); 667 break; 668 case ExitReason.unknown: 669 exitHandler_(this, ExitMsg.init); 670 break; 671 case ExitReason.userShutdown: 672 exitHandler_(this, ExitMsg.init); 673 break; 674 case ExitReason.kill: 675 exitHandler_(this, ExitMsg.init); 676 // the user do NOT have an option here 677 forceShutdown; 678 break; 679 } 680 }); 681 } 682 } 683 684 void processReply() @safe { 685 if (addr.get.empty!Reply) 686 return; 687 messages_++; 688 689 auto front = addr.get.pop!Reply; 690 scope (exit) 691 .destroy(front); 692 693 if (auto v = front.get.id in awaitedResponses) { 694 // TODO: reduce the lookups on front.id 695 v.behavior(front.get.data); 696 try { 697 () @trusted { v.behavior.free; }(); 698 } catch (Exception e) { 699 } 700 awaitedResponses.remove(front.get.id); 701 removeReplyTimeout(front.get.id); 702 } else { 703 // TODO: should probably be SystemError.unexpectedResponse? 704 //defaultHandler_(this, front.get.data); 705 // TODO: ignoring responses without a handler for now. It results 706 // in spam when the timeout triggers. 707 } 708 } 709 710 void processDelayed(const SysTime now) @trusted { 711 if (!addr.get.empty!DelayedMsg) { 712 // count as a message because handling them are "expensive". 713 // Ignoring the case that the message right away is moved to the 714 // incoming queue. This lead to "double accounting" but ohh well. 715 // Don't use delayedSend when you should have used send. 716 messages_++; 717 delayed.insert(addr.get.pop!DelayedMsg.unsafeMove); 718 } else if (delayed.empty) { 719 return; 720 } 721 722 foreach (const i; 0 .. delayed.length) { 723 if (now > delayed.front.triggerAt) { 724 addr.get.put(delayed.front.msg); 725 delayed.removeFront; 726 } else { 727 break; 728 } 729 } 730 } 731 732 private void removeReplyTimeout(ulong id) @safe nothrow { 733 import std.algorithm : remove; 734 735 foreach (const i; 0 .. replyTimeouts.length) { 736 if (replyTimeouts[i].id == id) { 737 remove(replyTimeouts, i); 738 break; 739 } 740 } 741 } 742 743 void register(ulong signature, Closure!(MsgHandler, void*) handler) @trusted { 744 if (!isAccepting) 745 return; 746 747 if (auto v = signature in incoming) { 748 try { 749 v.free; 750 } catch (Exception e) { 751 } 752 } 753 incoming[signature] = handler; 754 } 755 756 void register(ulong signature, Closure!(RequestHandler, void*) handler) @trusted { 757 if (!isAccepting) 758 return; 759 760 if (auto v = signature in reqBehavior) { 761 try { 762 v.free; 763 } catch (Exception e) { 764 } 765 } 766 reqBehavior[signature] = handler; 767 } 768 769 void register(ulong replyId, SysTime timeout, Closure!(ReplyHandler, 770 void*) reply, ErrorHandler onError) @safe { 771 if (!isAccepting) 772 return; 773 774 awaitedResponses[replyId] = AwaitReponse(reply, onError is null ? errorHandler_ : onError); 775 replyTimeouts ~= ReplyHandlerTimeout(replyId, timeout); 776 schwartzSort!(a => a.timeout, (a, b) => a < b)(replyTimeouts); 777 } 778 } 779 780 struct Closure(Fn, CtxT) { 781 alias FreeFn = void function(CtxT); 782 783 Fn fn; 784 CtxT ctx; 785 FreeFn cleanup; 786 787 this(Fn fn) { 788 this.fn = fn; 789 } 790 791 this(Fn fn, CtxT* ctx, FreeFn cleanup) { 792 this.fn = fn; 793 this.ctx = ctx; 794 this.cleanup = cleanup; 795 } 796 797 void opCall(Args...)(auto ref Args args) { 798 assert(fn !is null); 799 fn(ctx, args); 800 } 801 802 void free() { 803 // will crash, on purpuse, if there is a ctx and no cleanup registered. 804 // maybe a bad idea? dunno... lets see 805 if (ctx) 806 cleanup(ctx); 807 ctx = CtxT.init; 808 } 809 } 810 811 @("shall register a behavior to be called when msg received matching signature") 812 unittest { 813 auto addr = makeAddress2; 814 auto actor = Actor(addr); 815 816 bool processedIncoming; 817 void fn(void* ctx, ref Variant msg) { 818 processedIncoming = true; 819 } 820 821 actor.register(1, Closure!(MsgHandler, void*)(&fn)); 822 addr.get.put(Msg(1, MsgType(MsgOneShot(Variant(42))))); 823 824 actor.process(Clock.currTime); 825 826 assert(processedIncoming); 827 } 828 829 private void cleanupCtx(CtxT)(void* ctx) 830 if (is(CtxT == Tuple!T, T) || is(CtxT == void)) { 831 import std.traits; 832 import my.actor.typed; 833 834 static if (!is(CtxT == void)) { 835 // trust that any use of this also pass on the correct context type. 836 auto userCtx = () @trusted { return cast(CtxT*) ctx; }(); 837 // release the context such as if it holds a rc object. 838 alias Types = CtxT.Types; 839 840 static foreach (const i; 0 .. CtxT.Types.length) { 841 { 842 alias T = CtxT.Types[i]; 843 alias UT = Unqual!T; 844 static if (!is(T == UT)) { 845 static assert(!is(UT : WeakAddress), 846 "WeakAddress must NEVER be const or immutable"); 847 static assert(!is(UT : TypedAddress!M, M...), 848 "WeakAddress must NEVER be const or immutable: " ~ T.stringof); 849 } 850 // TODO: add a -version actor_ctx_diagnostic that prints when it is unable to deinit? 851 852 static if (is(UT == T)) { 853 .destroy((*userCtx)[i]); 854 } 855 } 856 } 857 } 858 } 859 860 @("shall default initialize when possible, skipping const/immutable") 861 unittest { 862 { 863 auto x = tuple(cast(const) 42, 43); 864 alias T = typeof(x); 865 cleanupCtx!T(cast(void*)&x); 866 assert(x[0] == 42); // can't assign to const 867 assert(x[1] == 0); 868 } 869 870 { 871 import my.path : Path; 872 873 auto x = tuple(Path.init, cast(const) Path("foo")); 874 alias T = typeof(x); 875 cleanupCtx!T(cast(void*)&x); 876 assert(x[0] == Path.init); 877 assert(x[1] == Path("foo")); 878 } 879 } 880 881 package struct Action { 882 Closure!(MsgHandler, void*) action; 883 ulong signature; 884 } 885 886 /// An behavior for an actor when it receive a message of `signature`. 887 package auto makeAction(T, CtxT = void)(T handler) @safe 888 if (isFunction!T || isFunctionPointer!T) { 889 static if (is(CtxT == void)) 890 alias Params = Parameters!T; 891 else { 892 alias CtxParam = Parameters!T[0]; 893 alias Params = Parameters!T[1 .. $]; 894 checkMatchingCtx!(CtxParam, CtxT); 895 checkRefForContext!handler; 896 } 897 898 alias HArgs = staticMap!(Unqual, Params); 899 900 void fn(void* ctx, ref Variant msg) @trusted { 901 static if (is(CtxT == void)) { 902 handler(msg.get!(Tuple!HArgs).expand); 903 } else { 904 auto userCtx = cast(CtxParam*) cast(CtxT*) ctx; 905 handler(*userCtx, msg.get!(Tuple!HArgs).expand); 906 } 907 } 908 909 return Action(typeof(Action.action)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs); 910 } 911 912 package Closure!(ReplyHandler, void*) makeReply(T, CtxT)(T handler) @safe { 913 static if (is(CtxT == void)) 914 alias Params = Parameters!T; 915 else { 916 alias CtxParam = Parameters!T[0]; 917 alias Params = Parameters!T[1 .. $]; 918 checkMatchingCtx!(CtxParam, CtxT); 919 checkRefForContext!handler; 920 } 921 922 alias HArgs = staticMap!(Unqual, Params); 923 924 void fn(void* ctx, ref Variant msg) @trusted { 925 static if (is(CtxT == void)) { 926 handler(msg.get!(Tuple!HArgs).expand); 927 } else { 928 auto userCtx = cast(CtxParam*) cast(CtxT*) ctx; 929 handler(*userCtx, msg.get!(Tuple!HArgs).expand); 930 } 931 } 932 933 return typeof(return)(&fn, null, &cleanupCtx!CtxT); 934 } 935 936 package struct Request { 937 Closure!(RequestHandler, void*) request; 938 ulong signature; 939 } 940 941 private string locToString(Loc...)() { 942 import std.conv : to; 943 944 return Loc[0] ~ ":" ~ Loc[1].to!string ~ ":" ~ Loc[2].to!string; 945 } 946 947 /// Check that the context parameter is `ref` otherwise issue a warning. 948 package void checkRefForContext(alias handler)() { 949 import std.traits : ParameterStorageClass, ParameterStorageClassTuple; 950 951 alias CtxParam = ParameterStorageClassTuple!(typeof(handler))[0]; 952 953 static if (CtxParam != ParameterStorageClass.ref_) { 954 pragma(msg, "INFO: handler type is " ~ typeof(handler).stringof); 955 static assert(CtxParam == ParameterStorageClass.ref_, 956 "The context must be `ref` to avoid unnecessary copying"); 957 } 958 } 959 960 package void checkMatchingCtx(CtxParam, CtxT)() { 961 static if (!is(CtxT == CtxParam)) { 962 static assert(__traits(compiles, { auto x = CtxParam(CtxT.init.expand); }), 963 "mismatch between the context type " ~ CtxT.stringof 964 ~ " and the first parameter " ~ CtxParam.stringof); 965 } 966 } 967 968 package auto makeRequest(T, CtxT = void)(T handler) @safe { 969 static assert(!is(ReturnType!T == void), "handler returns void, not allowed"); 970 971 alias RType = ReturnType!T; 972 enum isReqResult = is(RType : RequestResult!ReqT, ReqT); 973 enum isPromise = is(RType : Promise!PromT, PromT); 974 975 static if (is(CtxT == void)) 976 alias Params = Parameters!T; 977 else { 978 alias CtxParam = Parameters!T[0]; 979 alias Params = Parameters!T[1 .. $]; 980 checkMatchingCtx!(CtxParam, CtxT); 981 checkRefForContext!handler; 982 } 983 984 alias HArgs = staticMap!(Unqual, Params); 985 986 void fn(void* rawCtx, ref Variant msg, ulong replyId, WeakAddress replyTo) @trusted { 987 static if (is(CtxT == void)) { 988 auto r = handler(msg.get!(Tuple!HArgs).expand); 989 } else { 990 auto ctx = cast(CtxParam*) cast(CtxT*) rawCtx; 991 auto r = handler(*ctx, msg.get!(Tuple!HArgs).expand); 992 } 993 994 static if (isReqResult) { 995 r.value.match!((ErrorMsg a) { sendSystemMsg(replyTo, a); }, (Promise!ReqT a) { 996 assert(!a.data.empty, "the promise MUST be constructed before it is returned"); 997 a.data.get.replyId = replyId; 998 a.data.get.replyTo = replyTo; 999 }, (data) { 1000 enum wrapInTuple = !is(typeof(data) : Tuple!U, U); 1001 if (auto rc = replyTo.lock.get) { 1002 static if (wrapInTuple) 1003 rc.put(Reply(replyId, Variant(tuple(data)))); 1004 else 1005 rc.put(Reply(replyId, Variant(data))); 1006 } 1007 }); 1008 } else static if (isPromise) { 1009 r.data.get.replyId = replyId; 1010 r.data.get.replyTo = replyTo; 1011 } else { 1012 // TODO: is this syntax for U one variable or variable. I want it to be variable. 1013 enum wrapInTuple = !is(RType : Tuple!U, U); 1014 if (auto rc = replyTo.lock.get) { 1015 static if (wrapInTuple) 1016 rc.put(Reply(replyId, Variant(tuple(r)))); 1017 else 1018 rc.put(Reply(replyId, Variant(r))); 1019 } 1020 } 1021 } 1022 1023 return Request(typeof(Request.request)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs); 1024 } 1025 1026 @("shall link two actors lifetime") 1027 unittest { 1028 int count; 1029 void countExits(ref Actor self, ExitMsg msg) @safe nothrow { 1030 count++; 1031 self.shutdown; 1032 } 1033 1034 auto aa1 = Actor(makeAddress2); 1035 auto a1 = build(&aa1).set((int x) {}).exitHandler_(&countExits).finalize; 1036 auto aa2 = Actor(makeAddress2); 1037 auto a2 = build(&aa2).set((int x) {}).exitHandler_(&countExits).finalize; 1038 1039 a1.linkTo(a2.address); 1040 a1.process(Clock.currTime); 1041 a2.process(Clock.currTime); 1042 1043 assert(a1.isAlive); 1044 assert(a2.isAlive); 1045 1046 sendExit(a1.address, ExitReason.userShutdown); 1047 foreach (_; 0 .. 5) { 1048 a1.process(Clock.currTime); 1049 a2.process(Clock.currTime); 1050 } 1051 1052 assert(!a1.isAlive); 1053 assert(!a2.isAlive); 1054 assert(count == 2); 1055 } 1056 1057 @("shall let one actor monitor the lifetime of the other one") 1058 unittest { 1059 int count; 1060 void downMsg(ref Actor self, DownMsg msg) @safe nothrow { 1061 count++; 1062 } 1063 1064 auto aa1 = Actor(makeAddress2); 1065 auto a1 = build(&aa1).set((int x) {}).downHandler_(&downMsg).finalize; 1066 auto aa2 = Actor(makeAddress2); 1067 auto a2 = build(&aa2).set((int x) {}).finalize; 1068 1069 a1.monitor(a2.address); 1070 a1.process(Clock.currTime); 1071 a2.process(Clock.currTime); 1072 1073 assert(a1.isAlive); 1074 assert(a2.isAlive); 1075 1076 sendExit(a2.address, ExitReason.userShutdown); 1077 foreach (_; 0 .. 5) { 1078 a1.process(Clock.currTime); 1079 a2.process(Clock.currTime); 1080 } 1081 1082 assert(a1.isAlive); 1083 assert(!a2.isAlive); 1084 assert(count == 1); 1085 } 1086 1087 private struct BuildActor { 1088 Actor* actor; 1089 1090 Actor* finalize() @safe { 1091 auto rval = actor; 1092 actor = null; 1093 return rval; 1094 } 1095 1096 auto errorHandler(ErrorHandler a) { 1097 actor.errorHandler = a; 1098 return this; 1099 } 1100 1101 auto downHandler_(DownHandler a) { 1102 actor.downHandler_ = a; 1103 return this; 1104 } 1105 1106 auto exitHandler_(ExitHandler a) { 1107 actor.exitHandler_ = a; 1108 return this; 1109 } 1110 1111 auto exceptionHandler_(ExceptionHandler a) { 1112 actor.exceptionHandler_ = a; 1113 return this; 1114 } 1115 1116 auto defaultHandler_(DefaultHandler a) { 1117 actor.defaultHandler_ = a; 1118 return this; 1119 } 1120 1121 auto set(BehaviorT)(BehaviorT behavior) 1122 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT) 1123 && !is(ReturnType!BehaviorT == void)) { 1124 auto act = makeRequest(behavior); 1125 actor.register(act.signature, act.request); 1126 return this; 1127 } 1128 1129 auto set(BehaviorT, CT)(BehaviorT behavior, CT c) 1130 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT) 1131 && !is(ReturnType!BehaviorT == void)) { 1132 auto act = makeRequest!(BehaviorT, CT)(behavior); 1133 // for now just use the GC to allocate the context on. 1134 // TODO: use an allocator. 1135 act.request.ctx = cast(void*) new CT(c); 1136 actor.register(act.signature, act.request); 1137 return this; 1138 } 1139 1140 auto set(BehaviorT)(BehaviorT behavior) 1141 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT) 1142 && is(ReturnType!BehaviorT == void)) { 1143 auto act = makeAction(behavior); 1144 actor.register(act.signature, act.action); 1145 return this; 1146 } 1147 1148 auto set(BehaviorT, CT)(BehaviorT behavior, CT c) 1149 if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT) 1150 && is(ReturnType!BehaviorT == void)) { 1151 auto act = makeAction!(BehaviorT, CT)(behavior); 1152 // for now just use the GC to allocate the context on. 1153 // TODO: use an allocator. 1154 act.action.ctx = cast(void*) new CT(c); 1155 actor.register(act.signature, act.action); 1156 return this; 1157 } 1158 } 1159 1160 package BuildActor build(Actor* a) @safe { 1161 return BuildActor(a); 1162 } 1163 1164 /// Implement an actor. 1165 Actor* impl(Behavior...)(Actor* self, Behavior behaviors) { 1166 import my.actor.msg : isCapture, Capture; 1167 1168 auto bactor = build(self); 1169 static foreach (const i; 0 .. Behavior.length) { 1170 { 1171 alias b = Behavior[i]; 1172 1173 static if (!isCapture!b) { 1174 static if (!(isFunction!(b) || isFunctionPointer!(b))) 1175 static assert(0, "behavior may only be functions, not delgates: " ~ b.stringof); 1176 1177 static if (i + 1 < Behavior.length && isCapture!(Behavior[i + 1])) { 1178 bactor.set(behaviors[i], behaviors[i + 1]); 1179 } else 1180 bactor.set(behaviors[i]); 1181 } 1182 } 1183 } 1184 1185 return bactor.finalize; 1186 } 1187 1188 @("build dynamic actor from functions") 1189 unittest { 1190 static void fn3(int s) @safe { 1191 } 1192 1193 static string fn4(int s) @safe { 1194 return "foo"; 1195 } 1196 1197 static Tuple!(int, string) fn5(const string s) @safe { 1198 return typeof(return)(42, "hej"); 1199 } 1200 1201 auto aa1 = Actor(makeAddress2); 1202 auto a1 = build(&aa1).set(&fn3).set(&fn4).set(&fn5).finalize; 1203 } 1204 1205 unittest { 1206 bool delayOk; 1207 static void fn1(ref Tuple!(bool*, "delayOk") c, const string s) @safe { 1208 *c.delayOk = true; 1209 } 1210 1211 bool delayShouldNeverHappen; 1212 static void fn2(ref Tuple!(bool*, "delayShouldNeverHappen") c, int s) @safe { 1213 *c.delayShouldNeverHappen = true; 1214 } 1215 1216 auto aa1 = Actor(makeAddress2); 1217 auto actor = build(&aa1).set(&fn1, capture(&delayOk)).set(&fn2, 1218 capture(&delayShouldNeverHappen)).finalize; 1219 delayedSend(actor.address, Clock.currTime - 1.dur!"seconds", "foo"); 1220 delayedSend(actor.address, Clock.currTime + 1.dur!"hours", 42); 1221 1222 assert(!actor.addressRef.get.empty!DelayedMsg); 1223 assert(actor.addressRef.get.empty!Msg); 1224 assert(actor.addressRef.get.empty!Reply); 1225 1226 actor.process(Clock.currTime); 1227 1228 assert(!actor.addressRef.get.empty!DelayedMsg); 1229 assert(actor.addressRef.get.empty!Msg); 1230 assert(actor.addressRef.get.empty!Reply); 1231 1232 actor.process(Clock.currTime); 1233 actor.process(Clock.currTime); 1234 1235 assert(actor.addressRef.get.empty!DelayedMsg); 1236 assert(actor.addressRef.get.empty!Msg); 1237 assert(actor.addressRef.get.empty!Reply); 1238 1239 assert(delayOk); 1240 assert(!delayShouldNeverHappen); 1241 } 1242 1243 @("shall process a request->then chain xyz") 1244 @system unittest { 1245 // checking capture is correctly setup/teardown by using captured rc. 1246 1247 auto rcReq = refCounted(42); 1248 bool calledOk; 1249 static string fn(ref Tuple!(bool*, "calledOk", RefCounted!int) ctx, const string s, 1250 const string b) { 1251 assert(2 == ctx[1].refCount); 1252 if (s == "apa") 1253 *ctx.calledOk = true; 1254 return "foo"; 1255 } 1256 1257 auto rcReply = refCounted(42); 1258 bool calledReply; 1259 static void reply(ref Tuple!(bool*, RefCounted!int) ctx, const string s) { 1260 *ctx[0] = s == "foo"; 1261 assert(2 == ctx[1].refCount); 1262 } 1263 1264 auto aa1 = Actor(makeAddress2); 1265 auto actor = build(&aa1).set(&fn, capture(&calledOk, rcReq)).finalize; 1266 1267 assert(2 == rcReq.refCount); 1268 assert(1 == rcReply.refCount); 1269 1270 actor.request(actor.address, infTimeout).send("apa", "foo") 1271 .capture(&calledReply, rcReply).then(&reply); 1272 assert(2 == rcReply.refCount); 1273 1274 assert(!actor.addr.get.empty!Msg); 1275 assert(actor.addr.get.empty!Reply); 1276 1277 actor.process(Clock.currTime); 1278 assert(actor.addr.get.empty!Msg); 1279 assert(actor.addr.get.empty!Reply); 1280 1281 assert(2 == rcReq.refCount); 1282 assert(1 == rcReply.refCount, "after the message is consumed the refcount should go back"); 1283 1284 assert(calledOk); 1285 assert(calledReply); 1286 1287 actor.shutdown; 1288 while (actor.isAlive) 1289 actor.process(Clock.currTime); 1290 } 1291 1292 @("shall process a request->then chain using promises") 1293 unittest { 1294 static struct A { 1295 string v; 1296 } 1297 1298 static struct B { 1299 string v; 1300 } 1301 1302 int calledOk; 1303 auto fn1p = makePromise!string; 1304 static RequestResult!string fn1(ref Capture!(int*, "calledOk", Promise!string, "p") c, A a) @trusted { 1305 if (a.v == "apa") 1306 (*c.calledOk)++; 1307 return typeof(return)(c.p); 1308 } 1309 1310 auto fn2p = makePromise!string; 1311 static Promise!string fn2(ref Capture!(int*, "calledOk", Promise!string, "p") c, B a) { 1312 (*c.calledOk)++; 1313 return c.p; 1314 } 1315 1316 int calledReply; 1317 static void reply(ref Tuple!(int*) ctx, const string s) { 1318 if (s == "foo") 1319 *ctx[0] += 1; 1320 } 1321 1322 auto aa1 = Actor(makeAddress2); 1323 auto actor = build(&aa1).set(&fn1, capture(&calledOk, fn1p)).set(&fn2, 1324 capture(&calledOk, fn2p)).finalize; 1325 1326 actor.request(actor.address, infTimeout).send(A("apa")).capture(&calledReply).then(&reply); 1327 actor.request(actor.address, infTimeout).send(B("apa")).capture(&calledReply).then(&reply); 1328 1329 actor.process(Clock.currTime); 1330 assert(calledOk == 1); // first request 1331 assert(calledReply == 0); 1332 1333 fn1p.deliver("foo"); 1334 1335 assert(calledReply == 0); 1336 1337 actor.process(Clock.currTime); 1338 assert(calledOk == 2); // second request triggered 1339 assert(calledReply == 1); 1340 1341 fn2p.deliver("foo"); 1342 actor.process(Clock.currTime); 1343 1344 assert(calledReply == 2); 1345 1346 actor.shutdown; 1347 while (actor.isAlive) { 1348 actor.process(Clock.currTime); 1349 } 1350 } 1351 1352 /// The timeout triggered. 1353 class ScopedActorException : Exception { 1354 this(ScopedActorError err, string file = __FILE__, int line = __LINE__) @safe pure nothrow { 1355 super(null, file, line); 1356 error = err; 1357 } 1358 1359 ScopedActorError error; 1360 } 1361 1362 enum ScopedActorError : ubyte { 1363 none, 1364 // actor address is down 1365 down, 1366 // request timeout 1367 timeout, 1368 // the address where unable to process the received message 1369 unknownMsg, 1370 // some type of fatal error occured. 1371 fatal, 1372 } 1373 1374 /** Intended to be used in a local scope by a user. 1375 * 1376 * `ScopedActor` is not thread safe. 1377 */ 1378 struct ScopedActor { 1379 import my.actor.typed : underlyingAddress, underlyingWeakAddress; 1380 1381 private { 1382 static struct Data { 1383 Actor self; 1384 ScopedActorError errSt; 1385 1386 ~this() @safe { 1387 if (self.addr.empty) 1388 return; 1389 1390 () @trusted { 1391 self.downHandler = null; 1392 self.defaultHandler = toDelegate(&.defaultHandler); 1393 self.errorHandler = toDelegate(&defaultErrorHandler); 1394 }(); 1395 1396 self.shutdown; 1397 while (self.isAlive) { 1398 self.process(Clock.currTime); 1399 } 1400 } 1401 } 1402 1403 RefCounted!Data data; 1404 } 1405 1406 this(StrongAddress addr, string name) @safe { 1407 data = refCounted(Data(Actor(addr))); 1408 data.get.self.name = name; 1409 } 1410 1411 private void reset() @safe nothrow { 1412 data.get.errSt = ScopedActorError.none; 1413 } 1414 1415 SRequestSend request(TAddress)(TAddress requestTo, SysTime timeout) 1416 if (isAddress!TAddress) { 1417 reset; 1418 auto rs = .request(&data.get.self, underlyingWeakAddress(requestTo), timeout); 1419 return SRequestSend(rs, this); 1420 } 1421 1422 private static struct SRequestSend { 1423 RequestSend rs; 1424 ScopedActor self; 1425 1426 /// Copy constructor 1427 this(ref return typeof(this) rhs) @safe pure nothrow @nogc { 1428 rs = rhs.rs; 1429 self = rhs.self; 1430 } 1431 1432 @disable this(this); 1433 1434 SRequestSendThen send(Args...)(auto ref Args args) { 1435 return SRequestSendThen(.send(rs, args), self); 1436 } 1437 } 1438 1439 private static struct SRequestSendThen { 1440 RequestSendThen rs; 1441 ScopedActor self; 1442 uint backoff; 1443 1444 /// Copy constructor 1445 this(ref return typeof(this) rhs) { 1446 rs = rhs.rs; 1447 self = rhs.self; 1448 backoff = rhs.backoff; 1449 } 1450 1451 @disable this(this); 1452 1453 void dynIntervalSleep() @trusted { 1454 // +100 usecs "feels good", magic number. current OS and 1455 // implementation of message passing isn't that much faster than 1456 // 100us. A bit slow behavior, ehum, for a scoped actor is OK. They 1457 // aren't expected to be used for "time critical" sections. 1458 Thread.sleep(backoff.dur!"usecs"); 1459 backoff = min(backoff + 100, 20000); 1460 } 1461 1462 private static struct ValueCapture { 1463 RefCounted!Data data; 1464 1465 void downHandler(ref Actor, DownMsg) @safe nothrow { 1466 data.get.errSt = ScopedActorError.down; 1467 } 1468 1469 void errorHandler(ref Actor, ErrorMsg msg) @safe nothrow { 1470 if (msg.reason == SystemError.requestTimeout) 1471 data.get.errSt = ScopedActorError.timeout; 1472 else 1473 data.get.errSt = ScopedActorError.fatal; 1474 } 1475 1476 void unknownMsgHandler(ref Actor a, ref Variant msg) @safe nothrow { 1477 logAndDropHandler(a, msg); 1478 data.get.errSt = ScopedActorError.unknownMsg; 1479 } 1480 } 1481 1482 void then(T)(T handler, ErrorHandler onError = null) { 1483 scope (exit) 1484 demonitor(rs.rs.self, rs.rs.requestTo); 1485 monitor(rs.rs.self, rs.rs.requestTo); 1486 1487 auto callback = new ValueCapture(self.data); 1488 self.data.get.self.downHandler = &callback.downHandler; 1489 self.data.get.self.defaultHandler = &callback.unknownMsgHandler; 1490 self.data.get.self.errorHandler = &callback.errorHandler; 1491 1492 () @trusted { .thenUnsafe!(T, void)(rs, handler, null, onError); }(); 1493 1494 scope (exit) 1495 () @trusted { 1496 self.data.get.self.downHandler = null; 1497 self.data.get.self.defaultHandler = toDelegate(&.defaultHandler); 1498 self.data.get.self.errorHandler = toDelegate(&defaultErrorHandler); 1499 }(); 1500 1501 auto requestTo = rs.rs.requestTo.lock; 1502 if (!requestTo) 1503 throw new ScopedActorException(ScopedActorError.down); 1504 1505 // TODO: this loop is stupid... should use a conditional variable 1506 // instead but that requires changing the mailbox. later 1507 do { 1508 rs.rs.self.process(Clock.currTime); 1509 // force the actor to be alive even though there are no behaviors. 1510 rs.rs.self.state_ = ActorState.waiting; 1511 1512 if (self.data.get.errSt == ScopedActorError.none) { 1513 dynIntervalSleep; 1514 } else { 1515 throw new ScopedActorException(self.data.get.errSt); 1516 } 1517 1518 } 1519 while (self.data.get.self.waitingForReply); 1520 } 1521 } 1522 } 1523 1524 ScopedActor scopedActor(string file = __FILE__, uint line = __LINE__)() @safe { 1525 import std.format : format; 1526 1527 return ScopedActor(makeAddress2, format!"ScopedActor.%s:%s"(file, line)); 1528 } 1529 1530 @( 1531 "scoped actor shall throw an exception if the actor that is sent a request terminates or is closed") 1532 unittest { 1533 import my.actor.system; 1534 1535 auto sys = makeSystem; 1536 1537 auto a0 = sys.spawn((Actor* self) { 1538 return impl(self, (ref CSelf!() ctx, int x) { 1539 Thread.sleep(50.dur!"msecs"); 1540 return 42; 1541 }, capture(self), (ref CSelf!() ctx, double x) {}, capture(self), 1542 (ref CSelf!() ctx, string x) { ctx.self.shutdown; return 42; }, capture(self)); 1543 }); 1544 1545 { 1546 auto self = scopedActor; 1547 bool excThrown; 1548 auto stopAt = Clock.currTime + 3.dur!"seconds"; 1549 while (!excThrown && Clock.currTime < stopAt) { 1550 try { 1551 self.request(a0, delay(1.dur!"nsecs")).send(42).then((int x) {}); 1552 } catch (ScopedActorException e) { 1553 excThrown = e.error == ScopedActorError.timeout; 1554 } catch (Exception e) { 1555 logger.info(e.msg); 1556 } 1557 } 1558 assert(excThrown, "timeout did not trigger as expected"); 1559 } 1560 1561 { 1562 auto self = scopedActor; 1563 bool excThrown; 1564 auto stopAt = Clock.currTime + 3.dur!"seconds"; 1565 while (!excThrown && Clock.currTime < stopAt) { 1566 try { 1567 self.request(a0, delay(1.dur!"seconds")).send("hello").then((int x) { 1568 }); 1569 } catch (ScopedActorException e) { 1570 excThrown = e.error == ScopedActorError.down; 1571 } catch (Exception e) { 1572 logger.info(e.msg); 1573 } 1574 } 1575 assert(excThrown, "detecting terminated actor did not trigger as expected"); 1576 } 1577 }