1 /**
2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module proc.process;
7 
8 import core.sys.posix.signal : SIGKILL;
9 import core.thread : Thread;
10 import core.time : dur, Duration;
11 import logger = std.experimental.logger;
12 import std.algorithm : filter, count, joiner, map;
13 import std.array : appender, empty, array;
14 import std.exception : collectException;
15 import std.stdio : File, fileno, writeln;
16 import std.typecons : Flag, Yes;
17 static import std.process;
18 static import std.stdio;
19 
20 import my.gc.refc;
21 import my.from_;
22 import my.path;
23 import my.named_type;
24 
25 public import proc.channel;
26 public import proc.pid;
27 
28 version (unittest) {
29     import std.file : remove;
30 }
31 
32 /** Manage a process by reference counting so that it is terminated when the it
33  * stops being used such as the instance going out of scope.
34  */
35 auto rcKill(T)(T p, int signal = SIGKILL) {
36     return ScopeKill!T(p, signal);
37 }
38 
39 // backward compatibility.
40 alias scopeKill = rcKill;
41 
42 struct ScopeKill(T) {
43     private {
44         static struct Payload {
45             T process;
46             int signal = SIGKILL;
47             bool hasProcess;
48 
49             ~this() @safe {
50                 if (hasProcess)
51                     process.dispose();
52             }
53         }
54 
55         RefCounted!Payload payload_;
56     }
57 
58     alias process this;
59     ref T process() {
60         return payload_.get.process;
61     }
62 
63     this(T process, int signal) @safe {
64         payload_ = refCounted(Payload(process, signal, true));
65     }
66 }
67 
68 /// Async process wrapper for a std.process SpawnProcess
69 struct SpawnProcess {
70     import core.sys.posix.signal : SIGKILL;
71     import std.algorithm : among;
72 
73     private {
74         enum State {
75             running,
76             terminated,
77             exitCode
78         }
79 
80         std.process.Pid process;
81         RawPid pid;
82         int status_;
83         State st;
84     }
85 
86     this(std.process.Pid process) @safe {
87         this.process = process;
88         this.pid = process.osHandle.RawPid;
89     }
90 
91     ~this() @safe {
92     }
93 
94     /// Returns: The raw OS handle for the process ID.
95     RawPid osHandle() nothrow @safe {
96         return pid;
97     }
98 
99     /// Kill and cleanup the process.
100     void dispose() @safe {
101         final switch (st) {
102         case State.running:
103             this.kill;
104             this.wait;
105             break;
106         case State.terminated:
107             this.wait;
108             break;
109         case State.exitCode:
110             break;
111         }
112 
113         st = State.exitCode;
114     }
115 
116     /** Send `signal` to the process.
117      *
118      * Param:
119      *  signal = a signal from `core.sys.posix.signal`
120      */
121     void kill(int signal = SIGKILL) nothrow @trusted {
122         final switch (st) {
123         case State.running:
124             break;
125         case State.terminated:
126             goto case;
127         case State.exitCode:
128             return;
129         }
130 
131         try {
132             std.process.kill(process, signal);
133         } catch (Exception e) {
134         }
135 
136         st = State.terminated;
137     }
138 
139     /// Blocking wait for the process to terminated.
140     /// Returns: the exit status.
141     int wait() @safe {
142         final switch (st) {
143         case State.running:
144             status_ = std.process.wait(process);
145             break;
146         case State.terminated:
147             status_ = std.process.wait(process);
148             break;
149         case State.exitCode:
150             break;
151         }
152 
153         st = State.exitCode;
154 
155         return status_;
156     }
157 
158     /// Non-blocking wait for the process termination.
159     /// Returns: `true` if the process has terminated.
160     bool tryWait() @safe {
161         final switch (st) {
162         case State.running:
163             auto s = std.process.tryWait(process);
164             if (s.terminated) {
165                 st = State.exitCode;
166                 status_ = s.status;
167             }
168             break;
169         case State.terminated:
170             status_ = std.process.wait(process);
171             st = State.exitCode;
172             break;
173         case State.exitCode:
174             break;
175         }
176 
177         return st.among(State.terminated, State.exitCode) != 0;
178     }
179 
180     /// Returns: The exit status of the process.
181     int status() @safe {
182         if (st != State.exitCode) {
183             throw new Exception(
184                     "Process has not terminated and wait/tryWait been called to collect the exit status");
185         }
186         return status_;
187     }
188 
189     /// Returns: If the process has terminated.
190     bool terminated() @safe {
191         return st.among(State.terminated, State.exitCode) != 0;
192     }
193 }
194 
195 /// Async process that do not block on read from stdin/stderr.
196 struct PipeProcess {
197     import std.algorithm : among;
198     import core.sys.posix.signal : SIGKILL;
199 
200     private {
201         enum State {
202             running,
203             terminated,
204             exitCode
205         }
206 
207         std.process.ProcessPipes process;
208         std.process.Pid pid;
209 
210         FileReadChannel stderr_;
211         FileReadChannel stdout_;
212         FileWriteChannel stdin_;
213         int status_;
214         State st;
215     }
216 
217     this(std.process.Pid pid, File stdin, File stdout, File stderr) @safe {
218         this.pid = pid;
219 
220         this.stdin_ = FileWriteChannel(stdin);
221         this.stdout_ = FileReadChannel(stdout);
222         this.stderr_ = FileReadChannel(stderr);
223     }
224 
225     this(std.process.ProcessPipes process, std.process.Redirect r) @safe {
226         this.process = process;
227         this.pid = process.pid;
228 
229         if (r & std.process.Redirect.stdin) {
230             stdin_ = FileWriteChannel(this.process.stdin);
231         }
232         if (r & std.process.Redirect.stdout) {
233             stdout_ = FileReadChannel(this.process.stdout);
234         }
235         if (r & std.process.Redirect.stderr) {
236             this.stderr_ = FileReadChannel(this.process.stderr);
237         }
238     }
239 
240     /// Returns: The raw OS handle for the process ID.
241     RawPid osHandle() nothrow @safe {
242         return pid.osHandle.RawPid;
243     }
244 
245     /// Access to stdout.
246     ref FileWriteChannel stdin() return scope nothrow @safe {
247         return stdin_;
248     }
249 
250     /// Access to stdout.
251     ref FileReadChannel stdout() return scope nothrow @safe {
252         return stdout_;
253     }
254 
255     /// Access stderr.
256     ref FileReadChannel stderr() return scope nothrow @safe {
257         return stderr_;
258     }
259 
260     /// Kill and cleanup the process.
261     void dispose() @safe {
262         final switch (st) {
263         case State.running:
264             this.kill;
265             this.wait;
266             .destroy(process);
267             break;
268         case State.terminated:
269             this.wait;
270             .destroy(process);
271             break;
272         case State.exitCode:
273             break;
274         }
275 
276         st = State.exitCode;
277     }
278 
279     /** Send `signal` to the process.
280      *
281      * Param:
282      *  signal = a signal from `core.sys.posix.signal`
283      */
284     void kill(int signal = SIGKILL) nothrow @trusted {
285         final switch (st) {
286         case State.running:
287             break;
288         case State.terminated:
289             return;
290         case State.exitCode:
291             return;
292         }
293 
294         try {
295             std.process.kill(pid, signal);
296         } catch (Exception e) {
297         }
298 
299         st = State.terminated;
300     }
301 
302     /// Blocking wait for the process to terminated.
303     /// Returns: the exit status.
304     int wait() @safe {
305         final switch (st) {
306         case State.running:
307             status_ = std.process.wait(pid);
308             break;
309         case State.terminated:
310             status_ = std.process.wait(pid);
311             break;
312         case State.exitCode:
313             break;
314         }
315 
316         st = State.exitCode;
317 
318         return status_;
319     }
320 
321     /// Non-blocking wait for the process termination.
322     /// Returns: `true` if the process has terminated.
323     bool tryWait() @safe {
324         final switch (st) {
325         case State.running:
326             auto s = std.process.tryWait(pid);
327             if (s.terminated) {
328                 st = State.exitCode;
329                 status_ = s.status;
330             }
331             break;
332         case State.terminated:
333             status_ = std.process.wait(pid);
334             st = State.exitCode;
335             break;
336         case State.exitCode:
337             break;
338         }
339 
340         return st.among(State.terminated, State.exitCode) != 0;
341     }
342 
343     /// Returns: The exit status of the process.
344     int status() @safe {
345         if (st != State.exitCode) {
346             throw new Exception(
347                     "Process has not terminated and wait/tryWait been called to collect the exit status");
348         }
349         return status_;
350     }
351 
352     /// Returns: If the process has terminated.
353     bool terminated() @safe {
354         return st.among(State.terminated, State.exitCode) != 0;
355     }
356 }
357 
358 SpawnProcess spawnProcess(scope const(char[])[] args, File stdin = std.stdio.stdin,
359         File stdout = std.stdio.stdout, File stderr = std.stdio.stderr,
360         const string[string] env = null, std.process.Config config = std.process.Config.none,
361         scope const char[] workDir = null) {
362     return SpawnProcess(std.process.spawnProcess(args, stdin, stdout, stderr,
363             env, config, workDir));
364 }
365 
366 SpawnProcess spawnProcess(scope const(char[])[] args, const string[string] env,
367         std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) {
368     return SpawnProcess(std.process.spawnProcess(args, std.stdio.stdin,
369             std.stdio.stdout, std.stdio.stderr, env, config, workDir));
370 }
371 
372 SpawnProcess spawnProcess(scope const(char)[] program,
373         File stdin = std.stdio.stdin, File stdout = std.stdio.stdout,
374         File stderr = std.stdio.stderr, const string[string] env = null,
375         std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) {
376     return SpawnProcess(std.process.spawnProcess((&program)[0 .. 1], stdin,
377             stdout, stderr, env, config, workDir));
378 }
379 
380 SpawnProcess spawnShell(scope const(char)[] command, File stdin = std.stdio.stdin,
381         File stdout = std.stdio.stdout, File stderr = std.stdio.stderr,
382         scope const string[string] env = null, std.process.Config config = std.process.Config.none,
383         scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) {
384     return SpawnProcess(std.process.spawnShell(command, stdin, stdout, stderr,
385             env, config, workDir, shellPath));
386 }
387 
388 /// ditto
389 SpawnProcess spawnShell(scope const(char)[] command, scope const string[string] env,
390         std.process.Config config = std.process.Config.none,
391         scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) {
392     return SpawnProcess(std.process.spawnShell(command, env, config, workDir, shellPath));
393 }
394 
395 PipeProcess pipeProcess(scope const(char[])[] args,
396         std.process.Redirect redirect = std.process.Redirect.all,
397         const string[string] env = null, std.process.Config config = std.process.Config.none,
398         scope const(char)[] workDir = null) @safe {
399     return PipeProcess(std.process.pipeProcess(args, redirect, env, config, workDir), redirect);
400 }
401 
402 PipeProcess pipeShell(scope const(char)[] command,
403         std.process.Redirect redirect = std.process.Redirect.all,
404         const string[string] env = null, std.process.Config config = std.process.Config.none,
405         scope const(char)[] workDir = null, string shellPath = std.process.nativeShell) @safe {
406     return PipeProcess(std.process.pipeShell(command, redirect, env, config,
407             workDir, shellPath), redirect);
408 }
409 
410 /** Moves the process to a separate process group and on exit kill it and all
411  * its children.
412  */
413 @safe struct Sandbox(ProcessT) {
414     import core.sys.posix.signal : SIGKILL;
415 
416     private {
417         ProcessT p;
418         RawPid pid;
419     }
420 
421     this(ProcessT p) @safe {
422         import core.sys.posix.unistd : setpgid;
423 
424         this.p = p;
425         this.pid = p.osHandle;
426         setpgid(pid, 0);
427     }
428 
429     RawPid osHandle() nothrow @safe {
430         return pid;
431     }
432 
433     static if (__traits(hasMember, ProcessT, "stdin")) {
434         ref FileWriteChannel stdin() nothrow @safe {
435             return p.stdin;
436         }
437     }
438 
439     static if (__traits(hasMember, ProcessT, "stdout")) {
440         ref FileReadChannel stdout() nothrow @safe {
441             return p.stdout;
442         }
443     }
444 
445     static if (__traits(hasMember, ProcessT, "stderr")) {
446         ref FileReadChannel stderr() nothrow @safe {
447             return p.stderr;
448         }
449     }
450 
451     void dispose() @safe {
452         // this also reaps the children thus cleaning up zombies
453         this.kill;
454         p.dispose;
455     }
456 
457     /** Send `signal` to the process.
458      *
459      * Param:
460      *  signal = a signal from `core.sys.posix.signal`
461      */
462     void kill(int signal = SIGKILL) nothrow @safe {
463         // must first retrieve the submap because after the process is killed
464         // its children may have changed.
465         auto pmap = makePidMap.getSubMap(pid);
466 
467         p.kill(signal);
468 
469         // only kill and reap the children
470         pmap.remove(pid);
471         proc.pid.kill(pmap, Yes.onlyCurrentUser, signal).reap;
472     }
473 
474     int wait() @safe {
475         return p.wait;
476     }
477 
478     bool tryWait() @safe {
479         return p.tryWait;
480     }
481 
482     int status() @safe {
483         return p.status;
484     }
485 
486     bool terminated() @safe {
487         return p.terminated;
488     }
489 }
490 
491 auto sandbox(T)(T p) @safe {
492     return Sandbox!T(p);
493 }
494 
495 @("shall terminate a group of processes")
496 unittest {
497     import std.datetime.stopwatch : StopWatch, AutoStart;
498 
499     immutable scriptName = makeScript(`#!/bin/bash
500 sleep 10m &
501 sleep 10m &
502 sleep 10m
503 `);
504     scope (exit)
505         remove(scriptName);
506 
507     auto p = pipeProcess([scriptName]).sandbox.rcKill;
508     waitUntilChildren(p.osHandle, 3);
509     const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
510     p.kill;
511     Thread.sleep(500.dur!"msecs"); // wait for the OS to kill the children
512     const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
513 
514     assert(p.wait == -9);
515     assert(p.terminated);
516     assert(preChildren == 3);
517     assert(postChildren == 0);
518 }
519 
520 /** dispose the process after the timeout.
521  */
522 @safe struct Timeout(ProcessT) {
523     import core.sys.posix.signal : SIGKILL;
524     import core.thread;
525     import std.algorithm : among;
526     import std.datetime : Clock, Duration;
527 
528     private {
529         enum Msg {
530             none,
531             stop,
532             status,
533         }
534 
535         enum Reply {
536             none,
537             running,
538             normalDeath,
539             killedByTimeout,
540         }
541 
542         static struct Payload {
543             ProcessT p;
544             RawPid pid;
545             Background background;
546             Reply backgroundReply;
547         }
548 
549         RefCounted!Payload rc;
550     }
551 
552     this(ProcessT p, Duration timeout) @trusted {
553         import std.algorithm : move;
554 
555         auto pid = p.osHandle;
556         rc = refCounted(Payload(move(p), pid));
557         rc.get.background = new Background(&rc.get.p, timeout);
558         rc.get.background.isDaemon = true;
559         rc.get.background.start;
560     }
561 
562     ~this() @trusted {
563         rc.release;
564     }
565 
566     private static class Background : Thread {
567         import core.sync.condition : Condition;
568         import core.sync.mutex : Mutex;
569 
570         Duration timeout;
571         ProcessT* p;
572         Mutex mtx;
573         Msg[] msg;
574         Reply reply_;
575         RawPid pid;
576         int signal = SIGKILL;
577 
578         this(ProcessT* p, Duration timeout) {
579             this.p = p;
580             this.timeout = timeout;
581             this.mtx = new Mutex();
582             this.pid = p.osHandle;
583 
584             super(&run);
585         }
586 
587         void run() {
588             checkProcess(this.pid, this.timeout, this);
589         }
590 
591         void put(Msg msg) @trusted nothrow {
592             this.mtx.lock_nothrow();
593             scope (exit)
594                 this.mtx.unlock_nothrow();
595             this.msg ~= msg;
596         }
597 
598         Msg popMsg() @trusted nothrow {
599             this.mtx.lock_nothrow();
600             scope (exit)
601                 this.mtx.unlock_nothrow();
602             if (msg.empty)
603                 return Msg.none;
604             auto rval = msg[$ - 1];
605             msg = msg[0 .. $ - 1];
606             return rval;
607         }
608 
609         void setReply(Reply reply_) @trusted nothrow {
610             this.mtx.lock_nothrow();
611             scope (exit)
612                 this.mtx.unlock_nothrow();
613             this.reply_ = reply_;
614         }
615 
616         Reply reply() @trusted nothrow {
617             this.mtx.lock_nothrow();
618             scope (exit)
619                 this.mtx.unlock_nothrow();
620             return reply_;
621         }
622 
623         void setSignal(int signal) @trusted nothrow {
624             this.mtx.lock_nothrow();
625             scope (exit)
626                 this.mtx.unlock_nothrow();
627             this.signal = signal;
628         }
629 
630         void kill() @trusted nothrow {
631             this.mtx.lock_nothrow();
632             scope (exit)
633                 this.mtx.unlock_nothrow();
634             p.kill(signal);
635         }
636     }
637 
638     private static void checkProcess(RawPid p, Duration timeout, Background bg) nothrow {
639         import std.algorithm : max, min;
640         import std.variant : Variant;
641         static import core.sys.posix.signal;
642 
643         const stopAt = Clock.currTime + timeout;
644         // the purpose is to poll the process often "enough" that if it
645         // terminates early `Process` detects it fast enough. 1000 is chosen
646         // because it "feels good". the purpose
647         auto sleepInterval = min(50, max(1, timeout.total!"msecs" / 1000)).dur!"msecs";
648 
649         bool forceStop;
650         bool running = true;
651         while (running && Clock.currTime < stopAt) {
652             const msg = bg.popMsg;
653 
654             final switch (msg) {
655             case Msg.none:
656                 () @trusted { Thread.sleep(sleepInterval); }();
657                 break;
658             case Msg.stop:
659                 forceStop = true;
660                 running = false;
661                 break;
662             case Msg.status:
663                 bg.setReply(Reply.running);
664                 break;
665             }
666 
667             () @trusted {
668                 if (core.sys.posix.signal.kill(p, 0) == -1) {
669                     running = false;
670                 }
671             }();
672         }
673 
674         // may be children alive thus must ensure that the whole process tree
675         // is killed if this is a sandbox with a timeout.
676         bg.kill();
677 
678         if (!forceStop && Clock.currTime >= stopAt) {
679             bg.setReply(Reply.killedByTimeout);
680         } else {
681             bg.setReply(Reply.normalDeath);
682         }
683     }
684 
685     RawPid osHandle() nothrow @trusted {
686         return rc.get.pid;
687     }
688 
689     static if (__traits(hasMember, ProcessT, "stdin")) {
690         ref FileWriteChannel stdin() nothrow @safe {
691             return rc.get.p.stdin;
692         }
693     }
694 
695     static if (__traits(hasMember, ProcessT, "stdout")) {
696         ref FileReadChannel stdout() nothrow @safe {
697             return rc.get.p.stdout;
698         }
699     }
700 
701     static if (__traits(hasMember, ProcessT, "stderr")) {
702         ref FileReadChannel stderr() nothrow @trusted {
703             return rc.get.p.stderr;
704         }
705     }
706 
707     void dispose() @trusted {
708         if (rc.get.backgroundReply.among(Reply.none, Reply.running)) {
709             rc.get.background.put(Msg.stop);
710             rc.get.background.join;
711             rc.get.backgroundReply = rc.get.background.reply;
712         }
713         rc.get.p.dispose;
714     }
715 
716     /** Send `signal` to the process.
717      *
718      * Param:
719      *  signal = a signal from `core.sys.posix.signal`
720      */
721     void kill(int signal = SIGKILL) nothrow @trusted {
722         rc.get.background.setSignal(signal);
723         rc.get.background.kill();
724     }
725 
726     int wait() @trusted {
727         while (!this.tryWait) {
728             Thread.sleep(20.dur!"msecs");
729         }
730         return rc.get.p.wait;
731     }
732 
733     bool tryWait() @trusted {
734         return rc.get.p.tryWait;
735     }
736 
737     int status() @trusted {
738         return rc.get.p.status;
739     }
740 
741     bool terminated() @trusted {
742         return rc.get.p.terminated;
743     }
744 
745     bool timeoutTriggered() @trusted {
746         if (rc.get.backgroundReply.among(Reply.none, Reply.running)) {
747             rc.get.background.put(Msg.status);
748             rc.get.backgroundReply = rc.get.background.reply;
749         }
750         return rc.get.backgroundReply == Reply.killedByTimeout;
751     }
752 }
753 
754 auto timeout(T)(T p, Duration timeout_) @trusted {
755     return Timeout!T(p, timeout_);
756 }
757 
758 /// Returns when the process has pending data.
759 void waitForPendingData(ProcessT)(Process p) {
760     while (!p.pipe.hasPendingData || !p.stderr.hasPendingData) {
761         Thread.sleep(20.dur!"msecs");
762     }
763 }
764 
765 @("shall kill the process after the timeout")
766 unittest {
767     import std.datetime.stopwatch : StopWatch, AutoStart;
768 
769     auto p = pipeProcess(["sleep", "1m"]).timeout(100.dur!"msecs").rcKill;
770     auto sw = StopWatch(AutoStart.yes);
771     p.wait;
772     sw.stop;
773 
774     assert(sw.peek >= 100.dur!"msecs");
775     assert(sw.peek <= 500.dur!"msecs");
776     assert(p.wait == -9);
777     assert(p.terminated);
778     assert(p.status == -9);
779     assert(p.timeoutTriggered);
780 }
781 
782 struct DrainElement {
783     enum Type {
784         stdout,
785         stderr,
786     }
787 
788     Type type;
789     const(ubyte)[] data;
790 
791     /// Returns: iterates the data as an input range.
792     auto byUTF8() @safe pure nothrow const @nogc {
793         static import std.utf;
794 
795         return std.utf.byUTF!(const(char))(cast(const(char)[]) data);
796     }
797 
798     bool empty() @safe pure nothrow const @nogc {
799         return data.length == 0;
800     }
801 }
802 
803 /** A range that drains a process stdout/stderr until it terminates.
804  *
805  * There may be `DrainElement` that are empty.
806  */
807 struct DrainRange(ProcessT) {
808     private {
809         enum State {
810             start,
811             draining,
812             lastStdout,
813             lastStderr,
814             lastElement,
815             empty,
816         }
817 
818         ProcessT p;
819         DrainElement front_;
820         State st;
821         ubyte[] buf;
822     }
823 
824     this(ProcessT p) {
825         this.p = p;
826         this.buf = new ubyte[4096];
827     }
828 
829     DrainElement front() @safe pure nothrow const @nogc {
830         assert(!empty, "Can't get front of an empty range");
831         return front_;
832     }
833 
834     void popFront() @safe {
835         assert(!empty, "Can't pop front of an empty range");
836 
837         static bool isAnyPipeOpen(ref ProcessT p) {
838             return p.stdout.isOpen || p.stderr.isOpen;
839         }
840 
841         DrainElement readData(ref ProcessT p) @safe {
842             if (p.stderr.hasPendingData) {
843                 return DrainElement(DrainElement.Type.stderr, p.stderr.read(buf));
844             } else if (p.stdout.hasPendingData) {
845                 return DrainElement(DrainElement.Type.stdout, p.stdout.read(buf));
846             }
847             return DrainElement.init;
848         }
849 
850         DrainElement waitUntilData() @safe {
851             import std.datetime : Clock;
852 
853             // may livelock if the process never terminates and never writes to
854             // the terminal. timeout ensure that it sooner or later is break
855             // the loop. This is important if the drain is part of a timeout
856             // wrapping.
857 
858             const timeout = 100.dur!"msecs";
859             const stopAt = Clock.currTime + timeout;
860             const sleepFor = timeout / 20;
861             const useSleep = Clock.currTime + sleepFor;
862             bool running = true;
863             while (running) {
864                 const now = Clock.currTime;
865 
866                 running = (now < stopAt) && isAnyPipeOpen(p);
867 
868                 auto bufRead = readData(p);
869 
870                 if (!bufRead.empty) {
871                     return DrainElement(bufRead.type, bufRead.data.dup);
872                 } else if (running && now > useSleep && bufRead.empty) {
873                     import core.thread : Thread;
874 
875                     () @trusted { Thread.sleep(sleepFor); }();
876                 }
877             }
878 
879             return DrainElement.init;
880         }
881 
882         front_ = DrainElement.init;
883 
884         final switch (st) {
885         case State.start:
886             st = State.draining;
887             front_ = waitUntilData;
888             break;
889         case State.draining:
890             if (p.terminated) {
891                 st = State.lastStdout;
892             } else if (isAnyPipeOpen(p)) {
893                 front_ = waitUntilData();
894             } else {
895                 st = State.lastStdout;
896             }
897             break;
898         case State.lastStdout:
899             if (p.stdout.hasPendingData) {
900                 front_ = DrainElement(DrainElement.Type.stdout, p.stdout.read(buf).dup);
901             } else {
902                 st = State.lastStderr;
903             }
904             break;
905         case State.lastStderr:
906             if (p.stderr.hasPendingData) {
907                 front_ = DrainElement(DrainElement.Type.stderr, p.stderr.read(buf).dup);
908             } else {
909                 st = State.lastElement;
910             }
911             break;
912         case State.lastElement:
913             st = State.empty;
914             break;
915         case State.empty:
916             break;
917         }
918     }
919 
920     bool empty() @safe pure nothrow const @nogc {
921         return st == State.empty;
922     }
923 }
924 
925 /// Drain a process pipe until empty.
926 auto drain(T)(T p) {
927     return DrainRange!T(p);
928 }
929 
930 /// Read the data from a ReadChannel by line.
931 struct DrainByLineCopyRange(ProcessT) {
932     private {
933         enum State {
934             start,
935             draining,
936             lastLine,
937             lastBuf,
938             empty,
939         }
940 
941         ProcessT process;
942         DrainRange!ProcessT range;
943         State st;
944         const(ubyte)[] buf;
945         const(char)[] line;
946     }
947 
948     this(ProcessT p) {
949         process = p;
950         range = p.drain;
951     }
952 
953     string front() @trusted pure nothrow const @nogc {
954         import std.exception : assumeUnique;
955 
956         assert(!empty, "Can't get front of an empty range");
957         return line.assumeUnique;
958     }
959 
960     void popFront() @safe {
961         assert(!empty, "Can't pop front of an empty range");
962         import std.algorithm : countUntil;
963         import std.array : array;
964         static import std.utf;
965 
966         const(ubyte)[] updateBuf(size_t idx) {
967             const(ubyte)[] tmp;
968             if (buf.empty) {
969                 // do nothing
970             } else if (idx == -1) {
971                 tmp = buf;
972                 buf = null;
973             } else {
974                 idx = () {
975                     if (idx < buf.length) {
976                         return idx + 1;
977                     }
978                     return idx;
979                 }();
980                 tmp = buf[0 .. idx];
981                 buf = buf[idx .. $];
982             }
983 
984             if (!tmp.empty && tmp[$ - 1] == '\n') {
985                 tmp = tmp[0 .. $ - 1];
986             }
987             return tmp;
988         }
989 
990         void drainLine() {
991             void fillBuf() {
992                 if (!range.empty) {
993                     range.popFront;
994                 }
995                 if (!range.empty) {
996                     buf ~= range.front.data;
997                 }
998             }
999 
1000             size_t idx;
1001             () {
1002                 int cnt;
1003                 do {
1004                     fillBuf();
1005                     idx = buf.countUntil('\n');
1006                     // 2 is a magic number which mean that it at most wait 2x timeout for data
1007                 }
1008                 while (!range.empty && idx == -1 && cnt++ < 2);
1009             }();
1010 
1011             if (idx != -1) {
1012                 auto tmp = updateBuf(idx);
1013                 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array;
1014             }
1015         }
1016 
1017         bool lastLine() {
1018             size_t idx = buf.countUntil('\n');
1019             if (idx == -1)
1020                 return true;
1021 
1022             auto tmp = updateBuf(idx);
1023             line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array;
1024             return false;
1025         }
1026 
1027         line = null;
1028         final switch (st) {
1029         case State.start:
1030             drainLine;
1031             st = State.draining;
1032             break;
1033         case State.draining:
1034             drainLine;
1035             if (range.empty)
1036                 st = State.lastLine;
1037             break;
1038         case State.lastLine:
1039             if (lastLine)
1040                 st = State.lastBuf;
1041             break;
1042         case State.lastBuf:
1043             line = std.utf.byUTF!(const(char))(cast(const(char)[]) buf).array;
1044             st = State.empty;
1045             break;
1046         case State.empty:
1047             break;
1048         }
1049     }
1050 
1051     bool empty() @safe pure nothrow const @nogc {
1052         return st == State.empty;
1053     }
1054 }
1055 
1056 @("shall drain the process output by line")
1057 unittest {
1058     import std.algorithm : filter, joiner, map;
1059     import std.array : array;
1060 
1061     auto p = pipeProcess(["dd", "if=/dev/zero", "bs=10", "count=3"]).rcKill;
1062     auto res = p.process.drainByLineCopy.filter!"!a.empty".array;
1063 
1064     assert(res.length == 3);
1065     assert(res.joiner.count >= 30);
1066     assert(p.wait == 0);
1067     assert(p.terminated);
1068 }
1069 
1070 auto drainByLineCopy(T)(T p) {
1071     return DrainByLineCopyRange!T(p);
1072 }
1073 
1074 /// Drain the process output until it is done executing.
1075 auto drainToNull(T)(T p) {
1076     foreach (l; p.drain()) {
1077     }
1078     return p;
1079 }
1080 
1081 /// Drain the output from the process into an output range.
1082 auto drain(ProcessT, T)(ProcessT p, ref T range) {
1083     foreach (l; p.drain()) {
1084         range.put(l);
1085     }
1086     return p;
1087 }
1088 
1089 @("shall drain the output of a process while it is running with a separation of stdout and stderr")
1090 unittest {
1091     auto p = pipeProcess(["dd", "if=/dev/urandom", "bs=10", "count=3"]).rcKill;
1092     auto res = p.drain.array;
1093 
1094     // this is just a sanity check. It has to be kind a high because there is
1095     // some wiggleroom allowed
1096     assert(res.count > 1 && res.count <= 50);
1097 
1098     assert(res.filter!(a => a.type == DrainElement.Type.stdout)
1099             .map!(a => a.data)
1100             .joiner
1101             .count == 30);
1102     assert(p.wait == 0);
1103     assert(p.terminated);
1104 }
1105 
1106 @("shall kill the process tree when the timeout is reached")
1107 unittest {
1108     immutable script = makeScript(`#!/bin/bash
1109 sleep 10m
1110 `);
1111     scope (exit)
1112         remove(script);
1113 
1114     auto p = pipeProcess([script]).sandbox.timeout(1.dur!"seconds").rcKill;
1115     waitUntilChildren(p.osHandle, 1);
1116     const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
1117     const res = p.process.drain.array;
1118     const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
1119 
1120     assert(p.wait == -9);
1121     assert(p.terminated);
1122     assert(preChildren == 1);
1123     assert(postChildren == 0);
1124 }
1125 
1126 string makeScript(string script, string file = __FILE__, uint line = __LINE__) {
1127     import core.sys.posix.sys.stat;
1128     import std.file : getAttributes, setAttributes, thisExePath;
1129     import std.stdio : File;
1130     import std.path : baseName;
1131     import std.conv : to;
1132 
1133     immutable fname = thisExePath ~ "_" ~ file.baseName ~ line.to!string ~ ".sh";
1134 
1135     File(fname, "w").writeln(script);
1136     setAttributes(fname, getAttributes(fname) | S_IXUSR | S_IXGRP | S_IXOTH);
1137     return fname;
1138 }
1139 
1140 /// Wait for p to have num children or fail after 10s.
1141 void waitUntilChildren(RawPid p, int num) {
1142     import std.datetime : Clock;
1143 
1144     const failAt = Clock.currTime + 10.dur!"seconds";
1145     do {
1146         Thread.sleep(50.dur!"msecs");
1147         if (Clock.currTime > failAt)
1148             break;
1149     }
1150     while (makePidMap.getSubMap(p).remove(p).length < num);
1151 }
1152 
1153 alias Address = NamedType!(ulong, Tag!"Address", ulong.init, TagStringable);
1154 struct AddressMap {
1155     Address begin;
1156     Address end;
1157     NamedType!(string, Tag!"Permission", string.init, TagStringable) perm;
1158 }
1159 
1160 /** An assoc array mapping the path of each shared library loaded by
1161  * the process to the address it is loaded at in the process address space.
1162  */
1163 AddressMap[][AbsolutePath] libs(RawPid pid) nothrow {
1164     import std.stdio : File;
1165     import std.format : formattedRead, format;
1166     import std.algorithm : countUntil;
1167     import std.typecons : tuple;
1168     import std..string : strip;
1169 
1170     typeof(return) rval;
1171 
1172     try {
1173         foreach (l; File(format!"/proc/%s/maps"(pid)).byLine
1174                 .map!(a => tuple(a, a.countUntil('/')))
1175                 .filter!(a => a[1] != -1)) {
1176             try {
1177                 auto p = AbsolutePath(l[0][l[1] .. $].idup);
1178                 auto m = l[0][0 .. l[1]].strip;
1179                 ulong begin;
1180                 ulong end;
1181                 char[] perm;
1182                 if (formattedRead!"%x-%x %s "(m, begin, end, perm) != 3) {
1183                     continue;
1184                 }
1185 
1186                 auto amap = AddressMap(Address(begin), Address(end),
1187                         typeof(AddressMap.init.perm)(perm.idup));
1188                 if (auto v = p in rval) {
1189                     *v ~= amap;
1190                 } else {
1191                     rval[p] = [amap];
1192                 }
1193             } catch (Exception e) {
1194             }
1195         }
1196     } catch (Exception e) {
1197         logger.trace(e.msg).collectException;
1198     }
1199 
1200     return rval;
1201 }
1202 
1203 @("shall read the libraries the process is using")
1204 unittest {
1205     immutable scriptName = makeScript(`#!/bin/bash
1206 sleep 10m &
1207 sleep 10m &
1208 sleep 10m
1209 `);
1210     scope (exit)
1211         remove(scriptName);
1212 
1213     auto p = pipeProcess([scriptName]).sandbox.rcKill;
1214     waitUntilChildren(p.osHandle, 3);
1215     auto res = libs(p.osHandle);
1216     p.kill;
1217 
1218     assert(res.length != 0);
1219     assert(AbsolutePath("/bin/bash") in res);
1220 }
1221 
1222 /** Parses the output from a run of 'ldd' on a binary.
1223  *
1224  * Params:
1225  *  input = an input range of lines which is the output from ldd
1226  *
1227  * Example
1228  * ---
1229  * writeln(parseLddOutput(`
1230  *     linux-vdso.so.1 =>  (0x00007fffbf5fe000)
1231  *     libtinfo.so.5 => /lib/x86_64-linux-gnu/libtinfo.so.5 (0x00007fe28117f000)
1232  *     libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe280f7b000)
1233  *     libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe280bb4000)
1234  *     /lib64/ld-linux-x86-64.so.2 (0x00007fe2813dd000)
1235  * `))
1236  * ---
1237  *
1238  * Returns: a dictionary of {path: address} for each library required by the
1239  * specified binary.
1240  */
1241 Address[AbsolutePath] parseLddOutput(T)(T input) if (std_.range.isInputRange!T) {
1242     import std.regex : regex, matchFirst;
1243     import std.format : formattedRead;
1244 
1245     typeof(return) rval;
1246     const reLinux = regex(`\s(?P<lib>\S?/\S+)\s+\((?P<addr>0x.+)\)`);
1247 
1248     foreach (l; input) {
1249         auto m = matchFirst(l, reLinux);
1250         if (m.empty || m.length < 3) {
1251             continue;
1252         }
1253 
1254         try {
1255             ulong addr;
1256             formattedRead!"0x%x"(m["addr"], addr);
1257             rval[AbsolutePath(m["lib"])] = Address(addr);
1258         } catch (Exception e) {
1259         }
1260     }
1261 
1262     return rval;
1263 }
1264 
1265 @("shall parse the output of ldd")
1266 unittest {
1267     auto res = parseLddOutput([
1268             "linux-vdso.so.1 =>  (0x00007fffbf5fe000)",
1269             "libtinfo.so.5 => /lib/x86_64-linux-gnu/libtinfo.so.5 (0x00007fe28117f000)",
1270             "libdl.so.2 => /lib/x86_64-linux-gnu/libdl.so.2 (0x00007fe280f7b000)",
1271             "libc.so.6 => /lib/x86_64-linux-gnu/libc.so.6 (0x00007fe280bb4000)",
1272             "/lib64/ld-linux-x86-64.so.2 (0x00007fe2813dd000)"
1273             ]);
1274     assert(res.length == 3);
1275     assert(res[AbsolutePath("/lib/x86_64-linux-gnu/libtinfo.so.5")].get == 140610805166080);
1276 }