1 /**
2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module my.actor.actor;
7 
8 import std.stdio : writeln, writefln;
9 
10 import core.thread : Thread;
11 import logger = std.experimental.logger;
12 import std.algorithm : schwartzSort, max, min, among;
13 import std.array : empty;
14 import std.datetime : SysTime, Clock, dur;
15 import std.exception : collectException;
16 import std.functional : toDelegate;
17 import std.meta : staticMap;
18 import std.traits : Parameters, Unqual, ReturnType, isFunctionPointer, isFunction;
19 import std.typecons : Tuple, tuple;
20 import std.variant : Variant;
21 
22 import my.actor.common : ExitReason, SystemError, makeSignature;
23 import my.actor.mailbox;
24 import my.actor.msg;
25 import my.actor.system : System;
26 import my.actor.typed : isTypedAddress, isTypedActorImpl;
27 import my.gc.refc;
28 import sumtype;
29 
30 private struct PromiseData {
31     WeakAddress replyTo;
32     ulong replyId;
33 
34     /// Copy constructor
35     this(ref return scope typeof(this) rhs) @safe nothrow @nogc {
36         replyTo = rhs.replyTo;
37         replyId = rhs.replyId;
38     }
39 
40     @disable this(this);
41 }
42 
43 // deliver can only be called one time.
44 struct Promise(T) {
45     package {
46         RefCounted!PromiseData data;
47     }
48 
49     void deliver(T reply) {
50         auto tmp = reply;
51         deliver(reply);
52     }
53 
54     /** Deliver the message `reply`.
55      *
56      * A promise can only be delivered once.
57      */
58     void deliver(ref T reply) @trusted
59     in (!data.empty, "promise must be initialized") {
60         if (data.empty)
61             return;
62         scope (exit)
63             data.release;
64 
65         // TODO: should probably call delivering actor with an ErrorMsg if replyTo is closed.
66         if (auto replyTo = data.get.replyTo.lock.get) {
67             enum wrapInTuple = !is(T : Tuple!U, U);
68             static if (wrapInTuple)
69                 replyTo.put(Reply(data.get.replyId, Variant(tuple(reply))));
70             else
71                 replyTo.put(Reply(data.get.replyId, Variant(reply)));
72         }
73     }
74 
75     void opAssign(Promise!T rhs) {
76         data = rhs.data;
77     }
78 
79     /// True if the promise is not initialized.
80     bool empty() {
81         return data.empty || data.get.replyId == 0;
82     }
83 
84     /// Clear the promise.
85     void clear() {
86         data.release;
87     }
88 }
89 
90 auto makePromise(T)() {
91     return Promise!T(refCounted(PromiseData.init));
92 }
93 
94 struct RequestResult(T) {
95     this(T v) {
96         value = typeof(value)(v);
97     }
98 
99     this(ErrorMsg v) {
100         value = typeof(value)(v);
101     }
102 
103     this(Promise!T v) {
104         value = typeof(value)(v);
105     }
106 
107     SumType!(T, ErrorMsg, Promise!T) value;
108 }
109 
110 private alias MsgHandler = void delegate(void* ctx, ref Variant msg) @safe;
111 private alias RequestHandler = void delegate(void* ctx, ref Variant msg,
112         ulong replyId, WeakAddress replyTo) @safe;
113 private alias ReplyHandler = void delegate(void* ctx, ref Variant msg) @safe;
114 
115 alias DefaultHandler = void delegate(ref Actor self, ref Variant msg) @safe nothrow;
116 
117 /** Actors send error messages to others by returning an error (see Errors)
118  * from a message handler. Similar to exit messages, error messages usually
119  * cause the receiving actor to terminate, unless a custom handler was
120  * installed. The default handler is used as fallback if request is used
121  * without error handler.
122  */
123 alias ErrorHandler = void delegate(ref Actor self, ErrorMsg) @safe nothrow;
124 
125 /** Bidirectional monitoring with a strong lifetime coupling is established by
126  * calling a `LinkRequest` to an address. This will cause the runtime to send
127  * an `ExitMsg` if either this or other dies. Per default, actors terminate
128  * after receiving an `ExitMsg` unless the exit reason is exit_reason::normal.
129  * This mechanism propagates failure states in an actor system. Linked actors
130  * form a sub system in which an error causes all actors to fail collectively.
131  */
132 alias ExitHandler = void delegate(ref Actor self, ExitMsg msg) @safe nothrow;
133 
134 /// An exception has been thrown while processing a message.
135 alias ExceptionHandler = void delegate(ref Actor self, Exception e) @safe nothrow;
136 
137 /** Actors can monitor the lifetime of other actors by sending a `MonitorRequest`
138  * to an address. This will cause the runtime system to send a `DownMsg` for
139  * other if it dies.
140  *
141  * Actors drop down messages unless they provide a custom handler.
142  */
143 alias DownHandler = void delegate(ref Actor self, DownMsg msg) @safe nothrow;
144 
145 void defaultHandler(ref Actor self, ref Variant msg) @safe nothrow {
146 }
147 
148 /// Write the name of the actor and the message type to the console.
149 void logAndDropHandler(ref Actor self, ref Variant msg) @trusted nothrow {
150     import std.stdio : writeln;
151 
152     try {
153         writeln("UNKNOWN message sent to actor ", self.name);
154         writeln(msg.toString);
155     } catch (Exception e) {
156     }
157 }
158 
159 void defaultErrorHandler(ref Actor self, ErrorMsg msg) @safe nothrow {
160     self.lastError = msg.reason;
161     self.shutdown;
162 }
163 
164 void defaultExitHandler(ref Actor self, ExitMsg msg) @safe nothrow {
165     self.lastError = msg.reason;
166     self.forceShutdown;
167 }
168 
169 void defaultExceptionHandler(ref Actor self, Exception e) @safe nothrow {
170     self.lastError = SystemError.runtimeError;
171     // TODO: should log?
172     self.forceShutdown;
173 }
174 
175 // Write the name of the actor and the exception to stdout.
176 void logExceptionHandler(ref Actor self, Exception e) @safe nothrow {
177     import std.stdio : writeln;
178 
179     self.lastError = SystemError.runtimeError;
180 
181     try {
182         writeln("EXCEPTION thrown by actor ", self.name);
183         writeln(e.msg);
184         writeln("TERMINATING");
185     } catch (Exception e) {
186     }
187 
188     self.forceShutdown;
189 }
190 
191 /// Timeout for an outstanding request.
192 struct ReplyHandlerTimeout {
193     ulong id;
194     SysTime timeout;
195 }
196 
197 package enum ActorState {
198     /// waiting to be started.
199     waiting,
200     /// active and processing messages.
201     active,
202     /// wait for all awaited responses to finish
203     shutdown,
204     /// discard also the awaite responses, just shutdown fast
205     forceShutdown,
206     /// in process of shutting down
207     finishShutdown,
208     /// stopped.
209     stopped,
210 }
211 
212 private struct AwaitReponse {
213     Closure!(ReplyHandler, void*) behavior;
214     ErrorHandler onError;
215 }
216 
217 struct Actor {
218     import std.container.rbtree : RedBlackTree, redBlackTree;
219 
220     package StrongAddress addr;
221     // visible in the package for logging purpose.
222     package ActorState state_ = ActorState.stopped;
223 
224     private {
225         // TODO: rename to behavior.
226         Closure!(MsgHandler, void*)[ulong] incoming;
227         Closure!(RequestHandler, void*)[ulong] reqBehavior;
228 
229         // callbacks for awaited responses key:ed on their id.
230         AwaitReponse[ulong] awaitedResponses;
231         ReplyHandlerTimeout[] replyTimeouts;
232 
233         // important that it start at 1 because then zero is known to not be initialized.
234         ulong nextReplyId = 1;
235 
236         /// Delayed messages ordered by their trigger time.
237         RedBlackTree!(DelayedMsg*, "a.triggerAt < b.triggerAt", true) delayed;
238 
239         /// Used during shutdown to signal monitors and links why this actor is terminating.
240         SystemError lastError;
241 
242         /// monitoring the actor lifetime.
243         WeakAddress[size_t] monitors;
244 
245         /// strong, bidirectional link of the actors lifetime.
246         WeakAddress[size_t] links;
247 
248         // Number of messages that has been processed.
249         ulong messages_;
250 
251         /// System the actor belongs to.
252         System* homeSystem_;
253 
254         string name_;
255 
256         ErrorHandler errorHandler_;
257 
258         /// callback when a link goes down.
259         DownHandler downHandler_;
260 
261         ExitHandler exitHandler_;
262 
263         ExceptionHandler exceptionHandler_;
264 
265         DefaultHandler defaultHandler_;
266     }
267 
268     invariant () {
269         if (addr && !state_.among(ActorState.waiting, ActorState.shutdown)) {
270             assert(errorHandler_);
271             assert(exitHandler_);
272             assert(exceptionHandler_);
273             assert(defaultHandler_);
274         }
275     }
276 
277     this(StrongAddress a) @trusted
278     in (!a.empty, "address is empty") {
279         state_ = ActorState.waiting;
280 
281         addr = a;
282         addr.get.setOpen;
283         delayed = new typeof(delayed);
284 
285         errorHandler_ = toDelegate(&defaultErrorHandler);
286         downHandler_ = null;
287         exitHandler_ = toDelegate(&defaultExitHandler);
288         exceptionHandler_ = toDelegate(&defaultExceptionHandler);
289         defaultHandler_ = toDelegate(&.defaultHandler);
290     }
291 
292     WeakAddress address() @safe {
293         return addr.weakRef;
294     }
295 
296     package ref StrongAddress addressRef() return @safe pure nothrow @nogc {
297         return addr;
298     }
299 
300     ref System homeSystem() @safe pure nothrow @nogc {
301         return *homeSystem_;
302     }
303 
304     /** Clean shutdown of the actor
305      *
306      * Stopping incoming messages from triggering new behavior and finish all
307      * awaited respones.
308      */
309     void shutdown() @safe nothrow {
310         if (state_.among(ActorState.waiting, ActorState.active))
311             state_ = ActorState.shutdown;
312     }
313 
314     /** Force an immediate shutdown.
315      *
316      * Stopping incoming messages from triggering new behavior and finish all
317      * awaited respones.
318      */
319     void forceShutdown() @safe nothrow {
320         if (state_.among(ActorState.waiting, ActorState.active, ActorState.shutdown))
321             state_ = ActorState.forceShutdown;
322     }
323 
324     ulong id() @safe pure nothrow const @nogc {
325         return addr.id;
326     }
327 
328     /// Returns: the name of the actor.
329     string name() @safe pure nothrow const @nogc {
330         return name_;
331     }
332 
333     // dfmt off
334 
335     /// Set name name of the actor.
336     void name(string n) @safe pure nothrow @nogc {
337         this.name_ = n;
338     }
339 
340     void errorHandler(ErrorHandler v) @safe pure nothrow @nogc {
341         errorHandler_ = v;
342     }
343 
344     void downHandler(DownHandler v) @safe pure nothrow @nogc {
345         downHandler_ = v;
346     }
347 
348     void exitHandler(ExitHandler v) @safe pure nothrow @nogc {
349         exitHandler_ = v;
350     }
351 
352     void exceptionHandler(ExceptionHandler v) @safe pure nothrow @nogc {
353         exceptionHandler_ = v;
354     }
355 
356     void defaultHandler(DefaultHandler v) @safe pure nothrow @nogc {
357         defaultHandler_ = v;
358     }
359 
360     // dfmt on
361 
362 package:
363     bool hasMessage() @safe pure nothrow @nogc {
364         return addr && addr.get.hasMessage;
365     }
366 
367     /// How long until a delayed message or a timeout fires.
368     Duration nextTimeout(const SysTime now, const Duration default_) @safe {
369         return min(delayed.empty ? default_ : (delayed.front.triggerAt - now),
370                 replyTimeouts.empty ? default_ : (replyTimeouts[0].timeout - now));
371     }
372 
373     bool waitingForReply() @safe pure nothrow const @nogc {
374         return !awaitedResponses.empty;
375     }
376 
377     /// Number of messages that has been processed.
378     ulong messages() @safe pure nothrow const @nogc {
379         return messages_;
380     }
381 
382     void setHomeSystem(System* sys) @safe pure nothrow @nogc {
383         homeSystem_ = sys;
384     }
385 
386     void cleanupBehavior() @trusted nothrow {
387         foreach (ref a; incoming.byValue) {
388             try {
389                 a.free;
390             } catch (Exception e) {
391                 // TODO: call exceptionHandler?
392             }
393         }
394         incoming = null;
395         foreach (ref a; reqBehavior.byValue) {
396             try {
397                 a.free;
398             } catch (Exception e) {
399             }
400         }
401         reqBehavior = null;
402     }
403 
404     void cleanupAwait() @trusted nothrow {
405         foreach (ref a; awaitedResponses.byValue) {
406             try {
407                 a.behavior.free;
408             } catch (Exception e) {
409             }
410         }
411         awaitedResponses = null;
412     }
413 
414     void cleanupDelayed() @trusted nothrow {
415         foreach (const _; 0 .. delayed.length) {
416             try {
417                 delayed.front.msg = Msg.init;
418                 delayed.removeFront;
419             } catch (Exception e) {
420             }
421         }
422         .destroy(delayed);
423     }
424 
425     bool isAlive() @safe pure nothrow const @nogc {
426         final switch (state_) {
427         case ActorState.waiting:
428             goto case;
429         case ActorState.active:
430             goto case;
431         case ActorState.shutdown:
432             goto case;
433         case ActorState.forceShutdown:
434             goto case;
435         case ActorState.finishShutdown:
436             return true;
437         case ActorState.stopped:
438             return false;
439         }
440     }
441 
442     /// Accepting messages.
443     bool isAccepting() @safe pure nothrow const @nogc {
444         final switch (state_) {
445         case ActorState.waiting:
446             goto case;
447         case ActorState.active:
448             goto case;
449         case ActorState.shutdown:
450             return true;
451         case ActorState.forceShutdown:
452             goto case;
453         case ActorState.finishShutdown:
454             goto case;
455         case ActorState.stopped:
456             return false;
457         }
458     }
459 
460     ulong replyId() @safe {
461         return nextReplyId++;
462     }
463 
464     void process(const SysTime now) @safe nothrow {
465         import core.memory : GC;
466 
467         assert(!GC.inFinalizer);
468 
469         messages_ = 0;
470 
471         void tick() {
472             // philosophy of the order is that a timeout should only trigger if it
473             // is really required thus it is checked last. This order then mean
474             // that a request may have triggered a timeout but because
475             // `processReply` is called before `checkReplyTimeout` it is *ignored*.
476             // Thus "better to accept even if it is timeout rather than fail".
477             //
478             // NOTE: the assumption that a message that has timed out should be
479             // processed turned out to be... wrong. It is annoying that
480             // sometimes a timeout message triggers even though it shouldn't,
481             // because it is now too old to be useful!
482             // Thus the order is changed to first check for timeout, then process.
483             try {
484                 processSystemMsg();
485                 checkReplyTimeout(now);
486                 processDelayed(now);
487                 processIncoming();
488                 processReply();
489             } catch (Exception e) {
490                 exceptionHandler_(this, e);
491             }
492         }
493 
494         assert(state_ == ActorState.stopped || addr, "no address");
495 
496         final switch (state_) {
497         case ActorState.waiting:
498             state_ = ActorState.active;
499             tick;
500             // the state can be changed before the actor have executed.
501             break;
502         case ActorState.active:
503             tick;
504             // self terminate if the actor has no behavior.
505             if (incoming.empty && awaitedResponses.empty && reqBehavior.empty)
506                 state_ = ActorState.forceShutdown;
507             break;
508         case ActorState.shutdown:
509             tick;
510             if (awaitedResponses.empty)
511                 state_ = ActorState.finishShutdown;
512             cleanupBehavior;
513             break;
514         case ActorState.forceShutdown:
515             state_ = ActorState.finishShutdown;
516             cleanupBehavior;
517             addr.get.setClosed;
518             break;
519         case ActorState.finishShutdown:
520             state_ = ActorState.stopped;
521 
522             sendToMonitors(DownMsg(addr.weakRef, lastError));
523 
524             sendToLinks(ExitMsg(addr.weakRef, lastError));
525 
526             replyTimeouts = null;
527             cleanupDelayed;
528             cleanupAwait;
529 
530             // must be last because sendToLinks and sendToMonitors uses addr.
531             addr.get.shutdown();
532             addr.release;
533             break;
534         case ActorState.stopped:
535             break;
536         }
537     }
538 
539     void sendToMonitors(DownMsg msg) @safe nothrow {
540         foreach (ref a; monitors.byValue) {
541             try {
542                 if (auto rc = a.lock.get)
543                     rc.put(SystemMsg(msg));
544                 a.release;
545             } catch (Exception e) {
546             }
547         }
548 
549         monitors = null;
550     }
551 
552     void sendToLinks(ExitMsg msg) @safe nothrow {
553         foreach (ref a; links.byValue) {
554             try {
555                 if (auto rc = a.lock.get)
556                     rc.put(SystemMsg(msg));
557                 a.release;
558             } catch (Exception e) {
559             }
560         }
561 
562         links = null;
563     }
564 
565     void checkReplyTimeout(const SysTime now) @safe {
566         if (replyTimeouts.empty)
567             return;
568 
569         size_t removeTo;
570         foreach (const i; 0 .. replyTimeouts.length) {
571             if (now > replyTimeouts[i].timeout) {
572                 const id = replyTimeouts[i].id;
573                 if (auto v = id in awaitedResponses) {
574                     messages_++;
575                     v.onError(this, ErrorMsg(addr.weakRef, SystemError.requestTimeout));
576                     try {
577                         () @trusted { v.behavior.free; }();
578                     } catch (Exception e) {
579                     }
580                     awaitedResponses.remove(id);
581                 }
582                 removeTo = i + 1;
583             } else {
584                 break;
585             }
586         }
587 
588         if (removeTo >= replyTimeouts.length) {
589             replyTimeouts = null;
590         } else if (removeTo != 0) {
591             replyTimeouts = replyTimeouts[removeTo .. $];
592         }
593     }
594 
595     void processIncoming() @safe {
596         if (addr.get.empty!Msg)
597             return;
598         messages_++;
599 
600         auto front = addr.get.pop!Msg;
601         scope (exit)
602             .destroy(front);
603 
604         void doSend(ref MsgOneShot msg) {
605             if (auto v = front.get.signature in incoming) {
606                 (*v)(msg.data);
607             } else {
608                 defaultHandler_(this, msg.data);
609             }
610         }
611 
612         void doRequest(ref MsgRequest msg) @trusted {
613             if (auto v = front.get.signature in reqBehavior) {
614                 (*v)(msg.data, msg.replyId, msg.replyTo);
615             } else {
616                 defaultHandler_(this, msg.data);
617             }
618         }
619 
620         front.get.type.match!((ref MsgOneShot a) { doSend(a); }, (ref MsgRequest a) {
621             doRequest(a);
622         });
623     }
624 
625     /** All system messages are handled.
626      *
627      * Assuming:
628      *  * they are not heavy to process
629      *  * very important that if there are any they should be handled as soon as possible
630      *  * ignoring the case when there is a "storm" of system messages which
631      *    "could" overload the actor system and lead to a crash. I classify this,
632      *    for now, as intentional, malicious coding by the developer themself.
633      *    External inputs that could trigger such a behavior should be controlled
634      *    and limited. Other types of input such as a developer trying to break
635      *    the actor system is out of scope.
636      */
637     void processSystemMsg() @safe {
638         //() @trusted {
639         //logger.infof("run %X", cast(void*) &this);
640         //}();
641         while (!addr.get.empty!SystemMsg) {
642             messages_++;
643             //logger.infof("%X %s %s", addr.toHash, state_, messages_);
644             auto front = addr.get.pop!SystemMsg;
645             scope (exit)
646                 .destroy(front);
647 
648             front.get.match!((ref DownMsg a) {
649                 if (downHandler_)
650                     downHandler_(this, a);
651             }, (ref MonitorRequest a) { monitors[a.addr.toHash] = a.addr; }, (ref DemonitorRequest a) {
652                 if (auto v = a.addr.toHash in monitors)
653                     v.release;
654                 monitors.remove(a.addr.toHash);
655             }, (ref LinkRequest a) { links[a.addr.toHash] = a.addr; }, (ref UnlinkRequest a) {
656                 if (auto v = a.addr.toHash in links)
657                     v.release;
658                 links.remove(a.addr.toHash);
659             }, (ref ErrorMsg a) { errorHandler_(this, a); }, (ref ExitMsg a) {
660                 exitHandler_(this, a);
661             }, (ref SystemExitMsg a) {
662                 final switch (a.reason) {
663                 case ExitReason.normal:
664                     break;
665                 case ExitReason.unhandledException:
666                     exitHandler_(this, ExitMsg.init);
667                     break;
668                 case ExitReason.unknown:
669                     exitHandler_(this, ExitMsg.init);
670                     break;
671                 case ExitReason.userShutdown:
672                     exitHandler_(this, ExitMsg.init);
673                     break;
674                 case ExitReason.kill:
675                     exitHandler_(this, ExitMsg.init);
676                     // the user do NOT have an option here
677                     forceShutdown;
678                     break;
679                 }
680             });
681         }
682     }
683 
684     void processReply() @safe {
685         if (addr.get.empty!Reply)
686             return;
687         messages_++;
688 
689         auto front = addr.get.pop!Reply;
690         scope (exit)
691             .destroy(front);
692 
693         if (auto v = front.get.id in awaitedResponses) {
694             // TODO: reduce the lookups on front.id
695             v.behavior(front.get.data);
696             try {
697                 () @trusted { v.behavior.free; }();
698             } catch (Exception e) {
699             }
700             awaitedResponses.remove(front.get.id);
701             removeReplyTimeout(front.get.id);
702         } else {
703             // TODO: should probably be SystemError.unexpectedResponse?
704             //defaultHandler_(this, front.get.data);
705             // TODO: ignoring responses without a handler for now. It results
706             // in spam when the timeout triggers.
707         }
708     }
709 
710     void processDelayed(const SysTime now) @trusted {
711         if (!addr.get.empty!DelayedMsg) {
712             // count as a message because handling them are "expensive".
713             // Ignoring the case that the message right away is moved to the
714             // incoming queue. This lead to "double accounting" but ohh well.
715             // Don't use delayedSend when you should have used send.
716             messages_++;
717             delayed.insert(addr.get.pop!DelayedMsg.unsafeMove);
718         } else if (delayed.empty) {
719             return;
720         }
721 
722         foreach (const i; 0 .. delayed.length) {
723             if (now > delayed.front.triggerAt) {
724                 addr.get.put(delayed.front.msg);
725                 delayed.removeFront;
726             } else {
727                 break;
728             }
729         }
730     }
731 
732     private void removeReplyTimeout(ulong id) @safe nothrow {
733         import std.algorithm : remove;
734 
735         foreach (const i; 0 .. replyTimeouts.length) {
736             if (replyTimeouts[i].id == id) {
737                 remove(replyTimeouts, i);
738                 break;
739             }
740         }
741     }
742 
743     void register(ulong signature, Closure!(MsgHandler, void*) handler) @trusted {
744         if (!isAccepting)
745             return;
746 
747         if (auto v = signature in incoming) {
748             try {
749                 v.free;
750             } catch (Exception e) {
751             }
752         }
753         incoming[signature] = handler;
754     }
755 
756     void register(ulong signature, Closure!(RequestHandler, void*) handler) @trusted {
757         if (!isAccepting)
758             return;
759 
760         if (auto v = signature in reqBehavior) {
761             try {
762                 v.free;
763             } catch (Exception e) {
764             }
765         }
766         reqBehavior[signature] = handler;
767     }
768 
769     void register(ulong replyId, SysTime timeout, Closure!(ReplyHandler,
770             void*) reply, ErrorHandler onError) @safe {
771         if (!isAccepting)
772             return;
773 
774         awaitedResponses[replyId] = AwaitReponse(reply, onError is null ? errorHandler_ : onError);
775         replyTimeouts ~= ReplyHandlerTimeout(replyId, timeout);
776         schwartzSort!(a => a.timeout, (a, b) => a < b)(replyTimeouts);
777     }
778 }
779 
780 struct Closure(Fn, CtxT) {
781     alias FreeFn = void function(CtxT);
782 
783     Fn fn;
784     CtxT ctx;
785     FreeFn cleanup;
786 
787     this(Fn fn) {
788         this.fn = fn;
789     }
790 
791     this(Fn fn, CtxT* ctx, FreeFn cleanup) {
792         this.fn = fn;
793         this.ctx = ctx;
794         this.cleanup = cleanup;
795     }
796 
797     void opCall(Args...)(auto ref Args args) {
798         assert(fn !is null);
799         fn(ctx, args);
800     }
801 
802     void free() {
803         // will crash, on purpuse, if there is a ctx and no cleanup registered.
804         // maybe a bad idea? dunno... lets see
805         if (ctx)
806             cleanup(ctx);
807         ctx = CtxT.init;
808     }
809 }
810 
811 @("shall register a behavior to be called when msg received matching signature")
812 unittest {
813     auto addr = makeAddress2;
814     auto actor = Actor(addr);
815 
816     bool processedIncoming;
817     void fn(void* ctx, ref Variant msg) {
818         processedIncoming = true;
819     }
820 
821     actor.register(1, Closure!(MsgHandler, void*)(&fn));
822     addr.get.put(Msg(1, MsgType(MsgOneShot(Variant(42)))));
823 
824     actor.process(Clock.currTime);
825 
826     assert(processedIncoming);
827 }
828 
829 private void cleanupCtx(CtxT)(void* ctx)
830         if (is(CtxT == Tuple!T, T) || is(CtxT == void)) {
831     import std.traits;
832     import my.actor.typed;
833 
834     static if (!is(CtxT == void)) {
835         // trust that any use of this also pass on the correct context type.
836         auto userCtx = () @trusted { return cast(CtxT*) ctx; }();
837         // release the context such as if it holds a rc object.
838         alias Types = CtxT.Types;
839 
840         static foreach (const i; 0 .. CtxT.Types.length) {
841             {
842                 alias T = CtxT.Types[i];
843                 alias UT = Unqual!T;
844                 static if (!is(T == UT)) {
845                     static assert(!is(UT : WeakAddress),
846                             "WeakAddress must NEVER be const or immutable");
847                     static assert(!is(UT : TypedAddress!M, M...),
848                             "WeakAddress must NEVER be const or immutable: " ~ T.stringof);
849                 }
850                 // TODO: add a -version actor_ctx_diagnostic that prints when it is unable to deinit?
851 
852                 static if (is(UT == T)) {
853                     .destroy((*userCtx)[i]);
854                 }
855             }
856         }
857     }
858 }
859 
860 @("shall default initialize when possible, skipping const/immutable")
861 unittest {
862     {
863         auto x = tuple(cast(const) 42, 43);
864         alias T = typeof(x);
865         cleanupCtx!T(cast(void*)&x);
866         assert(x[0] == 42); // can't assign to const
867         assert(x[1] == 0);
868     }
869 
870     {
871         import my.path : Path;
872 
873         auto x = tuple(Path.init, cast(const) Path("foo"));
874         alias T = typeof(x);
875         cleanupCtx!T(cast(void*)&x);
876         assert(x[0] == Path.init);
877         assert(x[1] == Path("foo"));
878     }
879 }
880 
881 package struct Action {
882     Closure!(MsgHandler, void*) action;
883     ulong signature;
884 }
885 
886 /// An behavior for an actor when it receive a message of `signature`.
887 package auto makeAction(T, CtxT = void)(T handler) @safe
888         if (isFunction!T || isFunctionPointer!T) {
889     static if (is(CtxT == void))
890         alias Params = Parameters!T;
891     else {
892         alias CtxParam = Parameters!T[0];
893         alias Params = Parameters!T[1 .. $];
894         checkMatchingCtx!(CtxParam, CtxT);
895         checkRefForContext!handler;
896     }
897 
898     alias HArgs = staticMap!(Unqual, Params);
899 
900     void fn(void* ctx, ref Variant msg) @trusted {
901         static if (is(CtxT == void)) {
902             handler(msg.get!(Tuple!HArgs).expand);
903         } else {
904             auto userCtx = cast(CtxParam*) cast(CtxT*) ctx;
905             handler(*userCtx, msg.get!(Tuple!HArgs).expand);
906         }
907     }
908 
909     return Action(typeof(Action.action)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs);
910 }
911 
912 package Closure!(ReplyHandler, void*) makeReply(T, CtxT)(T handler) @safe {
913     static if (is(CtxT == void))
914         alias Params = Parameters!T;
915     else {
916         alias CtxParam = Parameters!T[0];
917         alias Params = Parameters!T[1 .. $];
918         checkMatchingCtx!(CtxParam, CtxT);
919         checkRefForContext!handler;
920     }
921 
922     alias HArgs = staticMap!(Unqual, Params);
923 
924     void fn(void* ctx, ref Variant msg) @trusted {
925         static if (is(CtxT == void)) {
926             handler(msg.get!(Tuple!HArgs).expand);
927         } else {
928             auto userCtx = cast(CtxParam*) cast(CtxT*) ctx;
929             handler(*userCtx, msg.get!(Tuple!HArgs).expand);
930         }
931     }
932 
933     return typeof(return)(&fn, null, &cleanupCtx!CtxT);
934 }
935 
936 package struct Request {
937     Closure!(RequestHandler, void*) request;
938     ulong signature;
939 }
940 
941 private string locToString(Loc...)() {
942     import std.conv : to;
943 
944     return Loc[0] ~ ":" ~ Loc[1].to!string ~ ":" ~ Loc[2].to!string;
945 }
946 
947 /// Check that the context parameter is `ref` otherwise issue a warning.
948 package void checkRefForContext(alias handler)() {
949     import std.traits : ParameterStorageClass, ParameterStorageClassTuple;
950 
951     alias CtxParam = ParameterStorageClassTuple!(typeof(handler))[0];
952 
953     static if (CtxParam != ParameterStorageClass.ref_) {
954         pragma(msg, "INFO: handler type is " ~ typeof(handler).stringof);
955         static assert(CtxParam == ParameterStorageClass.ref_,
956                 "The context must be `ref` to avoid unnecessary copying");
957     }
958 }
959 
960 package void checkMatchingCtx(CtxParam, CtxT)() {
961     static if (!is(CtxT == CtxParam)) {
962         static assert(__traits(compiles, { auto x = CtxParam(CtxT.init.expand); }),
963                 "mismatch between the context type " ~ CtxT.stringof
964                 ~ " and the first parameter " ~ CtxParam.stringof);
965     }
966 }
967 
968 package auto makeRequest(T, CtxT = void)(T handler) @safe {
969     static assert(!is(ReturnType!T == void), "handler returns void, not allowed");
970 
971     alias RType = ReturnType!T;
972     enum isReqResult = is(RType : RequestResult!ReqT, ReqT);
973     enum isPromise = is(RType : Promise!PromT, PromT);
974 
975     static if (is(CtxT == void))
976         alias Params = Parameters!T;
977     else {
978         alias CtxParam = Parameters!T[0];
979         alias Params = Parameters!T[1 .. $];
980         checkMatchingCtx!(CtxParam, CtxT);
981         checkRefForContext!handler;
982     }
983 
984     alias HArgs = staticMap!(Unqual, Params);
985 
986     void fn(void* rawCtx, ref Variant msg, ulong replyId, WeakAddress replyTo) @trusted {
987         static if (is(CtxT == void)) {
988             auto r = handler(msg.get!(Tuple!HArgs).expand);
989         } else {
990             auto ctx = cast(CtxParam*) cast(CtxT*) rawCtx;
991             auto r = handler(*ctx, msg.get!(Tuple!HArgs).expand);
992         }
993 
994         static if (isReqResult) {
995             r.value.match!((ErrorMsg a) { sendSystemMsg(replyTo, a); }, (Promise!ReqT a) {
996                 assert(!a.data.empty, "the promise MUST be constructed before it is returned");
997                 a.data.get.replyId = replyId;
998                 a.data.get.replyTo = replyTo;
999             }, (data) {
1000                 enum wrapInTuple = !is(typeof(data) : Tuple!U, U);
1001                 if (auto rc = replyTo.lock.get) {
1002                     static if (wrapInTuple)
1003                         rc.put(Reply(replyId, Variant(tuple(data))));
1004                     else
1005                         rc.put(Reply(replyId, Variant(data)));
1006                 }
1007             });
1008         } else static if (isPromise) {
1009             r.data.get.replyId = replyId;
1010             r.data.get.replyTo = replyTo;
1011         } else {
1012             // TODO: is this syntax for U one variable or variable. I want it to be variable.
1013             enum wrapInTuple = !is(RType : Tuple!U, U);
1014             if (auto rc = replyTo.lock.get) {
1015                 static if (wrapInTuple)
1016                     rc.put(Reply(replyId, Variant(tuple(r))));
1017                 else
1018                     rc.put(Reply(replyId, Variant(r)));
1019             }
1020         }
1021     }
1022 
1023     return Request(typeof(Request.request)(&fn, null, &cleanupCtx!CtxT), makeSignature!HArgs);
1024 }
1025 
1026 @("shall link two actors lifetime")
1027 unittest {
1028     int count;
1029     void countExits(ref Actor self, ExitMsg msg) @safe nothrow {
1030         count++;
1031         self.shutdown;
1032     }
1033 
1034     auto aa1 = Actor(makeAddress2);
1035     auto a1 = build(&aa1).set((int x) {}).exitHandler_(&countExits).finalize;
1036     auto aa2 = Actor(makeAddress2);
1037     auto a2 = build(&aa2).set((int x) {}).exitHandler_(&countExits).finalize;
1038 
1039     a1.linkTo(a2.address);
1040     a1.process(Clock.currTime);
1041     a2.process(Clock.currTime);
1042 
1043     assert(a1.isAlive);
1044     assert(a2.isAlive);
1045 
1046     sendExit(a1.address, ExitReason.userShutdown);
1047     foreach (_; 0 .. 5) {
1048         a1.process(Clock.currTime);
1049         a2.process(Clock.currTime);
1050     }
1051 
1052     assert(!a1.isAlive);
1053     assert(!a2.isAlive);
1054     assert(count == 2);
1055 }
1056 
1057 @("shall let one actor monitor the lifetime of the other one")
1058 unittest {
1059     int count;
1060     void downMsg(ref Actor self, DownMsg msg) @safe nothrow {
1061         count++;
1062     }
1063 
1064     auto aa1 = Actor(makeAddress2);
1065     auto a1 = build(&aa1).set((int x) {}).downHandler_(&downMsg).finalize;
1066     auto aa2 = Actor(makeAddress2);
1067     auto a2 = build(&aa2).set((int x) {}).finalize;
1068 
1069     a1.monitor(a2.address);
1070     a1.process(Clock.currTime);
1071     a2.process(Clock.currTime);
1072 
1073     assert(a1.isAlive);
1074     assert(a2.isAlive);
1075 
1076     sendExit(a2.address, ExitReason.userShutdown);
1077     foreach (_; 0 .. 5) {
1078         a1.process(Clock.currTime);
1079         a2.process(Clock.currTime);
1080     }
1081 
1082     assert(a1.isAlive);
1083     assert(!a2.isAlive);
1084     assert(count == 1);
1085 }
1086 
1087 private struct BuildActor {
1088     Actor* actor;
1089 
1090     Actor* finalize() @safe {
1091         auto rval = actor;
1092         actor = null;
1093         return rval;
1094     }
1095 
1096     auto errorHandler(ErrorHandler a) {
1097         actor.errorHandler = a;
1098         return this;
1099     }
1100 
1101     auto downHandler_(DownHandler a) {
1102         actor.downHandler_ = a;
1103         return this;
1104     }
1105 
1106     auto exitHandler_(ExitHandler a) {
1107         actor.exitHandler_ = a;
1108         return this;
1109     }
1110 
1111     auto exceptionHandler_(ExceptionHandler a) {
1112         actor.exceptionHandler_ = a;
1113         return this;
1114     }
1115 
1116     auto defaultHandler_(DefaultHandler a) {
1117         actor.defaultHandler_ = a;
1118         return this;
1119     }
1120 
1121     auto set(BehaviorT)(BehaviorT behavior)
1122             if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1123                 && !is(ReturnType!BehaviorT == void)) {
1124         auto act = makeRequest(behavior);
1125         actor.register(act.signature, act.request);
1126         return this;
1127     }
1128 
1129     auto set(BehaviorT, CT)(BehaviorT behavior, CT c)
1130             if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1131                 && !is(ReturnType!BehaviorT == void)) {
1132         auto act = makeRequest!(BehaviorT, CT)(behavior);
1133         // for now just use the GC to allocate the context on.
1134         // TODO: use an allocator.
1135         act.request.ctx = cast(void*) new CT(c);
1136         actor.register(act.signature, act.request);
1137         return this;
1138     }
1139 
1140     auto set(BehaviorT)(BehaviorT behavior)
1141             if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1142                 && is(ReturnType!BehaviorT == void)) {
1143         auto act = makeAction(behavior);
1144         actor.register(act.signature, act.action);
1145         return this;
1146     }
1147 
1148     auto set(BehaviorT, CT)(BehaviorT behavior, CT c)
1149             if ((isFunction!BehaviorT || isFunctionPointer!BehaviorT)
1150                 && is(ReturnType!BehaviorT == void)) {
1151         auto act = makeAction!(BehaviorT, CT)(behavior);
1152         // for now just use the GC to allocate the context on.
1153         // TODO: use an allocator.
1154         act.action.ctx = cast(void*) new CT(c);
1155         actor.register(act.signature, act.action);
1156         return this;
1157     }
1158 }
1159 
1160 package BuildActor build(Actor* a) @safe {
1161     return BuildActor(a);
1162 }
1163 
1164 /// Implement an actor.
1165 Actor* impl(Behavior...)(Actor* self, Behavior behaviors) {
1166     import my.actor.msg : isCapture, Capture;
1167 
1168     auto bactor = build(self);
1169     static foreach (const i; 0 .. Behavior.length) {
1170         {
1171             alias b = Behavior[i];
1172 
1173             static if (!isCapture!b) {
1174                 static if (!(isFunction!(b) || isFunctionPointer!(b)))
1175                     static assert(0, "behavior may only be functions, not delgates: " ~ b.stringof);
1176 
1177                 static if (i + 1 < Behavior.length && isCapture!(Behavior[i + 1])) {
1178                     bactor.set(behaviors[i], behaviors[i + 1]);
1179                 } else
1180                     bactor.set(behaviors[i]);
1181             }
1182         }
1183     }
1184 
1185     return bactor.finalize;
1186 }
1187 
1188 @("build dynamic actor from functions")
1189 unittest {
1190     static void fn3(int s) @safe {
1191     }
1192 
1193     static string fn4(int s) @safe {
1194         return "foo";
1195     }
1196 
1197     static Tuple!(int, string) fn5(const string s) @safe {
1198         return typeof(return)(42, "hej");
1199     }
1200 
1201     auto aa1 = Actor(makeAddress2);
1202     auto a1 = build(&aa1).set(&fn3).set(&fn4).set(&fn5).finalize;
1203 }
1204 
1205 unittest {
1206     bool delayOk;
1207     static void fn1(ref Tuple!(bool*, "delayOk") c, const string s) @safe {
1208         *c.delayOk = true;
1209     }
1210 
1211     bool delayShouldNeverHappen;
1212     static void fn2(ref Tuple!(bool*, "delayShouldNeverHappen") c, int s) @safe {
1213         *c.delayShouldNeverHappen = true;
1214     }
1215 
1216     auto aa1 = Actor(makeAddress2);
1217     auto actor = build(&aa1).set(&fn1, capture(&delayOk)).set(&fn2,
1218             capture(&delayShouldNeverHappen)).finalize;
1219     delayedSend(actor.address, Clock.currTime - 1.dur!"seconds", "foo");
1220     delayedSend(actor.address, Clock.currTime + 1.dur!"hours", 42);
1221 
1222     assert(!actor.addressRef.get.empty!DelayedMsg);
1223     assert(actor.addressRef.get.empty!Msg);
1224     assert(actor.addressRef.get.empty!Reply);
1225 
1226     actor.process(Clock.currTime);
1227 
1228     assert(!actor.addressRef.get.empty!DelayedMsg);
1229     assert(actor.addressRef.get.empty!Msg);
1230     assert(actor.addressRef.get.empty!Reply);
1231 
1232     actor.process(Clock.currTime);
1233     actor.process(Clock.currTime);
1234 
1235     assert(actor.addressRef.get.empty!DelayedMsg);
1236     assert(actor.addressRef.get.empty!Msg);
1237     assert(actor.addressRef.get.empty!Reply);
1238 
1239     assert(delayOk);
1240     assert(!delayShouldNeverHappen);
1241 }
1242 
1243 @("shall process a request->then chain xyz")
1244 @system unittest {
1245     // checking capture is correctly setup/teardown by using captured rc.
1246 
1247     auto rcReq = refCounted(42);
1248     bool calledOk;
1249     static string fn(ref Tuple!(bool*, "calledOk", RefCounted!int) ctx, const string s,
1250             const string b) {
1251         assert(2 == ctx[1].refCount);
1252         if (s == "apa")
1253             *ctx.calledOk = true;
1254         return "foo";
1255     }
1256 
1257     auto rcReply = refCounted(42);
1258     bool calledReply;
1259     static void reply(ref Tuple!(bool*, RefCounted!int) ctx, const string s) {
1260         *ctx[0] = s == "foo";
1261         assert(2 == ctx[1].refCount);
1262     }
1263 
1264     auto aa1 = Actor(makeAddress2);
1265     auto actor = build(&aa1).set(&fn, capture(&calledOk, rcReq)).finalize;
1266 
1267     assert(2 == rcReq.refCount);
1268     assert(1 == rcReply.refCount);
1269 
1270     actor.request(actor.address, infTimeout).send("apa", "foo")
1271         .capture(&calledReply, rcReply).then(&reply);
1272     assert(2 == rcReply.refCount);
1273 
1274     assert(!actor.addr.get.empty!Msg);
1275     assert(actor.addr.get.empty!Reply);
1276 
1277     actor.process(Clock.currTime);
1278     assert(actor.addr.get.empty!Msg);
1279     assert(actor.addr.get.empty!Reply);
1280 
1281     assert(2 == rcReq.refCount);
1282     assert(1 == rcReply.refCount, "after the message is consumed the refcount should go back");
1283 
1284     assert(calledOk);
1285     assert(calledReply);
1286 
1287     actor.shutdown;
1288     while (actor.isAlive)
1289         actor.process(Clock.currTime);
1290 }
1291 
1292 @("shall process a request->then chain using promises")
1293 unittest {
1294     static struct A {
1295         string v;
1296     }
1297 
1298     static struct B {
1299         string v;
1300     }
1301 
1302     int calledOk;
1303     auto fn1p = makePromise!string;
1304     static RequestResult!string fn1(ref Capture!(int*, "calledOk", Promise!string, "p") c, A a) @trusted {
1305         if (a.v == "apa")
1306             (*c.calledOk)++;
1307         return typeof(return)(c.p);
1308     }
1309 
1310     auto fn2p = makePromise!string;
1311     static Promise!string fn2(ref Capture!(int*, "calledOk", Promise!string, "p") c, B a) {
1312         (*c.calledOk)++;
1313         return c.p;
1314     }
1315 
1316     int calledReply;
1317     static void reply(ref Tuple!(int*) ctx, const string s) {
1318         if (s == "foo")
1319             *ctx[0] += 1;
1320     }
1321 
1322     auto aa1 = Actor(makeAddress2);
1323     auto actor = build(&aa1).set(&fn1, capture(&calledOk, fn1p)).set(&fn2,
1324             capture(&calledOk, fn2p)).finalize;
1325 
1326     actor.request(actor.address, infTimeout).send(A("apa")).capture(&calledReply).then(&reply);
1327     actor.request(actor.address, infTimeout).send(B("apa")).capture(&calledReply).then(&reply);
1328 
1329     actor.process(Clock.currTime);
1330     assert(calledOk == 1); // first request
1331     assert(calledReply == 0);
1332 
1333     fn1p.deliver("foo");
1334 
1335     assert(calledReply == 0);
1336 
1337     actor.process(Clock.currTime);
1338     assert(calledOk == 2); // second request triggered
1339     assert(calledReply == 1);
1340 
1341     fn2p.deliver("foo");
1342     actor.process(Clock.currTime);
1343 
1344     assert(calledReply == 2);
1345 
1346     actor.shutdown;
1347     while (actor.isAlive) {
1348         actor.process(Clock.currTime);
1349     }
1350 }
1351 
1352 /// The timeout triggered.
1353 class ScopedActorException : Exception {
1354     this(ScopedActorError err, string file = __FILE__, int line = __LINE__) @safe pure nothrow {
1355         super(null, file, line);
1356         error = err;
1357     }
1358 
1359     ScopedActorError error;
1360 }
1361 
1362 enum ScopedActorError : ubyte {
1363     none,
1364     // actor address is down
1365     down,
1366     // request timeout
1367     timeout,
1368     // the address where unable to process the received message
1369     unknownMsg,
1370     // some type of fatal error occured.
1371     fatal,
1372 }
1373 
1374 /** Intended to be used in a local scope by a user.
1375  *
1376  * `ScopedActor` is not thread safe.
1377  */
1378 struct ScopedActor {
1379     import my.actor.typed : underlyingAddress, underlyingWeakAddress;
1380 
1381     private {
1382         static struct Data {
1383             Actor self;
1384             ScopedActorError errSt;
1385 
1386             ~this() @safe {
1387                 if (self.addr.empty)
1388                     return;
1389 
1390                 () @trusted {
1391                     self.downHandler = null;
1392                     self.defaultHandler = toDelegate(&.defaultHandler);
1393                     self.errorHandler = toDelegate(&defaultErrorHandler);
1394                 }();
1395 
1396                 self.shutdown;
1397                 while (self.isAlive) {
1398                     self.process(Clock.currTime);
1399                 }
1400             }
1401         }
1402 
1403         RefCounted!Data data;
1404     }
1405 
1406     this(StrongAddress addr, string name) @safe {
1407         data = refCounted(Data(Actor(addr)));
1408         data.get.self.name = name;
1409     }
1410 
1411     private void reset() @safe nothrow {
1412         data.get.errSt = ScopedActorError.none;
1413     }
1414 
1415     SRequestSend request(TAddress)(TAddress requestTo, SysTime timeout)
1416             if (isAddress!TAddress) {
1417         reset;
1418         auto rs = .request(&data.get.self, underlyingWeakAddress(requestTo), timeout);
1419         return SRequestSend(rs, this);
1420     }
1421 
1422     private static struct SRequestSend {
1423         RequestSend rs;
1424         ScopedActor self;
1425 
1426         /// Copy constructor
1427         this(ref return typeof(this) rhs) @safe pure nothrow @nogc {
1428             rs = rhs.rs;
1429             self = rhs.self;
1430         }
1431 
1432         @disable this(this);
1433 
1434         SRequestSendThen send(Args...)(auto ref Args args) {
1435             return SRequestSendThen(.send(rs, args), self);
1436         }
1437     }
1438 
1439     private static struct SRequestSendThen {
1440         RequestSendThen rs;
1441         ScopedActor self;
1442         uint backoff;
1443 
1444         /// Copy constructor
1445         this(ref return typeof(this) rhs) {
1446             rs = rhs.rs;
1447             self = rhs.self;
1448             backoff = rhs.backoff;
1449         }
1450 
1451         @disable this(this);
1452 
1453         void dynIntervalSleep() @trusted {
1454             // +100 usecs "feels good", magic number. current OS and
1455             // implementation of message passing isn't that much faster than
1456             // 100us. A bit slow behavior, ehum, for a scoped actor is OK. They
1457             // aren't expected to be used for "time critical" sections.
1458             Thread.sleep(backoff.dur!"usecs");
1459             backoff = min(backoff + 100, 20000);
1460         }
1461 
1462         private static struct ValueCapture {
1463             RefCounted!Data data;
1464 
1465             void downHandler(ref Actor, DownMsg) @safe nothrow {
1466                 data.get.errSt = ScopedActorError.down;
1467             }
1468 
1469             void errorHandler(ref Actor, ErrorMsg msg) @safe nothrow {
1470                 if (msg.reason == SystemError.requestTimeout)
1471                     data.get.errSt = ScopedActorError.timeout;
1472                 else
1473                     data.get.errSt = ScopedActorError.fatal;
1474             }
1475 
1476             void unknownMsgHandler(ref Actor a, ref Variant msg) @safe nothrow {
1477                 logAndDropHandler(a, msg);
1478                 data.get.errSt = ScopedActorError.unknownMsg;
1479             }
1480         }
1481 
1482         void then(T)(T handler, ErrorHandler onError = null) {
1483             scope (exit)
1484                 demonitor(rs.rs.self, rs.rs.requestTo);
1485             monitor(rs.rs.self, rs.rs.requestTo);
1486 
1487             auto callback = new ValueCapture(self.data);
1488             self.data.get.self.downHandler = &callback.downHandler;
1489             self.data.get.self.defaultHandler = &callback.unknownMsgHandler;
1490             self.data.get.self.errorHandler = &callback.errorHandler;
1491 
1492             () @trusted { .thenUnsafe!(T, void)(rs, handler, null, onError); }();
1493 
1494             scope (exit)
1495                 () @trusted {
1496                 self.data.get.self.downHandler = null;
1497                 self.data.get.self.defaultHandler = toDelegate(&.defaultHandler);
1498                 self.data.get.self.errorHandler = toDelegate(&defaultErrorHandler);
1499             }();
1500 
1501             auto requestTo = rs.rs.requestTo.lock;
1502             if (!requestTo)
1503                 throw new ScopedActorException(ScopedActorError.down);
1504 
1505             // TODO: this loop is stupid... should use a conditional variable
1506             // instead but that requires changing the mailbox. later
1507             do {
1508                 rs.rs.self.process(Clock.currTime);
1509                 // force the actor to be alive even though there are no behaviors.
1510                 rs.rs.self.state_ = ActorState.waiting;
1511 
1512                 if (self.data.get.errSt == ScopedActorError.none) {
1513                     dynIntervalSleep;
1514                 } else {
1515                     throw new ScopedActorException(self.data.get.errSt);
1516                 }
1517 
1518             }
1519             while (self.data.get.self.waitingForReply);
1520         }
1521     }
1522 }
1523 
1524 ScopedActor scopedActor(string file = __FILE__, uint line = __LINE__)() @safe {
1525     import std.format : format;
1526 
1527     return ScopedActor(makeAddress2, format!"ScopedActor.%s:%s"(file, line));
1528 }
1529 
1530 @(
1531         "scoped actor shall throw an exception if the actor that is sent a request terminates or is closed")
1532 unittest {
1533     import my.actor.system;
1534 
1535     auto sys = makeSystem;
1536 
1537     auto a0 = sys.spawn((Actor* self) {
1538         return impl(self, (ref CSelf!() ctx, int x) {
1539             Thread.sleep(50.dur!"msecs");
1540             return 42;
1541         }, capture(self), (ref CSelf!() ctx, double x) {}, capture(self),
1542             (ref CSelf!() ctx, string x) { ctx.self.shutdown; return 42; }, capture(self));
1543     });
1544 
1545     {
1546         auto self = scopedActor;
1547         bool excThrown;
1548         auto stopAt = Clock.currTime + 3.dur!"seconds";
1549         while (!excThrown && Clock.currTime < stopAt) {
1550             try {
1551                 self.request(a0, delay(1.dur!"nsecs")).send(42).then((int x) {});
1552             } catch (ScopedActorException e) {
1553                 excThrown = e.error == ScopedActorError.timeout;
1554             } catch (Exception e) {
1555                 logger.info(e.msg);
1556             }
1557         }
1558         assert(excThrown, "timeout did not trigger as expected");
1559     }
1560 
1561     {
1562         auto self = scopedActor;
1563         bool excThrown;
1564         auto stopAt = Clock.currTime + 3.dur!"seconds";
1565         while (!excThrown && Clock.currTime < stopAt) {
1566             try {
1567                 self.request(a0, delay(1.dur!"seconds")).send("hello").then((int x) {
1568                 });
1569             } catch (ScopedActorException e) {
1570                 excThrown = e.error == ScopedActorError.down;
1571             } catch (Exception e) {
1572                 logger.info(e.msg);
1573             }
1574         }
1575         assert(excThrown, "detecting terminated actor did not trigger as expected");
1576     }
1577 }