1 /** 2 * IO related functions 3 */ 4 5 module unit_threaded.io; 6 7 import unit_threaded.from; 8 9 /** 10 * Write if debug output was enabled. 11 */ 12 void writelnUt(T...)(auto ref T args) { 13 import unit_threaded.testcase: TestCase; 14 if(isDebugOutputEnabled) 15 TestCase.currentTest.getWriter.writeln(args); 16 } 17 18 19 unittest { 20 import unit_threaded.testcase: TestCase; 21 import unit_threaded.should; 22 import std..string: splitLines; 23 24 enableDebugOutput(false); 25 class TestOutput: Output { 26 string output; 27 override void send(in string output) { 28 import std.conv: text; 29 this.output ~= output; 30 } 31 32 override void flush() {} 33 } 34 35 class PrintTest: TestCase { 36 override void test() { 37 writelnUt("foo", "bar"); 38 } 39 override string getPath() @safe pure nothrow const { 40 return "PrintTest"; 41 } 42 } 43 44 auto test = new PrintTest; 45 auto writer = new TestOutput; 46 test.setOutput(writer); 47 test(); 48 49 writer.output.splitLines.shouldEqual( 50 [ 51 "PrintTest:", 52 ] 53 ); 54 } 55 56 unittest { 57 import unit_threaded.should; 58 import unit_threaded.testcase: TestCase; 59 import unit_threaded.reflection: TestData; 60 import unit_threaded.factory: createTestCase; 61 import std.traits: fullyQualifiedName; 62 import std..string: splitLines; 63 64 enableDebugOutput; 65 scope(exit) enableDebugOutput(false); 66 67 class TestOutput: Output { 68 string output; 69 override void send(in string output) { 70 import std.conv: text; 71 this.output ~= output; 72 } 73 74 override void flush() {} 75 } 76 77 class PrintTest: TestCase { 78 override void test() { 79 writelnUt("foo", "bar"); 80 } 81 override string getPath() @safe pure nothrow const { 82 return "PrintTest"; 83 } 84 } 85 86 auto test = new PrintTest; 87 auto writer = new TestOutput; 88 test.setOutput(writer); 89 test(); 90 91 writer.output.splitLines.shouldEqual( 92 [ 93 "PrintTest:", 94 "foobar", 95 ] 96 ); 97 } 98 99 private shared(bool) _debugOutput = false; ///print debug msgs? 100 private shared(bool) _forceEscCodes = false; ///use ANSI escape codes anyway? 101 bool _useEscCodes; 102 enum _escCodes = ["\033[31;1m", "\033[32;1m", "\033[33;1m", "\033[0;;m"]; 103 104 105 static this() { 106 version (Posix) { 107 import std.stdio: stdout; 108 import core.sys.posix.unistd: isatty; 109 _useEscCodes = _forceEscCodes || isatty(stdout.fileno()) != 0; 110 } 111 } 112 113 114 package void enableDebugOutput(bool value = true) nothrow { 115 synchronized { 116 _debugOutput = value; 117 } 118 } 119 120 package bool isDebugOutputEnabled() nothrow @trusted { 121 synchronized { 122 return _debugOutput; 123 } 124 } 125 126 package void forceEscCodes() nothrow { 127 synchronized { 128 _forceEscCodes = true; 129 } 130 } 131 132 interface Output { 133 void send(in string output) @safe; 134 void flush() @safe; 135 } 136 137 private enum Colour { 138 red, 139 green, 140 yellow, 141 cancel, 142 } 143 144 private string colour(alias C)(in string msg) { 145 return escCode(C) ~ msg ~ escCode(Colour.cancel); 146 } 147 148 private alias green = colour!(Colour.green); 149 private alias red = colour!(Colour.red); 150 private alias yellow = colour!(Colour.yellow); 151 152 /** 153 * Send escape code to the console 154 */ 155 private string escCode(in Colour code) @safe { 156 return _useEscCodes ? _escCodes[code] : ""; 157 } 158 159 160 /** 161 * Writes the args in a thread-safe manner. 162 */ 163 void write(T...)(Output output, auto ref T args) { 164 import std.conv: text; 165 output.send(text(args)); 166 } 167 168 /** 169 * Writes the args in a thread-safe manner and appends a newline. 170 */ 171 void writeln(T...)(Output output, auto ref T args) { 172 write(output, args, "\n"); 173 } 174 175 /** 176 * Writes the args in a thread-safe manner in green (POSIX only). 177 * and appends a newline. 178 */ 179 void writelnGreen(T...)(Output output, auto ref T args) { 180 import std.conv: text; 181 output.send(green(text(args) ~ "\n")); 182 } 183 184 /** 185 * Writes the args in a thread-safe manner in red (POSIX only) 186 * and appends a newline. 187 */ 188 void writelnRed(T...)(Output output, auto ref T args) { 189 writeRed(output, args, "\n"); 190 } 191 192 /** 193 * Writes the args in a thread-safe manner in red (POSIX only). 194 * and appends a newline. 195 */ 196 void writeRed(T...)(Output output, auto ref T args) { 197 import std.conv: text; 198 output.send(red(text(args))); 199 } 200 201 /** 202 * Writes the args in a thread-safe manner in yellow (POSIX only). 203 * and appends a newline. 204 */ 205 void writeYellow(T...)(Output output, auto ref T args) { 206 import std.conv: text; 207 output.send(yellow(text(args))); 208 } 209 210 /** 211 * Thread to output to stdout 212 */ 213 class WriterThread: Output { 214 215 import std.concurrency: Tid; 216 217 218 /** 219 * Returns a reference to the only instance of this class. 220 */ 221 static WriterThread get() @trusted { 222 import std.concurrency: initOnce; 223 static __gshared WriterThread instance; 224 return initOnce!instance(new WriterThread); 225 } 226 227 override void send(in string output) @safe { 228 229 version(unitUnthreaded) { 230 import std.stdio: write; 231 write(output); 232 } else { 233 import std.concurrency: send, thisTid; 234 () @trusted { _tid.send(output, thisTid); }(); 235 } 236 } 237 238 override void flush() @safe { 239 version(unitUnthreaded) {} 240 else { 241 import std.concurrency: send, thisTid; 242 () @trusted { _tid.send(Flush(), thisTid); }(); 243 } 244 } 245 246 247 private: 248 249 this() { 250 version(unitUnthreaded) {} 251 else { 252 import std.concurrency: spawn, thisTid, receiveOnly, send; 253 import std.stdio: stdout, stderr; 254 _tid = spawn(&threadWriter!(stdout, stderr), thisTid); 255 _tid.send(ThreadWait()); 256 receiveOnly!ThreadStarted; 257 } 258 } 259 260 261 Tid _tid; 262 } 263 264 265 private struct ThreadWait{}; 266 private struct ThreadFinish{}; 267 private struct ThreadStarted{}; 268 private struct ThreadEnded{}; 269 private struct Flush{}; 270 271 version (Posix) { 272 enum nullFileName = "/dev/null"; 273 } else { 274 enum nullFileName = "NUL"; 275 } 276 277 278 private void threadWriter(alias OUT, alias ERR)(from!"std.concurrency".Tid tid) 279 { 280 import std.concurrency: receive, send, OwnerTerminated, Tid; 281 282 auto done = false; 283 284 auto saveStdout = OUT; 285 auto saveStderr = ERR; 286 287 void restore() { 288 saveStdout.flush(); 289 OUT = saveStdout; 290 ERR = saveStderr; 291 } 292 293 scope (failure) restore; 294 295 if (!isDebugOutputEnabled()) { 296 OUT = typeof(OUT)(nullFileName, "w"); 297 ERR = typeof(ERR)(nullFileName, "w"); 298 } 299 300 void actuallyPrint(in string msg) { 301 if(msg.length) saveStdout.write(msg); 302 } 303 304 // the first thread to send output becomes the current 305 // until that thread sends a Flush message no other thread 306 // can print to stdout, so we store their outputs in the meanwhile 307 static struct ThreadOutput { 308 string currentOutput; 309 string[] outputs; 310 311 void store(in string msg) { 312 currentOutput ~= msg; 313 } 314 315 void flush() { 316 outputs ~= currentOutput; 317 currentOutput = ""; 318 } 319 } 320 ThreadOutput[Tid] outputs; 321 322 Tid currentTid; 323 324 while (!done) { 325 receive( 326 (string msg, Tid originTid) { 327 328 if(currentTid == currentTid.init) { 329 currentTid = originTid; 330 331 // it could be that this thread became the current thread but had output not yet printed 332 if(originTid in outputs) { 333 actuallyPrint(outputs[originTid].currentOutput); 334 outputs[originTid].currentOutput = ""; 335 } 336 } 337 338 if(currentTid == originTid) 339 actuallyPrint(msg); 340 else { 341 if(originTid !in outputs) outputs[originTid] = typeof(outputs[originTid]).init; 342 outputs[originTid].store(msg); 343 } 344 }, 345 (ThreadWait w) { 346 tid.send(ThreadStarted()); 347 }, 348 (ThreadFinish f) { 349 done = true; 350 }, 351 (Flush f, Tid originTid) { 352 353 if(originTid in outputs) outputs[originTid].flush; 354 355 if(currentTid != currentTid.init && currentTid != originTid) 356 return; 357 358 foreach(tid, ref threadOutput; outputs) { 359 foreach(o; threadOutput.outputs) 360 actuallyPrint(o); 361 threadOutput.outputs = []; 362 } 363 364 currentTid = currentTid.init; 365 }, 366 (OwnerTerminated trm) { 367 done = true; 368 } 369 ); 370 } 371 372 restore; 373 tid.send(ThreadEnded()); 374 } 375 376 version(testing_unit_threaded) { 377 struct FakeFile { 378 string fileName; 379 string mode; 380 string output; 381 void flush() shared {} 382 void write(in string s) shared { 383 output ~= s.dup; 384 } 385 string[] lines() shared const @safe pure { 386 import std..string: splitLines; 387 return output.splitLines; 388 } 389 } 390 shared FakeFile gOut; 391 shared FakeFile gErr; 392 void resetFakeFiles() { 393 synchronized { 394 gOut = FakeFile("out", "mode"); 395 gErr = FakeFile("err", "mode"); 396 } 397 } 398 399 unittest { 400 import std.concurrency: spawn, thisTid, send, receiveOnly; 401 import unit_threaded.should; 402 403 enableDebugOutput(false); 404 resetFakeFiles; 405 406 auto tid = spawn(&threadWriter!(gOut, gErr), thisTid); 407 tid.send(ThreadWait()); 408 receiveOnly!ThreadStarted; 409 410 gOut.shouldEqual(shared FakeFile(nullFileName, "w")); 411 gErr.shouldEqual(shared FakeFile(nullFileName, "w")); 412 413 tid.send(ThreadFinish()); 414 receiveOnly!ThreadEnded; 415 } 416 417 unittest { 418 import std.concurrency: spawn, send, thisTid, receiveOnly; 419 import unit_threaded.should; 420 421 enableDebugOutput(true); 422 scope(exit) enableDebugOutput(false); 423 resetFakeFiles; 424 425 auto tid = spawn(&threadWriter!(gOut, gErr), thisTid); 426 tid.send(ThreadWait()); 427 receiveOnly!ThreadStarted; 428 429 gOut.shouldEqual(shared FakeFile("out", "mode")); 430 gErr.shouldEqual(shared FakeFile("err", "mode")); 431 432 tid.send(ThreadFinish()); 433 receiveOnly!ThreadEnded; 434 } 435 436 unittest { 437 import std.concurrency: spawn, thisTid, send, receiveOnly; 438 import unit_threaded.should; 439 440 resetFakeFiles; 441 442 auto tid = spawn(&threadWriter!(gOut, gErr), thisTid); 443 tid.send(ThreadWait()); 444 receiveOnly!ThreadStarted; 445 446 tid.send("foobar\n", thisTid); 447 tid.send("toto\n", thisTid); 448 gOut.output.shouldBeEmpty; // since it writes to the old gOut 449 450 tid.send(ThreadFinish()); 451 receiveOnly!ThreadEnded; 452 453 // gOut is restored so the output should be here 454 gOut.lines.shouldEqual( 455 [ 456 "foobar", 457 "toto", 458 ] 459 ); 460 } 461 462 unittest { 463 import std.concurrency: spawn, thisTid, send, receiveOnly, Tid; 464 import unit_threaded.should; 465 466 resetFakeFiles; 467 468 auto writerTid = spawn(&threadWriter!(gOut, gErr), thisTid); 469 writerTid.send(ThreadWait()); 470 receiveOnly!ThreadStarted; 471 472 writerTid.send("foobar\n", thisTid); 473 auto otherTid = spawn( 474 (Tid writerTid, Tid testTid) { 475 import std.concurrency: send, receiveOnly, OwnerTerminated, thisTid; 476 try { 477 writerTid.send("what about me?\n", thisTid); 478 testTid.send(true); 479 receiveOnly!bool; 480 481 writerTid.send("seriously, what about me?\n", thisTid); 482 testTid.send(true); 483 receiveOnly!bool; 484 485 writerTid.send(Flush(), thisTid); 486 testTid.send(true); 487 receiveOnly!bool; 488 489 writerTid.send("final attempt\n", thisTid); 490 testTid.send(true); 491 492 } catch(OwnerTerminated ex) {} 493 }, 494 writerTid, 495 thisTid); 496 receiveOnly!bool; //wait for otherThread 1st message 497 498 writerTid.send("toto\n", thisTid); 499 otherTid.send(true); //tell otherThread to continue 500 receiveOnly!bool; //wait for otherThread 2nd message 501 502 writerTid.send("last one from me\n", thisTid); 503 otherTid.send(true); // tell otherThread to continue 504 receiveOnly!bool; // wait for otherThread to try and flush (won't work) 505 506 writerTid.send(Flush(), thisTid); //finish with our output 507 otherTid.send(true); //finish 508 receiveOnly!bool; // wait for otherThread to finish 509 510 writerTid.send(ThreadFinish()); 511 receiveOnly!ThreadEnded; 512 513 // gOut is restored so the output should be here 514 // the output should also be serialised despite 515 // sending messages from two threads 516 gOut.lines.shouldEqual( 517 [ 518 "foobar", 519 "toto", 520 "last one from me", 521 "what about me?", 522 "seriously, what about me?", 523 "final attempt", 524 ] 525 ); 526 } 527 528 unittest { 529 import std.concurrency: spawn, thisTid, send, receiveOnly, Tid; 530 import unit_threaded.should; 531 532 resetFakeFiles; 533 534 auto writerTid = spawn(&threadWriter!(gOut, gErr), thisTid); 535 writerTid.send(ThreadWait()); 536 receiveOnly!ThreadStarted; 537 538 writerTid.send("foo\n", thisTid); 539 540 auto otherTid = spawn( 541 (Tid writerTid, Tid testTid) { 542 writerTid.send("bar\n", thisTid); 543 testTid.send(true); // synchronize with test tid 544 }, 545 writerTid, 546 thisTid); 547 548 receiveOnly!bool; //wait for spawned thread to do its thing 549 550 // from now on, we've send "foo\n" but not flushed 551 // and the other tid has send "bar\n" and flushed 552 553 writerTid.send(Flush(), thisTid); 554 555 writerTid.send(ThreadFinish()); 556 receiveOnly!ThreadEnded; 557 558 gOut.lines.shouldEqual( 559 [ 560 "foo", 561 ] 562 ); 563 } 564 565 unittest { 566 import std.concurrency: spawn, thisTid, send, receiveOnly, Tid; 567 import unit_threaded.should; 568 569 resetFakeFiles; 570 571 auto writerTid = spawn(&threadWriter!(gOut, gErr), thisTid); 572 writerTid.send(ThreadWait()); 573 receiveOnly!ThreadStarted; 574 575 writerTid.send("foo\n", thisTid); 576 577 auto otherTid = spawn( 578 (Tid writerTid, Tid testTid) { 579 writerTid.send("bar\n", thisTid); 580 writerTid.send(Flush(), thisTid); 581 writerTid.send("baz\n", thisTid); 582 testTid.send(true); // synchronize with test tid 583 }, 584 writerTid, 585 thisTid); 586 587 receiveOnly!bool; //wait for spawned thread to do its thing 588 589 // from now on, we've send "foo\n" but not flushed 590 // and the other tid has send "bar\n", flushed, then "baz\n" 591 592 writerTid.send(Flush(), thisTid); 593 594 writerTid.send(ThreadFinish()); 595 receiveOnly!ThreadEnded; 596 597 gOut.lines.shouldEqual( 598 [ 599 "foo", 600 "bar", 601 ] 602 ); 603 } 604 605 unittest { 606 import std.concurrency: spawn, thisTid, send, receiveOnly, Tid; 607 import unit_threaded.should; 608 609 resetFakeFiles; 610 611 auto writerTid = spawn(&threadWriter!(gOut, gErr), thisTid); 612 writerTid.send(ThreadWait()); 613 receiveOnly!ThreadStarted; 614 615 writerTid.send("foo\n", thisTid); 616 617 auto otherTid = spawn( 618 (Tid writerTid, Tid testTid) { 619 writerTid.send("bar\n", thisTid); 620 testTid.send(true); // synchronize with test tid 621 receiveOnly!bool; // wait for test thread to flush and give up being the primary thread 622 writerTid.send("baz\n", thisTid); 623 writerTid.send(Flush(), thisTid); 624 testTid.send(true); 625 }, 626 writerTid, 627 thisTid); 628 629 receiveOnly!bool; //wait for spawned thread to do its thing 630 631 // from now on, we've send "foo\n" but not flushed 632 // and the other tid has send "bar\n" and flushed 633 634 writerTid.send(Flush(), thisTid); 635 636 otherTid.send(true); // tell it to continue 637 receiveOnly!bool; 638 639 // now the other thread should be the main thread and prints out its partial output ("bar") 640 // and what it sent afterwards in order 641 642 writerTid.send(ThreadFinish()); 643 receiveOnly!ThreadEnded; 644 645 gOut.lines.shouldEqual( 646 [ 647 "foo", 648 "bar", 649 "baz", 650 ] 651 ); 652 } 653 654 unittest { 655 import std.concurrency: spawn, thisTid, send, receiveOnly; 656 import std.range: iota; 657 import std.parallelism: parallel; 658 import std.algorithm: map, canFind; 659 import std.array: array; 660 import std.conv: text; 661 import unit_threaded.should; 662 663 resetFakeFiles; 664 665 auto writerTid = spawn(&threadWriter!(gOut, gErr), thisTid); 666 writerTid.send(ThreadWait()); 667 receiveOnly!ThreadStarted; 668 669 string textFor(int i, int j) { 670 return text("i_", i, "_j_", j); 671 } 672 673 enum numThreads = 100; 674 enum numMessages = 5; 675 676 foreach(i; numThreads.iota.parallel) { 677 foreach(j; 0 .. numMessages) { 678 writerTid.send(textFor(i, j) ~ "\n", thisTid); 679 } 680 writerTid.send(Flush(), thisTid); 681 } 682 683 684 writerTid.send(ThreadFinish()); 685 receiveOnly!ThreadEnded; 686 687 foreach(i; 0 .. numThreads) { 688 const messages = numMessages.iota.map!(j => textFor(i, j)).array; 689 if(!gOut.lines.canFind(messages)) 690 throw new Exception(text("Could not find ", messages, " in:\n", gOut.lines)); 691 } 692 } 693 }