1 /** 2 Copyright: Copyright (c) 2021, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module my.actor.common; 7 8 import core.sync.mutex : Mutex; 9 10 /** Multiple producer, "single" consumer thread safe queue. 11 * 12 * The value may be `T.init` if multiple consumers try to pop a value at the same time. 13 */ 14 struct Queue(RawT) { 15 import std.container.dlist : DList; 16 17 static if (is(RawT == U*, U)) { 18 alias T = U; 19 enum AsPointer = true; 20 } else { 21 alias T = RawT; 22 enum AsPointer = false; 23 } 24 25 static struct Item(TT) { 26 private TT ptr; 27 28 ~this() @trusted { 29 if (ptr !is null) { 30 *ptr = T.init; 31 ptr = null; 32 } 33 } 34 35 ref T get() { 36 return *ptr; 37 } 38 39 // Move the value. It is now up to the user to ensure it is destructed. 40 TT unsafeMove() { 41 auto tmp = ptr; 42 ptr = null; 43 return tmp; 44 } 45 } 46 47 private { 48 Mutex mtx; 49 DList!(T*) data; 50 bool open; 51 size_t length_; 52 } 53 54 invariant { 55 assert(mtx !is null); 56 } 57 58 @disable this(this); 59 60 this(Mutex mtx) 61 in (mtx !is null) { 62 this.mtx = mtx; 63 this.open = true; 64 } 65 66 static if (AsPointer) { 67 bool put(T* a) @trusted { 68 synchronized (mtx) { 69 if (open) { 70 data.insertBack(a); 71 length_++; 72 } 73 return open; 74 } 75 } 76 } else { 77 bool put(T a) @trusted { 78 synchronized (mtx) { 79 if (open) { 80 auto obj = new T; 81 *obj = a; 82 data.insertBack(obj); 83 length_++; 84 } 85 return open; 86 } 87 } 88 } 89 90 alias PopReturnType = Item!(T*); 91 92 Item!(T*) pop() @trusted scope { 93 synchronized (mtx) { 94 if (!empty) { 95 auto tmp = data.front; 96 data.removeFront; 97 length_--; 98 return typeof(return)(tmp); 99 } 100 } 101 102 return typeof(return).init; 103 } 104 105 bool empty() @trusted pure const @nogc { 106 synchronized (mtx) { 107 return data.empty; 108 } 109 } 110 111 size_t length() @trusted pure const @nogc { 112 synchronized (mtx) { 113 return length_; 114 } 115 } 116 117 /// clear the queue and permanently shut it down by rejecting put messages. 118 void teardown(void delegate(ref T) deinit) @trusted { 119 synchronized (mtx) { 120 foreach (ref a; cast() data) 121 deinit(*a); 122 open = false; 123 data.clear; 124 length_ = 0; 125 } 126 } 127 } 128 129 /** Errors that occur in the actor system. 130 * 131 * Attribution: C++ Actor Framework. 132 * The framework is well developed and has gathered a lot of experience 133 * throughout the years. The error enum is one of many indications of this 134 * fact. The enum `Error` here is a copy of those suitable for a local actor 135 * system. 136 */ 137 enum SystemError : ubyte { 138 // no error 139 none, 140 /// Indicates that an actor dropped an unexpected message. 141 unexpectedMessage, 142 /// Indicates that a response message did not match the provided handler. 143 unexpectedResponse, 144 /// Indicates that the receiver of a request is no longer alive. 145 requestReceiverDown, 146 /// Indicates that a request message timed out. 147 requestTimeout, 148 /// An exception was thrown during message handling. 149 runtimeError, 150 } 151 152 /** A special kind of error codes are exit reasons of actors. These errors are 153 * usually fail states set by the actor system itself. The two exceptions are 154 * exit_reason::user_shutdown and exit_reason::kill. The former is used to 155 * signalize orderly, user-requested shutdown and can be used by programmers in 156 * the same way. The latter terminates an actor unconditionally when used in 157 * send_exit, even for actors that override the default handler (see Exit 158 * Handler). 159 */ 160 161 /// This error category represents fail conditions for actors. 162 enum ExitReason : ubyte { 163 /// Indicates that an actor finished execution without error. 164 normal, 165 /// Indicates that an actor died because of an unhandled exception. 166 unhandledException, 167 /// Indicates that the exit reason for this actor is unknown, i.e., 168 /// the actor has been terminated and no longer exists. 169 unknown, 170 /// Indicates that an actor was forced to shutdown by a user-generated event. 171 userShutdown, 172 /// Indicates that an actor was killed unconditionally. 173 kill, 174 } 175 176 ulong makeSignature(Types...)() @safe { 177 import std.traits : Unqual; 178 179 ulong rval; 180 static foreach (T; Types) { 181 rval += typeid(Unqual!T).toHash; 182 } 183 return rval; 184 }