1 /** 2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module proc; 7 8 import core.thread : Thread; 9 import core.time : dur, Duration; 10 import logger = std.experimental.logger; 11 import std.algorithm : filter, count, joiner, map; 12 import std.array : appender, empty, array; 13 import std.exception : collectException; 14 import std.stdio : File, fileno, writeln; 15 import std.typecons : Flag, Yes; 16 static import std.process; 17 static import std.stdio; 18 19 import my.gc.refc; 20 21 public import proc.channel; 22 public import proc.pid; 23 24 version (unittest) { 25 import std.file : remove; 26 } 27 28 /** Manage a process by reference counting so that it is terminated when the it 29 * stops being used such as the instance going out of scope. 30 */ 31 auto rcKill(T)(T p) { 32 return refCounted(ScopeKill!T(p)); 33 } 34 35 // backward compatibility. 36 alias scopeKill = rcKill; 37 38 struct ScopeKill(T) { 39 T process; 40 alias process this; 41 private bool hasProcess; 42 43 this(T process) @safe { 44 this.process = process; 45 this.hasProcess = true; 46 } 47 48 ~this() @safe { 49 if (hasProcess) 50 process.dispose(); 51 } 52 } 53 54 /// Async process wrapper for a std.process SpawnProcess 55 struct SpawnProcess { 56 import std.algorithm : among; 57 58 private { 59 enum State { 60 running, 61 terminated, 62 exitCode 63 } 64 65 std.process.Pid process; 66 RawPid pid; 67 int status_; 68 State st; 69 } 70 71 this(std.process.Pid process) @safe { 72 this.process = process; 73 this.pid = process.osHandle.RawPid; 74 } 75 76 ~this() @safe { 77 } 78 79 /// Returns: The raw OS handle for the process ID. 80 RawPid osHandle() nothrow @safe { 81 return pid; 82 } 83 84 /// Kill and cleanup the process. 85 void dispose() @safe { 86 final switch (st) { 87 case State.running: 88 this.kill; 89 this.wait; 90 break; 91 case State.terminated: 92 this.wait; 93 break; 94 case State.exitCode: 95 break; 96 } 97 98 st = State.exitCode; 99 } 100 101 /// Kill the process. 102 void kill() nothrow @trusted { 103 import core.sys.posix.signal : SIGKILL; 104 105 final switch (st) { 106 case State.running: 107 break; 108 case State.terminated: 109 goto case; 110 case State.exitCode: 111 return; 112 } 113 114 try { 115 std.process.kill(process, SIGKILL); 116 } catch (Exception e) { 117 } 118 119 st = State.terminated; 120 } 121 122 /// Blocking wait for the process to terminated. 123 /// Returns: the exit status. 124 int wait() @safe { 125 final switch (st) { 126 case State.running: 127 status_ = std.process.wait(process); 128 break; 129 case State.terminated: 130 status_ = std.process.wait(process); 131 break; 132 case State.exitCode: 133 break; 134 } 135 136 st = State.exitCode; 137 138 return status_; 139 } 140 141 /// Non-blocking wait for the process termination. 142 /// Returns: `true` if the process has terminated. 143 bool tryWait() @safe { 144 final switch (st) { 145 case State.running: 146 auto s = std.process.tryWait(process); 147 if (s.terminated) { 148 st = State.exitCode; 149 status_ = s.status; 150 } 151 break; 152 case State.terminated: 153 status_ = std.process.wait(process); 154 st = State.exitCode; 155 break; 156 case State.exitCode: 157 break; 158 } 159 160 return st.among(State.terminated, State.exitCode) != 0; 161 } 162 163 /// Returns: The exit status of the process. 164 int status() @safe { 165 if (st != State.exitCode) { 166 throw new Exception( 167 "Process has not terminated and wait/tryWait been called to collect the exit status"); 168 } 169 return status_; 170 } 171 172 /// Returns: If the process has terminated. 173 bool terminated() @safe { 174 return st.among(State.terminated, State.exitCode) != 0; 175 } 176 } 177 178 /// Async process that do not block on read from stdin/stderr. 179 struct PipeProcess { 180 import std.algorithm : among; 181 182 private { 183 enum State { 184 running, 185 terminated, 186 exitCode 187 } 188 189 std.process.ProcessPipes process; 190 Pipe pipe_; 191 FileReadChannel stderr_; 192 int status_; 193 State st; 194 RawPid pid; 195 } 196 197 this(std.process.ProcessPipes process) @safe { 198 this.process = process; 199 this.pipe_ = Pipe(this.process.stdout, this.process.stdin); 200 this.stderr_ = FileReadChannel(this.process.stderr); 201 this.pid = process.pid.osHandle.RawPid; 202 } 203 204 /// Returns: The raw OS handle for the process ID. 205 RawPid osHandle() nothrow @safe { 206 return this.pid; 207 } 208 209 /// Access to stdin and stdout. 210 ref Pipe pipe() return scope nothrow @safe { 211 return pipe_; 212 } 213 214 /// Access stderr. 215 ref FileReadChannel stderr() return scope nothrow @safe { 216 return stderr_; 217 } 218 219 /// Kill and cleanup the process. 220 void dispose() @safe { 221 final switch (st) { 222 case State.running: 223 this.kill; 224 this.wait; 225 .destroy(process); 226 break; 227 case State.terminated: 228 this.wait; 229 .destroy(process); 230 break; 231 case State.exitCode: 232 break; 233 } 234 235 st = State.exitCode; 236 } 237 238 /// Kill the process. 239 void kill() nothrow @trusted { 240 import core.sys.posix.signal : SIGKILL; 241 242 final switch (st) { 243 case State.running: 244 break; 245 case State.terminated: 246 return; 247 case State.exitCode: 248 return; 249 } 250 251 try { 252 std.process.kill(process.pid, SIGKILL); 253 } catch (Exception e) { 254 } 255 256 st = State.terminated; 257 } 258 259 /// Blocking wait for the process to terminated. 260 /// Returns: the exit status. 261 int wait() @safe { 262 final switch (st) { 263 case State.running: 264 status_ = std.process.wait(process.pid); 265 break; 266 case State.terminated: 267 status_ = std.process.wait(process.pid); 268 break; 269 case State.exitCode: 270 break; 271 } 272 273 st = State.exitCode; 274 275 return status_; 276 } 277 278 /// Non-blocking wait for the process termination. 279 /// Returns: `true` if the process has terminated. 280 bool tryWait() @safe { 281 final switch (st) { 282 case State.running: 283 auto s = std.process.tryWait(process.pid); 284 if (s.terminated) { 285 st = State.exitCode; 286 status_ = s.status; 287 } 288 break; 289 case State.terminated: 290 status_ = std.process.wait(process.pid); 291 st = State.exitCode; 292 break; 293 case State.exitCode: 294 break; 295 } 296 297 return st.among(State.terminated, State.exitCode) != 0; 298 } 299 300 /// Returns: The exit status of the process. 301 int status() @safe { 302 if (st != State.exitCode) { 303 throw new Exception( 304 "Process has not terminated and wait/tryWait been called to collect the exit status"); 305 } 306 return status_; 307 } 308 309 /// Returns: If the process has terminated. 310 bool terminated() @safe { 311 return st.among(State.terminated, State.exitCode) != 0; 312 } 313 } 314 315 SpawnProcess spawnProcess(scope const(char[])[] args, File stdin = std.stdio.stdin, 316 File stdout = std.stdio.stdout, File stderr = std.stdio.stderr, 317 const string[string] env = null, std.process.Config config = std.process.Config.none, 318 scope const char[] workDir = null) { 319 return SpawnProcess(std.process.spawnProcess(args, stdin, stdout, stderr, 320 env, config, workDir)); 321 } 322 323 SpawnProcess spawnProcess(scope const(char[])[] args, const string[string] env, 324 std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) { 325 return SpawnProcess(std.process.spawnProcess(args, std.stdio.stdin, 326 std.stdio.stdout, std.stdio.stderr, env, config, workDir)); 327 } 328 329 SpawnProcess spawnProcess(scope const(char)[] program, 330 File stdin = std.stdio.stdin, File stdout = std.stdio.stdout, 331 File stderr = std.stdio.stderr, const string[string] env = null, 332 std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) { 333 return SpawnProcess(std.process.spawnProcess((&program)[0 .. 1], stdin, 334 stdout, stderr, env, config, workDir)); 335 } 336 337 SpawnProcess spawnShell(scope const(char)[] command, File stdin = std.stdio.stdin, 338 File stdout = std.stdio.stdout, File stderr = std.stdio.stderr, 339 scope const string[string] env = null, std.process.Config config = std.process.Config.none, 340 scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) { 341 return SpawnProcess(std.process.spawnShell(command, stdin, stdout, stderr, 342 env, config, workDir, shellPath)); 343 } 344 345 /// ditto 346 SpawnProcess spawnShell(scope const(char)[] command, scope const string[string] env, 347 std.process.Config config = std.process.Config.none, 348 scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) { 349 return SpawnProcess(std.process.spawnShell(command, env, config, workDir, shellPath)); 350 } 351 352 PipeProcess pipeProcess(scope const(char[])[] args, 353 std.process.Redirect redirect = std.process.Redirect.all, 354 const string[string] env = null, std.process.Config config = std.process.Config.none, 355 scope const(char)[] workDir = null) @safe { 356 return PipeProcess(std.process.pipeProcess(args, redirect, env, config, workDir)); 357 } 358 359 PipeProcess pipeShell(scope const(char)[] command, 360 std.process.Redirect redirect = std.process.Redirect.all, 361 const string[string] env = null, std.process.Config config = std.process.Config.none, 362 scope const(char)[] workDir = null, string shellPath = std.process.nativeShell) @safe { 363 return PipeProcess(std.process.pipeShell(command, redirect, env, config, workDir, shellPath)); 364 } 365 366 /** Moves the process to a separate process group and on exit kill it and all 367 * its children. 368 */ 369 @safe struct Sandbox(ProcessT) { 370 private { 371 ProcessT p; 372 RawPid pid; 373 } 374 375 this(ProcessT p) @safe { 376 import core.sys.posix.unistd : setpgid; 377 378 this.p = p; 379 this.pid = p.osHandle; 380 setpgid(pid, 0); 381 } 382 383 RawPid osHandle() nothrow @safe { 384 return pid; 385 } 386 387 static if (__traits(hasMember, ProcessT, "pipe")) { 388 ref Pipe pipe() nothrow @safe { 389 return p.pipe; 390 } 391 } 392 393 static if (__traits(hasMember, ProcessT, "stderr")) { 394 ref FileReadChannel stderr() nothrow @safe { 395 return p.stderr; 396 } 397 } 398 399 void dispose() @safe { 400 // this also reaps the children thus cleaning up zombies 401 this.kill; 402 p.dispose; 403 } 404 405 void kill() nothrow @safe { 406 // must first retrieve the submap because after the process is killed 407 // its children may have changed. 408 auto pmap = makePidMap.getSubMap(pid); 409 410 p.kill; 411 412 // only kill and reap the children 413 pmap.remove(pid); 414 proc.pid.kill(pmap, Yes.onlyCurrentUser).reap; 415 } 416 417 int wait() @safe { 418 return p.wait; 419 } 420 421 bool tryWait() @safe { 422 return p.tryWait; 423 } 424 425 int status() @safe { 426 return p.status; 427 } 428 429 bool terminated() @safe { 430 return p.terminated; 431 } 432 } 433 434 auto sandbox(T)(T p) @safe { 435 return Sandbox!T(p); 436 } 437 438 @("shall terminate a group of processes") 439 unittest { 440 import std.datetime.stopwatch : StopWatch, AutoStart; 441 442 immutable scriptName = makeScript(`#!/bin/bash 443 sleep 10m & 444 sleep 10m & 445 sleep 10m 446 `); 447 scope (exit) 448 remove(scriptName); 449 450 auto p = pipeProcess([scriptName]).sandbox.rcKill; 451 waitUntilChildren(p.osHandle, 3); 452 const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 453 p.kill; 454 Thread.sleep(500.dur!"msecs"); // wait for the OS to kill the children 455 const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 456 457 assert(p.wait == -9); 458 assert(p.terminated); 459 assert(preChildren == 3); 460 assert(postChildren == 0); 461 } 462 463 /** dispose the process after the timeout. 464 */ 465 @safe struct Timeout(ProcessT) { 466 import std.algorithm : among; 467 import std.datetime : Clock, Duration; 468 import core.thread; 469 470 private { 471 enum Msg { 472 none, 473 stop, 474 status, 475 } 476 477 enum Reply { 478 none, 479 running, 480 normalDeath, 481 killedByTimeout, 482 } 483 484 static struct Payload { 485 ProcessT p; 486 RawPid pid; 487 Background background; 488 Reply backgroundReply; 489 } 490 491 RefCounted!Payload rc; 492 } 493 494 this(ProcessT p, Duration timeout) @trusted { 495 import std.algorithm : move; 496 497 auto pid = p.osHandle; 498 rc = refCounted(Payload(move(p), pid)); 499 rc.background = new Background(&rc.p, timeout); 500 rc.background.isDaemon = true; 501 rc.background.start; 502 } 503 504 ~this() @trusted { 505 rc.release; 506 } 507 508 private static class Background : Thread { 509 import core.sync.condition : Condition; 510 import core.sync.mutex : Mutex; 511 512 Duration timeout; 513 ProcessT* p; 514 Mutex mtx; 515 Msg[] msg; 516 Reply reply_; 517 RawPid pid; 518 519 this(ProcessT* p, Duration timeout) { 520 this.p = p; 521 this.timeout = timeout; 522 this.mtx = new Mutex(); 523 this.pid = p.osHandle; 524 525 super(&run); 526 } 527 528 void run() { 529 checkProcess(this.pid, this.timeout, this); 530 } 531 532 void put(Msg msg) @trusted nothrow { 533 this.mtx.lock_nothrow(); 534 scope (exit) 535 this.mtx.unlock_nothrow(); 536 this.msg ~= msg; 537 } 538 539 Msg popMsg() @trusted nothrow { 540 this.mtx.lock_nothrow(); 541 scope (exit) 542 this.mtx.unlock_nothrow(); 543 if (msg.empty) 544 return Msg.none; 545 auto rval = msg[$ - 1]; 546 msg = msg[0 .. $ - 1]; 547 return rval; 548 } 549 550 void setReply(Reply reply_) @trusted nothrow { 551 this.mtx.lock_nothrow(); 552 scope (exit) 553 this.mtx.unlock_nothrow(); 554 this.reply_ = reply_; 555 } 556 557 Reply reply() @trusted nothrow { 558 this.mtx.lock_nothrow(); 559 scope (exit) 560 this.mtx.unlock_nothrow(); 561 return reply_; 562 } 563 564 void kill() @trusted nothrow { 565 this.mtx.lock_nothrow(); 566 scope (exit) 567 this.mtx.unlock_nothrow(); 568 p.kill; 569 } 570 } 571 572 private static void checkProcess(RawPid p, Duration timeout, Background bg) nothrow { 573 import core.sys.posix.signal : SIGKILL; 574 import std.algorithm : max, min; 575 import std.variant : Variant; 576 static import core.sys.posix.signal; 577 578 const stopAt = Clock.currTime + timeout; 579 // the purpose is to poll the process often "enough" that if it 580 // terminates early `Process` detects it fast enough. 1000 is chosen 581 // because it "feels good". the purpose 582 auto sleepInterval = min(500, max(20, timeout.total!"msecs" / 1000)).dur!"msecs"; 583 584 bool forceStop; 585 bool running = true; 586 while (running && Clock.currTime < stopAt) { 587 const msg = bg.popMsg; 588 589 final switch (msg) { 590 case Msg.none: 591 () @trusted { Thread.sleep(sleepInterval); }(); 592 break; 593 case Msg.stop: 594 forceStop = true; 595 running = false; 596 break; 597 case Msg.status: 598 bg.setReply(Reply.running); 599 break; 600 } 601 602 () @trusted { 603 if (core.sys.posix.signal.kill(p, 0) == -1) { 604 running = false; 605 } 606 }(); 607 } 608 609 // may be children alive thus must ensure that the whole process tree 610 // is killed if this is a sandbox with a timeout. 611 bg.kill; 612 613 if (!forceStop && Clock.currTime >= stopAt) { 614 bg.setReply(Reply.killedByTimeout); 615 } else { 616 bg.setReply(Reply.normalDeath); 617 } 618 } 619 620 RawPid osHandle() nothrow @trusted { 621 return rc.pid; 622 } 623 624 ref Pipe pipe() nothrow @trusted { 625 return rc.p.pipe; 626 } 627 628 ref FileReadChannel stderr() nothrow @trusted { 629 return rc.p.stderr; 630 } 631 632 void dispose() @trusted { 633 if (rc.backgroundReply.among(Reply.none, Reply.running)) { 634 rc.background.put(Msg.stop); 635 rc.background.join; 636 rc.backgroundReply = rc.background.reply; 637 } 638 rc.p.dispose; 639 } 640 641 void kill() nothrow @trusted { 642 rc.background.kill; 643 } 644 645 int wait() @trusted { 646 while (!this.tryWait) { 647 Thread.sleep(20.dur!"msecs"); 648 } 649 return rc.p.wait; 650 } 651 652 bool tryWait() @trusted { 653 return rc.p.tryWait; 654 } 655 656 int status() @trusted { 657 return rc.p.status; 658 } 659 660 bool terminated() @trusted { 661 return rc.p.terminated; 662 } 663 664 bool timeoutTriggered() @trusted { 665 if (rc.backgroundReply.among(Reply.none, Reply.running)) { 666 rc.background.put(Msg.status); 667 rc.backgroundReply = rc.background.reply; 668 } 669 return rc.backgroundReply == Reply.killedByTimeout; 670 } 671 } 672 673 auto timeout(T)(T p, Duration timeout_) @trusted { 674 return Timeout!T(p, timeout_); 675 } 676 677 /// Returns when the process has pending data. 678 void waitForPendingData(ProcessT)(Process p) { 679 while (!p.pipe.hasPendingData || !p.stderr.hasPendingData) { 680 Thread.sleep(20.dur!"msecs"); 681 } 682 } 683 684 @("shall kill the process after the timeout") 685 unittest { 686 import std.datetime.stopwatch : StopWatch, AutoStart; 687 688 auto p = pipeProcess(["sleep", "1m"]).timeout(100.dur!"msecs").rcKill; 689 auto sw = StopWatch(AutoStart.yes); 690 p.wait; 691 sw.stop; 692 693 assert(sw.peek >= 100.dur!"msecs"); 694 assert(sw.peek <= 500.dur!"msecs"); 695 assert(p.wait == -9); 696 assert(p.terminated); 697 assert(p.status == -9); 698 assert(p.timeoutTriggered); 699 } 700 701 struct DrainElement { 702 enum Type { 703 stdout, 704 stderr, 705 } 706 707 Type type; 708 const(ubyte)[] data; 709 710 /// Returns: iterates the data as an input range. 711 auto byUTF8() @safe pure nothrow const @nogc { 712 static import std.utf; 713 714 return std.utf.byUTF!(const(char))(cast(const(char)[]) data); 715 } 716 717 bool empty() @safe pure nothrow const @nogc { 718 return data.length == 0; 719 } 720 } 721 722 /** A range that drains a process stdout/stderr until it terminates. 723 * 724 * There may be `DrainElement` that are empty. 725 */ 726 struct DrainRange(ProcessT) { 727 private { 728 enum State { 729 start, 730 draining, 731 lastStdout, 732 lastStderr, 733 lastElement, 734 empty, 735 } 736 737 ProcessT p; 738 DrainElement front_; 739 State st; 740 ubyte[] buf; 741 } 742 743 this(ProcessT p) { 744 this.p = p; 745 this.buf = new ubyte[4096]; 746 } 747 748 DrainElement front() @safe pure nothrow const @nogc { 749 assert(!empty, "Can't get front of an empty range"); 750 return front_; 751 } 752 753 void popFront() @safe { 754 assert(!empty, "Can't pop front of an empty range"); 755 756 static bool isAnyPipeOpen(ref ProcessT p) { 757 return p.pipe.isOpen || p.stderr.isOpen; 758 } 759 760 DrainElement readData(ref ProcessT p) @safe { 761 if (p.stderr.hasPendingData) { 762 return DrainElement(DrainElement.Type.stderr, p.stderr.read(buf)); 763 } else if (p.pipe.hasPendingData) { 764 return DrainElement(DrainElement.Type.stdout, p.pipe.read(buf)); 765 } 766 return DrainElement.init; 767 } 768 769 DrainElement waitUntilData() @safe { 770 import std.datetime : Clock; 771 772 // may livelock if the process never terminates and never writes to 773 // the terminal. timeout ensure that it sooner or later is break 774 // the loop. This is important if the drain is part of a timeout 775 // wrapping. 776 777 const timeout = 100.dur!"msecs"; 778 const stopAt = Clock.currTime + timeout; 779 const sleepFor = timeout / 20; 780 const useSleep = Clock.currTime + sleepFor; 781 bool running = true; 782 while (running) { 783 const now = Clock.currTime; 784 785 running = (now < stopAt) && isAnyPipeOpen(p); 786 787 auto bufRead = readData(p); 788 789 if (!bufRead.empty) { 790 return DrainElement(bufRead.type, bufRead.data.dup); 791 } else if (running && now > useSleep && bufRead.empty) { 792 import core.thread : Thread; 793 794 () @trusted { Thread.sleep(sleepFor); }(); 795 } 796 } 797 798 return DrainElement.init; 799 } 800 801 front_ = DrainElement.init; 802 803 final switch (st) { 804 case State.start: 805 st = State.draining; 806 front_ = waitUntilData; 807 break; 808 case State.draining: 809 if (p.terminated) { 810 st = State.lastStdout; 811 } else if (isAnyPipeOpen(p)) { 812 front_ = waitUntilData(); 813 } else { 814 st = State.lastStdout; 815 } 816 break; 817 case State.lastStdout: 818 if (p.pipe.hasPendingData) { 819 front_ = DrainElement(DrainElement.Type.stdout, p.pipe.read(buf).dup); 820 } else { 821 st = State.lastStderr; 822 } 823 break; 824 case State.lastStderr: 825 if (p.stderr.hasPendingData) { 826 front_ = DrainElement(DrainElement.Type.stderr, p.stderr.read(buf).dup); 827 } else { 828 st = State.lastElement; 829 } 830 break; 831 case State.lastElement: 832 st = State.empty; 833 break; 834 case State.empty: 835 break; 836 } 837 } 838 839 bool empty() @safe pure nothrow const @nogc { 840 return st == State.empty; 841 } 842 } 843 844 /// Drain a process pipe until empty. 845 auto drain(T)(T p) { 846 return DrainRange!T(p); 847 } 848 849 /// Read the data from a ReadChannel by line. 850 struct DrainByLineCopyRange(ProcessT) { 851 private { 852 enum State { 853 start, 854 draining, 855 lastLine, 856 lastBuf, 857 empty, 858 } 859 860 ProcessT process; 861 DrainRange!ProcessT range; 862 State st; 863 const(ubyte)[] buf; 864 const(char)[] line; 865 } 866 867 this(ProcessT p) { 868 process = p; 869 range = p.drain; 870 } 871 872 string front() @trusted pure nothrow const @nogc { 873 import std.exception : assumeUnique; 874 875 assert(!empty, "Can't get front of an empty range"); 876 return line.assumeUnique; 877 } 878 879 void popFront() @safe { 880 assert(!empty, "Can't pop front of an empty range"); 881 import std.algorithm : countUntil; 882 import std.array : array; 883 static import std.utf; 884 885 const(ubyte)[] updateBuf(size_t idx) { 886 const(ubyte)[] tmp; 887 if (buf.empty) { 888 // do nothing 889 } else if (idx == -1) { 890 tmp = buf; 891 buf = null; 892 } else { 893 idx = () { 894 if (idx < buf.length) { 895 return idx + 1; 896 } 897 return idx; 898 }(); 899 tmp = buf[0 .. idx]; 900 buf = buf[idx .. $]; 901 } 902 903 if (!tmp.empty && tmp[$ - 1] == '\n') { 904 tmp = tmp[0 .. $ - 1]; 905 } 906 return tmp; 907 } 908 909 void drainLine() { 910 void fillBuf() { 911 if (!range.empty) { 912 range.popFront; 913 } 914 if (!range.empty) { 915 buf ~= range.front.data; 916 } 917 } 918 919 size_t idx; 920 () { 921 int cnt; 922 do { 923 fillBuf(); 924 idx = buf.countUntil('\n'); 925 // 2 is a magic number which mean that it at most wait 2x timeout for data 926 } 927 while (!range.empty && idx == -1 && cnt++ < 2); 928 }(); 929 930 if (idx != -1) { 931 auto tmp = updateBuf(idx); 932 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array; 933 } 934 } 935 936 bool lastLine() { 937 size_t idx = buf.countUntil('\n'); 938 if (idx == -1) 939 return true; 940 941 auto tmp = updateBuf(idx); 942 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array; 943 return false; 944 } 945 946 line = null; 947 final switch (st) { 948 case State.start: 949 drainLine; 950 st = State.draining; 951 break; 952 case State.draining: 953 drainLine; 954 if (range.empty) 955 st = State.lastLine; 956 break; 957 case State.lastLine: 958 if (lastLine) 959 st = State.lastBuf; 960 break; 961 case State.lastBuf: 962 line = std.utf.byUTF!(const(char))(cast(const(char)[]) buf).array; 963 st = State.empty; 964 break; 965 case State.empty: 966 break; 967 } 968 } 969 970 bool empty() @safe pure nothrow const @nogc { 971 return st == State.empty; 972 } 973 } 974 975 @("shall drain the process output by line") 976 unittest { 977 import std.algorithm : filter, joiner, map; 978 import std.array : array; 979 980 auto p = pipeProcess(["dd", "if=/dev/zero", "bs=10", "count=3"]).rcKill; 981 auto res = p.process.drainByLineCopy.filter!"!a.empty".array; 982 983 assert(res.length == 3); 984 assert(res.joiner.count >= 30); 985 assert(p.wait == 0); 986 assert(p.terminated); 987 } 988 989 auto drainByLineCopy(T)(T p) { 990 return DrainByLineCopyRange!T(p); 991 } 992 993 /// Drain the process output until it is done executing. 994 auto drainToNull(T)(T p) { 995 foreach (l; p.drain()) { 996 } 997 return p; 998 } 999 1000 /// Drain the output from the process into an output range. 1001 auto drain(ProcessT, T)(ProcessT p, ref T range) { 1002 foreach (l; p.drain()) { 1003 range.put(l); 1004 } 1005 return p; 1006 } 1007 1008 @("shall drain the output of a process while it is running with a separation of stdout and stderr") 1009 unittest { 1010 auto p = pipeProcess(["dd", "if=/dev/urandom", "bs=10", "count=3"]).rcKill; 1011 auto res = p.drain.array; 1012 1013 // this is just a sanity check. It has to be kind a high because there is 1014 // some wiggleroom allowed 1015 assert(res.count <= 50); 1016 1017 assert(res.filter!(a => a.type == DrainElement.Type.stdout) 1018 .map!(a => a.data) 1019 .joiner 1020 .count == 30); 1021 assert(res.filter!(a => a.type == DrainElement.Type.stderr).count == 0); 1022 assert(p.wait == 0); 1023 assert(p.terminated); 1024 } 1025 1026 @("shall kill the process tree when the timeout is reached") 1027 unittest { 1028 immutable script = makeScript(`#!/bin/bash 1029 sleep 10m 1030 `); 1031 scope (exit) 1032 remove(script); 1033 1034 auto p = pipeProcess([script]).sandbox.timeout(1.dur!"seconds").rcKill; 1035 waitUntilChildren(p.osHandle, 1); 1036 const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 1037 const res = p.process.drain.array; 1038 const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 1039 1040 assert(p.wait == -9); 1041 assert(p.terminated); 1042 assert(preChildren == 1); 1043 assert(postChildren == 0); 1044 } 1045 1046 string makeScript(string script, string file = __FILE__, uint line = __LINE__) { 1047 import core.sys.posix.sys.stat; 1048 import std.file : getAttributes, setAttributes, thisExePath; 1049 import std.stdio : File; 1050 import std.path : baseName; 1051 import std.conv : to; 1052 1053 immutable fname = thisExePath ~ "_" ~ file.baseName ~ line.to!string ~ ".sh"; 1054 1055 File(fname, "w").writeln(script); 1056 setAttributes(fname, getAttributes(fname) | S_IXUSR | S_IXGRP | S_IXOTH); 1057 return fname; 1058 } 1059 1060 /// Wait for p to have num children or fail after 10s. 1061 void waitUntilChildren(RawPid p, int num) { 1062 import std.datetime : Clock; 1063 1064 const failAt = Clock.currTime + 10.dur!"seconds"; 1065 do { 1066 Thread.sleep(50.dur!"msecs"); 1067 if (Clock.currTime > failAt) 1068 break; 1069 } 1070 while (makePidMap.getSubMap(p).remove(p).length < num); 1071 }