1 /** 2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module proc.process; 7 8 import core.sys.posix.signal : SIGKILL; 9 import core.thread : Thread; 10 import core.time : dur, Duration; 11 import logger = std.experimental.logger; 12 import std.algorithm : filter, count, joiner, map; 13 import std.array : appender, empty, array; 14 import std.exception : collectException; 15 import std.stdio : File, fileno, writeln; 16 import std.typecons : Flag, Yes; 17 static import std.process; 18 static import std.stdio; 19 20 import my.gc.refc; 21 import my.from_; 22 import my.path; 23 import my.named_type; 24 25 public import proc.channel; 26 public import proc.pid; 27 28 version (unittest) { 29 import std.file : remove; 30 } 31 32 /** Manage a process by reference counting so that it is terminated when the it 33 * stops being used such as the instance going out of scope. 34 */ 35 auto rcKill(T)(T p, int signal = SIGKILL) { 36 return refCounted(ScopeKill!T(p, signal)); 37 } 38 39 // backward compatibility. 40 alias scopeKill = rcKill; 41 42 struct ScopeKill(T) { 43 T process; 44 alias process this; 45 46 private int signal = SIGKILL; 47 private bool hasProcess; 48 49 this(T process, int signal) @safe { 50 this.process = process; 51 this.signal = signal; 52 this.hasProcess = true; 53 } 54 55 ~this() @safe { 56 if (hasProcess) 57 process.dispose(); 58 } 59 } 60 61 /// Async process wrapper for a std.process SpawnProcess 62 struct SpawnProcess { 63 import core.sys.posix.signal : SIGKILL; 64 import std.algorithm : among; 65 66 private { 67 enum State { 68 running, 69 terminated, 70 exitCode 71 } 72 73 std.process.Pid process; 74 RawPid pid; 75 int status_; 76 State st; 77 } 78 79 this(std.process.Pid process) @safe { 80 this.process = process; 81 this.pid = process.osHandle.RawPid; 82 } 83 84 ~this() @safe { 85 } 86 87 /// Returns: The raw OS handle for the process ID. 88 RawPid osHandle() nothrow @safe { 89 return pid; 90 } 91 92 /// Kill and cleanup the process. 93 void dispose() @safe { 94 final switch (st) { 95 case State.running: 96 this.kill; 97 this.wait; 98 break; 99 case State.terminated: 100 this.wait; 101 break; 102 case State.exitCode: 103 break; 104 } 105 106 st = State.exitCode; 107 } 108 109 /** Send `signal` to the process. 110 * 111 * Param: 112 * signal = a signal from `core.sys.posix.signal` 113 */ 114 void kill(int signal = SIGKILL) nothrow @trusted { 115 final switch (st) { 116 case State.running: 117 break; 118 case State.terminated: 119 goto case; 120 case State.exitCode: 121 return; 122 } 123 124 try { 125 std.process.kill(process, signal); 126 } catch (Exception e) { 127 } 128 129 st = State.terminated; 130 } 131 132 /// Blocking wait for the process to terminated. 133 /// Returns: the exit status. 134 int wait() @safe { 135 final switch (st) { 136 case State.running: 137 status_ = std.process.wait(process); 138 break; 139 case State.terminated: 140 status_ = std.process.wait(process); 141 break; 142 case State.exitCode: 143 break; 144 } 145 146 st = State.exitCode; 147 148 return status_; 149 } 150 151 /// Non-blocking wait for the process termination. 152 /// Returns: `true` if the process has terminated. 153 bool tryWait() @safe { 154 final switch (st) { 155 case State.running: 156 auto s = std.process.tryWait(process); 157 if (s.terminated) { 158 st = State.exitCode; 159 status_ = s.status; 160 } 161 break; 162 case State.terminated: 163 status_ = std.process.wait(process); 164 st = State.exitCode; 165 break; 166 case State.exitCode: 167 break; 168 } 169 170 return st.among(State.terminated, State.exitCode) != 0; 171 } 172 173 /// Returns: The exit status of the process. 174 int status() @safe { 175 if (st != State.exitCode) { 176 throw new Exception( 177 "Process has not terminated and wait/tryWait been called to collect the exit status"); 178 } 179 return status_; 180 } 181 182 /// Returns: If the process has terminated. 183 bool terminated() @safe { 184 return st.among(State.terminated, State.exitCode) != 0; 185 } 186 } 187 188 /// Async process that do not block on read from stdin/stderr. 189 struct PipeProcess { 190 import std.algorithm : among; 191 import core.sys.posix.signal : SIGKILL; 192 193 private { 194 enum State { 195 running, 196 terminated, 197 exitCode 198 } 199 200 std.process.ProcessPipes process; 201 std.process.Pid pid; 202 203 FileReadChannel stderr_; 204 FileReadChannel stdout_; 205 FileWriteChannel stdin_; 206 int status_; 207 State st; 208 } 209 210 this(std.process.Pid pid, File stdin, File stdout, File stderr) @safe { 211 this.pid = pid; 212 213 this.stdin_ = FileWriteChannel(stdin); 214 this.stdout_ = FileReadChannel(stdout); 215 this.stderr_ = FileReadChannel(stderr); 216 } 217 218 this(std.process.ProcessPipes process, std.process.Redirect r) @safe { 219 this.process = process; 220 this.pid = process.pid; 221 222 if (r & std.process.Redirect.stdin) { 223 stdin_ = FileWriteChannel(this.process.stdin); 224 } 225 if (r & std.process.Redirect.stdout) { 226 stdout_ = FileReadChannel(this.process.stdout); 227 } 228 if (r & std.process.Redirect.stderr) { 229 this.stderr_ = FileReadChannel(this.process.stderr); 230 } 231 } 232 233 /// Returns: The raw OS handle for the process ID. 234 RawPid osHandle() nothrow @safe { 235 return pid.osHandle.RawPid; 236 } 237 238 /// Access to stdout. 239 ref FileWriteChannel stdin() return scope nothrow @safe { 240 return stdin_; 241 } 242 243 /// Access to stdout. 244 ref FileReadChannel stdout() return scope nothrow @safe { 245 return stdout_; 246 } 247 248 /// Access stderr. 249 ref FileReadChannel stderr() return scope nothrow @safe { 250 return stderr_; 251 } 252 253 /// Kill and cleanup the process. 254 void dispose() @safe { 255 final switch (st) { 256 case State.running: 257 this.kill; 258 this.wait; 259 .destroy(process); 260 break; 261 case State.terminated: 262 this.wait; 263 .destroy(process); 264 break; 265 case State.exitCode: 266 break; 267 } 268 269 st = State.exitCode; 270 } 271 272 /** Send `signal` to the process. 273 * 274 * Param: 275 * signal = a signal from `core.sys.posix.signal` 276 */ 277 void kill(int signal = SIGKILL) nothrow @trusted { 278 final switch (st) { 279 case State.running: 280 break; 281 case State.terminated: 282 return; 283 case State.exitCode: 284 return; 285 } 286 287 try { 288 std.process.kill(pid, signal); 289 } catch (Exception e) { 290 } 291 292 st = State.terminated; 293 } 294 295 /// Blocking wait for the process to terminated. 296 /// Returns: the exit status. 297 int wait() @safe { 298 final switch (st) { 299 case State.running: 300 status_ = std.process.wait(pid); 301 break; 302 case State.terminated: 303 status_ = std.process.wait(pid); 304 break; 305 case State.exitCode: 306 break; 307 } 308 309 st = State.exitCode; 310 311 return status_; 312 } 313 314 /// Non-blocking wait for the process termination. 315 /// Returns: `true` if the process has terminated. 316 bool tryWait() @safe { 317 final switch (st) { 318 case State.running: 319 auto s = std.process.tryWait(pid); 320 if (s.terminated) { 321 st = State.exitCode; 322 status_ = s.status; 323 } 324 break; 325 case State.terminated: 326 status_ = std.process.wait(pid); 327 st = State.exitCode; 328 break; 329 case State.exitCode: 330 break; 331 } 332 333 return st.among(State.terminated, State.exitCode) != 0; 334 } 335 336 /// Returns: The exit status of the process. 337 int status() @safe { 338 if (st != State.exitCode) { 339 throw new Exception( 340 "Process has not terminated and wait/tryWait been called to collect the exit status"); 341 } 342 return status_; 343 } 344 345 /// Returns: If the process has terminated. 346 bool terminated() @safe { 347 return st.among(State.terminated, State.exitCode) != 0; 348 } 349 } 350 351 SpawnProcess spawnProcess(scope const(char[])[] args, File stdin = std.stdio.stdin, 352 File stdout = std.stdio.stdout, File stderr = std.stdio.stderr, 353 const string[string] env = null, std.process.Config config = std.process.Config.none, 354 scope const char[] workDir = null) { 355 return SpawnProcess(std.process.spawnProcess(args, stdin, stdout, stderr, 356 env, config, workDir)); 357 } 358 359 SpawnProcess spawnProcess(scope const(char[])[] args, const string[string] env, 360 std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) { 361 return SpawnProcess(std.process.spawnProcess(args, std.stdio.stdin, 362 std.stdio.stdout, std.stdio.stderr, env, config, workDir)); 363 } 364 365 SpawnProcess spawnProcess(scope const(char)[] program, 366 File stdin = std.stdio.stdin, File stdout = std.stdio.stdout, 367 File stderr = std.stdio.stderr, const string[string] env = null, 368 std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) { 369 return SpawnProcess(std.process.spawnProcess((&program)[0 .. 1], stdin, 370 stdout, stderr, env, config, workDir)); 371 } 372 373 SpawnProcess spawnShell(scope const(char)[] command, File stdin = std.stdio.stdin, 374 File stdout = std.stdio.stdout, File stderr = std.stdio.stderr, 375 scope const string[string] env = null, std.process.Config config = std.process.Config.none, 376 scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) { 377 return SpawnProcess(std.process.spawnShell(command, stdin, stdout, stderr, 378 env, config, workDir, shellPath)); 379 } 380 381 /// ditto 382 SpawnProcess spawnShell(scope const(char)[] command, scope const string[string] env, 383 std.process.Config config = std.process.Config.none, 384 scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) { 385 return SpawnProcess(std.process.spawnShell(command, env, config, workDir, shellPath)); 386 } 387 388 PipeProcess pipeProcess(scope const(char[])[] args, 389 std.process.Redirect redirect = std.process.Redirect.all, 390 const string[string] env = null, std.process.Config config = std.process.Config.none, 391 scope const(char)[] workDir = null) @safe { 392 return PipeProcess(std.process.pipeProcess(args, redirect, env, config, workDir), redirect); 393 } 394 395 PipeProcess pipeShell(scope const(char)[] command, 396 std.process.Redirect redirect = std.process.Redirect.all, 397 const string[string] env = null, std.process.Config config = std.process.Config.none, 398 scope const(char)[] workDir = null, string shellPath = std.process.nativeShell) @safe { 399 return PipeProcess(std.process.pipeShell(command, redirect, env, config, 400 workDir, shellPath), redirect); 401 } 402 403 /** Moves the process to a separate process group and on exit kill it and all 404 * its children. 405 */ 406 @safe struct Sandbox(ProcessT) { 407 import core.sys.posix.signal : SIGKILL; 408 409 private { 410 ProcessT p; 411 RawPid pid; 412 } 413 414 this(ProcessT p) @safe { 415 import core.sys.posix.unistd : setpgid; 416 417 this.p = p; 418 this.pid = p.osHandle; 419 setpgid(pid, 0); 420 } 421 422 RawPid osHandle() nothrow @safe { 423 return pid; 424 } 425 426 static if (__traits(hasMember, ProcessT, "stdin")) { 427 ref FileWriteChannel stdin() nothrow @safe { 428 return p.stdin; 429 } 430 } 431 432 static if (__traits(hasMember, ProcessT, "stdout")) { 433 ref FileReadChannel stdout() nothrow @safe { 434 return p.stdout; 435 } 436 } 437 438 static if (__traits(hasMember, ProcessT, "stderr")) { 439 ref FileReadChannel stderr() nothrow @safe { 440 return p.stderr; 441 } 442 } 443 444 void dispose() @safe { 445 // this also reaps the children thus cleaning up zombies 446 this.kill; 447 p.dispose; 448 } 449 450 /** Send `signal` to the process. 451 * 452 * Param: 453 * signal = a signal from `core.sys.posix.signal` 454 */ 455 void kill(int signal = SIGKILL) nothrow @safe { 456 // must first retrieve the submap because after the process is killed 457 // its children may have changed. 458 auto pmap = makePidMap.getSubMap(pid); 459 460 p.kill(signal); 461 462 // only kill and reap the children 463 pmap.remove(pid); 464 proc.pid.kill(pmap, Yes.onlyCurrentUser, signal).reap; 465 } 466 467 int wait() @safe { 468 return p.wait; 469 } 470 471 bool tryWait() @safe { 472 return p.tryWait; 473 } 474 475 int status() @safe { 476 return p.status; 477 } 478 479 bool terminated() @safe { 480 return p.terminated; 481 } 482 } 483 484 auto sandbox(T)(T p) @safe { 485 return Sandbox!T(p); 486 } 487 488 @("shall terminate a group of processes") 489 unittest { 490 import std.datetime.stopwatch : StopWatch, AutoStart; 491 492 immutable scriptName = makeScript(`#!/bin/bash 493 sleep 10m & 494 sleep 10m & 495 sleep 10m 496 `); 497 scope (exit) 498 remove(scriptName); 499 500 auto p = pipeProcess([scriptName]).sandbox.rcKill; 501 waitUntilChildren(p.osHandle, 3); 502 const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 503 p.kill; 504 Thread.sleep(500.dur!"msecs"); // wait for the OS to kill the children 505 const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 506 507 assert(p.wait == -9); 508 assert(p.terminated); 509 assert(preChildren == 3); 510 assert(postChildren == 0); 511 } 512 513 /** dispose the process after the timeout. 514 */ 515 @safe struct Timeout(ProcessT) { 516 import core.sys.posix.signal : SIGKILL; 517 import core.thread; 518 import std.algorithm : among; 519 import std.datetime : Clock, Duration; 520 521 private { 522 enum Msg { 523 none, 524 stop, 525 status, 526 } 527 528 enum Reply { 529 none, 530 running, 531 normalDeath, 532 killedByTimeout, 533 } 534 535 static struct Payload { 536 ProcessT p; 537 RawPid pid; 538 Background background; 539 Reply backgroundReply; 540 } 541 542 RefCounted!Payload rc; 543 } 544 545 this(ProcessT p, Duration timeout) @trusted { 546 import std.algorithm : move; 547 548 auto pid = p.osHandle; 549 rc = refCounted(Payload(move(p), pid)); 550 rc.background = new Background(&rc.p, timeout); 551 rc.background.isDaemon = true; 552 rc.background.start; 553 } 554 555 ~this() @trusted { 556 rc.release; 557 } 558 559 private static class Background : Thread { 560 import core.sync.condition : Condition; 561 import core.sync.mutex : Mutex; 562 563 Duration timeout; 564 ProcessT* p; 565 Mutex mtx; 566 Msg[] msg; 567 Reply reply_; 568 RawPid pid; 569 int signal = SIGKILL; 570 571 this(ProcessT* p, Duration timeout) { 572 this.p = p; 573 this.timeout = timeout; 574 this.mtx = new Mutex(); 575 this.pid = p.osHandle; 576 577 super(&run); 578 } 579 580 void run() { 581 checkProcess(this.pid, this.timeout, this); 582 } 583 584 void put(Msg msg) @trusted nothrow { 585 this.mtx.lock_nothrow(); 586 scope (exit) 587 this.mtx.unlock_nothrow(); 588 this.msg ~= msg; 589 } 590 591 Msg popMsg() @trusted nothrow { 592 this.mtx.lock_nothrow(); 593 scope (exit) 594 this.mtx.unlock_nothrow(); 595 if (msg.empty) 596 return Msg.none; 597 auto rval = msg[$ - 1]; 598 msg = msg[0 .. $ - 1]; 599 return rval; 600 } 601 602 void setReply(Reply reply_) @trusted nothrow { 603 this.mtx.lock_nothrow(); 604 scope (exit) 605 this.mtx.unlock_nothrow(); 606 this.reply_ = reply_; 607 } 608 609 Reply reply() @trusted nothrow { 610 this.mtx.lock_nothrow(); 611 scope (exit) 612 this.mtx.unlock_nothrow(); 613 return reply_; 614 } 615 616 void setSignal(int signal) @trusted nothrow { 617 this.mtx.lock_nothrow(); 618 scope (exit) 619 this.mtx.unlock_nothrow(); 620 this.signal = signal; 621 } 622 623 void kill() @trusted nothrow { 624 this.mtx.lock_nothrow(); 625 scope (exit) 626 this.mtx.unlock_nothrow(); 627 p.kill(signal); 628 } 629 } 630 631 private static void checkProcess(RawPid p, Duration timeout, Background bg) nothrow { 632 import std.algorithm : max, min; 633 import std.variant : Variant; 634 static import core.sys.posix.signal; 635 636 const stopAt = Clock.currTime + timeout; 637 // the purpose is to poll the process often "enough" that if it 638 // terminates early `Process` detects it fast enough. 1000 is chosen 639 // because it "feels good". the purpose 640 auto sleepInterval = min(50, max(1, timeout.total!"msecs" / 1000)).dur!"msecs"; 641 642 bool forceStop; 643 bool running = true; 644 while (running && Clock.currTime < stopAt) { 645 const msg = bg.popMsg; 646 647 final switch (msg) { 648 case Msg.none: 649 () @trusted { Thread.sleep(sleepInterval); }(); 650 break; 651 case Msg.stop: 652 forceStop = true; 653 running = false; 654 break; 655 case Msg.status: 656 bg.setReply(Reply.running); 657 break; 658 } 659 660 () @trusted { 661 if (core.sys.posix.signal.kill(p, 0) == -1) { 662 running = false; 663 } 664 }(); 665 } 666 667 // may be children alive thus must ensure that the whole process tree 668 // is killed if this is a sandbox with a timeout. 669 bg.kill(); 670 671 if (!forceStop && Clock.currTime >= stopAt) { 672 bg.setReply(Reply.killedByTimeout); 673 } else { 674 bg.setReply(Reply.normalDeath); 675 } 676 } 677 678 RawPid osHandle() nothrow @trusted { 679 return rc.pid; 680 } 681 682 static if (__traits(hasMember, ProcessT, "stdin")) { 683 ref FileWriteChannel stdin() nothrow @safe { 684 return rc.p.stdin; 685 } 686 } 687 688 static if (__traits(hasMember, ProcessT, "stdout")) { 689 ref FileReadChannel stdout() nothrow @safe { 690 return rc.p.stdout; 691 } 692 } 693 694 static if (__traits(hasMember, ProcessT, "stderr")) { 695 ref FileReadChannel stderr() nothrow @trusted { 696 return rc.p.stderr; 697 } 698 } 699 700 void dispose() @trusted { 701 if (rc.backgroundReply.among(Reply.none, Reply.running)) { 702 rc.background.put(Msg.stop); 703 rc.background.join; 704 rc.backgroundReply = rc.background.reply; 705 } 706 rc.p.dispose; 707 } 708 709 /** Send `signal` to the process. 710 * 711 * Param: 712 * signal = a signal from `core.sys.posix.signal` 713 */ 714 void kill(int signal = SIGKILL) nothrow @trusted { 715 rc.background.setSignal(signal); 716 rc.background.kill(); 717 } 718 719 int wait() @trusted { 720 while (!this.tryWait) { 721 Thread.sleep(20.dur!"msecs"); 722 } 723 return rc.p.wait; 724 } 725 726 bool tryWait() @trusted { 727 return rc.p.tryWait; 728 } 729 730 int status() @trusted { 731 return rc.p.status; 732 } 733 734 bool terminated() @trusted { 735 return rc.p.terminated; 736 } 737 738 bool timeoutTriggered() @trusted { 739 if (rc.backgroundReply.among(Reply.none, Reply.running)) { 740 rc.background.put(Msg.status); 741 rc.backgroundReply = rc.background.reply; 742 } 743 return rc.backgroundReply == Reply.killedByTimeout; 744 } 745 } 746 747 auto timeout(T)(T p, Duration timeout_) @trusted { 748 return Timeout!T(p, timeout_); 749 } 750 751 /// Returns when the process has pending data. 752 void waitForPendingData(ProcessT)(Process p) { 753 while (!p.pipe.hasPendingData || !p.stderr.hasPendingData) { 754 Thread.sleep(20.dur!"msecs"); 755 } 756 } 757 758 @("shall kill the process after the timeout") 759 unittest { 760 import std.datetime.stopwatch : StopWatch, AutoStart; 761 762 auto p = pipeProcess(["sleep", "1m"]).timeout(100.dur!"msecs").rcKill; 763 auto sw = StopWatch(AutoStart.yes); 764 p.wait; 765 sw.stop; 766 767 assert(sw.peek >= 100.dur!"msecs"); 768 assert(sw.peek <= 500.dur!"msecs"); 769 assert(p.wait == -9); 770 assert(p.terminated); 771 assert(p.status == -9); 772 assert(p.timeoutTriggered); 773 } 774 775 struct DrainElement { 776 enum Type { 777 stdout, 778 stderr, 779 } 780 781 Type type; 782 const(ubyte)[] data; 783 784 /// Returns: iterates the data as an input range. 785 auto byUTF8() @safe pure nothrow const @nogc { 786 static import std.utf; 787 788 return std.utf.byUTF!(const(char))(cast(const(char)[]) data); 789 } 790 791 bool empty() @safe pure nothrow const @nogc { 792 return data.length == 0; 793 } 794 } 795 796 /** A range that drains a process stdout/stderr until it terminates. 797 * 798 * There may be `DrainElement` that are empty. 799 */ 800 struct DrainRange(ProcessT) { 801 private { 802 enum State { 803 start, 804 draining, 805 lastStdout, 806 lastStderr, 807 lastElement, 808 empty, 809 } 810 811 ProcessT p; 812 DrainElement front_; 813 State st; 814 ubyte[] buf; 815 } 816 817 this(ProcessT p) { 818 this.p = p; 819 this.buf = new ubyte[4096]; 820 } 821 822 DrainElement front() @safe pure nothrow const @nogc { 823 assert(!empty, "Can't get front of an empty range"); 824 return front_; 825 } 826 827 void popFront() @safe { 828 assert(!empty, "Can't pop front of an empty range"); 829 830 static bool isAnyPipeOpen(ref ProcessT p) { 831 return p.stdout.isOpen || p.stderr.isOpen; 832 } 833 834 DrainElement readData(ref ProcessT p) @safe { 835 if (p.stderr.hasPendingData) { 836 return DrainElement(DrainElement.Type.stderr, p.stderr.read(buf)); 837 } else if (p.stdout.hasPendingData) { 838 return DrainElement(DrainElement.Type.stdout, p.stdout.read(buf)); 839 } 840 return DrainElement.init; 841 } 842 843 DrainElement waitUntilData() @safe { 844 import std.datetime : Clock; 845 846 // may livelock if the process never terminates and never writes to 847 // the terminal. timeout ensure that it sooner or later is break 848 // the loop. This is important if the drain is part of a timeout 849 // wrapping. 850 851 const timeout = 100.dur!"msecs"; 852 const stopAt = Clock.currTime + timeout; 853 const sleepFor = timeout / 20; 854 const useSleep = Clock.currTime + sleepFor; 855 bool running = true; 856 while (running) { 857 const now = Clock.currTime; 858 859 running = (now < stopAt) && isAnyPipeOpen(p); 860 861 auto bufRead = readData(p); 862 863 if (!bufRead.empty) { 864 return DrainElement(bufRead.type, bufRead.data.dup); 865 } else if (running && now > useSleep && bufRead.empty) { 866 import core.thread : Thread; 867 868 () @trusted { Thread.sleep(sleepFor); }(); 869 } 870 } 871 872 return DrainElement.init; 873 } 874 875 front_ = DrainElement.init; 876 877 final switch (st) { 878 case State.start: 879 st = State.draining; 880 front_ = waitUntilData; 881 break; 882 case State.draining: 883 if (p.terminated) { 884 st = State.lastStdout; 885 } else if (isAnyPipeOpen(p)) { 886 front_ = waitUntilData(); 887 } else { 888 st = State.lastStdout; 889 } 890 break; 891 case State.lastStdout: 892 if (p.stdout.hasPendingData) { 893 front_ = DrainElement(DrainElement.Type.stdout, p.stdout.read(buf).dup); 894 } else { 895 st = State.lastStderr; 896 } 897 break; 898 case State.lastStderr: 899 if (p.stderr.hasPendingData) { 900 front_ = DrainElement(DrainElement.Type.stderr, p.stderr.read(buf).dup); 901 } else { 902 st = State.lastElement; 903 } 904 break; 905 case State.lastElement: 906 st = State.empty; 907 break; 908 case State.empty: 909 break; 910 } 911 } 912 913 bool empty() @safe pure nothrow const @nogc { 914 return st == State.empty; 915 } 916 } 917 918 /// Drain a process pipe until empty. 919 auto drain(T)(T p) { 920 return DrainRange!T(p); 921 } 922 923 /// Read the data from a ReadChannel by line. 924 struct DrainByLineCopyRange(ProcessT) { 925 private { 926 enum State { 927 start, 928 draining, 929 lastLine, 930 lastBuf, 931 empty, 932 } 933 934 ProcessT process; 935 DrainRange!ProcessT range; 936 State st; 937 const(ubyte)[] buf; 938 const(char)[] line; 939 } 940 941 this(ProcessT p) { 942 process = p; 943 range = p.drain; 944 } 945 946 string front() @trusted pure nothrow const @nogc { 947 import std.exception : assumeUnique; 948 949 assert(!empty, "Can't get front of an empty range"); 950 return line.assumeUnique; 951 } 952 953 void popFront() @safe { 954 assert(!empty, "Can't pop front of an empty range"); 955 import std.algorithm : countUntil; 956 import std.array : array; 957 static import std.utf; 958 959 const(ubyte)[] updateBuf(size_t idx) { 960 const(ubyte)[] tmp; 961 if (buf.empty) { 962 // do nothing 963 } else if (idx == -1) { 964 tmp = buf; 965 buf = null; 966 } else { 967 idx = () { 968 if (idx < buf.length) { 969 return idx + 1; 970 } 971 return idx; 972 }(); 973 tmp = buf[0 .. idx]; 974 buf = buf[idx .. $]; 975 } 976 977 if (!tmp.empty && tmp[$ - 1] == '\n') { 978 tmp = tmp[0 .. $ - 1]; 979 } 980 return tmp; 981 } 982 983 void drainLine() { 984 void fillBuf() { 985 if (!range.empty) { 986 range.popFront; 987 } 988 if (!range.empty) { 989 buf ~= range.front.data; 990 } 991 } 992 993 size_t idx; 994 () { 995 int cnt; 996 do { 997 fillBuf(); 998 idx = buf.countUntil('\n'); 999 // 2 is a magic number which mean that it at most wait 2x timeout for data 1000 } 1001 while (!range.empty && idx == -1 && cnt++ < 2); 1002 }(); 1003 1004 if (idx != -1) { 1005 auto tmp = updateBuf(idx); 1006 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array; 1007 } 1008 } 1009 1010 bool lastLine() { 1011 size_t idx = buf.countUntil('\n'); 1012 if (idx == -1) 1013 return true; 1014 1015 auto tmp = updateBuf(idx); 1016 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array; 1017 return false; 1018 } 1019 1020 line = null; 1021 final switch (st) { 1022 case State.start: 1023 drainLine; 1024 st = State.draining; 1025 break; 1026 case State.draining: 1027 drainLine; 1028 if (range.empty) 1029 st = State.lastLine; 1030 break; 1031 case State.lastLine: 1032 if (lastLine) 1033 st = State.lastBuf; 1034 break; 1035 case State.lastBuf: 1036 line = std.utf.byUTF!(const(char))(cast(const(char)[]) buf).array; 1037 st = State.empty; 1038 break; 1039 case State.empty: 1040 break; 1041 } 1042 } 1043 1044 bool empty() @safe pure nothrow const @nogc { 1045 return st == State.empty; 1046 } 1047 } 1048 1049 @("shall drain the process output by line") 1050 unittest { 1051 import std.algorithm : filter, joiner, map; 1052 import std.array : array; 1053 1054 auto p = pipeProcess(["dd", "if=/dev/zero", "bs=10", "count=3"]).rcKill; 1055 auto res = p.process.drainByLineCopy.filter!"!a.empty".array; 1056 1057 assert(res.length == 3); 1058 assert(res.joiner.count >= 30); 1059 assert(p.wait == 0); 1060 assert(p.terminated); 1061 } 1062 1063 auto drainByLineCopy(T)(T p) { 1064 return DrainByLineCopyRange!T(p); 1065 } 1066 1067 /// Drain the process output until it is done executing. 1068 auto drainToNull(T)(T p) { 1069 foreach (l; p.drain()) { 1070 } 1071 return p; 1072 } 1073 1074 /// Drain the output from the process into an output range. 1075 auto drain(ProcessT, T)(ProcessT p, ref T range) { 1076 foreach (l; p.drain()) { 1077 range.put(l); 1078 } 1079 return p; 1080 } 1081 1082 @("shall drain the output of a process while it is running with a separation of stdout and stderr") 1083 unittest { 1084 auto p = pipeProcess(["dd", "if=/dev/urandom", "bs=10", "count=3"]).rcKill; 1085 auto res = p.drain.array; 1086 1087 // this is just a sanity check. It has to be kind a high because there is 1088 // some wiggleroom allowed 1089 assert(res.count > 1 && res.count <= 50); 1090 1091 assert(res.filter!(a => a.type == DrainElement.Type.stdout) 1092 .map!(a => a.data) 1093 .joiner 1094 .count == 30); 1095 assert(p.wait == 0); 1096 assert(p.terminated); 1097 } 1098 1099 @("shall kill the process tree when the timeout is reached") 1100 unittest { 1101 immutable script = makeScript(`#!/bin/bash 1102 sleep 10m 1103 `); 1104 scope (exit) 1105 remove(script); 1106 1107 auto p = pipeProcess([script]).sandbox.timeout(1.dur!"seconds").rcKill; 1108 waitUntilChildren(p.osHandle, 1); 1109 const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 1110 const res = p.process.drain.array; 1111 const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 1112 1113 assert(p.wait == -9); 1114 assert(p.terminated); 1115 assert(preChildren == 1); 1116 assert(postChildren == 0); 1117 } 1118 1119 string makeScript(string script, string file = __FILE__, uint line = __LINE__) { 1120 import core.sys.posix.sys.stat; 1121 import std.file : getAttributes, setAttributes, thisExePath; 1122 import std.stdio : File; 1123 import std.path : baseName; 1124 import std.conv : to; 1125 1126 immutable fname = thisExePath ~ "_" ~ file.baseName ~ line.to!string ~ ".sh"; 1127 1128 File(fname, "w").writeln(script); 1129 setAttributes(fname, getAttributes(fname) | S_IXUSR | S_IXGRP | S_IXOTH); 1130 return fname; 1131 } 1132 1133 /// Wait for p to have num children or fail after 10s. 1134 void waitUntilChildren(RawPid p, int num) { 1135 import std.datetime : Clock; 1136 1137 const failAt = Clock.currTime + 10.dur!"seconds"; 1138 do { 1139 Thread.sleep(50.dur!"msecs"); 1140 if (Clock.currTime > failAt) 1141 break; 1142 } 1143 while (makePidMap.getSubMap(p).remove(p).length < num); 1144 } 1145 1146 alias Address = NamedType!(ulong, Tag!"Address", ulong.init, TagStringable); 1147 struct AddressMap { 1148 Address begin; 1149 Address end; 1150 NamedType!(string, Tag!"Permission", string.init, TagStringable) perm; 1151 } 1152 1153 /** An assoc array mapping the path of each shared library loaded by 1154 * the process to the address it is loaded at in the process address space. 1155 */ 1156 AddressMap[][AbsolutePath] libs(RawPid pid) nothrow { 1157 import std.stdio : File; 1158 import std.format : formattedRead, format; 1159 import std.algorithm : countUntil; 1160 import std.typecons : tuple; 1161 import std..string : strip; 1162 1163 typeof(return) rval; 1164 1165 try { 1166 foreach (l; File(format!"/proc/%s/maps"(pid)).byLine 1167 .map!(a => tuple(a, a.countUntil('/'))) 1168 .filter!(a => a[1] != -1)) { 1169 try { 1170 auto p = AbsolutePath(l[0][l[1] .. $].idup); 1171 auto m = l[0][0 .. l[1]].strip; 1172 ulong begin; 1173 ulong end; 1174 char[] perm; 1175 if (formattedRead!"%x-%x %s "(m, begin, end, perm) != 3) { 1176 continue; 1177 } 1178 1179 auto amap = AddressMap(Address(begin), Address(end), 1180 typeof(AddressMap.init.perm)(perm.idup)); 1181 if (auto v = p in rval) { 1182 *v ~= amap; 1183 } else { 1184 rval[p] = [amap]; 1185 } 1186 } catch (Exception e) { 1187 } 1188 } 1189 } catch (Exception e) { 1190 logger.trace(e.msg).collectException; 1191 } 1192 1193 return rval; 1194 } 1195 1196 @("shall read the libraries the process is using") 1197 unittest { 1198 immutable scriptName = makeScript(`#!/bin/bash 1199 sleep 10m & 1200 sleep 10m & 1201 sleep 10m 1202 `); 1203 scope (exit) 1204 remove(scriptName); 1205 1206 auto p = pipeProcess([scriptName]).sandbox.rcKill; 1207 waitUntilChildren(p.osHandle, 3); 1208 auto res = libs(p.osHandle); 1209 p.kill; 1210 1211 assert(res.length != 0); 1212 assert(AbsolutePath("/bin/bash") in res); 1213 } 1214 1215 /** Parses the output from a run of 'ldd' on a binary. 1216 * 1217 * Params: 1218 * input = an input range of lines which is the output from ldd 1219 * 1220 * Example 1221 * --- 1222 * writeln(parseLddOutput(` 1223 * linux-vdso.so.1 => (0x00007fffbf5fe000) 1224 * libtinfo.so.5 => /lib/x86_64-linux-gnu/libtinfo.so.5 (0x00007fe28117f000) 1225 * libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe280f7b000) 1226 * libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe280bb4000) 1227 * /lib64/ld-linux-x86-64.so.2 (0x00007fe2813dd000) 1228 * `)) 1229 * --- 1230 * 1231 * Returns: a dictionary of {path: address} for each library required by the 1232 * specified binary. 1233 */ 1234 Address[AbsolutePath] parseLddOutput(T)(T input) if (std_.range.isInputRange!T) { 1235 import std.regex : regex, matchFirst; 1236 import std.format : formattedRead; 1237 1238 typeof(return) rval; 1239 const reLinux = regex(`\s(?P<lib>\S?/\S+)\s+\((?P<addr>0x.+)\)`); 1240 1241 foreach (l; input) { 1242 auto m = matchFirst(l, reLinux); 1243 if (m.empty || m.length < 3) { 1244 continue; 1245 } 1246 1247 try { 1248 ulong addr; 1249 formattedRead!"0x%x"(m["addr"], addr); 1250 rval[AbsolutePath(m["lib"])] = Address(addr); 1251 } catch (Exception e) { 1252 } 1253 } 1254 1255 return rval; 1256 } 1257 1258 @("shall parse the output of ldd") 1259 unittest { 1260 auto res = parseLddOutput([ 1261 "linux-vdso.so.1 => (0x00007fffbf5fe000)", 1262 "libtinfo.so.5 => /lib/x86_64-linux-gnu/libtinfo.so.5 (0x00007fe28117f000)", 1263 "libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe280f7b000)", 1264 "libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe280bb4000)", 1265 "/lib64/ld-linux-x86-64.so.2 (0x00007fe2813dd000)" 1266 ]); 1267 assert(res.length == 3); 1268 assert(res[AbsolutePath("/lib/x86_64-linux-gnu/libtinfo.so.5")].get == 140610805166080); 1269 }