1 /**
2  * IO related functions
3  */
4 
5 module unit_threaded.io;
6 
7 import unit_threaded.from;
8 
9 /**
10  * Write if debug output was enabled.
11  */
12 void writelnUt(T...)(auto ref T args) {
13     import unit_threaded.testcase: TestCase;
14     if(isDebugOutputEnabled)
15         TestCase.currentTest.getWriter.writeln(args);
16 }
17 
18 
19 unittest {
20     import unit_threaded.testcase: TestCase;
21     import unit_threaded.should;
22     import std..string: splitLines;
23 
24     enableDebugOutput(false);
25     class TestOutput: Output {
26         string output;
27         override void send(in string output) {
28             import std.conv: text;
29             this.output ~= output;
30         }
31 
32         override void flush() {}
33     }
34 
35     class PrintTest: TestCase {
36         override void test() {
37             writelnUt("foo", "bar");
38         }
39         override string getPath() @safe pure nothrow const {
40             return "PrintTest";
41         }
42     }
43 
44     auto test = new PrintTest;
45     auto writer = new TestOutput;
46     test.setOutput(writer);
47     test();
48 
49     writer.output.splitLines.shouldEqual(
50         [
51             "PrintTest:",
52         ]
53     );
54 }
55 
56 unittest {
57     import unit_threaded.should;
58     import unit_threaded.testcase: TestCase;
59     import unit_threaded.reflection: TestData;
60     import unit_threaded.factory: createTestCase;
61     import std.traits: fullyQualifiedName;
62     import std..string: splitLines;
63 
64     enableDebugOutput;
65     scope(exit) enableDebugOutput(false);
66 
67     class TestOutput: Output {
68         string output;
69         override void send(in string output) {
70             import std.conv: text;
71             this.output ~= output;
72         }
73 
74         override void flush() {}
75     }
76 
77     class PrintTest: TestCase {
78         override void test() {
79             writelnUt("foo", "bar");
80         }
81         override string getPath() @safe pure nothrow const {
82             return "PrintTest";
83         }
84     }
85 
86     auto test = new PrintTest;
87     auto writer = new TestOutput;
88     test.setOutput(writer);
89     test();
90 
91     writer.output.splitLines.shouldEqual(
92         [
93             "PrintTest:",
94             "foobar",
95         ]
96     );
97 }
98 
99 private shared(bool) _debugOutput = false; ///print debug msgs?
100 private shared(bool) _forceEscCodes = false; ///use ANSI escape codes anyway?
101 bool _useEscCodes;
102 enum _escCodes = ["\033[31;1m", "\033[32;1m", "\033[33;1m", "\033[0;;m"];
103 
104 
105 static this() {
106     version (Posix) {
107         import std.stdio: stdout;
108         import core.sys.posix.unistd: isatty;
109         _useEscCodes = _forceEscCodes || isatty(stdout.fileno()) != 0;
110     }
111 }
112 
113 
114 package void enableDebugOutput(bool value = true) nothrow {
115     synchronized {
116         _debugOutput = value;
117     }
118 }
119 
120 package bool isDebugOutputEnabled() nothrow @trusted {
121     synchronized {
122         return _debugOutput;
123     }
124 }
125 
126 package void forceEscCodes() nothrow {
127     synchronized {
128         _forceEscCodes = true;
129     }
130 }
131 
132 interface Output {
133     void send(in string output) @safe;
134     void flush() @safe;
135 }
136 
137 private enum Colour {
138     red,
139     green,
140     yellow,
141     cancel,
142 }
143 
144 private string colour(alias C)(in string msg) {
145     return escCode(C) ~ msg ~ escCode(Colour.cancel);
146 }
147 
148 private alias green = colour!(Colour.green);
149 private alias red = colour!(Colour.red);
150 private alias yellow = colour!(Colour.yellow);
151 
152 /**
153  * Send escape code to the console
154  */
155 private string escCode(in Colour code) @safe {
156     return _useEscCodes ? _escCodes[code] : "";
157 }
158 
159 
160 /**
161  * Writes the args in a thread-safe manner.
162  */
163 void write(T...)(Output output, auto ref T args) {
164     import std.conv: text;
165     output.send(text(args));
166 }
167 
168 /**
169  * Writes the args in a thread-safe manner and appends a newline.
170  */
171 void writeln(T...)(Output output, auto ref T args) {
172     write(output, args, "\n");
173 }
174 
175 /**
176  * Writes the args in a thread-safe manner in green (POSIX only).
177  * and appends a newline.
178  */
179 void writelnGreen(T...)(Output output, auto ref T args) {
180     import std.conv: text;
181     output.send(green(text(args) ~ "\n"));
182 }
183 
184 /**
185  * Writes the args in a thread-safe manner in red (POSIX only)
186  * and appends a newline.
187  */
188 void writelnRed(T...)(Output output, auto ref T args) {
189     writeRed(output, args, "\n");
190 }
191 
192 /**
193  * Writes the args in a thread-safe manner in red (POSIX only).
194  * and appends a newline.
195  */
196 void writeRed(T...)(Output output, auto ref T args) {
197     import std.conv: text;
198     output.send(red(text(args)));
199 }
200 
201 /**
202  * Writes the args in a thread-safe manner in yellow (POSIX only).
203  * and appends a newline.
204  */
205 void writeYellow(T...)(Output output, auto ref T args) {
206     import std.conv: text;
207     output.send(yellow(text(args)));
208 }
209 
210 /**
211  * Thread to output to stdout
212  */
213 class WriterThread: Output {
214 
215     import std.concurrency: Tid;
216 
217 
218     /**
219      * Returns a reference to the only instance of this class.
220      */
221     static WriterThread get() @trusted {
222         import std.concurrency: initOnce;
223         static __gshared WriterThread instance;
224         return initOnce!instance(new WriterThread);
225     }
226 
227     override void send(in string output) @safe {
228 
229         version(unitUnthreaded) {
230             import std.stdio: write;
231             write(output);
232         } else {
233             import std.concurrency: send, thisTid;
234             () @trusted { _tid.send(output, thisTid); }();
235         }
236     }
237 
238     override void flush() @safe {
239         version(unitUnthreaded) {}
240         else {
241             import std.concurrency: send, thisTid;
242             () @trusted { _tid.send(Flush(), thisTid); }();
243         }
244     }
245 
246 
247 private:
248 
249     this() {
250         version(unitUnthreaded) {}
251         else {
252             import std.concurrency: spawn, thisTid, receiveOnly, send;
253             import std.stdio: stdout, stderr;
254             _tid = spawn(&threadWriter!(stdout, stderr), thisTid);
255             _tid.send(ThreadWait());
256             receiveOnly!ThreadStarted;
257         }
258     }
259 
260 
261     Tid _tid;
262 }
263 
264 
265 private struct ThreadWait{};
266 private struct ThreadFinish{};
267 private struct ThreadStarted{};
268 private struct ThreadEnded{};
269 private struct Flush{};
270 
271 version (Posix) {
272     enum nullFileName = "/dev/null";
273 } else {
274     enum nullFileName = "NUL";
275 }
276 
277 
278 private void threadWriter(alias OUT, alias ERR)(from!"std.concurrency".Tid tid)
279 {
280     import std.concurrency: receive, send, OwnerTerminated, Tid;
281 
282     auto done = false;
283 
284     auto saveStdout = OUT;
285     auto saveStderr = ERR;
286 
287     void restore() {
288         saveStdout.flush();
289         OUT = saveStdout;
290         ERR = saveStderr;
291     }
292 
293     scope (failure) restore;
294 
295     if (!isDebugOutputEnabled()) {
296         OUT = typeof(OUT)(nullFileName, "w");
297         ERR = typeof(ERR)(nullFileName, "w");
298     }
299 
300     void actuallyPrint(in string msg) {
301         if(msg.length) saveStdout.write(msg);
302     }
303 
304     // the first thread to send output becomes the current
305     // until that thread sends a Flush message no other thread
306     // can print to stdout, so we store their outputs in the meanwhile
307     static struct ThreadOutput {
308         string currentOutput;
309         string[] outputs;
310 
311         void store(in string msg) {
312             currentOutput ~= msg;
313         }
314 
315         void flush() {
316             outputs ~= currentOutput;
317             currentOutput = "";
318         }
319     }
320     ThreadOutput[Tid] outputs;
321 
322     Tid currentTid;
323 
324     while (!done) {
325         receive(
326             (string msg, Tid originTid) {
327 
328                 if(currentTid == currentTid.init) {
329                     currentTid = originTid;
330 
331                     // it could be that this thread became the current thread but had output not yet printed
332                     if(originTid in outputs) {
333                         actuallyPrint(outputs[originTid].currentOutput);
334                         outputs[originTid].currentOutput = "";
335                     }
336                 }
337 
338                 if(currentTid == originTid)
339                     actuallyPrint(msg);
340                 else {
341                     if(originTid !in outputs) outputs[originTid] = typeof(outputs[originTid]).init;
342                     outputs[originTid].store(msg);
343                 }
344             },
345             (ThreadWait w) {
346                 tid.send(ThreadStarted());
347             },
348             (ThreadFinish f) {
349                 done = true;
350             },
351             (Flush f, Tid originTid) {
352 
353                 if(originTid in outputs) outputs[originTid].flush;
354 
355                 if(currentTid != currentTid.init && currentTid != originTid)
356                     return;
357 
358                 foreach(tid, ref threadOutput; outputs) {
359                     foreach(o; threadOutput.outputs)
360                         actuallyPrint(o);
361                     threadOutput.outputs = [];
362                 }
363 
364                 currentTid = currentTid.init;
365             },
366             (OwnerTerminated trm) {
367                 done = true;
368             }
369         );
370     }
371 
372     restore;
373     tid.send(ThreadEnded());
374 }
375 
376 version(testing_unit_threaded) {
377     struct FakeFile {
378         string fileName;
379         string mode;
380         string output;
381         void flush() shared {}
382         void write(in string s) shared {
383             output ~= s.dup;
384         }
385         string[] lines() shared const @safe pure {
386             import std..string: splitLines;
387             return output.splitLines;
388         }
389     }
390     shared FakeFile gOut;
391     shared FakeFile gErr;
392     void resetFakeFiles() {
393         synchronized {
394             gOut = FakeFile("out", "mode");
395             gErr = FakeFile("err", "mode");
396         }
397     }
398 
399     unittest {
400         import std.concurrency: spawn, thisTid, send, receiveOnly;
401         import unit_threaded.should;
402 
403         enableDebugOutput(false);
404         resetFakeFiles;
405 
406         auto tid = spawn(&threadWriter!(gOut, gErr), thisTid);
407         tid.send(ThreadWait());
408         receiveOnly!ThreadStarted;
409 
410         gOut.shouldEqual(shared FakeFile(nullFileName, "w"));
411         gErr.shouldEqual(shared FakeFile(nullFileName, "w"));
412 
413         tid.send(ThreadFinish());
414         receiveOnly!ThreadEnded;
415     }
416 
417     unittest {
418         import std.concurrency: spawn, send, thisTid, receiveOnly;
419         import unit_threaded.should;
420 
421         enableDebugOutput(true);
422         scope(exit) enableDebugOutput(false);
423         resetFakeFiles;
424 
425         auto tid = spawn(&threadWriter!(gOut, gErr), thisTid);
426         tid.send(ThreadWait());
427         receiveOnly!ThreadStarted;
428 
429         gOut.shouldEqual(shared FakeFile("out", "mode"));
430         gErr.shouldEqual(shared FakeFile("err", "mode"));
431 
432         tid.send(ThreadFinish());
433         receiveOnly!ThreadEnded;
434     }
435 
436     unittest {
437         import std.concurrency: spawn, thisTid, send, receiveOnly;
438         import unit_threaded.should;
439 
440         resetFakeFiles;
441 
442         auto tid = spawn(&threadWriter!(gOut, gErr), thisTid);
443         tid.send(ThreadWait());
444         receiveOnly!ThreadStarted;
445 
446         tid.send("foobar\n", thisTid);
447         tid.send("toto\n", thisTid);
448         gOut.output.shouldBeEmpty; // since it writes to the old gOut
449 
450         tid.send(ThreadFinish());
451         receiveOnly!ThreadEnded;
452 
453         // gOut is restored so the output should be here
454         gOut.lines.shouldEqual(
455             [
456                 "foobar",
457                 "toto",
458                 ]
459             );
460     }
461 
462     unittest {
463         import std.concurrency: spawn, thisTid, send, receiveOnly, Tid;
464         import unit_threaded.should;
465 
466         resetFakeFiles;
467 
468         auto writerTid = spawn(&threadWriter!(gOut, gErr), thisTid);
469         writerTid.send(ThreadWait());
470         receiveOnly!ThreadStarted;
471 
472         writerTid.send("foobar\n", thisTid);
473         auto otherTid = spawn(
474             (Tid writerTid, Tid testTid) {
475                 import std.concurrency: send, receiveOnly, OwnerTerminated, thisTid;
476                 try {
477                     writerTid.send("what about me?\n", thisTid);
478                     testTid.send(true);
479                     receiveOnly!bool;
480 
481                     writerTid.send("seriously, what about me?\n", thisTid);
482                     testTid.send(true);
483                     receiveOnly!bool;
484 
485                     writerTid.send(Flush(), thisTid);
486                     testTid.send(true);
487                     receiveOnly!bool;
488 
489                     writerTid.send("final attempt\n", thisTid);
490                     testTid.send(true);
491 
492                 } catch(OwnerTerminated ex) {}
493             },
494             writerTid,
495             thisTid);
496         receiveOnly!bool; //wait for otherThread 1st message
497 
498         writerTid.send("toto\n", thisTid);
499         otherTid.send(true); //tell otherThread to continue
500         receiveOnly!bool; //wait for otherThread 2nd message
501 
502         writerTid.send("last one from me\n", thisTid);
503         otherTid.send(true); // tell otherThread to continue
504         receiveOnly!bool; // wait for otherThread to try and flush (won't work)
505 
506         writerTid.send(Flush(), thisTid); //finish with our output
507         otherTid.send(true); //finish
508         receiveOnly!bool; // wait for otherThread to finish
509 
510         writerTid.send(ThreadFinish());
511         receiveOnly!ThreadEnded;
512 
513         // gOut is restored so the output should be here
514         // the output should also be serialised despite
515         // sending messages from two threads
516         gOut.lines.shouldEqual(
517             [
518                 "foobar",
519                 "toto",
520                 "last one from me",
521                 "what about me?",
522                 "seriously, what about me?",
523                 "final attempt",
524             ]
525         );
526     }
527 
528     unittest {
529         import std.concurrency: spawn, thisTid, send, receiveOnly, Tid;
530         import unit_threaded.should;
531 
532         resetFakeFiles;
533 
534         auto writerTid = spawn(&threadWriter!(gOut, gErr), thisTid);
535         writerTid.send(ThreadWait());
536         receiveOnly!ThreadStarted;
537 
538         writerTid.send("foo\n", thisTid);
539 
540         auto otherTid = spawn(
541             (Tid writerTid, Tid testTid) {
542                 writerTid.send("bar\n", thisTid);
543                 testTid.send(true); // synchronize with test tid
544             },
545             writerTid,
546             thisTid);
547 
548         receiveOnly!bool; //wait for spawned thread to do its thing
549 
550         // from now on, we've send "foo\n" but not flushed
551         // and the other tid has send "bar\n" and flushed
552 
553         writerTid.send(Flush(), thisTid);
554 
555         writerTid.send(ThreadFinish());
556         receiveOnly!ThreadEnded;
557 
558         gOut.lines.shouldEqual(
559             [
560                 "foo",
561             ]
562         );
563     }
564 
565     unittest {
566         import std.concurrency: spawn, thisTid, send, receiveOnly, Tid;
567         import unit_threaded.should;
568 
569         resetFakeFiles;
570 
571         auto writerTid = spawn(&threadWriter!(gOut, gErr), thisTid);
572         writerTid.send(ThreadWait());
573         receiveOnly!ThreadStarted;
574 
575         writerTid.send("foo\n", thisTid);
576 
577         auto otherTid = spawn(
578             (Tid writerTid, Tid testTid) {
579                 writerTid.send("bar\n", thisTid);
580                 writerTid.send(Flush(), thisTid);
581                 writerTid.send("baz\n", thisTid);
582                 testTid.send(true); // synchronize with test tid
583             },
584             writerTid,
585             thisTid);
586 
587         receiveOnly!bool; //wait for spawned thread to do its thing
588 
589         // from now on, we've send "foo\n" but not flushed
590         // and the other tid has send "bar\n", flushed, then "baz\n"
591 
592         writerTid.send(Flush(), thisTid);
593 
594         writerTid.send(ThreadFinish());
595         receiveOnly!ThreadEnded;
596 
597         gOut.lines.shouldEqual(
598             [
599                 "foo",
600                 "bar",
601             ]
602         );
603     }
604 
605     unittest {
606         import std.concurrency: spawn, thisTid, send, receiveOnly, Tid;
607         import unit_threaded.should;
608 
609         resetFakeFiles;
610 
611         auto writerTid = spawn(&threadWriter!(gOut, gErr), thisTid);
612         writerTid.send(ThreadWait());
613         receiveOnly!ThreadStarted;
614 
615         writerTid.send("foo\n", thisTid);
616 
617         auto otherTid = spawn(
618             (Tid writerTid, Tid testTid) {
619                 writerTid.send("bar\n", thisTid);
620                 testTid.send(true); // synchronize with test tid
621                 receiveOnly!bool; // wait for test thread to flush and give up being the primary thread
622                 writerTid.send("baz\n", thisTid);
623                 writerTid.send(Flush(), thisTid);
624                 testTid.send(true);
625             },
626             writerTid,
627             thisTid);
628 
629         receiveOnly!bool; //wait for spawned thread to do its thing
630 
631         // from now on, we've send "foo\n" but not flushed
632         // and the other tid has send "bar\n" and flushed
633 
634         writerTid.send(Flush(), thisTid);
635 
636         otherTid.send(true); // tell it to continue
637         receiveOnly!bool;
638 
639         // now the other thread should be the main thread and prints out its partial output ("bar")
640         // and what it sent afterwards in order
641 
642         writerTid.send(ThreadFinish());
643         receiveOnly!ThreadEnded;
644 
645         gOut.lines.shouldEqual(
646             [
647                 "foo",
648                 "bar",
649                 "baz",
650             ]
651         );
652     }
653 
654     unittest {
655         import std.concurrency: spawn, thisTid, send, receiveOnly;
656         import std.range: iota;
657         import std.parallelism: parallel;
658         import std.algorithm: map, canFind;
659         import std.array: array;
660         import std.conv: text;
661         import unit_threaded.should;
662 
663         resetFakeFiles;
664 
665         auto writerTid = spawn(&threadWriter!(gOut, gErr), thisTid);
666         writerTid.send(ThreadWait());
667         receiveOnly!ThreadStarted;
668 
669         string textFor(int i, int j) {
670             return text("i_", i, "_j_", j);
671         }
672 
673         enum numThreads = 100;
674         enum numMessages = 5;
675 
676         foreach(i; numThreads.iota.parallel) {
677             foreach(j; 0 .. numMessages) {
678                 writerTid.send(textFor(i, j) ~ "\n", thisTid);
679             }
680             writerTid.send(Flush(), thisTid);
681         }
682 
683 
684         writerTid.send(ThreadFinish());
685         receiveOnly!ThreadEnded;
686 
687         foreach(i; 0 .. numThreads) {
688             const messages = numMessages.iota.map!(j => textFor(i, j)).array;
689             if(!gOut.lines.canFind(messages))
690                 throw new Exception(text("Could not find ", messages, " in:\n", gOut.lines));
691         }
692     }
693 }