1 /** 2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module proc.process; 7 8 import core.sys.posix.signal : SIGKILL; 9 import core.thread : Thread; 10 import core.time : dur, Duration; 11 import logger = std.experimental.logger; 12 import std.algorithm : filter, count, joiner, map; 13 import std.array : appender, empty, array; 14 import std.exception : collectException; 15 import std.stdio : File, fileno, writeln; 16 import std.typecons : Flag, Yes; 17 static import std.process; 18 static import std.stdio; 19 20 import my.gc.refc; 21 import my.from_; 22 import my.path; 23 import my.named_type; 24 25 public import proc.channel; 26 public import proc.pid; 27 28 version (unittest) { 29 import std.file : remove; 30 } 31 32 /** Manage a process by reference counting so that it is terminated when the it 33 * stops being used such as the instance going out of scope. 34 */ 35 auto rcKill(T)(T p, int signal = SIGKILL) { 36 return ScopeKill!T(p, signal); 37 } 38 39 // backward compatibility. 40 alias scopeKill = rcKill; 41 42 struct ScopeKill(T) { 43 private { 44 static struct Payload { 45 T process; 46 int signal = SIGKILL; 47 bool hasProcess; 48 49 ~this() @safe { 50 if (hasProcess) 51 process.dispose(); 52 } 53 } 54 55 RefCounted!Payload payload_; 56 } 57 58 alias process this; 59 ref T process() { 60 return payload_.get.process; 61 } 62 63 this(T process, int signal) @safe { 64 payload_ = refCounted(Payload(process, signal, true)); 65 } 66 } 67 68 /// Async process wrapper for a std.process SpawnProcess 69 struct SpawnProcess { 70 import core.sys.posix.signal : SIGKILL; 71 import std.algorithm : among; 72 73 private { 74 enum State { 75 running, 76 terminated, 77 exitCode 78 } 79 80 std.process.Pid process; 81 RawPid pid; 82 int status_; 83 State st; 84 } 85 86 this(std.process.Pid process) @safe { 87 this.process = process; 88 this.pid = process.osHandle.RawPid; 89 } 90 91 ~this() @safe { 92 } 93 94 /// Returns: The raw OS handle for the process ID. 95 RawPid osHandle() nothrow @safe { 96 return pid; 97 } 98 99 /// Kill and cleanup the process. 100 void dispose() @safe { 101 final switch (st) { 102 case State.running: 103 this.kill; 104 this.wait; 105 break; 106 case State.terminated: 107 this.wait; 108 break; 109 case State.exitCode: 110 break; 111 } 112 113 st = State.exitCode; 114 } 115 116 /** Send `signal` to the process. 117 * 118 * Param: 119 * signal = a signal from `core.sys.posix.signal` 120 */ 121 void kill(int signal = SIGKILL) nothrow @trusted { 122 final switch (st) { 123 case State.running: 124 break; 125 case State.terminated: 126 goto case; 127 case State.exitCode: 128 return; 129 } 130 131 try { 132 std.process.kill(process, signal); 133 } catch (Exception e) { 134 } 135 136 st = State.terminated; 137 } 138 139 /// Blocking wait for the process to terminated. 140 /// Returns: the exit status. 141 int wait() @safe { 142 final switch (st) { 143 case State.running: 144 status_ = std.process.wait(process); 145 break; 146 case State.terminated: 147 status_ = std.process.wait(process); 148 break; 149 case State.exitCode: 150 break; 151 } 152 153 st = State.exitCode; 154 155 return status_; 156 } 157 158 /// Non-blocking wait for the process termination. 159 /// Returns: `true` if the process has terminated. 160 bool tryWait() @safe { 161 final switch (st) { 162 case State.running: 163 auto s = std.process.tryWait(process); 164 if (s.terminated) { 165 st = State.exitCode; 166 status_ = s.status; 167 } 168 break; 169 case State.terminated: 170 status_ = std.process.wait(process); 171 st = State.exitCode; 172 break; 173 case State.exitCode: 174 break; 175 } 176 177 return st.among(State.terminated, State.exitCode) != 0; 178 } 179 180 /// Returns: The exit status of the process. 181 int status() @safe { 182 if (st != State.exitCode) { 183 throw new Exception( 184 "Process has not terminated and wait/tryWait been called to collect the exit status"); 185 } 186 return status_; 187 } 188 189 /// Returns: If the process has terminated. 190 bool terminated() @safe { 191 return st.among(State.terminated, State.exitCode) != 0; 192 } 193 } 194 195 /// Async process that do not block on read from stdin/stderr. 196 struct PipeProcess { 197 import std.algorithm : among; 198 import core.sys.posix.signal : SIGKILL; 199 200 private { 201 enum State { 202 running, 203 terminated, 204 exitCode 205 } 206 207 std.process.ProcessPipes process; 208 std.process.Pid pid; 209 210 FileReadChannel stderr_; 211 FileReadChannel stdout_; 212 FileWriteChannel stdin_; 213 int status_; 214 State st; 215 } 216 217 this(std.process.Pid pid, File stdin, File stdout, File stderr) @safe { 218 this.pid = pid; 219 220 this.stdin_ = FileWriteChannel(stdin); 221 this.stdout_ = FileReadChannel(stdout); 222 this.stderr_ = FileReadChannel(stderr); 223 } 224 225 this(std.process.ProcessPipes process, std.process.Redirect r) @safe { 226 this.process = process; 227 this.pid = process.pid; 228 229 if (r & std.process.Redirect.stdin) { 230 stdin_ = FileWriteChannel(this.process.stdin); 231 } 232 if (r & std.process.Redirect.stdout) { 233 stdout_ = FileReadChannel(this.process.stdout); 234 } 235 if (r & std.process.Redirect.stderr) { 236 this.stderr_ = FileReadChannel(this.process.stderr); 237 } 238 } 239 240 /// Returns: The raw OS handle for the process ID. 241 RawPid osHandle() nothrow @safe { 242 return pid.osHandle.RawPid; 243 } 244 245 /// Access to stdout. 246 ref FileWriteChannel stdin() return scope nothrow @safe { 247 return stdin_; 248 } 249 250 /// Access to stdout. 251 ref FileReadChannel stdout() return scope nothrow @safe { 252 return stdout_; 253 } 254 255 /// Access stderr. 256 ref FileReadChannel stderr() return scope nothrow @safe { 257 return stderr_; 258 } 259 260 /// Kill and cleanup the process. 261 void dispose() @safe { 262 final switch (st) { 263 case State.running: 264 this.kill; 265 this.wait; 266 .destroy(process); 267 break; 268 case State.terminated: 269 this.wait; 270 .destroy(process); 271 break; 272 case State.exitCode: 273 break; 274 } 275 276 st = State.exitCode; 277 } 278 279 /** Send `signal` to the process. 280 * 281 * Param: 282 * signal = a signal from `core.sys.posix.signal` 283 */ 284 void kill(int signal = SIGKILL) nothrow @trusted { 285 final switch (st) { 286 case State.running: 287 break; 288 case State.terminated: 289 return; 290 case State.exitCode: 291 return; 292 } 293 294 try { 295 std.process.kill(pid, signal); 296 } catch (Exception e) { 297 } 298 299 st = State.terminated; 300 } 301 302 /// Blocking wait for the process to terminated. 303 /// Returns: the exit status. 304 int wait() @safe { 305 final switch (st) { 306 case State.running: 307 status_ = std.process.wait(pid); 308 break; 309 case State.terminated: 310 status_ = std.process.wait(pid); 311 break; 312 case State.exitCode: 313 break; 314 } 315 316 st = State.exitCode; 317 318 return status_; 319 } 320 321 /// Non-blocking wait for the process termination. 322 /// Returns: `true` if the process has terminated. 323 bool tryWait() @safe { 324 final switch (st) { 325 case State.running: 326 auto s = std.process.tryWait(pid); 327 if (s.terminated) { 328 st = State.exitCode; 329 status_ = s.status; 330 } 331 break; 332 case State.terminated: 333 status_ = std.process.wait(pid); 334 st = State.exitCode; 335 break; 336 case State.exitCode: 337 break; 338 } 339 340 return st.among(State.terminated, State.exitCode) != 0; 341 } 342 343 /// Returns: The exit status of the process. 344 int status() @safe { 345 if (st != State.exitCode) { 346 throw new Exception( 347 "Process has not terminated and wait/tryWait been called to collect the exit status"); 348 } 349 return status_; 350 } 351 352 /// Returns: If the process has terminated. 353 bool terminated() @safe { 354 return st.among(State.terminated, State.exitCode) != 0; 355 } 356 } 357 358 SpawnProcess spawnProcess(scope const(char[])[] args, File stdin = std.stdio.stdin, 359 File stdout = std.stdio.stdout, File stderr = std.stdio.stderr, 360 const string[string] env = null, std.process.Config config = std.process.Config.none, 361 scope const char[] workDir = null) { 362 return SpawnProcess(std.process.spawnProcess(args, stdin, stdout, stderr, 363 env, config, workDir)); 364 } 365 366 SpawnProcess spawnProcess(scope const(char[])[] args, const string[string] env, 367 std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) { 368 return SpawnProcess(std.process.spawnProcess(args, std.stdio.stdin, 369 std.stdio.stdout, std.stdio.stderr, env, config, workDir)); 370 } 371 372 SpawnProcess spawnProcess(scope const(char)[] program, 373 File stdin = std.stdio.stdin, File stdout = std.stdio.stdout, 374 File stderr = std.stdio.stderr, const string[string] env = null, 375 std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) { 376 return SpawnProcess(std.process.spawnProcess((&program)[0 .. 1], stdin, 377 stdout, stderr, env, config, workDir)); 378 } 379 380 SpawnProcess spawnShell(scope const(char)[] command, File stdin = std.stdio.stdin, 381 File stdout = std.stdio.stdout, File stderr = std.stdio.stderr, 382 scope const string[string] env = null, std.process.Config config = std.process.Config.none, 383 scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) { 384 return SpawnProcess(std.process.spawnShell(command, stdin, stdout, stderr, 385 env, config, workDir, shellPath)); 386 } 387 388 /// ditto 389 SpawnProcess spawnShell(scope const(char)[] command, scope const string[string] env, 390 std.process.Config config = std.process.Config.none, 391 scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) { 392 return SpawnProcess(std.process.spawnShell(command, env, config, workDir, shellPath)); 393 } 394 395 PipeProcess pipeProcess(scope const(char[])[] args, 396 std.process.Redirect redirect = std.process.Redirect.all, 397 const string[string] env = null, std.process.Config config = std.process.Config.none, 398 scope const(char)[] workDir = null) @safe { 399 return PipeProcess(std.process.pipeProcess(args, redirect, env, config, workDir), redirect); 400 } 401 402 PipeProcess pipeShell(scope const(char)[] command, 403 std.process.Redirect redirect = std.process.Redirect.all, 404 const string[string] env = null, std.process.Config config = std.process.Config.none, 405 scope const(char)[] workDir = null, string shellPath = std.process.nativeShell) @safe { 406 return PipeProcess(std.process.pipeShell(command, redirect, env, config, 407 workDir, shellPath), redirect); 408 } 409 410 /** Moves the process to a separate process group and on exit kill it and all 411 * its children. 412 */ 413 @safe struct Sandbox(ProcessT) { 414 import core.sys.posix.signal : SIGKILL; 415 416 private { 417 ProcessT p; 418 RawPid pid; 419 } 420 421 this(ProcessT p) @safe { 422 import core.sys.posix.unistd : setpgid; 423 424 this.p = p; 425 this.pid = p.osHandle; 426 setpgid(pid, 0); 427 } 428 429 RawPid osHandle() nothrow @safe { 430 return pid; 431 } 432 433 static if (__traits(hasMember, ProcessT, "stdin")) { 434 ref FileWriteChannel stdin() nothrow @safe { 435 return p.stdin; 436 } 437 } 438 439 static if (__traits(hasMember, ProcessT, "stdout")) { 440 ref FileReadChannel stdout() nothrow @safe { 441 return p.stdout; 442 } 443 } 444 445 static if (__traits(hasMember, ProcessT, "stderr")) { 446 ref FileReadChannel stderr() nothrow @safe { 447 return p.stderr; 448 } 449 } 450 451 void dispose() @safe { 452 // this also reaps the children thus cleaning up zombies 453 this.kill; 454 p.dispose; 455 } 456 457 /** Send `signal` to the process. 458 * 459 * Param: 460 * signal = a signal from `core.sys.posix.signal` 461 */ 462 void kill(int signal = SIGKILL) nothrow @safe { 463 // must first retrieve the submap because after the process is killed 464 // its children may have changed. 465 auto pmap = makePidMap.getSubMap(pid); 466 467 p.kill(signal); 468 469 // only kill and reap the children 470 pmap.remove(pid); 471 proc.pid.kill(pmap, Yes.onlyCurrentUser, signal).reap; 472 } 473 474 int wait() @safe { 475 return p.wait; 476 } 477 478 bool tryWait() @safe { 479 return p.tryWait; 480 } 481 482 int status() @safe { 483 return p.status; 484 } 485 486 bool terminated() @safe { 487 return p.terminated; 488 } 489 } 490 491 auto sandbox(T)(T p) @safe { 492 return Sandbox!T(p); 493 } 494 495 @("shall terminate a group of processes") 496 unittest { 497 import std.datetime.stopwatch : StopWatch, AutoStart; 498 499 immutable scriptName = makeScript(`#!/bin/bash 500 sleep 10m & 501 sleep 10m & 502 sleep 10m 503 `); 504 scope (exit) 505 remove(scriptName); 506 507 auto p = pipeProcess([scriptName]).sandbox.rcKill; 508 waitUntilChildren(p.osHandle, 3); 509 const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 510 p.kill; 511 Thread.sleep(500.dur!"msecs"); // wait for the OS to kill the children 512 const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 513 514 assert(p.wait == -9); 515 assert(p.terminated); 516 assert(preChildren == 3); 517 assert(postChildren == 0); 518 } 519 520 /** dispose the process after the timeout. 521 */ 522 @safe struct Timeout(ProcessT) { 523 import core.sys.posix.signal : SIGKILL; 524 import core.thread; 525 import std.algorithm : among; 526 import std.datetime : Clock, Duration; 527 528 private { 529 enum Msg { 530 none, 531 stop, 532 status, 533 } 534 535 enum Reply { 536 none, 537 running, 538 normalDeath, 539 killedByTimeout, 540 } 541 542 static struct Payload { 543 ProcessT p; 544 RawPid pid; 545 Background background; 546 Reply backgroundReply; 547 } 548 549 RefCounted!Payload rc; 550 } 551 552 this(ProcessT p, Duration timeout) @trusted { 553 import std.algorithm : move; 554 555 auto pid = p.osHandle; 556 rc = refCounted(Payload(move(p), pid)); 557 rc.get.background = new Background(&rc.get.p, timeout); 558 rc.get.background.isDaemon = true; 559 rc.get.background.start; 560 } 561 562 ~this() @trusted { 563 rc.release; 564 } 565 566 private static class Background : Thread { 567 import core.sync.condition : Condition; 568 import core.sync.mutex : Mutex; 569 570 Duration timeout; 571 ProcessT* p; 572 Mutex mtx; 573 Msg[] msg; 574 Reply reply_; 575 RawPid pid; 576 int signal = SIGKILL; 577 578 this(ProcessT* p, Duration timeout) { 579 this.p = p; 580 this.timeout = timeout; 581 this.mtx = new Mutex(); 582 this.pid = p.osHandle; 583 584 super(&run); 585 } 586 587 void run() { 588 checkProcess(this.pid, this.timeout, this); 589 } 590 591 void put(Msg msg) @trusted nothrow { 592 this.mtx.lock_nothrow(); 593 scope (exit) 594 this.mtx.unlock_nothrow(); 595 this.msg ~= msg; 596 } 597 598 Msg popMsg() @trusted nothrow { 599 this.mtx.lock_nothrow(); 600 scope (exit) 601 this.mtx.unlock_nothrow(); 602 if (msg.empty) 603 return Msg.none; 604 auto rval = msg[$ - 1]; 605 msg = msg[0 .. $ - 1]; 606 return rval; 607 } 608 609 void setReply(Reply reply_) @trusted nothrow { 610 this.mtx.lock_nothrow(); 611 scope (exit) 612 this.mtx.unlock_nothrow(); 613 this.reply_ = reply_; 614 } 615 616 Reply reply() @trusted nothrow { 617 this.mtx.lock_nothrow(); 618 scope (exit) 619 this.mtx.unlock_nothrow(); 620 return reply_; 621 } 622 623 void setSignal(int signal) @trusted nothrow { 624 this.mtx.lock_nothrow(); 625 scope (exit) 626 this.mtx.unlock_nothrow(); 627 this.signal = signal; 628 } 629 630 void kill() @trusted nothrow { 631 this.mtx.lock_nothrow(); 632 scope (exit) 633 this.mtx.unlock_nothrow(); 634 p.kill(signal); 635 } 636 } 637 638 private static void checkProcess(RawPid p, Duration timeout, Background bg) nothrow { 639 import std.algorithm : max, min; 640 import std.variant : Variant; 641 static import core.sys.posix.signal; 642 643 const stopAt = Clock.currTime + timeout; 644 // the purpose is to poll the process often "enough" that if it 645 // terminates early `Process` detects it fast enough. 1000 is chosen 646 // because it "feels good". the purpose 647 auto sleepInterval = min(50, max(1, timeout.total!"msecs" / 1000)).dur!"msecs"; 648 649 bool forceStop; 650 bool running = true; 651 while (running && Clock.currTime < stopAt) { 652 const msg = bg.popMsg; 653 654 final switch (msg) { 655 case Msg.none: 656 () @trusted { Thread.sleep(sleepInterval); }(); 657 break; 658 case Msg.stop: 659 forceStop = true; 660 running = false; 661 break; 662 case Msg.status: 663 bg.setReply(Reply.running); 664 break; 665 } 666 667 () @trusted { 668 if (core.sys.posix.signal.kill(p, 0) == -1) { 669 running = false; 670 } 671 }(); 672 } 673 674 // may be children alive thus must ensure that the whole process tree 675 // is killed if this is a sandbox with a timeout. 676 bg.kill(); 677 678 if (!forceStop && Clock.currTime >= stopAt) { 679 bg.setReply(Reply.killedByTimeout); 680 } else { 681 bg.setReply(Reply.normalDeath); 682 } 683 } 684 685 RawPid osHandle() nothrow @trusted { 686 return rc.get.pid; 687 } 688 689 static if (__traits(hasMember, ProcessT, "stdin")) { 690 ref FileWriteChannel stdin() nothrow @safe { 691 return rc.get.p.stdin; 692 } 693 } 694 695 static if (__traits(hasMember, ProcessT, "stdout")) { 696 ref FileReadChannel stdout() nothrow @safe { 697 return rc.get.p.stdout; 698 } 699 } 700 701 static if (__traits(hasMember, ProcessT, "stderr")) { 702 ref FileReadChannel stderr() nothrow @trusted { 703 return rc.get.p.stderr; 704 } 705 } 706 707 void dispose() @trusted { 708 if (rc.get.backgroundReply.among(Reply.none, Reply.running)) { 709 rc.get.background.put(Msg.stop); 710 rc.get.background.join; 711 rc.get.backgroundReply = rc.get.background.reply; 712 } 713 rc.get.p.dispose; 714 } 715 716 /** Send `signal` to the process. 717 * 718 * Param: 719 * signal = a signal from `core.sys.posix.signal` 720 */ 721 void kill(int signal = SIGKILL) nothrow @trusted { 722 rc.get.background.setSignal(signal); 723 rc.get.background.kill(); 724 } 725 726 int wait() @trusted { 727 while (!this.tryWait) { 728 Thread.sleep(20.dur!"msecs"); 729 } 730 return rc.get.p.wait; 731 } 732 733 bool tryWait() @trusted { 734 return rc.get.p.tryWait; 735 } 736 737 int status() @trusted { 738 return rc.get.p.status; 739 } 740 741 bool terminated() @trusted { 742 return rc.get.p.terminated; 743 } 744 745 bool timeoutTriggered() @trusted { 746 if (rc.get.backgroundReply.among(Reply.none, Reply.running)) { 747 rc.get.background.put(Msg.status); 748 rc.get.backgroundReply = rc.get.background.reply; 749 } 750 return rc.get.backgroundReply == Reply.killedByTimeout; 751 } 752 } 753 754 auto timeout(T)(T p, Duration timeout_) @trusted { 755 return Timeout!T(p, timeout_); 756 } 757 758 /// Returns when the process has pending data. 759 void waitForPendingData(ProcessT)(Process p) { 760 while (!p.pipe.hasPendingData || !p.stderr.hasPendingData) { 761 Thread.sleep(20.dur!"msecs"); 762 } 763 } 764 765 @("shall kill the process after the timeout") 766 unittest { 767 import std.datetime.stopwatch : StopWatch, AutoStart; 768 769 auto p = pipeProcess(["sleep", "1m"]).timeout(100.dur!"msecs").rcKill; 770 auto sw = StopWatch(AutoStart.yes); 771 p.wait; 772 sw.stop; 773 774 assert(sw.peek >= 100.dur!"msecs"); 775 assert(sw.peek <= 500.dur!"msecs"); 776 assert(p.wait == -9); 777 assert(p.terminated); 778 assert(p.status == -9); 779 assert(p.timeoutTriggered); 780 } 781 782 struct DrainElement { 783 enum Type { 784 stdout, 785 stderr, 786 } 787 788 Type type; 789 const(ubyte)[] data; 790 791 /// Returns: iterates the data as an input range. 792 auto byUTF8() @safe pure nothrow const @nogc { 793 static import std.utf; 794 795 return std.utf.byUTF!(const(char))(cast(const(char)[]) data); 796 } 797 798 bool empty() @safe pure nothrow const @nogc { 799 return data.length == 0; 800 } 801 } 802 803 /** A range that drains a process stdout/stderr until it terminates. 804 * 805 * There may be `DrainElement` that are empty. 806 */ 807 struct DrainRange(ProcessT) { 808 private { 809 enum State { 810 start, 811 draining, 812 lastStdout, 813 lastStderr, 814 lastElement, 815 empty, 816 } 817 818 ProcessT p; 819 DrainElement front_; 820 State st; 821 ubyte[] buf; 822 } 823 824 this(ProcessT p) { 825 this.p = p; 826 this.buf = new ubyte[4096]; 827 } 828 829 DrainElement front() @safe pure nothrow const @nogc { 830 assert(!empty, "Can't get front of an empty range"); 831 return front_; 832 } 833 834 void popFront() @safe { 835 assert(!empty, "Can't pop front of an empty range"); 836 837 static bool isAnyPipeOpen(ref ProcessT p) { 838 return p.stdout.isOpen || p.stderr.isOpen; 839 } 840 841 DrainElement readData(ref ProcessT p) @safe { 842 if (p.stderr.hasPendingData) { 843 return DrainElement(DrainElement.Type.stderr, p.stderr.read(buf)); 844 } else if (p.stdout.hasPendingData) { 845 return DrainElement(DrainElement.Type.stdout, p.stdout.read(buf)); 846 } 847 return DrainElement.init; 848 } 849 850 DrainElement waitUntilData() @safe { 851 import std.datetime : Clock; 852 853 // may livelock if the process never terminates and never writes to 854 // the terminal. timeout ensure that it sooner or later is break 855 // the loop. This is important if the drain is part of a timeout 856 // wrapping. 857 858 const timeout = 100.dur!"msecs"; 859 const stopAt = Clock.currTime + timeout; 860 const sleepFor = timeout / 20; 861 const useSleep = Clock.currTime + sleepFor; 862 bool running = true; 863 while (running) { 864 const now = Clock.currTime; 865 866 running = (now < stopAt) && isAnyPipeOpen(p); 867 868 auto bufRead = readData(p); 869 870 if (!bufRead.empty) { 871 return DrainElement(bufRead.type, bufRead.data.dup); 872 } else if (running && now > useSleep && bufRead.empty) { 873 import core.thread : Thread; 874 875 () @trusted { Thread.sleep(sleepFor); }(); 876 } 877 } 878 879 return DrainElement.init; 880 } 881 882 front_ = DrainElement.init; 883 884 final switch (st) { 885 case State.start: 886 st = State.draining; 887 front_ = waitUntilData; 888 break; 889 case State.draining: 890 if (p.terminated) { 891 st = State.lastStdout; 892 } else if (isAnyPipeOpen(p)) { 893 front_ = waitUntilData(); 894 } else { 895 st = State.lastStdout; 896 } 897 break; 898 case State.lastStdout: 899 if (p.stdout.hasPendingData) { 900 front_ = DrainElement(DrainElement.Type.stdout, p.stdout.read(buf).dup); 901 } else { 902 st = State.lastStderr; 903 } 904 break; 905 case State.lastStderr: 906 if (p.stderr.hasPendingData) { 907 front_ = DrainElement(DrainElement.Type.stderr, p.stderr.read(buf).dup); 908 } else { 909 st = State.lastElement; 910 } 911 break; 912 case State.lastElement: 913 st = State.empty; 914 break; 915 case State.empty: 916 break; 917 } 918 } 919 920 bool empty() @safe pure nothrow const @nogc { 921 return st == State.empty; 922 } 923 } 924 925 /// Drain a process pipe until empty. 926 auto drain(T)(T p) { 927 return DrainRange!T(p); 928 } 929 930 /// Read the data from a ReadChannel by line. 931 struct DrainByLineCopyRange(ProcessT) { 932 private { 933 enum State { 934 start, 935 draining, 936 lastLine, 937 lastBuf, 938 empty, 939 } 940 941 ProcessT process; 942 DrainRange!ProcessT range; 943 State st; 944 const(ubyte)[] buf; 945 const(char)[] line; 946 } 947 948 this(ProcessT p) { 949 process = p; 950 range = p.drain; 951 } 952 953 string front() @trusted pure nothrow const @nogc { 954 import std.exception : assumeUnique; 955 956 assert(!empty, "Can't get front of an empty range"); 957 return line.assumeUnique; 958 } 959 960 void popFront() @safe { 961 assert(!empty, "Can't pop front of an empty range"); 962 import std.algorithm : countUntil; 963 import std.array : array; 964 static import std.utf; 965 966 const(ubyte)[] updateBuf(size_t idx) { 967 const(ubyte)[] tmp; 968 if (buf.empty) { 969 // do nothing 970 } else if (idx == -1) { 971 tmp = buf; 972 buf = null; 973 } else { 974 idx = () { 975 if (idx < buf.length) { 976 return idx + 1; 977 } 978 return idx; 979 }(); 980 tmp = buf[0 .. idx]; 981 buf = buf[idx .. $]; 982 } 983 984 if (!tmp.empty && tmp[$ - 1] == '\n') { 985 tmp = tmp[0 .. $ - 1]; 986 } 987 return tmp; 988 } 989 990 void drainLine() { 991 void fillBuf() { 992 if (!range.empty) { 993 range.popFront; 994 } 995 if (!range.empty) { 996 buf ~= range.front.data; 997 } 998 } 999 1000 size_t idx; 1001 () { 1002 int cnt; 1003 do { 1004 fillBuf(); 1005 idx = buf.countUntil('\n'); 1006 // 2 is a magic number which mean that it at most wait 2x timeout for data 1007 } 1008 while (!range.empty && idx == -1 && cnt++ < 2); 1009 }(); 1010 1011 if (idx != -1) { 1012 auto tmp = updateBuf(idx); 1013 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array; 1014 } 1015 } 1016 1017 bool lastLine() { 1018 size_t idx = buf.countUntil('\n'); 1019 if (idx == -1) 1020 return true; 1021 1022 auto tmp = updateBuf(idx); 1023 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array; 1024 return false; 1025 } 1026 1027 line = null; 1028 final switch (st) { 1029 case State.start: 1030 drainLine; 1031 st = State.draining; 1032 break; 1033 case State.draining: 1034 drainLine; 1035 if (range.empty) 1036 st = State.lastLine; 1037 break; 1038 case State.lastLine: 1039 if (lastLine) 1040 st = State.lastBuf; 1041 break; 1042 case State.lastBuf: 1043 line = std.utf.byUTF!(const(char))(cast(const(char)[]) buf).array; 1044 st = State.empty; 1045 break; 1046 case State.empty: 1047 break; 1048 } 1049 } 1050 1051 bool empty() @safe pure nothrow const @nogc { 1052 return st == State.empty; 1053 } 1054 } 1055 1056 @("shall drain the process output by line") 1057 unittest { 1058 import std.algorithm : filter, joiner, map; 1059 import std.array : array; 1060 1061 auto p = pipeProcess(["dd", "if=/dev/zero", "bs=10", "count=3"]).rcKill; 1062 auto res = p.process.drainByLineCopy.filter!"!a.empty".array; 1063 1064 assert(res.length == 3); 1065 assert(res.joiner.count >= 30); 1066 assert(p.wait == 0); 1067 assert(p.terminated); 1068 } 1069 1070 auto drainByLineCopy(T)(T p) { 1071 return DrainByLineCopyRange!T(p); 1072 } 1073 1074 /// Drain the process output until it is done executing. 1075 auto drainToNull(T)(T p) { 1076 foreach (l; p.drain()) { 1077 } 1078 return p; 1079 } 1080 1081 /// Drain the output from the process into an output range. 1082 auto drain(ProcessT, T)(ProcessT p, ref T range) { 1083 foreach (l; p.drain()) { 1084 range.put(l); 1085 } 1086 return p; 1087 } 1088 1089 @("shall drain the output of a process while it is running with a separation of stdout and stderr") 1090 unittest { 1091 auto p = pipeProcess(["dd", "if=/dev/urandom", "bs=10", "count=3"]).rcKill; 1092 auto res = p.drain.array; 1093 1094 // this is just a sanity check. It has to be kind a high because there is 1095 // some wiggleroom allowed 1096 assert(res.count > 1 && res.count <= 50); 1097 1098 assert(res.filter!(a => a.type == DrainElement.Type.stdout) 1099 .map!(a => a.data) 1100 .joiner 1101 .count == 30); 1102 assert(p.wait == 0); 1103 assert(p.terminated); 1104 } 1105 1106 @("shall kill the process tree when the timeout is reached") 1107 unittest { 1108 immutable script = makeScript(`#!/bin/bash 1109 sleep 10m 1110 `); 1111 scope (exit) 1112 remove(script); 1113 1114 auto p = pipeProcess([script]).sandbox.timeout(1.dur!"seconds").rcKill; 1115 waitUntilChildren(p.osHandle, 1); 1116 const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 1117 const res = p.process.drain.array; 1118 const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 1119 1120 assert(p.wait == -9); 1121 assert(p.terminated); 1122 assert(preChildren == 1); 1123 assert(postChildren == 0); 1124 } 1125 1126 string makeScript(string script, string file = __FILE__, uint line = __LINE__) { 1127 import core.sys.posix.sys.stat; 1128 import std.file : getAttributes, setAttributes, thisExePath; 1129 import std.stdio : File; 1130 import std.path : baseName; 1131 import std.conv : to; 1132 1133 immutable fname = thisExePath ~ "_" ~ file.baseName ~ line.to!string ~ ".sh"; 1134 1135 File(fname, "w").writeln(script); 1136 setAttributes(fname, getAttributes(fname) | S_IXUSR | S_IXGRP | S_IXOTH); 1137 return fname; 1138 } 1139 1140 /// Wait for p to have num children or fail after 10s. 1141 void waitUntilChildren(RawPid p, int num) { 1142 import std.datetime : Clock; 1143 1144 const failAt = Clock.currTime + 10.dur!"seconds"; 1145 do { 1146 Thread.sleep(50.dur!"msecs"); 1147 if (Clock.currTime > failAt) 1148 break; 1149 } 1150 while (makePidMap.getSubMap(p).remove(p).length < num); 1151 } 1152 1153 alias Address = NamedType!(ulong, Tag!"Address", ulong.init, TagStringable); 1154 struct AddressMap { 1155 Address begin; 1156 Address end; 1157 NamedType!(string, Tag!"Permission", string.init, TagStringable) perm; 1158 } 1159 1160 /** An assoc array mapping the path of each shared library loaded by 1161 * the process to the address it is loaded at in the process address space. 1162 */ 1163 AddressMap[][AbsolutePath] libs(RawPid pid) nothrow { 1164 import std.stdio : File; 1165 import std.format : formattedRead, format; 1166 import std.algorithm : countUntil; 1167 import std.typecons : tuple; 1168 import std.string : strip; 1169 1170 typeof(return) rval; 1171 1172 try { 1173 foreach (l; File(format!"/proc/%s/maps"(pid)).byLine 1174 .map!(a => tuple(a, a.countUntil('/'))) 1175 .filter!(a => a[1] != -1)) { 1176 try { 1177 auto p = AbsolutePath(l[0][l[1] .. $].idup); 1178 auto m = l[0][0 .. l[1]].strip; 1179 ulong begin; 1180 ulong end; 1181 char[] perm; 1182 if (formattedRead!"%x-%x %s "(m, begin, end, perm) != 3) { 1183 continue; 1184 } 1185 1186 auto amap = AddressMap(Address(begin), Address(end), 1187 typeof(AddressMap.init.perm)(perm.idup)); 1188 if (auto v = p in rval) { 1189 *v ~= amap; 1190 } else { 1191 rval[p] = [amap]; 1192 } 1193 } catch (Exception e) { 1194 } 1195 } 1196 } catch (Exception e) { 1197 logger.trace(e.msg).collectException; 1198 } 1199 1200 return rval; 1201 } 1202 1203 @("shall read the libraries the process is using") 1204 unittest { 1205 immutable scriptName = makeScript(`#!/bin/bash 1206 sleep 10m & 1207 sleep 10m & 1208 sleep 10m 1209 `); 1210 scope (exit) 1211 remove(scriptName); 1212 1213 auto p = pipeProcess([scriptName]).sandbox.rcKill; 1214 waitUntilChildren(p.osHandle, 3); 1215 auto res = libs(p.osHandle); 1216 p.kill; 1217 1218 assert(res.length != 0); 1219 assert(AbsolutePath("/bin/bash") in res); 1220 } 1221 1222 /** Parses the output from a run of 'ldd' on a binary. 1223 * 1224 * Params: 1225 * input = an input range of lines which is the output from ldd 1226 * 1227 * Example 1228 * --- 1229 * writeln(parseLddOutput(` 1230 * linux-vdso.so.1 => (0x00007fffbf5fe000) 1231 * libtinfo.so.5 => /lib/x86_64-linux-gnu/libtinfo.so.5 (0x00007fe28117f000) 1232 * libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe280f7b000) 1233 * libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe280bb4000) 1234 * /lib64/ld-linux-x86-64.so.2 (0x00007fe2813dd000) 1235 * `)) 1236 * --- 1237 * 1238 * Returns: a dictionary of {path: address} for each library required by the 1239 * specified binary. 1240 */ 1241 Address[AbsolutePath] parseLddOutput(T)(T input) if (std_.range.isInputRange!T) { 1242 import std.regex : regex, matchFirst; 1243 import std.format : formattedRead; 1244 1245 typeof(return) rval; 1246 const reLinux = regex(`\s(?P<lib>\S?/\S+)\s+\((?P<addr>0x.+)\)`); 1247 1248 foreach (l; input) { 1249 auto m = matchFirst(l, reLinux); 1250 if (m.empty || m.length < 3) { 1251 continue; 1252 } 1253 1254 try { 1255 ulong addr; 1256 formattedRead!"0x%x"(m["addr"], addr); 1257 rval[AbsolutePath(m["lib"])] = Address(addr); 1258 } catch (Exception e) { 1259 } 1260 } 1261 1262 return rval; 1263 } 1264 1265 @("shall parse the output of ldd") 1266 unittest { 1267 auto res = parseLddOutput([ 1268 "linux-vdso.so.1 => (0x00007fffbf5fe000)", 1269 "libtinfo.so.5 => /lib/x86_64-linux-gnu/libtinfo.so.5 (0x00007fe28117f000)", 1270 "libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe280f7b000)", 1271 "libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe280bb4000)", 1272 "/lib64/ld-linux-x86-64.so.2 (0x00007fe2813dd000)" 1273 ]); 1274 assert(res.length == 3); 1275 assert(res[AbsolutePath("/lib/x86_64-linux-gnu/libtinfo.so.5")].get == 140610805166080); 1276 }