1 /**
2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module proc.process;
7 
8 import core.sys.posix.signal : SIGKILL;
9 import core.thread : Thread;
10 import core.time : dur, Duration;
11 import logger = std.experimental.logger;
12 import std.algorithm : filter, count, joiner, map;
13 import std.array : appender, empty, array;
14 import std.exception : collectException;
15 import std.stdio : File, fileno, writeln;
16 import std.typecons : Flag, Yes;
17 static import std.process;
18 static import std.stdio;
19 
20 import my.gc.refc;
21 import my.from_;
22 import my.path;
23 import my.named_type;
24 
25 public import proc.channel;
26 public import proc.pid;
27 
28 version (unittest) {
29     import std.file : remove;
30 }
31 
32 /** Manage a process by reference counting so that it is terminated when the it
33  * stops being used such as the instance going out of scope.
34  */
35 auto rcKill(T)(T p, int signal = SIGKILL) {
36     return refCounted(ScopeKill!T(p, signal));
37 }
38 
39 // backward compatibility.
40 alias scopeKill = rcKill;
41 
42 struct ScopeKill(T) {
43     T process;
44     alias process this;
45 
46     private int signal = SIGKILL;
47     private bool hasProcess;
48 
49     this(T process, int signal) @safe {
50         this.process = process;
51         this.signal = signal;
52         this.hasProcess = true;
53     }
54 
55     ~this() @safe {
56         if (hasProcess)
57             process.dispose();
58     }
59 }
60 
61 /// Async process wrapper for a std.process SpawnProcess
62 struct SpawnProcess {
63     import core.sys.posix.signal : SIGKILL;
64     import std.algorithm : among;
65 
66     private {
67         enum State {
68             running,
69             terminated,
70             exitCode
71         }
72 
73         std.process.Pid process;
74         RawPid pid;
75         int status_;
76         State st;
77     }
78 
79     this(std.process.Pid process) @safe {
80         this.process = process;
81         this.pid = process.osHandle.RawPid;
82     }
83 
84     ~this() @safe {
85     }
86 
87     /// Returns: The raw OS handle for the process ID.
88     RawPid osHandle() nothrow @safe {
89         return pid;
90     }
91 
92     /// Kill and cleanup the process.
93     void dispose() @safe {
94         final switch (st) {
95         case State.running:
96             this.kill;
97             this.wait;
98             break;
99         case State.terminated:
100             this.wait;
101             break;
102         case State.exitCode:
103             break;
104         }
105 
106         st = State.exitCode;
107     }
108 
109     /** Send `signal` to the process.
110      *
111      * Param:
112      *  signal = a signal from `core.sys.posix.signal`
113      */
114     void kill(int signal = SIGKILL) nothrow @trusted {
115         final switch (st) {
116         case State.running:
117             break;
118         case State.terminated:
119             goto case;
120         case State.exitCode:
121             return;
122         }
123 
124         try {
125             std.process.kill(process, signal);
126         } catch (Exception e) {
127         }
128 
129         st = State.terminated;
130     }
131 
132     /// Blocking wait for the process to terminated.
133     /// Returns: the exit status.
134     int wait() @safe {
135         final switch (st) {
136         case State.running:
137             status_ = std.process.wait(process);
138             break;
139         case State.terminated:
140             status_ = std.process.wait(process);
141             break;
142         case State.exitCode:
143             break;
144         }
145 
146         st = State.exitCode;
147 
148         return status_;
149     }
150 
151     /// Non-blocking wait for the process termination.
152     /// Returns: `true` if the process has terminated.
153     bool tryWait() @safe {
154         final switch (st) {
155         case State.running:
156             auto s = std.process.tryWait(process);
157             if (s.terminated) {
158                 st = State.exitCode;
159                 status_ = s.status;
160             }
161             break;
162         case State.terminated:
163             status_ = std.process.wait(process);
164             st = State.exitCode;
165             break;
166         case State.exitCode:
167             break;
168         }
169 
170         return st.among(State.terminated, State.exitCode) != 0;
171     }
172 
173     /// Returns: The exit status of the process.
174     int status() @safe {
175         if (st != State.exitCode) {
176             throw new Exception(
177                     "Process has not terminated and wait/tryWait been called to collect the exit status");
178         }
179         return status_;
180     }
181 
182     /// Returns: If the process has terminated.
183     bool terminated() @safe {
184         return st.among(State.terminated, State.exitCode) != 0;
185     }
186 }
187 
188 /// Async process that do not block on read from stdin/stderr.
189 struct PipeProcess {
190     import std.algorithm : among;
191     import core.sys.posix.signal : SIGKILL;
192 
193     private {
194         enum State {
195             running,
196             terminated,
197             exitCode
198         }
199 
200         std.process.ProcessPipes process;
201         std.process.Pid pid;
202 
203         FileReadChannel stderr_;
204         FileReadChannel stdout_;
205         FileWriteChannel stdin_;
206         int status_;
207         State st;
208     }
209 
210     this(std.process.Pid pid, File stdin, File stdout, File stderr) @safe {
211         this.pid = pid;
212 
213         this.stdin_ = FileWriteChannel(stdin);
214         this.stdout_ = FileReadChannel(stdout);
215         this.stderr_ = FileReadChannel(stderr);
216     }
217 
218     this(std.process.ProcessPipes process, std.process.Redirect r) @safe {
219         this.process = process;
220         this.pid = process.pid;
221 
222         if (r & std.process.Redirect.stdin) {
223             stdin_ = FileWriteChannel(this.process.stdin);
224         }
225         if (r & std.process.Redirect.stdout) {
226             stdout_ = FileReadChannel(this.process.stdout);
227         }
228         if (r & std.process.Redirect.stderr) {
229             this.stderr_ = FileReadChannel(this.process.stderr);
230         }
231     }
232 
233     /// Returns: The raw OS handle for the process ID.
234     RawPid osHandle() nothrow @safe {
235         return pid.osHandle.RawPid;
236     }
237 
238     /// Access to stdout.
239     ref FileWriteChannel stdin() return scope nothrow @safe {
240         return stdin_;
241     }
242 
243     /// Access to stdout.
244     ref FileReadChannel stdout() return scope nothrow @safe {
245         return stdout_;
246     }
247 
248     /// Access stderr.
249     ref FileReadChannel stderr() return scope nothrow @safe {
250         return stderr_;
251     }
252 
253     /// Kill and cleanup the process.
254     void dispose() @safe {
255         final switch (st) {
256         case State.running:
257             this.kill;
258             this.wait;
259             .destroy(process);
260             break;
261         case State.terminated:
262             this.wait;
263             .destroy(process);
264             break;
265         case State.exitCode:
266             break;
267         }
268 
269         st = State.exitCode;
270     }
271 
272     /** Send `signal` to the process.
273      *
274      * Param:
275      *  signal = a signal from `core.sys.posix.signal`
276      */
277     void kill(int signal = SIGKILL) nothrow @trusted {
278         final switch (st) {
279         case State.running:
280             break;
281         case State.terminated:
282             return;
283         case State.exitCode:
284             return;
285         }
286 
287         try {
288             std.process.kill(pid, signal);
289         } catch (Exception e) {
290         }
291 
292         st = State.terminated;
293     }
294 
295     /// Blocking wait for the process to terminated.
296     /// Returns: the exit status.
297     int wait() @safe {
298         final switch (st) {
299         case State.running:
300             status_ = std.process.wait(pid);
301             break;
302         case State.terminated:
303             status_ = std.process.wait(pid);
304             break;
305         case State.exitCode:
306             break;
307         }
308 
309         st = State.exitCode;
310 
311         return status_;
312     }
313 
314     /// Non-blocking wait for the process termination.
315     /// Returns: `true` if the process has terminated.
316     bool tryWait() @safe {
317         final switch (st) {
318         case State.running:
319             auto s = std.process.tryWait(pid);
320             if (s.terminated) {
321                 st = State.exitCode;
322                 status_ = s.status;
323             }
324             break;
325         case State.terminated:
326             status_ = std.process.wait(pid);
327             st = State.exitCode;
328             break;
329         case State.exitCode:
330             break;
331         }
332 
333         return st.among(State.terminated, State.exitCode) != 0;
334     }
335 
336     /// Returns: The exit status of the process.
337     int status() @safe {
338         if (st != State.exitCode) {
339             throw new Exception(
340                     "Process has not terminated and wait/tryWait been called to collect the exit status");
341         }
342         return status_;
343     }
344 
345     /// Returns: If the process has terminated.
346     bool terminated() @safe {
347         return st.among(State.terminated, State.exitCode) != 0;
348     }
349 }
350 
351 SpawnProcess spawnProcess(scope const(char[])[] args, File stdin = std.stdio.stdin,
352         File stdout = std.stdio.stdout, File stderr = std.stdio.stderr,
353         const string[string] env = null, std.process.Config config = std.process.Config.none,
354         scope const char[] workDir = null) {
355     return SpawnProcess(std.process.spawnProcess(args, stdin, stdout, stderr,
356             env, config, workDir));
357 }
358 
359 SpawnProcess spawnProcess(scope const(char[])[] args, const string[string] env,
360         std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) {
361     return SpawnProcess(std.process.spawnProcess(args, std.stdio.stdin,
362             std.stdio.stdout, std.stdio.stderr, env, config, workDir));
363 }
364 
365 SpawnProcess spawnProcess(scope const(char)[] program,
366         File stdin = std.stdio.stdin, File stdout = std.stdio.stdout,
367         File stderr = std.stdio.stderr, const string[string] env = null,
368         std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) {
369     return SpawnProcess(std.process.spawnProcess((&program)[0 .. 1], stdin,
370             stdout, stderr, env, config, workDir));
371 }
372 
373 SpawnProcess spawnShell(scope const(char)[] command, File stdin = std.stdio.stdin,
374         File stdout = std.stdio.stdout, File stderr = std.stdio.stderr,
375         scope const string[string] env = null, std.process.Config config = std.process.Config.none,
376         scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) {
377     return SpawnProcess(std.process.spawnShell(command, stdin, stdout, stderr,
378             env, config, workDir, shellPath));
379 }
380 
381 /// ditto
382 SpawnProcess spawnShell(scope const(char)[] command, scope const string[string] env,
383         std.process.Config config = std.process.Config.none,
384         scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) {
385     return SpawnProcess(std.process.spawnShell(command, env, config, workDir, shellPath));
386 }
387 
388 PipeProcess pipeProcess(scope const(char[])[] args,
389         std.process.Redirect redirect = std.process.Redirect.all,
390         const string[string] env = null, std.process.Config config = std.process.Config.none,
391         scope const(char)[] workDir = null) @safe {
392     return PipeProcess(std.process.pipeProcess(args, redirect, env, config, workDir), redirect);
393 }
394 
395 PipeProcess pipeShell(scope const(char)[] command,
396         std.process.Redirect redirect = std.process.Redirect.all,
397         const string[string] env = null, std.process.Config config = std.process.Config.none,
398         scope const(char)[] workDir = null, string shellPath = std.process.nativeShell) @safe {
399     return PipeProcess(std.process.pipeShell(command, redirect, env, config,
400             workDir, shellPath), redirect);
401 }
402 
403 /** Moves the process to a separate process group and on exit kill it and all
404  * its children.
405  */
406 @safe struct Sandbox(ProcessT) {
407     import core.sys.posix.signal : SIGKILL;
408 
409     private {
410         ProcessT p;
411         RawPid pid;
412     }
413 
414     this(ProcessT p) @safe {
415         import core.sys.posix.unistd : setpgid;
416 
417         this.p = p;
418         this.pid = p.osHandle;
419         setpgid(pid, 0);
420     }
421 
422     RawPid osHandle() nothrow @safe {
423         return pid;
424     }
425 
426     static if (__traits(hasMember, ProcessT, "stdin")) {
427         ref FileWriteChannel stdin() nothrow @safe {
428             return p.stdin;
429         }
430     }
431 
432     static if (__traits(hasMember, ProcessT, "stdout")) {
433         ref FileReadChannel stdout() nothrow @safe {
434             return p.stdout;
435         }
436     }
437 
438     static if (__traits(hasMember, ProcessT, "stderr")) {
439         ref FileReadChannel stderr() nothrow @safe {
440             return p.stderr;
441         }
442     }
443 
444     void dispose() @safe {
445         // this also reaps the children thus cleaning up zombies
446         this.kill;
447         p.dispose;
448     }
449 
450     /** Send `signal` to the process.
451      *
452      * Param:
453      *  signal = a signal from `core.sys.posix.signal`
454      */
455     void kill(int signal = SIGKILL) nothrow @safe {
456         // must first retrieve the submap because after the process is killed
457         // its children may have changed.
458         auto pmap = makePidMap.getSubMap(pid);
459 
460         p.kill(signal);
461 
462         // only kill and reap the children
463         pmap.remove(pid);
464         proc.pid.kill(pmap, Yes.onlyCurrentUser, signal).reap;
465     }
466 
467     int wait() @safe {
468         return p.wait;
469     }
470 
471     bool tryWait() @safe {
472         return p.tryWait;
473     }
474 
475     int status() @safe {
476         return p.status;
477     }
478 
479     bool terminated() @safe {
480         return p.terminated;
481     }
482 }
483 
484 auto sandbox(T)(T p) @safe {
485     return Sandbox!T(p);
486 }
487 
488 @("shall terminate a group of processes")
489 unittest {
490     import std.datetime.stopwatch : StopWatch, AutoStart;
491 
492     immutable scriptName = makeScript(`#!/bin/bash
493 sleep 10m &
494 sleep 10m &
495 sleep 10m
496 `);
497     scope (exit)
498         remove(scriptName);
499 
500     auto p = pipeProcess([scriptName]).sandbox.rcKill;
501     waitUntilChildren(p.osHandle, 3);
502     const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
503     p.kill;
504     Thread.sleep(500.dur!"msecs"); // wait for the OS to kill the children
505     const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
506 
507     assert(p.wait == -9);
508     assert(p.terminated);
509     assert(preChildren == 3);
510     assert(postChildren == 0);
511 }
512 
513 /** dispose the process after the timeout.
514  */
515 @safe struct Timeout(ProcessT) {
516     import core.sys.posix.signal : SIGKILL;
517     import core.thread;
518     import std.algorithm : among;
519     import std.datetime : Clock, Duration;
520 
521     private {
522         enum Msg {
523             none,
524             stop,
525             status,
526         }
527 
528         enum Reply {
529             none,
530             running,
531             normalDeath,
532             killedByTimeout,
533         }
534 
535         static struct Payload {
536             ProcessT p;
537             RawPid pid;
538             Background background;
539             Reply backgroundReply;
540         }
541 
542         RefCounted!Payload rc;
543     }
544 
545     this(ProcessT p, Duration timeout) @trusted {
546         import std.algorithm : move;
547 
548         auto pid = p.osHandle;
549         rc = refCounted(Payload(move(p), pid));
550         rc.background = new Background(&rc.p, timeout);
551         rc.background.isDaemon = true;
552         rc.background.start;
553     }
554 
555     ~this() @trusted {
556         rc.release;
557     }
558 
559     private static class Background : Thread {
560         import core.sync.condition : Condition;
561         import core.sync.mutex : Mutex;
562 
563         Duration timeout;
564         ProcessT* p;
565         Mutex mtx;
566         Msg[] msg;
567         Reply reply_;
568         RawPid pid;
569         int signal = SIGKILL;
570 
571         this(ProcessT* p, Duration timeout) {
572             this.p = p;
573             this.timeout = timeout;
574             this.mtx = new Mutex();
575             this.pid = p.osHandle;
576 
577             super(&run);
578         }
579 
580         void run() {
581             checkProcess(this.pid, this.timeout, this);
582         }
583 
584         void put(Msg msg) @trusted nothrow {
585             this.mtx.lock_nothrow();
586             scope (exit)
587                 this.mtx.unlock_nothrow();
588             this.msg ~= msg;
589         }
590 
591         Msg popMsg() @trusted nothrow {
592             this.mtx.lock_nothrow();
593             scope (exit)
594                 this.mtx.unlock_nothrow();
595             if (msg.empty)
596                 return Msg.none;
597             auto rval = msg[$ - 1];
598             msg = msg[0 .. $ - 1];
599             return rval;
600         }
601 
602         void setReply(Reply reply_) @trusted nothrow {
603             this.mtx.lock_nothrow();
604             scope (exit)
605                 this.mtx.unlock_nothrow();
606             this.reply_ = reply_;
607         }
608 
609         Reply reply() @trusted nothrow {
610             this.mtx.lock_nothrow();
611             scope (exit)
612                 this.mtx.unlock_nothrow();
613             return reply_;
614         }
615 
616         void setSignal(int signal) @trusted nothrow {
617             this.mtx.lock_nothrow();
618             scope (exit)
619                 this.mtx.unlock_nothrow();
620             this.signal = signal;
621         }
622 
623         void kill() @trusted nothrow {
624             this.mtx.lock_nothrow();
625             scope (exit)
626                 this.mtx.unlock_nothrow();
627             p.kill(signal);
628         }
629     }
630 
631     private static void checkProcess(RawPid p, Duration timeout, Background bg) nothrow {
632         import std.algorithm : max, min;
633         import std.variant : Variant;
634         static import core.sys.posix.signal;
635 
636         const stopAt = Clock.currTime + timeout;
637         // the purpose is to poll the process often "enough" that if it
638         // terminates early `Process` detects it fast enough. 1000 is chosen
639         // because it "feels good". the purpose
640         auto sleepInterval = min(50, max(1, timeout.total!"msecs" / 1000)).dur!"msecs";
641 
642         bool forceStop;
643         bool running = true;
644         while (running && Clock.currTime < stopAt) {
645             const msg = bg.popMsg;
646 
647             final switch (msg) {
648             case Msg.none:
649                 () @trusted { Thread.sleep(sleepInterval); }();
650                 break;
651             case Msg.stop:
652                 forceStop = true;
653                 running = false;
654                 break;
655             case Msg.status:
656                 bg.setReply(Reply.running);
657                 break;
658             }
659 
660             () @trusted {
661                 if (core.sys.posix.signal.kill(p, 0) == -1) {
662                     running = false;
663                 }
664             }();
665         }
666 
667         // may be children alive thus must ensure that the whole process tree
668         // is killed if this is a sandbox with a timeout.
669         bg.kill();
670 
671         if (!forceStop && Clock.currTime >= stopAt) {
672             bg.setReply(Reply.killedByTimeout);
673         } else {
674             bg.setReply(Reply.normalDeath);
675         }
676     }
677 
678     RawPid osHandle() nothrow @trusted {
679         return rc.pid;
680     }
681 
682     static if (__traits(hasMember, ProcessT, "stdin")) {
683         ref FileWriteChannel stdin() nothrow @safe {
684             return rc.p.stdin;
685         }
686     }
687 
688     static if (__traits(hasMember, ProcessT, "stdout")) {
689         ref FileReadChannel stdout() nothrow @safe {
690             return rc.p.stdout;
691         }
692     }
693 
694     static if (__traits(hasMember, ProcessT, "stderr")) {
695         ref FileReadChannel stderr() nothrow @trusted {
696             return rc.p.stderr;
697         }
698     }
699 
700     void dispose() @trusted {
701         if (rc.backgroundReply.among(Reply.none, Reply.running)) {
702             rc.background.put(Msg.stop);
703             rc.background.join;
704             rc.backgroundReply = rc.background.reply;
705         }
706         rc.p.dispose;
707     }
708 
709     /** Send `signal` to the process.
710      *
711      * Param:
712      *  signal = a signal from `core.sys.posix.signal`
713      */
714     void kill(int signal = SIGKILL) nothrow @trusted {
715         rc.background.setSignal(signal);
716         rc.background.kill();
717     }
718 
719     int wait() @trusted {
720         while (!this.tryWait) {
721             Thread.sleep(20.dur!"msecs");
722         }
723         return rc.p.wait;
724     }
725 
726     bool tryWait() @trusted {
727         return rc.p.tryWait;
728     }
729 
730     int status() @trusted {
731         return rc.p.status;
732     }
733 
734     bool terminated() @trusted {
735         return rc.p.terminated;
736     }
737 
738     bool timeoutTriggered() @trusted {
739         if (rc.backgroundReply.among(Reply.none, Reply.running)) {
740             rc.background.put(Msg.status);
741             rc.backgroundReply = rc.background.reply;
742         }
743         return rc.backgroundReply == Reply.killedByTimeout;
744     }
745 }
746 
747 auto timeout(T)(T p, Duration timeout_) @trusted {
748     return Timeout!T(p, timeout_);
749 }
750 
751 /// Returns when the process has pending data.
752 void waitForPendingData(ProcessT)(Process p) {
753     while (!p.pipe.hasPendingData || !p.stderr.hasPendingData) {
754         Thread.sleep(20.dur!"msecs");
755     }
756 }
757 
758 @("shall kill the process after the timeout")
759 unittest {
760     import std.datetime.stopwatch : StopWatch, AutoStart;
761 
762     auto p = pipeProcess(["sleep", "1m"]).timeout(100.dur!"msecs").rcKill;
763     auto sw = StopWatch(AutoStart.yes);
764     p.wait;
765     sw.stop;
766 
767     assert(sw.peek >= 100.dur!"msecs");
768     assert(sw.peek <= 500.dur!"msecs");
769     assert(p.wait == -9);
770     assert(p.terminated);
771     assert(p.status == -9);
772     assert(p.timeoutTriggered);
773 }
774 
775 struct DrainElement {
776     enum Type {
777         stdout,
778         stderr,
779     }
780 
781     Type type;
782     const(ubyte)[] data;
783 
784     /// Returns: iterates the data as an input range.
785     auto byUTF8() @safe pure nothrow const @nogc {
786         static import std.utf;
787 
788         return std.utf.byUTF!(const(char))(cast(const(char)[]) data);
789     }
790 
791     bool empty() @safe pure nothrow const @nogc {
792         return data.length == 0;
793     }
794 }
795 
796 /** A range that drains a process stdout/stderr until it terminates.
797  *
798  * There may be `DrainElement` that are empty.
799  */
800 struct DrainRange(ProcessT) {
801     private {
802         enum State {
803             start,
804             draining,
805             lastStdout,
806             lastStderr,
807             lastElement,
808             empty,
809         }
810 
811         ProcessT p;
812         DrainElement front_;
813         State st;
814         ubyte[] buf;
815     }
816 
817     this(ProcessT p) {
818         this.p = p;
819         this.buf = new ubyte[4096];
820     }
821 
822     DrainElement front() @safe pure nothrow const @nogc {
823         assert(!empty, "Can't get front of an empty range");
824         return front_;
825     }
826 
827     void popFront() @safe {
828         assert(!empty, "Can't pop front of an empty range");
829 
830         static bool isAnyPipeOpen(ref ProcessT p) {
831             return p.stdout.isOpen || p.stderr.isOpen;
832         }
833 
834         DrainElement readData(ref ProcessT p) @safe {
835             if (p.stderr.hasPendingData) {
836                 return DrainElement(DrainElement.Type.stderr, p.stderr.read(buf));
837             } else if (p.stdout.hasPendingData) {
838                 return DrainElement(DrainElement.Type.stdout, p.stdout.read(buf));
839             }
840             return DrainElement.init;
841         }
842 
843         DrainElement waitUntilData() @safe {
844             import std.datetime : Clock;
845 
846             // may livelock if the process never terminates and never writes to
847             // the terminal. timeout ensure that it sooner or later is break
848             // the loop. This is important if the drain is part of a timeout
849             // wrapping.
850 
851             const timeout = 100.dur!"msecs";
852             const stopAt = Clock.currTime + timeout;
853             const sleepFor = timeout / 20;
854             const useSleep = Clock.currTime + sleepFor;
855             bool running = true;
856             while (running) {
857                 const now = Clock.currTime;
858 
859                 running = (now < stopAt) && isAnyPipeOpen(p);
860 
861                 auto bufRead = readData(p);
862 
863                 if (!bufRead.empty) {
864                     return DrainElement(bufRead.type, bufRead.data.dup);
865                 } else if (running && now > useSleep && bufRead.empty) {
866                     import core.thread : Thread;
867 
868                     () @trusted { Thread.sleep(sleepFor); }();
869                 }
870             }
871 
872             return DrainElement.init;
873         }
874 
875         front_ = DrainElement.init;
876 
877         final switch (st) {
878         case State.start:
879             st = State.draining;
880             front_ = waitUntilData;
881             break;
882         case State.draining:
883             if (p.terminated) {
884                 st = State.lastStdout;
885             } else if (isAnyPipeOpen(p)) {
886                 front_ = waitUntilData();
887             } else {
888                 st = State.lastStdout;
889             }
890             break;
891         case State.lastStdout:
892             if (p.stdout.hasPendingData) {
893                 front_ = DrainElement(DrainElement.Type.stdout, p.stdout.read(buf).dup);
894             } else {
895                 st = State.lastStderr;
896             }
897             break;
898         case State.lastStderr:
899             if (p.stderr.hasPendingData) {
900                 front_ = DrainElement(DrainElement.Type.stderr, p.stderr.read(buf).dup);
901             } else {
902                 st = State.lastElement;
903             }
904             break;
905         case State.lastElement:
906             st = State.empty;
907             break;
908         case State.empty:
909             break;
910         }
911     }
912 
913     bool empty() @safe pure nothrow const @nogc {
914         return st == State.empty;
915     }
916 }
917 
918 /// Drain a process pipe until empty.
919 auto drain(T)(T p) {
920     return DrainRange!T(p);
921 }
922 
923 /// Read the data from a ReadChannel by line.
924 struct DrainByLineCopyRange(ProcessT) {
925     private {
926         enum State {
927             start,
928             draining,
929             lastLine,
930             lastBuf,
931             empty,
932         }
933 
934         ProcessT process;
935         DrainRange!ProcessT range;
936         State st;
937         const(ubyte)[] buf;
938         const(char)[] line;
939     }
940 
941     this(ProcessT p) {
942         process = p;
943         range = p.drain;
944     }
945 
946     string front() @trusted pure nothrow const @nogc {
947         import std.exception : assumeUnique;
948 
949         assert(!empty, "Can't get front of an empty range");
950         return line.assumeUnique;
951     }
952 
953     void popFront() @safe {
954         assert(!empty, "Can't pop front of an empty range");
955         import std.algorithm : countUntil;
956         import std.array : array;
957         static import std.utf;
958 
959         const(ubyte)[] updateBuf(size_t idx) {
960             const(ubyte)[] tmp;
961             if (buf.empty) {
962                 // do nothing
963             } else if (idx == -1) {
964                 tmp = buf;
965                 buf = null;
966             } else {
967                 idx = () {
968                     if (idx < buf.length) {
969                         return idx + 1;
970                     }
971                     return idx;
972                 }();
973                 tmp = buf[0 .. idx];
974                 buf = buf[idx .. $];
975             }
976 
977             if (!tmp.empty && tmp[$ - 1] == '\n') {
978                 tmp = tmp[0 .. $ - 1];
979             }
980             return tmp;
981         }
982 
983         void drainLine() {
984             void fillBuf() {
985                 if (!range.empty) {
986                     range.popFront;
987                 }
988                 if (!range.empty) {
989                     buf ~= range.front.data;
990                 }
991             }
992 
993             size_t idx;
994             () {
995                 int cnt;
996                 do {
997                     fillBuf();
998                     idx = buf.countUntil('\n');
999                     // 2 is a magic number which mean that it at most wait 2x timeout for data
1000                 }
1001                 while (!range.empty && idx == -1 && cnt++ < 2);
1002             }();
1003 
1004             if (idx != -1) {
1005                 auto tmp = updateBuf(idx);
1006                 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array;
1007             }
1008         }
1009 
1010         bool lastLine() {
1011             size_t idx = buf.countUntil('\n');
1012             if (idx == -1)
1013                 return true;
1014 
1015             auto tmp = updateBuf(idx);
1016             line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array;
1017             return false;
1018         }
1019 
1020         line = null;
1021         final switch (st) {
1022         case State.start:
1023             drainLine;
1024             st = State.draining;
1025             break;
1026         case State.draining:
1027             drainLine;
1028             if (range.empty)
1029                 st = State.lastLine;
1030             break;
1031         case State.lastLine:
1032             if (lastLine)
1033                 st = State.lastBuf;
1034             break;
1035         case State.lastBuf:
1036             line = std.utf.byUTF!(const(char))(cast(const(char)[]) buf).array;
1037             st = State.empty;
1038             break;
1039         case State.empty:
1040             break;
1041         }
1042     }
1043 
1044     bool empty() @safe pure nothrow const @nogc {
1045         return st == State.empty;
1046     }
1047 }
1048 
1049 @("shall drain the process output by line")
1050 unittest {
1051     import std.algorithm : filter, joiner, map;
1052     import std.array : array;
1053 
1054     auto p = pipeProcess(["dd", "if=/dev/zero", "bs=10", "count=3"]).rcKill;
1055     auto res = p.process.drainByLineCopy.filter!"!a.empty".array;
1056 
1057     assert(res.length == 3);
1058     assert(res.joiner.count >= 30);
1059     assert(p.wait == 0);
1060     assert(p.terminated);
1061 }
1062 
1063 auto drainByLineCopy(T)(T p) {
1064     return DrainByLineCopyRange!T(p);
1065 }
1066 
1067 /// Drain the process output until it is done executing.
1068 auto drainToNull(T)(T p) {
1069     foreach (l; p.drain()) {
1070     }
1071     return p;
1072 }
1073 
1074 /// Drain the output from the process into an output range.
1075 auto drain(ProcessT, T)(ProcessT p, ref T range) {
1076     foreach (l; p.drain()) {
1077         range.put(l);
1078     }
1079     return p;
1080 }
1081 
1082 @("shall drain the output of a process while it is running with a separation of stdout and stderr")
1083 unittest {
1084     auto p = pipeProcess(["dd", "if=/dev/urandom", "bs=10", "count=3"]).rcKill;
1085     auto res = p.drain.array;
1086 
1087     // this is just a sanity check. It has to be kind a high because there is
1088     // some wiggleroom allowed
1089     assert(res.count > 1 && res.count <= 50);
1090 
1091     assert(res.filter!(a => a.type == DrainElement.Type.stdout)
1092             .map!(a => a.data)
1093             .joiner
1094             .count == 30);
1095     assert(p.wait == 0);
1096     assert(p.terminated);
1097 }
1098 
1099 @("shall kill the process tree when the timeout is reached")
1100 unittest {
1101     immutable script = makeScript(`#!/bin/bash
1102 sleep 10m
1103 `);
1104     scope (exit)
1105         remove(script);
1106 
1107     auto p = pipeProcess([script]).sandbox.timeout(1.dur!"seconds").rcKill;
1108     waitUntilChildren(p.osHandle, 1);
1109     const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
1110     const res = p.process.drain.array;
1111     const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
1112 
1113     assert(p.wait == -9);
1114     assert(p.terminated);
1115     assert(preChildren == 1);
1116     assert(postChildren == 0);
1117 }
1118 
1119 string makeScript(string script, string file = __FILE__, uint line = __LINE__) {
1120     import core.sys.posix.sys.stat;
1121     import std.file : getAttributes, setAttributes, thisExePath;
1122     import std.stdio : File;
1123     import std.path : baseName;
1124     import std.conv : to;
1125 
1126     immutable fname = thisExePath ~ "_" ~ file.baseName ~ line.to!string ~ ".sh";
1127 
1128     File(fname, "w").writeln(script);
1129     setAttributes(fname, getAttributes(fname) | S_IXUSR | S_IXGRP | S_IXOTH);
1130     return fname;
1131 }
1132 
1133 /// Wait for p to have num children or fail after 10s.
1134 void waitUntilChildren(RawPid p, int num) {
1135     import std.datetime : Clock;
1136 
1137     const failAt = Clock.currTime + 10.dur!"seconds";
1138     do {
1139         Thread.sleep(50.dur!"msecs");
1140         if (Clock.currTime > failAt)
1141             break;
1142     }
1143     while (makePidMap.getSubMap(p).remove(p).length < num);
1144 }
1145 
1146 alias Address = NamedType!(ulong, Tag!"Address", ulong.init, TagStringable);
1147 struct AddressMap {
1148     Address begin;
1149     Address end;
1150     NamedType!(string, Tag!"Permission", string.init, TagStringable) perm;
1151 }
1152 
1153 /** An assoc array mapping the path of each shared library loaded by
1154  * the process to the address it is loaded at in the process address space.
1155  */
1156 AddressMap[][AbsolutePath] libs(RawPid pid) nothrow {
1157     import std.stdio : File;
1158     import std.format : formattedRead, format;
1159     import std.algorithm : countUntil;
1160     import std.typecons : tuple;
1161     import std..string : strip;
1162 
1163     typeof(return) rval;
1164 
1165     try {
1166         foreach (l; File(format!"/proc/%s/maps"(pid)).byLine
1167                 .map!(a => tuple(a, a.countUntil('/')))
1168                 .filter!(a => a[1] != -1)) {
1169             try {
1170                 auto p = AbsolutePath(l[0][l[1] .. $].idup);
1171                 auto m = l[0][0 .. l[1]].strip;
1172                 ulong begin;
1173                 ulong end;
1174                 char[] perm;
1175                 if (formattedRead!"%x-%x %s "(m, begin, end, perm) != 3) {
1176                     continue;
1177                 }
1178 
1179                 auto amap = AddressMap(Address(begin), Address(end),
1180                         typeof(AddressMap.init.perm)(perm.idup));
1181                 if (auto v = p in rval) {
1182                     *v ~= amap;
1183                 } else {
1184                     rval[p] = [amap];
1185                 }
1186             } catch (Exception e) {
1187             }
1188         }
1189     } catch (Exception e) {
1190         logger.trace(e.msg).collectException;
1191     }
1192 
1193     return rval;
1194 }
1195 
1196 @("shall read the libraries the process is using")
1197 unittest {
1198     immutable scriptName = makeScript(`#!/bin/bash
1199 sleep 10m &
1200 sleep 10m &
1201 sleep 10m
1202 `);
1203     scope (exit)
1204         remove(scriptName);
1205 
1206     auto p = pipeProcess([scriptName]).sandbox.rcKill;
1207     waitUntilChildren(p.osHandle, 3);
1208     auto res = libs(p.osHandle);
1209     p.kill;
1210 
1211     assert(res.length != 0);
1212     assert(AbsolutePath("/bin/bash") in res);
1213 }
1214 
1215 /** Parses the output from a run of 'ldd' on a binary.
1216  *
1217  * Params:
1218  *  input = an input range of lines which is the output from ldd
1219  *
1220  * Example
1221  * ---
1222  * writeln(parseLddOutput(`
1223  *     linux-vdso.so.1 =>  (0x00007fffbf5fe000)
1224  *     libtinfo.so.5 => /lib/x86_64-linux-gnu/libtinfo.so.5 (0x00007fe28117f000)
1225  *     libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe280f7b000)
1226  *     libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe280bb4000)
1227  *     /lib64/ld-linux-x86-64.so.2 (0x00007fe2813dd000)
1228  * `))
1229  * ---
1230  *
1231  * Returns: a dictionary of {path: address} for each library required by the
1232  * specified binary.
1233  */
1234 Address[AbsolutePath] parseLddOutput(T)(T input) if (std_.range.isInputRange!T) {
1235     import std.regex : regex, matchFirst;
1236     import std.format : formattedRead;
1237 
1238     typeof(return) rval;
1239     const reLinux = regex(`\s(?P<lib>\S?/\S+)\s+\((?P<addr>0x.+)\)`);
1240 
1241     foreach (l; input) {
1242         auto m = matchFirst(l, reLinux);
1243         if (m.empty || m.length < 3) {
1244             continue;
1245         }
1246 
1247         try {
1248             ulong addr;
1249             formattedRead!"0x%x"(m["addr"], addr);
1250             rval[AbsolutePath(m["lib"])] = Address(addr);
1251         } catch (Exception e) {
1252         }
1253     }
1254 
1255     return rval;
1256 }
1257 
1258 @("shall parse the output of ldd")
1259 unittest {
1260     auto res = parseLddOutput([
1261             "linux-vdso.so.1 =>  (0x00007fffbf5fe000)",
1262             "libtinfo.so.5 => /lib/x86_64-linux-gnu/libtinfo.so.5 (0x00007fe28117f000)",
1263             "libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe280f7b000)",
1264             "libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe280bb4000)",
1265             "/lib64/ld-linux-x86-64.so.2 (0x00007fe2813dd000)"
1266             ]);
1267     assert(res.length == 3);
1268     assert(res[AbsolutePath("/lib/x86_64-linux-gnu/libtinfo.so.5")].get == 140610805166080);
1269 }