1 /** 2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module my.actor.msg; 7 8 import std.meta : staticMap, AliasSeq; 9 import std.traits : Unqual, Parameters, isFunction, isFunctionPointer; 10 import std.typecons : Tuple, tuple; 11 import std.variant : Variant; 12 13 public import std.datetime : SysTime, Duration, dur; 14 15 import my.actor.mailbox; 16 import my.actor.common : ExitReason, makeSignature, SystemError; 17 import my.actor.actor : Actor, makeAction, makeRequest, makeReply, makePromise, 18 ErrorHandler, Promise, RequestResult; 19 import my.actor.system_msg; 20 import my.actor.typed : isTypedAddress, isTypedActor, isTypedActorImpl, 21 typeCheckMsg, ParamsToTuple, ReturnToTupleOrVoid, 22 underlyingActor, underlyingAddress, underlyingTypedAddress, underlyingWeakAddress; 23 24 SysTime infTimeout() @safe pure nothrow { 25 return SysTime.max; 26 } 27 28 SysTime timeout(Duration d) @safe nothrow { 29 import std.datetime : Clock; 30 31 return Clock.currTime + d; 32 } 33 34 /// Code looks better if it says delay when using delayedSend. 35 alias delay = timeout; 36 37 enum isActor(T) = is(T == Actor*) || isTypedActor!T || isTypedActorImpl!T; 38 enum isAddress(T) = is(T == WeakAddress) || is(T == StrongAddress) || isTypedAddress!T; 39 enum isDynamicAddress(T) = is(T == WeakAddress) || is(T == StrongAddress); 40 41 /** Link the lifetime of `self` to the actor using `sendTo`. 42 * 43 * An `ExitMsg` is sent to `self` if `sendTo` is terminated and vice versa. 44 * 45 * `ExitMsg` triggers `exitHandler`. 46 */ 47 void linkTo(AddressT0, AddressT1)(AddressT0 self, AddressT1 sendTo) @safe 48 if ((isActor!AddressT0 || isAddress!AddressT0) && (isActor!AddressT1 49 || isAddress!AddressT1)) { 50 import my.actor.mailbox : LinkRequest; 51 52 auto self_ = underlyingAddress(self); 53 auto addr = underlyingAddress(sendTo); 54 55 if (self_.empty || addr.empty) 56 return; 57 58 sendSystemMsg(self_, LinkRequest(addr.weakRef)); 59 sendSystemMsg(addr, LinkRequest(self_.weakRef)); 60 } 61 62 /// Remove the link between `self` and the actor using `sendTo`. 63 void unlinkTo(AddressT0, AddressT1)(AddressT0 self, AddressT1 sendTo) @safe 64 if ((isActor!AddressT0 || isAddress!AddressT0) && (isActor!AddressT1 65 || isAddress!AddressT1)) { 66 import my.actor.mailbox : UnlinkRequest; 67 68 auto self_ = underlyingAddress(self); 69 auto addr = underlyingAddress(sendTo); 70 71 // do NOT check if the addresses exist becuase it doesn't matter. Just 72 // remove the link. 73 74 sendSystemMsg(self_, UnlinkRequest(addr.weakRef)); 75 sendSystemMsg(addr, UnlinkRequest(self_.weakRef)); 76 } 77 78 /** Actor `self` will receive a `DownMsg` when `sendTo` shutdown. 79 * 80 * `DownMsg` triggers `downHandler`. 81 */ 82 void monitor(AddressT0, AddressT1)(AddressT0 self, AddressT1 sendTo) @safe 83 if ((isActor!AddressT0 || isAddress!AddressT0) && (isActor!AddressT1 84 || isAddress!AddressT1)) { 85 import my.actor.system_msg : MonitorRequest; 86 87 if (auto self_ = underlyingAddress(self)) 88 sendSystemMsg(sendTo, MonitorRequest(self_.weakRef)); 89 } 90 91 /// Remove `self` as a monitor of the actor using `sendTo`. 92 void demonitor(AddressT0, AddressT1)(AddressT0 self, AddressT1 sendTo) @safe 93 if ((isActor!AddressT0 || isAddress!AddressT0) && (isActor!AddressT1 94 || isAddress!AddressT1)) { 95 import my.actor.system_msg : MonitorRequest; 96 97 if (auto self_ = underlyingAddress(self)) 98 sendSystemMsg(sendTo, DemonitorRequest(self_.weakRef)); 99 } 100 101 // Only send the message if the system message queue is empty. 102 package void sendSystemMsgIfEmpty(AddressT, T)(AddressT sendTo, T msg) @safe 103 if (isAddress!AddressT) 104 in (!sendTo.empty, "cannot send to an empty address") { 105 auto addr = underlyingAddress(sendTo).get; 106 if (addr && addr.empty!SystemMsg) 107 addr.put(SystemMsg(msg)); 108 } 109 110 package void sendSystemMsg(AddressT, T)(AddressT sendTo, T msg) @safe 111 if (isAddress!AddressT) 112 in (!sendTo.empty, "cannot send to an empty address") { 113 if (auto addr = underlyingAddress(sendTo).get) 114 addr.put(SystemMsg(msg)); 115 } 116 117 /// Trigger the message in the future. 118 void delayedSend(AddressT, Args...)(AddressT sendTo, SysTime delayTo, auto ref Args args) @trusted 119 if (is(AddressT == WeakAddress) || is(AddressT == StrongAddress) || is(AddressT == Actor*)) { 120 alias UArgs = staticMap!(Unqual, Args); 121 if (auto addr = underlyingAddress(sendTo).get) 122 addr.put(DelayedMsg(Msg(makeSignature!UArgs, 123 MsgType(MsgOneShot(Variant(Tuple!UArgs(args))))), delayTo)); 124 } 125 126 void sendExit(WeakAddress sendTo, const ExitReason reason) @safe { 127 import my.actor.system_msg : SystemExitMsg; 128 129 sendSystemMsg(sendTo, SystemExitMsg(reason)); 130 } 131 132 // TODO: add verification that args do not have interior pointers 133 void send(AddressT, Args...)(AddressT sendTo, auto ref Args args) @trusted 134 if (isDynamicAddress!AddressT || is(AddressT == Actor*)) { 135 alias UArgs = staticMap!(Unqual, Args); 136 if (auto addr = underlyingAddress(sendTo).get) 137 addr.put(Msg(makeSignature!UArgs, MsgType(MsgOneShot(Variant(Tuple!UArgs(args)))))); 138 } 139 140 package struct RequestSend { 141 Actor* self; 142 WeakAddress requestTo; 143 SysTime timeout; 144 ulong replyId; 145 146 /// Copy constructor 147 this(ref return scope typeof(this) rhs) @safe pure nothrow @nogc { 148 self = rhs.self; 149 requestTo = rhs.requestTo; 150 timeout = rhs.timeout; 151 replyId = rhs.replyId; 152 } 153 154 @disable this(this); 155 } 156 157 package struct RequestSendThen { 158 RequestSend rs; 159 Msg msg; 160 161 @disable this(this); 162 163 /// Copy constructor 164 this(ref return typeof(this) rhs) { 165 rs = rhs.rs; 166 msg = rhs.msg; 167 } 168 } 169 170 RequestSend request(ActorT)(ActorT self, WeakAddress requestTo, SysTime timeout) 171 if (is(ActorT == Actor*)) { 172 return RequestSend(self, requestTo, timeout, self.replyId); 173 } 174 175 RequestSendThen send(Args...)(RequestSend r, auto ref Args args) { 176 alias UArgs = staticMap!(Unqual, Args); 177 178 auto replyTo = r.self.addr.weakRef; 179 180 // dfmt off 181 auto msg = () @trusted { 182 return Msg( 183 makeSignature!UArgs, 184 MsgType(MsgRequest(replyTo, r.replyId, Variant(Tuple!UArgs(args))))); 185 }(); 186 // dfmt on 187 188 return () @trusted { return RequestSendThen(r, msg); }(); 189 } 190 191 private struct ThenContext(Captures...) { 192 alias Ctx = Tuple!Captures; 193 194 RequestSendThen r; 195 Ctx* ctx; 196 197 void then(T)(T handler, ErrorHandler onError = null) 198 if (isFunction!T || isFunctionPointer!T) { 199 thenUnsafe!(T, Ctx)(r, handler, cast(void*) ctx, onError); 200 ctx = null; 201 } 202 } 203 204 // allows delegates but the context for them may be corrupted by the GC if they 205 // are used in another thread thus use of `thenUnsafe` must ensure it is not 206 // escaped. 207 package void thenUnsafe(T, CtxT = void)(scope RequestSendThen r, T handler, 208 void* ctx, ErrorHandler onError = null) @trusted { 209 auto requestTo = r.rs.requestTo.lock.get; 210 if (!requestTo) { 211 if (onError) 212 onError(*r.rs.self, ErrorMsg(r.rs.requestTo, SystemError.requestReceiverDown)); 213 return; 214 } 215 216 // TODO: compiler bug? how can SysTime be inferred to being scoped? 217 SysTime timeout = () @trusted { return r.rs.timeout; }(); 218 219 // first register a handler for the message. 220 // this order ensure that there is always a handler that can receive the message. 221 222 () @safe { 223 auto reply = makeReply!(T, CtxT)(handler); 224 reply.ctx = ctx; 225 r.rs.self.register(r.rs.replyId, timeout, reply, onError); 226 }(); 227 228 // then send it 229 requestTo.put(r.msg); 230 } 231 232 void then(T, CtxT = void)(scope RequestSendThen r, T handler, ErrorHandler onError = null) @trusted 233 if (isFunction!T || isFunctionPointer!T) { 234 thenUnsafe!(T, CtxT)(r, handler, null, onError); 235 } 236 237 void send(T, Args...)(T sendTo, auto ref Args args) 238 if ((isTypedAddress!T || isTypedActorImpl!T) && typeCheckMsg!(T, void, Args)) { 239 send(underlyingAddress(sendTo), args); 240 } 241 242 void delayedSend(T, Args...)(T sendTo, SysTime delayTo, auto ref Args args) 243 if ((isTypedAddress!T || isTypedActorImpl!T) && typeCheckMsg!(T, void, Args)) { 244 delayedSend(underlyingAddress(sendTo), delayTo, args); 245 } 246 247 private struct TypedRequestSend(TAddress) { 248 alias TypeAddress = TAddress; 249 RequestSend rs; 250 } 251 252 TypedRequestSend!TAddress request(TActor, TAddress)(ref TActor self, TAddress sendTo, 253 SysTime timeout) 254 if (isActor!TActor && (isTypedActorImpl!TAddress || isTypedAddress!TAddress)) { 255 return typeof(return)(.request(underlyingActor(self), underlyingWeakAddress(sendTo), timeout)); 256 } 257 258 private struct TypedRequestSendThen(TAddress, Params_...) { 259 alias TypeAddress = TAddress; 260 alias Params = Params_; 261 RequestSendThen rs; 262 263 /// Copy constructor 264 this(ref return scope typeof(this) rhs) { 265 rs = rhs.rs; 266 } 267 268 @disable this(this); 269 } 270 271 auto send(TR, Args...)(scope TR tr, auto ref Args args) 272 if (is(TR == TypedRequestSend!TAddress, TAddress)) { 273 return TypedRequestSendThen!(TR.TypeAddress, Args)(send(tr.rs, args)); 274 } 275 276 void then(TR, T, CtxT = void)(scope TR tr, T handler, ErrorHandler onError = null) 277 if ((isFunction!T || isFunctionPointer!T) && is(TR : TypedRequestSendThen!(TAddress, 278 Params), TAddress, Params...) && typeCheckMsg!(TAddress, 279 ParamsToTuple!(Parameters!T), Params)) { 280 then(tr.rs, handler, onError); 281 } 282 283 private struct TypedThenContext(TR, Captures...) { 284 import my.actor.actor : checkRefForContext, checkMatchingCtx; 285 286 alias Ctx = Tuple!Captures; 287 288 TR r; 289 Ctx* ctx; 290 291 void then(T)(T handler, ErrorHandler onError = null) 292 if ((isFunction!T || isFunctionPointer!T) && typeCheckMsg!(TR.TypeAddress, 293 ParamsToTuple!(Parameters!T[1 .. $]), TR.Params)) { 294 // better error message for the user by checking in the body instead of 295 // the constraint because the constraint gagges the static assert 296 // messages. 297 checkMatchingCtx!(Parameters!T[0], Ctx); 298 checkRefForContext!handler; 299 .thenUnsafe!(T, Ctx)(r.rs, handler, cast(void*) ctx, onError); 300 ctx = null; 301 } 302 } 303 304 alias Capture(T...) = Tuple!T; 305 enum isCapture(T) = is(T == Tuple!U, U); 306 enum isFirstParamCtx(Fn, CtxT) = is(Parameters!Fn[0] == CtxT); 307 308 /// Convenient function for capturing the actor itself when spawning. 309 alias CSelf(T = Actor*) = Capture!(T, "self"); 310 311 Capture!T capture(T...)(auto ref T args) 312 if (!is(T[0] == RequestSendThen) 313 && !is(T[0] == TypedRequestSendThen!(TAddress, Params), TAddress, Params...)) { 314 return Tuple!T(args); 315 } 316 317 auto capture(Captures...)(RequestSendThen r, auto ref Captures captures) { 318 // TODO: how to read the identifiers from captures? Using 319 // ParameterIdentifierTuple didn't work. 320 auto ctx = new Tuple!Captures(captures); 321 return ThenContext!Captures(r, ctx); 322 } 323 324 auto capture(TR, Captures...)(TR r, auto ref Captures captures) 325 if (is(TR : TypedRequestSendThen!(TAddress, Params), TAddress, Params...)) { 326 auto ctx = new Tuple!Captures(captures); 327 return TypedThenContext!(TR, Captures)(r, ctx); 328 }