1 /**
2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module my.actor.msg;
7 
8 import std.meta : staticMap, AliasSeq;
9 import std.traits : Unqual, Parameters, isFunction, isFunctionPointer;
10 import std.typecons : Tuple, tuple;
11 import std.variant : Variant;
12 
13 public import std.datetime : SysTime, Duration, dur;
14 
15 import my.actor.mailbox;
16 import my.actor.common : ExitReason, makeSignature, SystemError;
17 import my.actor.actor : Actor, makeAction, makeRequest, makeReply, makePromise,
18     ErrorHandler, Promise, RequestResult;
19 import my.actor.system_msg;
20 import my.actor.typed : isTypedAddress, isTypedActor, isTypedActorImpl,
21     typeCheckMsg, ParamsToTuple, ReturnToTupleOrVoid,
22     underlyingActor, underlyingAddress, underlyingTypedAddress, underlyingWeakAddress;
23 
24 SysTime infTimeout() @safe pure nothrow {
25     return SysTime.max;
26 }
27 
28 SysTime timeout(Duration d) @safe nothrow {
29     import std.datetime : Clock;
30 
31     return Clock.currTime + d;
32 }
33 
34 /// Code looks better if it says delay when using delayedSend.
35 alias delay = timeout;
36 
37 enum isActor(T) = is(T == Actor*) || isTypedActor!T || isTypedActorImpl!T;
38 enum isAddress(T) = is(T == WeakAddress) || is(T == StrongAddress) || isTypedAddress!T;
39 enum isDynamicAddress(T) = is(T == WeakAddress) || is(T == StrongAddress);
40 
41 /** Link the lifetime of `self` to the actor using `sendTo`.
42  *
43  * An `ExitMsg` is sent to `self` if `sendTo` is terminated and vice versa.
44  *
45  * `ExitMsg` triggers `exitHandler`.
46  */
47 void linkTo(AddressT0, AddressT1)(AddressT0 self, AddressT1 sendTo) @safe
48         if ((isActor!AddressT0 || isAddress!AddressT0) && (isActor!AddressT1
49             || isAddress!AddressT1)) {
50     import my.actor.mailbox : LinkRequest;
51 
52     auto self_ = underlyingAddress(self);
53     auto addr = underlyingAddress(sendTo);
54 
55     if (self_.empty || addr.empty)
56         return;
57 
58     sendSystemMsg(self_, LinkRequest(addr.weakRef));
59     sendSystemMsg(addr, LinkRequest(self_.weakRef));
60 }
61 
62 /// Remove the link between `self` and the actor using `sendTo`.
63 void unlinkTo(AddressT0, AddressT1)(AddressT0 self, AddressT1 sendTo) @safe
64         if ((isActor!AddressT0 || isAddress!AddressT0) && (isActor!AddressT1
65             || isAddress!AddressT1)) {
66     import my.actor.mailbox : UnlinkRequest;
67 
68     auto self_ = underlyingAddress(self);
69     auto addr = underlyingAddress(sendTo);
70 
71     // do NOT check if the addresses exist becuase it doesn't matter. Just
72     // remove the link.
73 
74     sendSystemMsg(self_, UnlinkRequest(addr.weakRef));
75     sendSystemMsg(addr, UnlinkRequest(self_.weakRef));
76 }
77 
78 /** Actor `self` will receive a `DownMsg` when `sendTo` shutdown.
79  *
80  * `DownMsg` triggers `downHandler`.
81  */
82 void monitor(AddressT0, AddressT1)(AddressT0 self, AddressT1 sendTo) @safe
83         if ((isActor!AddressT0 || isAddress!AddressT0) && (isActor!AddressT1
84             || isAddress!AddressT1)) {
85     import my.actor.system_msg : MonitorRequest;
86 
87     if (auto self_ = underlyingAddress(self))
88         sendSystemMsg(sendTo, MonitorRequest(self_.weakRef));
89 }
90 
91 /// Remove `self` as a monitor of the actor using `sendTo`.
92 void demonitor(AddressT0, AddressT1)(AddressT0 self, AddressT1 sendTo) @safe
93         if ((isActor!AddressT0 || isAddress!AddressT0) && (isActor!AddressT1
94             || isAddress!AddressT1)) {
95     import my.actor.system_msg : MonitorRequest;
96 
97     if (auto self_ = underlyingAddress(self))
98         sendSystemMsg(sendTo, DemonitorRequest(self_.weakRef));
99 }
100 
101 // Only send the message if the system message queue is empty.
102 package void sendSystemMsgIfEmpty(AddressT, T)(AddressT sendTo, T msg) @safe
103         if (isAddress!AddressT)
104 in (!sendTo.empty, "cannot send to an empty address") {
105     auto addr = underlyingAddress(sendTo).get;
106     if (addr && addr.empty!SystemMsg)
107         addr.put(SystemMsg(msg));
108 }
109 
110 package void sendSystemMsg(AddressT, T)(AddressT sendTo, T msg) @safe
111         if (isAddress!AddressT)
112 in (!sendTo.empty, "cannot send to an empty address") {
113     if (auto addr = underlyingAddress(sendTo).get)
114         addr.put(SystemMsg(msg));
115 }
116 
117 /// Trigger the message in the future.
118 void delayedSend(AddressT, Args...)(AddressT sendTo, SysTime delayTo, auto ref Args args) @trusted
119         if (is(AddressT == WeakAddress) || is(AddressT == StrongAddress) || is(AddressT == Actor*)) {
120     alias UArgs = staticMap!(Unqual, Args);
121     if (auto addr = underlyingAddress(sendTo).get)
122         addr.put(DelayedMsg(Msg(makeSignature!UArgs,
123                 MsgType(MsgOneShot(Variant(Tuple!UArgs(args))))), delayTo));
124 }
125 
126 void sendExit(WeakAddress sendTo, const ExitReason reason) @safe {
127     import my.actor.system_msg : SystemExitMsg;
128 
129     sendSystemMsg(sendTo, SystemExitMsg(reason));
130 }
131 
132 // TODO: add verification that args do not have interior pointers
133 void send(AddressT, Args...)(AddressT sendTo, auto ref Args args) @trusted
134         if (isDynamicAddress!AddressT || is(AddressT == Actor*)) {
135     alias UArgs = staticMap!(Unqual, Args);
136     if (auto addr = underlyingAddress(sendTo).get)
137         addr.put(Msg(makeSignature!UArgs, MsgType(MsgOneShot(Variant(Tuple!UArgs(args))))));
138 }
139 
140 package struct RequestSend {
141     Actor* self;
142     WeakAddress requestTo;
143     SysTime timeout;
144     ulong replyId;
145 
146     /// Copy constructor
147     this(ref return scope typeof(this) rhs) @safe pure nothrow @nogc {
148         self = rhs.self;
149         requestTo = rhs.requestTo;
150         timeout = rhs.timeout;
151         replyId = rhs.replyId;
152     }
153 
154     @disable this(this);
155 }
156 
157 package struct RequestSendThen {
158     RequestSend rs;
159     Msg msg;
160 
161     @disable this(this);
162 
163     /// Copy constructor
164     this(ref return typeof(this) rhs) {
165         rs = rhs.rs;
166         msg = rhs.msg;
167     }
168 }
169 
170 RequestSend request(ActorT)(ActorT self, WeakAddress requestTo, SysTime timeout)
171         if (is(ActorT == Actor*)) {
172     return RequestSend(self, requestTo, timeout, self.replyId);
173 }
174 
175 RequestSendThen send(Args...)(RequestSend r, auto ref Args args) {
176     alias UArgs = staticMap!(Unqual, Args);
177 
178     auto replyTo = r.self.addr.weakRef;
179 
180     // dfmt off
181     auto msg = () @trusted {
182         return Msg(
183         makeSignature!UArgs,
184         MsgType(MsgRequest(replyTo, r.replyId, Variant(Tuple!UArgs(args)))));
185     }();
186     // dfmt on
187 
188     return () @trusted { return RequestSendThen(r, msg); }();
189 }
190 
191 private struct ThenContext(Captures...) {
192     alias Ctx = Tuple!Captures;
193 
194     RequestSendThen r;
195     Ctx* ctx;
196 
197     void then(T)(T handler, ErrorHandler onError = null)
198             if (isFunction!T || isFunctionPointer!T) {
199         thenUnsafe!(T, Ctx)(r, handler, cast(void*) ctx, onError);
200         ctx = null;
201     }
202 }
203 
204 // allows delegates but the context for them may be corrupted by the GC if they
205 // are used in another thread thus use of `thenUnsafe` must ensure it is not
206 // escaped.
207 package void thenUnsafe(T, CtxT = void)(scope RequestSendThen r, T handler,
208         void* ctx, ErrorHandler onError = null) @trusted {
209     auto requestTo = r.rs.requestTo.lock.get;
210     if (!requestTo) {
211         if (onError)
212             onError(*r.rs.self, ErrorMsg(r.rs.requestTo, SystemError.requestReceiverDown));
213         return;
214     }
215 
216     // TODO: compiler bug? how can SysTime be inferred to being scoped?
217     SysTime timeout = () @trusted { return r.rs.timeout; }();
218 
219     // first register a handler for the message.
220     // this order ensure that there is always a handler that can receive the message.
221 
222     () @safe {
223         auto reply = makeReply!(T, CtxT)(handler);
224         reply.ctx = ctx;
225         r.rs.self.register(r.rs.replyId, timeout, reply, onError);
226     }();
227 
228     // then send it
229     requestTo.put(r.msg);
230 }
231 
232 void then(T, CtxT = void)(scope RequestSendThen r, T handler, ErrorHandler onError = null) @trusted
233         if (isFunction!T || isFunctionPointer!T) {
234     thenUnsafe!(T, CtxT)(r, handler, null, onError);
235 }
236 
237 void send(T, Args...)(T sendTo, auto ref Args args)
238         if ((isTypedAddress!T || isTypedActorImpl!T) && typeCheckMsg!(T, void, Args)) {
239     send(underlyingAddress(sendTo), args);
240 }
241 
242 void delayedSend(T, Args...)(T sendTo, SysTime delayTo, auto ref Args args)
243         if ((isTypedAddress!T || isTypedActorImpl!T) && typeCheckMsg!(T, void, Args)) {
244     delayedSend(underlyingAddress(sendTo), delayTo, args);
245 }
246 
247 private struct TypedRequestSend(TAddress) {
248     alias TypeAddress = TAddress;
249     RequestSend rs;
250 }
251 
252 TypedRequestSend!TAddress request(TActor, TAddress)(ref TActor self, TAddress sendTo,
253         SysTime timeout)
254         if (isActor!TActor && (isTypedActorImpl!TAddress || isTypedAddress!TAddress)) {
255     return typeof(return)(.request(underlyingActor(self), underlyingWeakAddress(sendTo), timeout));
256 }
257 
258 private struct TypedRequestSendThen(TAddress, Params_...) {
259     alias TypeAddress = TAddress;
260     alias Params = Params_;
261     RequestSendThen rs;
262 
263     /// Copy constructor
264     this(ref return scope typeof(this) rhs) {
265         rs = rhs.rs;
266     }
267 
268     @disable this(this);
269 }
270 
271 auto send(TR, Args...)(scope TR tr, auto ref Args args)
272         if (is(TR == TypedRequestSend!TAddress, TAddress)) {
273     return TypedRequestSendThen!(TR.TypeAddress, Args)(send(tr.rs, args));
274 }
275 
276 void then(TR, T, CtxT = void)(scope TR tr, T handler, ErrorHandler onError = null)
277         if ((isFunction!T || isFunctionPointer!T) && is(TR : TypedRequestSendThen!(TAddress,
278             Params), TAddress, Params...) && typeCheckMsg!(TAddress,
279             ParamsToTuple!(Parameters!T), Params)) {
280     then(tr.rs, handler, onError);
281 }
282 
283 private struct TypedThenContext(TR, Captures...) {
284     import my.actor.actor : checkRefForContext, checkMatchingCtx;
285 
286     alias Ctx = Tuple!Captures;
287 
288     TR r;
289     Ctx* ctx;
290 
291     void then(T)(T handler, ErrorHandler onError = null)
292             if ((isFunction!T || isFunctionPointer!T) && typeCheckMsg!(TR.TypeAddress,
293                 ParamsToTuple!(Parameters!T[1 .. $]), TR.Params)) {
294         // better error message for the user by checking in the body instead of
295         // the constraint because the constraint gagges the static assert
296         // messages.
297         checkMatchingCtx!(Parameters!T[0], Ctx);
298         checkRefForContext!handler;
299         .thenUnsafe!(T, Ctx)(r.rs, handler, cast(void*) ctx, onError);
300         ctx = null;
301     }
302 }
303 
304 alias Capture(T...) = Tuple!T;
305 enum isCapture(T) = is(T == Tuple!U, U);
306 enum isFirstParamCtx(Fn, CtxT) = is(Parameters!Fn[0] == CtxT);
307 
308 /// Convenient function for capturing the actor itself when spawning.
309 alias CSelf(T = Actor*) = Capture!(T, "self");
310 
311 Capture!T capture(T...)(auto ref T args)
312         if (!is(T[0] == RequestSendThen)
313             && !is(T[0] == TypedRequestSendThen!(TAddress, Params), TAddress, Params...)) {
314     return Tuple!T(args);
315 }
316 
317 auto capture(Captures...)(RequestSendThen r, auto ref Captures captures) {
318     // TODO: how to read the identifiers from captures? Using
319     // ParameterIdentifierTuple didn't work.
320     auto ctx = new Tuple!Captures(captures);
321     return ThenContext!Captures(r, ctx);
322 }
323 
324 auto capture(TR, Captures...)(TR r, auto ref Captures captures)
325         if (is(TR : TypedRequestSendThen!(TAddress, Params), TAddress, Params...)) {
326     auto ctx = new Tuple!Captures(captures);
327     return TypedThenContext!(TR, Captures)(r, ctx);
328 }