1 /**
2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved.
3 License: MPL-2
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 
6 This Source Code Form is subject to the terms of the Mozilla Public License,
7 v.2.0. If a copy of the MPL was not distributed with this file, You can obtain
8 one at http://mozilla.org/MPL/2.0/.
9 */
10 module process;
11 
12 import core.sys.posix.unistd : pid_t;
13 import core.thread : Thread;
14 import core.time : dur, Duration;
15 import logger = std.experimental.logger;
16 import std.algorithm : filter, count, joiner, map;
17 import std.array : appender, empty, array;
18 import std.exception : collectException;
19 import std.stdio : File, fileno, writeln;
20 static import std.process;
21 
22 public import process.channel;
23 
24 version (unittest) {
25     import unit_threaded.assertions;
26     import std.file : remove;
27 }
28 
29 /// Automatically terminate the process when it goes out of scope.
30 auto scopeKill(T)(T p) {
31     return ScopeKill!T(p);
32 }
33 
34 struct ScopeKill(T) {
35     T process;
36     alias process this;
37 
38     ~this() {
39         process.dispose();
40     }
41 }
42 
43 /** Async process that do not block on read from stdin/stderr.
44  */
45 struct PipeProcess {
46     import std.algorithm : among;
47 
48     private {
49         enum State {
50             running,
51             terminated,
52             exitCode
53         }
54 
55         std.process.ProcessPipes process;
56         Pipe pipe_;
57         FileReadChannel stderr_;
58         int status_;
59         State st;
60     }
61 
62     this(std.process.ProcessPipes process) @safe {
63         this.process = process;
64         this.pipe_ = Pipe(this.process.stdout, this.process.stdin);
65         this.stderr_ = FileReadChannel(this.process.stderr);
66     }
67 
68     /// Returns: The raw OS handle for the process ID.
69     RawPid osHandle() nothrow @safe {
70         return process.pid.osHandle.RawPid;
71     }
72 
73     /// Access to stdin and stdout.
74     ref Pipe pipe() return scope nothrow @safe {
75         return pipe_;
76     }
77 
78     /// Access stderr.
79     ref FileReadChannel stderr() return scope nothrow @safe {
80         return stderr_;
81     }
82 
83     /// Kill and cleanup the process.
84     void dispose() @safe {
85         final switch (st) {
86         case State.running:
87             this.kill;
88             this.wait;
89             .destroy(process);
90             break;
91         case State.terminated:
92             this.wait;
93             .destroy(process);
94             break;
95         case State.exitCode:
96             break;
97         }
98 
99         st = State.exitCode;
100     }
101 
102     /// Kill the process.
103     void kill() nothrow @trusted {
104         import core.sys.posix.signal : SIGKILL;
105 
106         final switch (st) {
107         case State.running:
108             break;
109         case State.terminated:
110             return;
111         case State.exitCode:
112             return;
113         }
114 
115         try {
116             std.process.kill(process.pid, SIGKILL);
117         } catch (Exception e) {
118         }
119 
120         st = State.terminated;
121     }
122 
123     /// Blocking wait for the process to terminated.
124     /// Returns: the exit status.
125     int wait() @safe {
126         final switch (st) {
127         case State.running:
128             status_ = std.process.wait(process.pid);
129             break;
130         case State.terminated:
131             status_ = std.process.wait(process.pid);
132             break;
133         case State.exitCode:
134             break;
135         }
136 
137         st = State.exitCode;
138 
139         return status_;
140     }
141 
142     /// Non-blocking wait for the process termination.
143     /// Returns: `true` if the process has terminated.
144     bool tryWait() @safe {
145         final switch (st) {
146         case State.running:
147             auto s = std.process.tryWait(process.pid);
148             if (s.terminated) {
149                 st = State.exitCode;
150                 status_ = s.status;
151             }
152             break;
153         case State.terminated:
154             status_ = std.process.wait(process.pid);
155             st = State.exitCode;
156             break;
157         case State.exitCode:
158             break;
159         }
160 
161         return st.among(State.terminated, State.exitCode) != 0;
162     }
163 
164     /// Returns: The exit status of the process.
165     int status() @safe {
166         if (st != State.exitCode) {
167             throw new Exception(
168                     "Process has not terminated and wait/tryWait been called to collect the exit status");
169         }
170         return status_;
171     }
172 
173     /// Returns: If the process has terminated.
174     bool terminated() @safe {
175         return st.among(State.terminated, State.exitCode) != 0;
176     }
177 }
178 
179 PipeProcess pipeProcess(scope const(char[])[] args,
180         std.process.Redirect redirect = std.process.Redirect.all,
181         const string[string] env = null, std.process.Config config = std.process.Config.none,
182         scope const(char)[] workDir = null) @safe {
183     return PipeProcess(std.process.pipeProcess(args, redirect, env, config, workDir));
184 }
185 
186 PipeProcess pipeShell(scope const(char)[] command,
187         std.process.Redirect redirect = std.process.Redirect.all,
188         const string[string] env = null, std.process.Config config = std.process.Config.none,
189         scope const(char)[] workDir = null, string shellPath = std.process.nativeShell) @safe {
190     return PipeProcess(std.process.pipeShell(command, redirect, env, config, workDir, shellPath));
191 }
192 
193 /** Moves the process to a separate process group and on exit kill it and all
194  * its children.
195  */
196 struct Sandbox(ProcessT) {
197     private {
198         ProcessT p;
199     }
200 
201     this(ProcessT p) @safe {
202         import core.sys.posix.unistd : setpgid;
203 
204         this.p = p;
205         setpgid(p.osHandle, 0);
206     }
207 
208     RawPid osHandle() nothrow @safe {
209         return p.osHandle;
210     }
211 
212     ref Pipe pipe() nothrow @safe {
213         return p.pipe;
214     }
215 
216     ref FileReadChannel stderr() nothrow @safe {
217         return p.stderr;
218     }
219 
220     void dispose() @safe {
221         // this also reaps the children thus cleaning up zombies
222         this.kill;
223         p.dispose;
224     }
225 
226     void kill() nothrow @safe {
227         static import core.sys.posix.signal;
228         import core.sys.posix.sys.wait : waitpid, WNOHANG;
229 
230         static RawPid[] update(RawPid[] pids) @trusted {
231             auto app = appender!(RawPid[])();
232 
233             foreach (p; pids) {
234                 try {
235                     app.put(getDeepChildren(p));
236                 } catch (Exception e) {
237                 }
238             }
239 
240             return app.data;
241         }
242 
243         static void killChildren(RawPid[] children) @trusted {
244             foreach (const c; children) {
245                 core.sys.posix.signal.kill(c, core.sys.posix.signal.SIGKILL);
246             }
247         }
248 
249         p.kill;
250         auto children = update([p.osHandle]);
251         auto reapChildren = appender!(RawPid[])();
252         // if there ever are processes that are spawned with root permissions
253         // or something happens that they can't be killed by "this" process
254         // tree. Thus limit the iterations to a reasonable number
255         for (int i = 0; !children.empty && i < 5; ++i) {
256             reapChildren.put(children);
257             killChildren(children);
258             children = update(children);
259         }
260 
261         foreach (c; reapChildren.data) {
262             () @trusted { waitpid(c, null, WNOHANG); }();
263         }
264     }
265 
266     int wait() @safe {
267         return p.wait;
268     }
269 
270     bool tryWait() @safe {
271         return p.tryWait;
272     }
273 
274     int status() @safe {
275         return p.status;
276     }
277 
278     bool terminated() @safe {
279         return p.terminated;
280     }
281 }
282 
283 auto sandbox(T)(T p) @safe {
284     return Sandbox!T(p);
285 }
286 
287 @("shall terminate a group of processes")
288 unittest {
289     import std.algorithm : count;
290     import std.datetime.stopwatch : StopWatch, AutoStart;
291 
292     immutable scriptName = makeScript(`#!/bin/bash
293 sleep 10m &
294 sleep 10m &
295 sleep 10m
296 `);
297     scope (exit)
298         remove(scriptName);
299 
300     auto p = pipeProcess([scriptName]).sandbox.scopeKill;
301     for (int i = 0; getDeepChildren(p.osHandle).count < 3; ++i) {
302         Thread.sleep(50.dur!"msecs");
303     }
304     const preChildren = getDeepChildren(p.osHandle).count;
305     p.kill;
306     Thread.sleep(500.dur!"msecs"); // wait for the OS to kill the children
307     const postChildren = getDeepChildren(p.osHandle).count;
308 
309     p.wait.shouldEqual(-9);
310     p.terminated.shouldBeTrue;
311     preChildren.shouldEqual(3);
312     postChildren.shouldEqual(0);
313 }
314 
315 /** dispose the process after the timeout.
316  */
317 struct Timeout(ProcessT) {
318     import std.algorithm : among;
319     import std.datetime : Clock, Duration;
320     import core.thread;
321     import std.typecons : RefCounted, refCounted;
322 
323     private {
324         enum Msg {
325             none,
326             stop,
327             status,
328         }
329 
330         enum Reply {
331             none,
332             running,
333             normalDeath,
334             killedByTimeout,
335         }
336 
337         static struct Payload {
338             ProcessT p;
339             Background background;
340             Reply backgroundReply;
341         }
342 
343         RefCounted!Payload rc;
344     }
345 
346     this(ProcessT p, Duration timeout) @trusted {
347         import std.algorithm : move;
348 
349         rc = refCounted(Payload(move(p)));
350         rc.background = new Background(&rc.p, timeout);
351         rc.background.isDaemon = true;
352         rc.background.start;
353     }
354 
355     private static class Background : Thread {
356         import core.sync.condition : Condition;
357         import core.sync.mutex : Mutex;
358 
359         Duration timeout;
360         ProcessT* p;
361         Mutex mtx;
362         Msg[] msg;
363         Reply reply_;
364 
365         this(ProcessT* p, Duration timeout) {
366             this.p = p;
367             this.timeout = timeout;
368             this.mtx = new Mutex();
369 
370             super(&run);
371         }
372 
373         void run() {
374             checkProcess(p.osHandle, this.timeout, this);
375         }
376 
377         void put(Msg msg) @trusted {
378             this.mtx.lock_nothrow();
379             scope (exit)
380                 this.mtx.unlock_nothrow();
381             this.msg ~= msg;
382         }
383 
384         Msg popMsg() @trusted nothrow {
385             this.mtx.lock_nothrow();
386             scope (exit)
387                 this.mtx.unlock_nothrow();
388             if (msg.empty)
389                 return Msg.none;
390             auto rval = msg[$ - 1];
391             msg = msg[0 .. $ - 1];
392             return rval;
393         }
394 
395         void setReply(Reply reply_) @trusted {
396             {
397                 this.mtx.lock_nothrow();
398                 scope (exit)
399                     this.mtx.unlock_nothrow();
400                 this.reply_ = reply_;
401             }
402         }
403 
404         Reply reply() @trusted nothrow {
405             this.mtx.lock_nothrow();
406             scope (exit)
407                 this.mtx.unlock_nothrow();
408             return reply_;
409         }
410 
411         void kill() @trusted nothrow {
412             this.mtx.lock_nothrow();
413             scope (exit)
414                 this.mtx.unlock_nothrow();
415             p.kill;
416         }
417     }
418 
419     private static void checkProcess(RawPid p, Duration timeout, Background bg) {
420         import core.sys.posix.signal : SIGKILL;
421         import std.algorithm : max, min;
422         import std.variant : Variant;
423         static import core.sys.posix.signal;
424 
425         const stopAt = Clock.currTime + timeout;
426         // the purpose is to poll the process often "enough" that if it
427         // terminates early `Process` detects it fast enough. 1000 is chosen
428         // because it "feels good". the purpose
429         auto sleepInterval = min(500, max(20, timeout.total!"msecs" / 1000)).dur!"msecs";
430 
431         bool forceStop;
432         bool running = true;
433         while (running && Clock.currTime < stopAt) {
434             const msg = bg.popMsg;
435 
436             final switch (msg) {
437             case Msg.none:
438                 Thread.sleep(sleepInterval);
439                 break;
440             case Msg.stop:
441                 forceStop = true;
442                 running = false;
443                 break;
444             case Msg.status:
445                 bg.setReply(Reply.running);
446                 break;
447             }
448 
449             if (core.sys.posix.signal.kill(p, 0) == -1) {
450                 running = false;
451             }
452         }
453 
454         if (!forceStop && Clock.currTime >= stopAt) {
455             bg.kill;
456             bg.setReply(Reply.killedByTimeout);
457         } else {
458             bg.setReply(Reply.normalDeath);
459         }
460     }
461 
462     RawPid osHandle() nothrow @trusted {
463         return rc.p.osHandle;
464     }
465 
466     ref Pipe pipe() nothrow @trusted {
467         return rc.p.pipe;
468     }
469 
470     ref FileReadChannel stderr() nothrow @trusted {
471         return rc.p.stderr;
472     }
473 
474     void dispose() @trusted {
475         if (rc.backgroundReply.among(Reply.none, Reply.running)) {
476             rc.background.put(Msg.stop);
477             rc.background.join;
478             rc.backgroundReply = rc.background.reply;
479         }
480         rc.p.dispose;
481     }
482 
483     void kill() nothrow @trusted {
484         rc.background.kill;
485     }
486 
487     int wait() @trusted {
488         while (!this.tryWait) {
489             Thread.sleep(20.dur!"msecs");
490         }
491         return rc.p.wait;
492     }
493 
494     bool tryWait() @trusted {
495         return rc.p.tryWait;
496     }
497 
498     int status() @trusted {
499         return rc.p.status;
500     }
501 
502     bool terminated() @trusted {
503         return rc.p.terminated;
504     }
505 
506     bool timeoutTriggered() @trusted {
507         if (rc.backgroundReply.among(Reply.none, Reply.running)) {
508             rc.background.put(Msg.status);
509             rc.backgroundReply = rc.background.reply;
510         }
511         return rc.backgroundReply == Reply.killedByTimeout;
512     }
513 }
514 
515 auto timeout(T)(T p, Duration timeout_) @trusted {
516     return Timeout!T(p, timeout_);
517 }
518 
519 /// Returns when the process has pending data.
520 void waitForPendingData(ProcessT)(Process p) {
521     while (!p.pipe.hasPendingData || !p.stderr.hasPendingData) {
522         Thread.sleep(20.dur!"msecs");
523     }
524 }
525 
526 @("shall kill the process after the timeout")
527 unittest {
528     import std.datetime.stopwatch : StopWatch, AutoStart;
529 
530     auto p = pipeProcess(["sleep", "1m"]).timeout(100.dur!"msecs").scopeKill;
531     auto sw = StopWatch(AutoStart.yes);
532     p.wait;
533     sw.stop;
534 
535     sw.peek.shouldBeGreaterThan(100.dur!"msecs");
536     sw.peek.shouldBeSmallerThan(500.dur!"msecs");
537     p.wait.shouldEqual(-9);
538     p.terminated.shouldBeTrue;
539     p.status.shouldEqual(-9);
540     p.timeoutTriggered.shouldBeTrue;
541 }
542 
543 struct RawPid {
544     pid_t value;
545     alias value this;
546 }
547 
548 RawPid[] getShallowChildren(const int parentPid) {
549     import std.algorithm : filter, splitter;
550     import std.conv : to;
551     import std.file : exists;
552     import std.path : buildPath;
553 
554     const pidPath = buildPath("/proc", parentPid.to!string);
555     if (!exists(pidPath)) {
556         return null;
557     }
558 
559     auto children = appender!(RawPid[])();
560     foreach (const p; File(buildPath(pidPath, "task", parentPid.to!string, "children")).readln.splitter(" ")
561             .filter!(a => !a.empty)) {
562         try {
563             children.put(p.to!pid_t.RawPid);
564         } catch (Exception e) {
565             logger.trace(e.msg).collectException;
566         }
567     }
568 
569     return children.data;
570 }
571 
572 /// Returns: a list of all processes with the leafs being at the back.
573 RawPid[] getDeepChildren(const int parentPid) {
574     import std.container : DList;
575 
576     auto children = DList!(RawPid)();
577 
578     children.insert(getShallowChildren(parentPid));
579     auto res = appender!(RawPid[])();
580 
581     while (!children.empty) {
582         const p = children.front;
583         res.put(p);
584         children.insertBack(getShallowChildren(p));
585         children.removeFront;
586     }
587 
588     return res.data;
589 }
590 
591 struct DrainElement {
592     enum Type {
593         stdout,
594         stderr,
595     }
596 
597     Type type;
598     const(ubyte)[] data;
599 
600     /// Returns: iterates the data as an input range.
601     auto byUTF8() @safe pure nothrow const @nogc {
602         static import std.utf;
603 
604         return std.utf.byUTF!(const(char))(cast(const(char)[]) data);
605     }
606 
607     bool empty() @safe pure nothrow const @nogc {
608         return data.length == 0;
609     }
610 }
611 
612 /** A range that drains a process stdout/stderr until it terminates.
613  *
614  * There may be `DrainElement` that are empty.
615  */
616 struct DrainRange(ProcessT) {
617     enum State {
618         start,
619         draining,
620         lastStdout,
621         lastStderr,
622         lastElement,
623         empty,
624     }
625 
626     private {
627         ProcessT p;
628         DrainElement front_;
629         State st;
630         ubyte[] buf;
631         ubyte[] bufRead;
632     }
633 
634     this(ProcessT p) {
635         this.p = p;
636         this.buf = new ubyte[4096];
637     }
638 
639     DrainElement front() @safe pure nothrow const @nogc {
640         assert(!empty, "Can't get front of an empty range");
641         return front_;
642     }
643 
644     void popFront() @safe {
645         assert(!empty, "Can't pop front of an empty range");
646 
647         bool isAnyPipeOpen() {
648             return p.pipe.hasData || p.stderr.hasData;
649         }
650 
651         void readData() @safe {
652             if (p.stderr.hasData && p.stderr.hasPendingData) {
653                 front_ = DrainElement(DrainElement.Type.stderr);
654                 bufRead = p.stderr.read(buf);
655             } else if (p.pipe.hasData && p.pipe.hasPendingData) {
656                 front_ = DrainElement(DrainElement.Type.stdout);
657                 bufRead = p.pipe.read(buf);
658             }
659         }
660 
661         void waitUntilData() @safe {
662             while (bufRead.empty && isAnyPipeOpen) {
663                 import core.thread : Thread;
664                 import core.time : dur;
665 
666                 readData();
667                 if (front_.data.empty) {
668                     () @trusted { Thread.sleep(20.dur!"msecs"); }();
669                 }
670             }
671             front_.data = bufRead.dup;
672         }
673 
674         front_ = DrainElement.init;
675         bufRead = null;
676 
677         final switch (st) {
678         case State.start:
679             st = State.draining;
680             waitUntilData;
681             break;
682         case State.draining:
683             if (isAnyPipeOpen) {
684                 waitUntilData();
685             } else {
686                 st = State.lastStdout;
687             }
688             break;
689         case State.lastStdout:
690             st = State.lastStderr;
691             readData();
692             if (p.pipe.hasData) {
693                 st = State.lastStdout;
694             }
695             break;
696         case State.lastStderr:
697             st = State.lastElement;
698             readData();
699             if (p.stderr.hasData) {
700                 st = State.lastStderr;
701             }
702             break;
703         case State.lastElement:
704             st = State.empty;
705             break;
706         case State.empty:
707             break;
708         }
709     }
710 
711     bool empty() @safe pure nothrow const @nogc {
712         return st == State.empty;
713     }
714 }
715 
716 /// Drain a process pipe until empty.
717 auto drain(T)(T p) {
718     return DrainRange!T(p);
719 }
720 
721 /// Read the data from a ReadChannel by line.
722 struct DrainByLineCopyRange(ProcessT) {
723     private {
724         ProcessT process;
725         DrainRange!ProcessT range;
726         const(ubyte)[] buf;
727         const(char)[] line;
728     }
729 
730     this(ProcessT p) @safe {
731         process = p;
732         range = p.drain;
733     }
734 
735     string front() @trusted pure nothrow const @nogc {
736         import std.exception : assumeUnique;
737 
738         assert(!empty, "Can't get front of an empty range");
739         return line.assumeUnique;
740     }
741 
742     void popFront() @safe {
743         assert(!empty, "Can't pop front of an empty range");
744         import std.algorithm : countUntil;
745         import std.array : array;
746         static import std.utf;
747 
748         void fillBuf() {
749             if (!range.empty) {
750                 range.popFront;
751             }
752             if (!range.empty) {
753                 buf ~= range.front.data;
754             }
755         }
756 
757         size_t idx;
758         do {
759             fillBuf();
760             idx = buf.countUntil('\n');
761         }
762         while (!range.empty && idx == -1);
763 
764         const(ubyte)[] tmp;
765         if (buf.empty) {
766             // do nothing
767         } else if (idx == -1) {
768             tmp = buf;
769             buf = null;
770         } else {
771             idx = () {
772                 if (idx < buf.length) {
773                     return idx + 1;
774                 }
775                 return idx;
776             }();
777             tmp = buf[0 .. idx];
778             buf = buf[idx .. $];
779         }
780 
781         if (!tmp.empty && tmp[$ - 1] == '\n') {
782             tmp = tmp[0 .. $ - 1];
783         }
784 
785         line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array;
786     }
787 
788     bool empty() @safe pure nothrow const @nogc {
789         return range.empty && buf.empty && line.empty;
790     }
791 }
792 
793 @("shall drain the process output by line")
794 unittest {
795     import std.algorithm : filter, count, joiner, map;
796     import std.array : array;
797 
798     auto p = pipeProcess(["dd", "if=/dev/zero", "bs=10", "count=3"]).scopeKill;
799     auto res = p.process.drainByLineCopy.filter!"!a.empty".array;
800 
801     res.length.shouldEqual(4);
802     res.joiner.count.shouldBeGreaterThan(30);
803     p.wait.shouldEqual(0);
804     p.terminated.shouldBeTrue;
805 }
806 
807 auto drainByLineCopy(T)(T p) @safe {
808     return DrainByLineCopyRange!T(p);
809 }
810 
811 /// Drain the process output until it is done executing.
812 auto drainToNull(T)(T p) @safe {
813     foreach (l; p.drain) {
814     }
815     return p;
816 }
817 
818 /// Drain the output from the process into an output range.
819 auto drain(ProcessT, T)(ProcessT p, ref T range) {
820     foreach (l; p.drain) {
821         range.put(l);
822     }
823     return p;
824 }
825 
826 @("shall drain the output of a process while it is running with a separation of stdout and stderr")
827 unittest {
828     auto p = pipeProcess(["dd", "if=/dev/urandom", "bs=10", "count=3"]).scopeKill;
829     auto res = p.process.drain.array;
830 
831     // this is just a sanity check. It has to be kind a high because there is
832     // some wiggleroom allowed
833     res.count.shouldBeSmallerThan(50);
834 
835     res.filter!(a => a.type == DrainElement.Type.stdout)
836         .map!(a => a.data)
837         .joiner
838         .count
839         .shouldEqual(30);
840     res.filter!(a => a.type == DrainElement.Type.stderr).count.shouldBeGreaterThan(0);
841     p.wait.shouldEqual(0);
842     p.terminated.shouldBeTrue;
843 }
844 
845 @("shall kill the process tree when the timeout is reached")
846 unittest {
847     immutable script = makeScript(`#!/bin/bash
848 sleep 10m
849 `);
850     scope (exit)
851         remove(script);
852 
853     auto p = pipeProcess([script]).sandbox.timeout(1.dur!"seconds").scopeKill;
854     for (int i = 0; getDeepChildren(p.osHandle).count < 1; ++i) {
855         Thread.sleep(50.dur!"msecs");
856     }
857     const preChildren = getDeepChildren(p.osHandle).count;
858     const res = p.process.drain.array;
859     const postChildren = getDeepChildren(p.osHandle).count;
860 
861     p.wait.shouldEqual(-9);
862     p.terminated.shouldBeTrue;
863     preChildren.shouldEqual(1);
864     postChildren.shouldEqual(0);
865 }
866 
867 string makeScript(string script, string file = __FILE__, uint line = __LINE__) {
868     import core.sys.posix.sys.stat;
869     import std.file : getAttributes, setAttributes, thisExePath;
870     import std.stdio : File;
871     import std.path : baseName;
872     import std.conv : to;
873 
874     immutable fname = thisExePath ~ "_" ~ file.baseName ~ line.to!string ~ ".sh";
875 
876     File(fname, "w").writeln(script);
877     setAttributes(fname, getAttributes(fname) | S_IXUSR | S_IXGRP | S_IXOTH);
878     return fname;
879 }