1 /** 2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module my.actor.mailbox; 7 8 import logger = std.experimental.logger; 9 import core.sync.mutex : Mutex; 10 import std.datetime : SysTime; 11 import std.variant : Variant; 12 13 import sumtype; 14 15 import my.actor.common; 16 import my.gc.refc; 17 public import my.actor.system_msg; 18 19 struct MsgOneShot { 20 Variant data; 21 } 22 23 struct MsgRequest { 24 WeakAddress replyTo; 25 ulong replyId; 26 Variant data; 27 } 28 29 alias MsgType = SumType!(MsgOneShot, MsgRequest); 30 31 struct Msg { 32 ulong signature; 33 MsgType type; 34 35 this(ref return typeof(this) a) @trusted { 36 signature = a.signature; 37 type = a.type; 38 } 39 40 @disable this(this); 41 } 42 43 alias SystemMsg = SumType!(ErrorMsg, DownMsg, ExitMsg, SystemExitMsg, 44 MonitorRequest, DemonitorRequest, LinkRequest, UnlinkRequest); 45 46 struct Reply { 47 ulong id; 48 Variant data; 49 50 this(ref return typeof(this) a) { 51 id = a.id; 52 data = a.data; 53 } 54 55 @disable this(this); 56 } 57 58 struct DelayedMsg { 59 Msg msg; 60 SysTime triggerAt; 61 62 this(ref return DelayedMsg a) @trusted { 63 msg = a.msg; 64 triggerAt = a.triggerAt; 65 } 66 67 this(const ref return DelayedMsg a) inout @safe { 68 assert(0, "not supported"); 69 } 70 71 @disable this(this); 72 } 73 74 struct Address { 75 private { 76 // If the actor that use the address is active and processing messages. 77 bool open_; 78 ulong id_; 79 Mutex mtx; 80 } 81 82 package { 83 Queue!Msg incoming; 84 85 Queue!SystemMsg sysMsg; 86 87 // Delayed messages for this actor that will be triggered in the future. 88 Queue!DelayedMsg delayed; 89 90 // Incoming replies on requests. 91 Queue!Reply replies; 92 } 93 94 invariant { 95 assert(mtx !is null, 96 "mutex must always be set or the address will fail on sporadic method calls"); 97 } 98 99 private this(Mutex mtx) @safe 100 in (mtx !is null) { 101 this.mtx = mtx; 102 103 // lazy way of generating an ID. a mutex is a class thus allocated on 104 // the heap at a unique location. just... use the pointer as the ID. 105 () @trusted { id_ = cast(ulong) cast(void*) mtx; }(); 106 incoming = typeof(incoming)(mtx); 107 sysMsg = typeof(sysMsg)(mtx); 108 delayed = typeof(delayed)(mtx); 109 replies = typeof(replies)(mtx); 110 } 111 112 @disable this(this); 113 114 void shutdown() @safe nothrow { 115 try { 116 synchronized (mtx) { 117 open_ = false; 118 incoming.teardown((ref Msg a) { a.type = MsgType.init; }); 119 sysMsg.teardown((ref SystemMsg a) { a = SystemMsg.init; }); 120 delayed.teardown((ref DelayedMsg a) { a.msg.type = MsgType.init; }); 121 replies.teardown((ref Reply a) { a.data = a.data.type.init; }); 122 } 123 } catch (Exception e) { 124 assert(0, "this should never happen"); 125 } 126 } 127 128 package bool put(T)(T msg) { 129 synchronized (mtx) { 130 if (!open_) 131 return false; 132 133 static if (is(T : Msg)) 134 return incoming.put(msg); 135 else static if (is(T : SystemMsg)) 136 return sysMsg.put(msg); 137 else static if (is(T : DelayedMsg)) 138 return delayed.put(msg); 139 else static if (is(T : Reply)) 140 return replies.put(msg); 141 else 142 static assert(0, "msg type not supported " ~ T.stringof); 143 } 144 } 145 146 package auto pop(T)() @safe { 147 synchronized (mtx) { 148 static if (is(T : Msg)) { 149 if (!open_) 150 return incoming.PopReturnType.init; 151 return incoming.pop; 152 } else static if (is(T : SystemMsg)) { 153 if (!open_) 154 return sysMsg.PopReturnType.init; 155 return sysMsg.pop; 156 } else static if (is(T : DelayedMsg)) { 157 if (!open_) 158 return delayed.PopReturnType.init; 159 return delayed.pop; 160 } else static if (is(T : Reply)) { 161 if (!open_) 162 return replies.PopReturnType.init; 163 return replies.pop; 164 } else { 165 static assert(0, "msg type not supported " ~ T.stringof); 166 } 167 } 168 } 169 170 package bool empty(T)() @safe { 171 synchronized (mtx) { 172 if (!open_) 173 return true; 174 175 static if (is(T : Msg)) 176 return incoming.empty; 177 else static if (is(T : SystemMsg)) 178 return sysMsg.empty; 179 else static if (is(T : DelayedMsg)) 180 return delayed.empty; 181 else static if (is(T : Reply)) 182 return replies.empty; 183 else 184 static assert(0, "msg type not supported " ~ T.stringof); 185 } 186 } 187 188 package bool hasMessage() @safe pure nothrow const @nogc { 189 try { 190 synchronized (mtx) { 191 return !(incoming.empty && sysMsg.empty && delayed.empty && replies.empty); 192 } 193 } catch (Exception e) { 194 } 195 return false; 196 } 197 198 package void setOpen() @safe pure nothrow @nogc { 199 open_ = true; 200 } 201 202 package void setClosed() @safe pure nothrow @nogc { 203 open_ = false; 204 } 205 } 206 207 struct WeakAddress { 208 private Address* addr; 209 210 StrongAddress lock() @safe nothrow @nogc { 211 return StrongAddress(addr); 212 } 213 214 T opCast(T : bool)() @safe nothrow const @nogc { 215 return cast(bool) addr; 216 } 217 218 bool empty() @safe nothrow const @nogc { 219 return addr is null; 220 } 221 222 void opAssign(WeakAddress rhs) @safe nothrow @nogc { 223 this.addr = rhs.addr; 224 } 225 226 size_t toHash() @safe pure nothrow const @nogc scope { 227 return cast(size_t) addr; 228 } 229 230 void release() @safe nothrow @nogc { 231 addr = null; 232 } 233 } 234 235 /** Messages can be sent to a strong address. 236 */ 237 struct StrongAddress { 238 package { 239 Address* addr; 240 } 241 242 private this(Address* addr) @safe nothrow @nogc { 243 this.addr = addr; 244 } 245 246 void release() @safe nothrow @nogc { 247 addr = null; 248 } 249 250 ulong id() @safe pure nothrow const @nogc { 251 return cast(ulong) addr; 252 } 253 254 size_t toHash() @safe pure nothrow const @nogc scope { 255 return cast(size_t) addr; 256 } 257 258 void opAssign(StrongAddress rhs) @safe nothrow @nogc { 259 this.addr = rhs.addr; 260 } 261 262 T opCast(T : bool)() @safe nothrow const @nogc { 263 return cast(bool) addr; 264 } 265 266 bool empty() @safe pure nothrow const @nogc { 267 return addr is null; 268 } 269 270 WeakAddress weakRef() @safe nothrow { 271 return WeakAddress(addr); 272 } 273 274 package Address* get() @safe pure nothrow @nogc return { 275 return addr; 276 } 277 } 278 279 StrongAddress makeAddress2() @safe { 280 return StrongAddress(new Address(new Mutex)); 281 }