1 /** 2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved. 3 License: MPL-2 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 6 This Source Code Form is subject to the terms of the Mozilla Public License, 7 v.2.0. If a copy of the MPL was not distributed with this file, You can obtain 8 one at http://mozilla.org/MPL/2.0/. 9 */ 10 module process; 11 12 import core.sys.posix.unistd : pid_t; 13 import core.thread : Thread; 14 import core.time : dur, Duration; 15 import logger = std.experimental.logger; 16 import std.algorithm : filter, count, joiner, map; 17 import std.array : appender, empty, array; 18 import std.exception : collectException; 19 import std.stdio : File, fileno, writeln; 20 static import std.process; 21 22 public import process.channel; 23 24 version (unittest) { 25 import unit_threaded.assertions; 26 import std.file : remove; 27 } 28 29 /// Automatically terminate the process when it goes out of scope. 30 auto scopeKill(T)(T p) { 31 return ScopeKill!T(p); 32 } 33 34 struct ScopeKill(T) { 35 T process; 36 alias process this; 37 38 ~this() { 39 process.dispose(); 40 } 41 } 42 43 /** Async process that do not block on read from stdin/stderr. 44 */ 45 struct PipeProcess { 46 import std.algorithm : among; 47 48 private { 49 enum State { 50 running, 51 terminated, 52 exitCode 53 } 54 55 std.process.ProcessPipes process; 56 Pipe pipe_; 57 FileReadChannel stderr_; 58 int status_; 59 State st; 60 } 61 62 this(std.process.ProcessPipes process) @safe { 63 this.process = process; 64 this.pipe_ = Pipe(this.process.stdout, this.process.stdin); 65 this.stderr_ = FileReadChannel(this.process.stderr); 66 } 67 68 /// Returns: The raw OS handle for the process ID. 69 RawPid osHandle() nothrow @safe { 70 return process.pid.osHandle.RawPid; 71 } 72 73 /// Access to stdin and stdout. 74 ref Pipe pipe() return scope nothrow @safe { 75 return pipe_; 76 } 77 78 /// Access stderr. 79 ref FileReadChannel stderr() return scope nothrow @safe { 80 return stderr_; 81 } 82 83 /// Kill and cleanup the process. 84 void dispose() @safe { 85 final switch (st) { 86 case State.running: 87 this.kill; 88 this.wait; 89 .destroy(process); 90 break; 91 case State.terminated: 92 this.wait; 93 .destroy(process); 94 break; 95 case State.exitCode: 96 break; 97 } 98 99 st = State.exitCode; 100 } 101 102 /// Kill the process. 103 void kill() nothrow @trusted { 104 import core.sys.posix.signal : SIGKILL; 105 106 final switch (st) { 107 case State.running: 108 break; 109 case State.terminated: 110 return; 111 case State.exitCode: 112 return; 113 } 114 115 try { 116 std.process.kill(process.pid, SIGKILL); 117 } catch (Exception e) { 118 } 119 120 st = State.terminated; 121 } 122 123 /// Blocking wait for the process to terminated. 124 /// Returns: the exit status. 125 int wait() @safe { 126 final switch (st) { 127 case State.running: 128 status_ = std.process.wait(process.pid); 129 break; 130 case State.terminated: 131 status_ = std.process.wait(process.pid); 132 break; 133 case State.exitCode: 134 break; 135 } 136 137 st = State.exitCode; 138 139 return status_; 140 } 141 142 /// Non-blocking wait for the process termination. 143 /// Returns: `true` if the process has terminated. 144 bool tryWait() @safe { 145 final switch (st) { 146 case State.running: 147 auto s = std.process.tryWait(process.pid); 148 if (s.terminated) { 149 st = State.exitCode; 150 status_ = s.status; 151 } 152 break; 153 case State.terminated: 154 status_ = std.process.wait(process.pid); 155 st = State.exitCode; 156 break; 157 case State.exitCode: 158 break; 159 } 160 161 return st.among(State.terminated, State.exitCode) != 0; 162 } 163 164 /// Returns: The exit status of the process. 165 int status() @safe { 166 if (st != State.exitCode) { 167 throw new Exception( 168 "Process has not terminated and wait/tryWait been called to collect the exit status"); 169 } 170 return status_; 171 } 172 173 /// Returns: If the process has terminated. 174 bool terminated() @safe { 175 return st.among(State.terminated, State.exitCode) != 0; 176 } 177 } 178 179 PipeProcess pipeProcess(scope const(char[])[] args, 180 std.process.Redirect redirect = std.process.Redirect.all, 181 const string[string] env = null, std.process.Config config = std.process.Config.none, 182 scope const(char)[] workDir = null) @safe { 183 return PipeProcess(std.process.pipeProcess(args, redirect, env, config, workDir)); 184 } 185 186 PipeProcess pipeShell(scope const(char)[] command, 187 std.process.Redirect redirect = std.process.Redirect.all, 188 const string[string] env = null, std.process.Config config = std.process.Config.none, 189 scope const(char)[] workDir = null, string shellPath = std.process.nativeShell) @safe { 190 return PipeProcess(std.process.pipeShell(command, redirect, env, config, workDir, shellPath)); 191 } 192 193 /** Moves the process to a separate process group and on exit kill it and all 194 * its children. 195 */ 196 struct Sandbox(ProcessT) { 197 private { 198 ProcessT p; 199 } 200 201 this(ProcessT p) @safe { 202 import core.sys.posix.unistd : setpgid; 203 204 this.p = p; 205 setpgid(p.osHandle, 0); 206 } 207 208 RawPid osHandle() nothrow @safe { 209 return p.osHandle; 210 } 211 212 ref Pipe pipe() nothrow @safe { 213 return p.pipe; 214 } 215 216 ref FileReadChannel stderr() nothrow @safe { 217 return p.stderr; 218 } 219 220 void dispose() @safe { 221 // this also reaps the children thus cleaning up zombies 222 this.kill; 223 p.dispose; 224 } 225 226 void kill() nothrow @safe { 227 static import core.sys.posix.signal; 228 import core.sys.posix.sys.wait : waitpid, WNOHANG; 229 230 static RawPid[] update(RawPid[] pids) @trusted { 231 auto app = appender!(RawPid[])(); 232 233 foreach (p; pids) { 234 try { 235 app.put(getDeepChildren(p)); 236 } catch (Exception e) { 237 } 238 } 239 240 return app.data; 241 } 242 243 static void killChildren(RawPid[] children) @trusted { 244 foreach (const c; children) { 245 core.sys.posix.signal.kill(c, core.sys.posix.signal.SIGKILL); 246 } 247 } 248 249 p.kill; 250 auto children = update([p.osHandle]); 251 auto reapChildren = appender!(RawPid[])(); 252 // if there ever are processes that are spawned with root permissions 253 // or something happens that they can't be killed by "this" process 254 // tree. Thus limit the iterations to a reasonable number 255 for (int i = 0; !children.empty && i < 5; ++i) { 256 reapChildren.put(children); 257 killChildren(children); 258 children = update(children); 259 } 260 261 foreach (c; reapChildren.data) { 262 () @trusted { waitpid(c, null, WNOHANG); }(); 263 } 264 } 265 266 int wait() @safe { 267 return p.wait; 268 } 269 270 bool tryWait() @safe { 271 return p.tryWait; 272 } 273 274 int status() @safe { 275 return p.status; 276 } 277 278 bool terminated() @safe { 279 return p.terminated; 280 } 281 } 282 283 auto sandbox(T)(T p) @safe { 284 return Sandbox!T(p); 285 } 286 287 @("shall terminate a group of processes") 288 unittest { 289 import std.algorithm : count; 290 import std.datetime.stopwatch : StopWatch, AutoStart; 291 292 immutable scriptName = makeScript(`#!/bin/bash 293 sleep 10m & 294 sleep 10m & 295 sleep 10m 296 `); 297 scope (exit) 298 remove(scriptName); 299 300 auto p = pipeProcess([scriptName]).sandbox.scopeKill; 301 for (int i = 0; getDeepChildren(p.osHandle).count < 3; ++i) { 302 Thread.sleep(50.dur!"msecs"); 303 } 304 const preChildren = getDeepChildren(p.osHandle).count; 305 p.kill; 306 Thread.sleep(500.dur!"msecs"); // wait for the OS to kill the children 307 const postChildren = getDeepChildren(p.osHandle).count; 308 309 p.wait.shouldEqual(-9); 310 p.terminated.shouldBeTrue; 311 preChildren.shouldEqual(3); 312 postChildren.shouldEqual(0); 313 } 314 315 /** dispose the process after the timeout. 316 */ 317 struct Timeout(ProcessT) { 318 import std.algorithm : among; 319 import std.datetime : Clock, Duration; 320 import core.thread; 321 import std.typecons : RefCounted, refCounted; 322 323 private { 324 enum Msg { 325 none, 326 stop, 327 status, 328 } 329 330 enum Reply { 331 none, 332 running, 333 normalDeath, 334 killedByTimeout, 335 } 336 337 static struct Payload { 338 ProcessT p; 339 Background background; 340 Reply backgroundReply; 341 } 342 343 RefCounted!Payload rc; 344 } 345 346 this(ProcessT p, Duration timeout) @trusted { 347 import std.algorithm : move; 348 349 rc = refCounted(Payload(move(p))); 350 rc.background = new Background(&rc.p, timeout); 351 rc.background.isDaemon = true; 352 rc.background.start; 353 } 354 355 private static class Background : Thread { 356 import core.sync.condition : Condition; 357 import core.sync.mutex : Mutex; 358 359 Duration timeout; 360 ProcessT* p; 361 Mutex mtx; 362 Msg[] msg; 363 Reply reply_; 364 365 this(ProcessT* p, Duration timeout) { 366 this.p = p; 367 this.timeout = timeout; 368 this.mtx = new Mutex(); 369 370 super(&run); 371 } 372 373 void run() { 374 checkProcess(p.osHandle, this.timeout, this); 375 } 376 377 void put(Msg msg) @trusted { 378 this.mtx.lock_nothrow(); 379 scope (exit) 380 this.mtx.unlock_nothrow(); 381 this.msg ~= msg; 382 } 383 384 Msg popMsg() @trusted nothrow { 385 this.mtx.lock_nothrow(); 386 scope (exit) 387 this.mtx.unlock_nothrow(); 388 if (msg.empty) 389 return Msg.none; 390 auto rval = msg[$ - 1]; 391 msg = msg[0 .. $ - 1]; 392 return rval; 393 } 394 395 void setReply(Reply reply_) @trusted { 396 { 397 this.mtx.lock_nothrow(); 398 scope (exit) 399 this.mtx.unlock_nothrow(); 400 this.reply_ = reply_; 401 } 402 } 403 404 Reply reply() @trusted nothrow { 405 this.mtx.lock_nothrow(); 406 scope (exit) 407 this.mtx.unlock_nothrow(); 408 return reply_; 409 } 410 411 void kill() @trusted nothrow { 412 this.mtx.lock_nothrow(); 413 scope (exit) 414 this.mtx.unlock_nothrow(); 415 p.kill; 416 } 417 } 418 419 private static void checkProcess(RawPid p, Duration timeout, Background bg) { 420 import core.sys.posix.signal : SIGKILL; 421 import std.algorithm : max, min; 422 import std.variant : Variant; 423 static import core.sys.posix.signal; 424 425 const stopAt = Clock.currTime + timeout; 426 // the purpose is to poll the process often "enough" that if it 427 // terminates early `Process` detects it fast enough. 1000 is chosen 428 // because it "feels good". the purpose 429 auto sleepInterval = min(500, max(20, timeout.total!"msecs" / 1000)).dur!"msecs"; 430 431 bool forceStop; 432 bool running = true; 433 while (running && Clock.currTime < stopAt) { 434 const msg = bg.popMsg; 435 436 final switch (msg) { 437 case Msg.none: 438 Thread.sleep(sleepInterval); 439 break; 440 case Msg.stop: 441 forceStop = true; 442 running = false; 443 break; 444 case Msg.status: 445 bg.setReply(Reply.running); 446 break; 447 } 448 449 if (core.sys.posix.signal.kill(p, 0) == -1) { 450 running = false; 451 } 452 } 453 454 if (!forceStop && Clock.currTime >= stopAt) { 455 bg.kill; 456 bg.setReply(Reply.killedByTimeout); 457 } else { 458 bg.setReply(Reply.normalDeath); 459 } 460 } 461 462 RawPid osHandle() nothrow @trusted { 463 return rc.p.osHandle; 464 } 465 466 ref Pipe pipe() nothrow @trusted { 467 return rc.p.pipe; 468 } 469 470 ref FileReadChannel stderr() nothrow @trusted { 471 return rc.p.stderr; 472 } 473 474 void dispose() @trusted { 475 if (rc.backgroundReply.among(Reply.none, Reply.running)) { 476 rc.background.put(Msg.stop); 477 rc.background.join; 478 rc.backgroundReply = rc.background.reply; 479 } 480 rc.p.dispose; 481 } 482 483 void kill() nothrow @trusted { 484 rc.background.kill; 485 } 486 487 int wait() @trusted { 488 while (!this.tryWait) { 489 Thread.sleep(20.dur!"msecs"); 490 } 491 return rc.p.wait; 492 } 493 494 bool tryWait() @trusted { 495 return rc.p.tryWait; 496 } 497 498 int status() @trusted { 499 return rc.p.status; 500 } 501 502 bool terminated() @trusted { 503 return rc.p.terminated; 504 } 505 506 bool timeoutTriggered() @trusted { 507 if (rc.backgroundReply.among(Reply.none, Reply.running)) { 508 rc.background.put(Msg.status); 509 rc.backgroundReply = rc.background.reply; 510 } 511 return rc.backgroundReply == Reply.killedByTimeout; 512 } 513 } 514 515 auto timeout(T)(T p, Duration timeout_) @trusted { 516 return Timeout!T(p, timeout_); 517 } 518 519 /// Returns when the process has pending data. 520 void waitForPendingData(ProcessT)(Process p) { 521 while (!p.pipe.hasPendingData || !p.stderr.hasPendingData) { 522 Thread.sleep(20.dur!"msecs"); 523 } 524 } 525 526 @("shall kill the process after the timeout") 527 unittest { 528 import std.datetime.stopwatch : StopWatch, AutoStart; 529 530 auto p = pipeProcess(["sleep", "1m"]).timeout(100.dur!"msecs").scopeKill; 531 auto sw = StopWatch(AutoStart.yes); 532 p.wait; 533 sw.stop; 534 535 sw.peek.shouldBeGreaterThan(100.dur!"msecs"); 536 sw.peek.shouldBeSmallerThan(500.dur!"msecs"); 537 p.wait.shouldEqual(-9); 538 p.terminated.shouldBeTrue; 539 p.status.shouldEqual(-9); 540 p.timeoutTriggered.shouldBeTrue; 541 } 542 543 struct RawPid { 544 pid_t value; 545 alias value this; 546 } 547 548 RawPid[] getShallowChildren(const int parentPid) { 549 import std.algorithm : filter, splitter; 550 import std.conv : to; 551 import std.file : exists; 552 import std.path : buildPath; 553 554 const pidPath = buildPath("/proc", parentPid.to!string); 555 if (!exists(pidPath)) { 556 return null; 557 } 558 559 auto children = appender!(RawPid[])(); 560 foreach (const p; File(buildPath(pidPath, "task", parentPid.to!string, "children")).readln.splitter(" ") 561 .filter!(a => !a.empty)) { 562 try { 563 children.put(p.to!pid_t.RawPid); 564 } catch (Exception e) { 565 logger.trace(e.msg).collectException; 566 } 567 } 568 569 return children.data; 570 } 571 572 /// Returns: a list of all processes with the leafs being at the back. 573 RawPid[] getDeepChildren(const int parentPid) { 574 import std.container : DList; 575 576 auto children = DList!(RawPid)(); 577 578 children.insert(getShallowChildren(parentPid)); 579 auto res = appender!(RawPid[])(); 580 581 while (!children.empty) { 582 const p = children.front; 583 res.put(p); 584 children.insertBack(getShallowChildren(p)); 585 children.removeFront; 586 } 587 588 return res.data; 589 } 590 591 struct DrainElement { 592 enum Type { 593 stdout, 594 stderr, 595 } 596 597 Type type; 598 const(ubyte)[] data; 599 600 /// Returns: iterates the data as an input range. 601 auto byUTF8() @safe pure nothrow const @nogc { 602 static import std.utf; 603 604 return std.utf.byUTF!(const(char))(cast(const(char)[]) data); 605 } 606 607 bool empty() @safe pure nothrow const @nogc { 608 return data.length == 0; 609 } 610 } 611 612 /** A range that drains a process stdout/stderr until it terminates. 613 * 614 * There may be `DrainElement` that are empty. 615 */ 616 struct DrainRange(ProcessT) { 617 enum State { 618 start, 619 draining, 620 lastStdout, 621 lastStderr, 622 lastElement, 623 empty, 624 } 625 626 private { 627 ProcessT p; 628 DrainElement front_; 629 State st; 630 ubyte[] buf; 631 ubyte[] bufRead; 632 } 633 634 this(ProcessT p) { 635 this.p = p; 636 this.buf = new ubyte[4096]; 637 } 638 639 DrainElement front() @safe pure nothrow const @nogc { 640 assert(!empty, "Can't get front of an empty range"); 641 return front_; 642 } 643 644 void popFront() @safe { 645 assert(!empty, "Can't pop front of an empty range"); 646 647 bool isAnyPipeOpen() { 648 return p.pipe.hasData || p.stderr.hasData; 649 } 650 651 void readData() @safe { 652 if (p.stderr.hasData && p.stderr.hasPendingData) { 653 front_ = DrainElement(DrainElement.Type.stderr); 654 bufRead = p.stderr.read(buf); 655 } else if (p.pipe.hasData && p.pipe.hasPendingData) { 656 front_ = DrainElement(DrainElement.Type.stdout); 657 bufRead = p.pipe.read(buf); 658 } 659 } 660 661 void waitUntilData() @safe { 662 while (bufRead.empty && isAnyPipeOpen) { 663 import core.thread : Thread; 664 import core.time : dur; 665 666 readData(); 667 if (front_.data.empty) { 668 () @trusted { Thread.sleep(20.dur!"msecs"); }(); 669 } 670 } 671 front_.data = bufRead.dup; 672 } 673 674 front_ = DrainElement.init; 675 bufRead = null; 676 677 final switch (st) { 678 case State.start: 679 st = State.draining; 680 waitUntilData; 681 break; 682 case State.draining: 683 if (isAnyPipeOpen) { 684 waitUntilData(); 685 } else { 686 st = State.lastStdout; 687 } 688 break; 689 case State.lastStdout: 690 st = State.lastStderr; 691 readData(); 692 if (p.pipe.hasData) { 693 st = State.lastStdout; 694 } 695 break; 696 case State.lastStderr: 697 st = State.lastElement; 698 readData(); 699 if (p.stderr.hasData) { 700 st = State.lastStderr; 701 } 702 break; 703 case State.lastElement: 704 st = State.empty; 705 break; 706 case State.empty: 707 break; 708 } 709 } 710 711 bool empty() @safe pure nothrow const @nogc { 712 return st == State.empty; 713 } 714 } 715 716 /// Drain a process pipe until empty. 717 auto drain(T)(T p) { 718 return DrainRange!T(p); 719 } 720 721 /// Read the data from a ReadChannel by line. 722 struct DrainByLineCopyRange(ProcessT) { 723 private { 724 ProcessT process; 725 DrainRange!ProcessT range; 726 const(ubyte)[] buf; 727 const(char)[] line; 728 } 729 730 this(ProcessT p) @safe { 731 process = p; 732 range = p.drain; 733 } 734 735 string front() @trusted pure nothrow const @nogc { 736 import std.exception : assumeUnique; 737 738 assert(!empty, "Can't get front of an empty range"); 739 return line.assumeUnique; 740 } 741 742 void popFront() @safe { 743 assert(!empty, "Can't pop front of an empty range"); 744 import std.algorithm : countUntil; 745 import std.array : array; 746 static import std.utf; 747 748 void fillBuf() { 749 if (!range.empty) { 750 range.popFront; 751 } 752 if (!range.empty) { 753 buf ~= range.front.data; 754 } 755 } 756 757 size_t idx; 758 do { 759 fillBuf(); 760 idx = buf.countUntil('\n'); 761 } 762 while (!range.empty && idx == -1); 763 764 const(ubyte)[] tmp; 765 if (buf.empty) { 766 // do nothing 767 } else if (idx == -1) { 768 tmp = buf; 769 buf = null; 770 } else { 771 idx = () { 772 if (idx < buf.length) { 773 return idx + 1; 774 } 775 return idx; 776 }(); 777 tmp = buf[0 .. idx]; 778 buf = buf[idx .. $]; 779 } 780 781 if (!tmp.empty && tmp[$ - 1] == '\n') { 782 tmp = tmp[0 .. $ - 1]; 783 } 784 785 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array; 786 } 787 788 bool empty() @safe pure nothrow const @nogc { 789 return range.empty && buf.empty && line.empty; 790 } 791 } 792 793 @("shall drain the process output by line") 794 unittest { 795 import std.algorithm : filter, count, joiner, map; 796 import std.array : array; 797 798 auto p = pipeProcess(["dd", "if=/dev/zero", "bs=10", "count=3"]).scopeKill; 799 auto res = p.process.drainByLineCopy.filter!"!a.empty".array; 800 801 res.length.shouldEqual(4); 802 res.joiner.count.shouldBeGreaterThan(30); 803 p.wait.shouldEqual(0); 804 p.terminated.shouldBeTrue; 805 } 806 807 auto drainByLineCopy(T)(T p) @safe { 808 return DrainByLineCopyRange!T(p); 809 } 810 811 /// Drain the process output until it is done executing. 812 auto drainToNull(T)(T p) @safe { 813 foreach (l; p.drain) { 814 } 815 return p; 816 } 817 818 /// Drain the output from the process into an output range. 819 auto drain(ProcessT, T)(ProcessT p, ref T range) { 820 foreach (l; p.drain) { 821 range.put(l); 822 } 823 return p; 824 } 825 826 @("shall drain the output of a process while it is running with a separation of stdout and stderr") 827 unittest { 828 auto p = pipeProcess(["dd", "if=/dev/urandom", "bs=10", "count=3"]).scopeKill; 829 auto res = p.process.drain.array; 830 831 // this is just a sanity check. It has to be kind a high because there is 832 // some wiggleroom allowed 833 res.count.shouldBeSmallerThan(50); 834 835 res.filter!(a => a.type == DrainElement.Type.stdout) 836 .map!(a => a.data) 837 .joiner 838 .count 839 .shouldEqual(30); 840 res.filter!(a => a.type == DrainElement.Type.stderr).count.shouldBeGreaterThan(0); 841 p.wait.shouldEqual(0); 842 p.terminated.shouldBeTrue; 843 } 844 845 @("shall kill the process tree when the timeout is reached") 846 unittest { 847 immutable script = makeScript(`#!/bin/bash 848 sleep 10m 849 `); 850 scope (exit) 851 remove(script); 852 853 auto p = pipeProcess([script]).sandbox.timeout(1.dur!"seconds").scopeKill; 854 for (int i = 0; getDeepChildren(p.osHandle).count < 1; ++i) { 855 Thread.sleep(50.dur!"msecs"); 856 } 857 const preChildren = getDeepChildren(p.osHandle).count; 858 const res = p.process.drain.array; 859 const postChildren = getDeepChildren(p.osHandle).count; 860 861 p.wait.shouldEqual(-9); 862 p.terminated.shouldBeTrue; 863 preChildren.shouldEqual(1); 864 postChildren.shouldEqual(0); 865 } 866 867 string makeScript(string script, string file = __FILE__, uint line = __LINE__) { 868 import core.sys.posix.sys.stat; 869 import std.file : getAttributes, setAttributes, thisExePath; 870 import std.stdio : File; 871 import std.path : baseName; 872 import std.conv : to; 873 874 immutable fname = thisExePath ~ "_" ~ file.baseName ~ line.to!string ~ ".sh"; 875 876 File(fname, "w").writeln(script); 877 setAttributes(fname, getAttributes(fname) | S_IXUSR | S_IXGRP | S_IXOTH); 878 return fname; 879 }