1 /**
2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module proc;
7 
8 import core.thread : Thread;
9 import core.time : dur, Duration;
10 import logger = std.experimental.logger;
11 import std.algorithm : filter, count, joiner, map;
12 import std.array : appender, empty, array;
13 import std.exception : collectException;
14 import std.stdio : File, fileno, writeln;
15 import std.typecons : Flag, Yes;
16 static import std.process;
17 static import std.stdio;
18 
19 import my.gc.refc;
20 
21 public import proc.channel;
22 public import proc.pid;
23 
24 version (unittest) {
25     import std.file : remove;
26 }
27 
28 /** Manage a process by reference counting so that it is terminated when the it
29  * stops being used such as the instance going out of scope.
30  */
31 auto rcKill(T)(T p) {
32     return refCounted(ScopeKill!T(p));
33 }
34 
35 // backward compatibility.
36 alias scopeKill = rcKill;
37 
38 struct ScopeKill(T) {
39     T process;
40     alias process this;
41     private bool hasProcess;
42 
43     this(T process) @safe {
44         this.process = process;
45         this.hasProcess = true;
46     }
47 
48     ~this() @safe {
49         if (hasProcess)
50             process.dispose();
51     }
52 }
53 
54 /// Async process wrapper for a std.process SpawnProcess
55 struct SpawnProcess {
56     import std.algorithm : among;
57 
58     private {
59         enum State {
60             running,
61             terminated,
62             exitCode
63         }
64 
65         std.process.Pid process;
66         RawPid pid;
67         int status_;
68         State st;
69     }
70 
71     this(std.process.Pid process) @safe {
72         this.process = process;
73         this.pid = process.osHandle.RawPid;
74     }
75 
76     ~this() @safe {
77     }
78 
79     /// Returns: The raw OS handle for the process ID.
80     RawPid osHandle() nothrow @safe {
81         return pid;
82     }
83 
84     /// Kill and cleanup the process.
85     void dispose() @safe {
86         final switch (st) {
87         case State.running:
88             this.kill;
89             this.wait;
90             break;
91         case State.terminated:
92             this.wait;
93             break;
94         case State.exitCode:
95             break;
96         }
97 
98         st = State.exitCode;
99     }
100 
101     /// Kill the process.
102     void kill() nothrow @trusted {
103         import core.sys.posix.signal : SIGKILL;
104 
105         final switch (st) {
106         case State.running:
107             break;
108         case State.terminated:
109             goto case;
110         case State.exitCode:
111             return;
112         }
113 
114         try {
115             std.process.kill(process, SIGKILL);
116         } catch (Exception e) {
117         }
118 
119         st = State.terminated;
120     }
121 
122     /// Blocking wait for the process to terminated.
123     /// Returns: the exit status.
124     int wait() @safe {
125         final switch (st) {
126         case State.running:
127             status_ = std.process.wait(process);
128             break;
129         case State.terminated:
130             status_ = std.process.wait(process);
131             break;
132         case State.exitCode:
133             break;
134         }
135 
136         st = State.exitCode;
137 
138         return status_;
139     }
140 
141     /// Non-blocking wait for the process termination.
142     /// Returns: `true` if the process has terminated.
143     bool tryWait() @safe {
144         final switch (st) {
145         case State.running:
146             auto s = std.process.tryWait(process);
147             if (s.terminated) {
148                 st = State.exitCode;
149                 status_ = s.status;
150             }
151             break;
152         case State.terminated:
153             status_ = std.process.wait(process);
154             st = State.exitCode;
155             break;
156         case State.exitCode:
157             break;
158         }
159 
160         return st.among(State.terminated, State.exitCode) != 0;
161     }
162 
163     /// Returns: The exit status of the process.
164     int status() @safe {
165         if (st != State.exitCode) {
166             throw new Exception(
167                     "Process has not terminated and wait/tryWait been called to collect the exit status");
168         }
169         return status_;
170     }
171 
172     /// Returns: If the process has terminated.
173     bool terminated() @safe {
174         return st.among(State.terminated, State.exitCode) != 0;
175     }
176 }
177 
178 /// Async process that do not block on read from stdin/stderr.
179 struct PipeProcess {
180     import std.algorithm : among;
181 
182     private {
183         enum State {
184             running,
185             terminated,
186             exitCode
187         }
188 
189         std.process.ProcessPipes process;
190         Pipe pipe_;
191         FileReadChannel stderr_;
192         int status_;
193         State st;
194         RawPid pid;
195     }
196 
197     this(std.process.ProcessPipes process) @safe {
198         this.process = process;
199         this.pipe_ = Pipe(this.process.stdout, this.process.stdin);
200         this.stderr_ = FileReadChannel(this.process.stderr);
201         this.pid = process.pid.osHandle.RawPid;
202     }
203 
204     /// Returns: The raw OS handle for the process ID.
205     RawPid osHandle() nothrow @safe {
206         return this.pid;
207     }
208 
209     /// Access to stdin and stdout.
210     ref Pipe pipe() return scope nothrow @safe {
211         return pipe_;
212     }
213 
214     /// Access stderr.
215     ref FileReadChannel stderr() return scope nothrow @safe {
216         return stderr_;
217     }
218 
219     /// Kill and cleanup the process.
220     void dispose() @safe {
221         final switch (st) {
222         case State.running:
223             this.kill;
224             this.wait;
225             .destroy(process);
226             break;
227         case State.terminated:
228             this.wait;
229             .destroy(process);
230             break;
231         case State.exitCode:
232             break;
233         }
234 
235         st = State.exitCode;
236     }
237 
238     /// Kill the process.
239     void kill() nothrow @trusted {
240         import core.sys.posix.signal : SIGKILL;
241 
242         final switch (st) {
243         case State.running:
244             break;
245         case State.terminated:
246             return;
247         case State.exitCode:
248             return;
249         }
250 
251         try {
252             std.process.kill(process.pid, SIGKILL);
253         } catch (Exception e) {
254         }
255 
256         st = State.terminated;
257     }
258 
259     /// Blocking wait for the process to terminated.
260     /// Returns: the exit status.
261     int wait() @safe {
262         final switch (st) {
263         case State.running:
264             status_ = std.process.wait(process.pid);
265             break;
266         case State.terminated:
267             status_ = std.process.wait(process.pid);
268             break;
269         case State.exitCode:
270             break;
271         }
272 
273         st = State.exitCode;
274 
275         return status_;
276     }
277 
278     /// Non-blocking wait for the process termination.
279     /// Returns: `true` if the process has terminated.
280     bool tryWait() @safe {
281         final switch (st) {
282         case State.running:
283             auto s = std.process.tryWait(process.pid);
284             if (s.terminated) {
285                 st = State.exitCode;
286                 status_ = s.status;
287             }
288             break;
289         case State.terminated:
290             status_ = std.process.wait(process.pid);
291             st = State.exitCode;
292             break;
293         case State.exitCode:
294             break;
295         }
296 
297         return st.among(State.terminated, State.exitCode) != 0;
298     }
299 
300     /// Returns: The exit status of the process.
301     int status() @safe {
302         if (st != State.exitCode) {
303             throw new Exception(
304                     "Process has not terminated and wait/tryWait been called to collect the exit status");
305         }
306         return status_;
307     }
308 
309     /// Returns: If the process has terminated.
310     bool terminated() @safe {
311         return st.among(State.terminated, State.exitCode) != 0;
312     }
313 }
314 
315 SpawnProcess spawnProcess(scope const(char[])[] args, File stdin = std.stdio.stdin,
316         File stdout = std.stdio.stdout, File stderr = std.stdio.stderr,
317         const string[string] env = null, std.process.Config config = std.process.Config.none,
318         scope const char[] workDir = null) {
319     return SpawnProcess(std.process.spawnProcess(args, stdin, stdout, stderr,
320             env, config, workDir));
321 }
322 
323 SpawnProcess spawnProcess(scope const(char[])[] args, const string[string] env,
324         std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) {
325     return SpawnProcess(std.process.spawnProcess(args, std.stdio.stdin,
326             std.stdio.stdout, std.stdio.stderr, env, config, workDir));
327 }
328 
329 SpawnProcess spawnProcess(scope const(char)[] program,
330         File stdin = std.stdio.stdin, File stdout = std.stdio.stdout,
331         File stderr = std.stdio.stderr, const string[string] env = null,
332         std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) {
333     return SpawnProcess(std.process.spawnProcess((&program)[0 .. 1], stdin,
334             stdout, stderr, env, config, workDir));
335 }
336 
337 SpawnProcess spawnShell(scope const(char)[] command, File stdin = std.stdio.stdin,
338         File stdout = std.stdio.stdout, File stderr = std.stdio.stderr,
339         scope const string[string] env = null, std.process.Config config = std.process.Config.none,
340         scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) {
341     return SpawnProcess(std.process.spawnShell(command, stdin, stdout, stderr,
342             env, config, workDir, shellPath));
343 }
344 
345 /// ditto
346 SpawnProcess spawnShell(scope const(char)[] command, scope const string[string] env,
347         std.process.Config config = std.process.Config.none,
348         scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) {
349     return SpawnProcess(std.process.spawnShell(command, env, config, workDir, shellPath));
350 }
351 
352 PipeProcess pipeProcess(scope const(char[])[] args,
353         std.process.Redirect redirect = std.process.Redirect.all,
354         const string[string] env = null, std.process.Config config = std.process.Config.none,
355         scope const(char)[] workDir = null) @safe {
356     return PipeProcess(std.process.pipeProcess(args, redirect, env, config, workDir));
357 }
358 
359 PipeProcess pipeShell(scope const(char)[] command,
360         std.process.Redirect redirect = std.process.Redirect.all,
361         const string[string] env = null, std.process.Config config = std.process.Config.none,
362         scope const(char)[] workDir = null, string shellPath = std.process.nativeShell) @safe {
363     return PipeProcess(std.process.pipeShell(command, redirect, env, config, workDir, shellPath));
364 }
365 
366 /** Moves the process to a separate process group and on exit kill it and all
367  * its children.
368  */
369 @safe struct Sandbox(ProcessT) {
370     private {
371         ProcessT p;
372         RawPid pid;
373     }
374 
375     this(ProcessT p) @safe {
376         import core.sys.posix.unistd : setpgid;
377 
378         this.p = p;
379         this.pid = p.osHandle;
380         setpgid(pid, 0);
381     }
382 
383     RawPid osHandle() nothrow @safe {
384         return pid;
385     }
386 
387     static if (__traits(hasMember, ProcessT, "pipe")) {
388         ref Pipe pipe() nothrow @safe {
389             return p.pipe;
390         }
391     }
392 
393     static if (__traits(hasMember, ProcessT, "stderr")) {
394         ref FileReadChannel stderr() nothrow @safe {
395             return p.stderr;
396         }
397     }
398 
399     void dispose() @safe {
400         // this also reaps the children thus cleaning up zombies
401         this.kill;
402         p.dispose;
403     }
404 
405     void kill() nothrow @safe {
406         // must first retrieve the submap because after the process is killed
407         // its children may have changed.
408         auto pmap = makePidMap.getSubMap(pid);
409 
410         p.kill;
411 
412         // only kill and reap the children
413         pmap.remove(pid);
414         proc.pid.kill(pmap, Yes.onlyCurrentUser).reap;
415     }
416 
417     int wait() @safe {
418         return p.wait;
419     }
420 
421     bool tryWait() @safe {
422         return p.tryWait;
423     }
424 
425     int status() @safe {
426         return p.status;
427     }
428 
429     bool terminated() @safe {
430         return p.terminated;
431     }
432 }
433 
434 auto sandbox(T)(T p) @safe {
435     return Sandbox!T(p);
436 }
437 
438 @("shall terminate a group of processes")
439 unittest {
440     import std.datetime.stopwatch : StopWatch, AutoStart;
441 
442     immutable scriptName = makeScript(`#!/bin/bash
443 sleep 10m &
444 sleep 10m &
445 sleep 10m
446 `);
447     scope (exit)
448         remove(scriptName);
449 
450     auto p = pipeProcess([scriptName]).sandbox.rcKill;
451     waitUntilChildren(p.osHandle, 3);
452     const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
453     p.kill;
454     Thread.sleep(500.dur!"msecs"); // wait for the OS to kill the children
455     const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
456 
457     assert(p.wait == -9);
458     assert(p.terminated);
459     assert(preChildren == 3);
460     assert(postChildren == 0);
461 }
462 
463 /** dispose the process after the timeout.
464  */
465 @safe struct Timeout(ProcessT) {
466     import std.algorithm : among;
467     import std.datetime : Clock, Duration;
468     import core.thread;
469 
470     private {
471         enum Msg {
472             none,
473             stop,
474             status,
475         }
476 
477         enum Reply {
478             none,
479             running,
480             normalDeath,
481             killedByTimeout,
482         }
483 
484         static struct Payload {
485             ProcessT p;
486             RawPid pid;
487             Background background;
488             Reply backgroundReply;
489         }
490 
491         RefCounted!Payload rc;
492     }
493 
494     this(ProcessT p, Duration timeout) @trusted {
495         import std.algorithm : move;
496 
497         auto pid = p.osHandle;
498         rc = refCounted(Payload(move(p), pid));
499         rc.background = new Background(&rc.p, timeout);
500         rc.background.isDaemon = true;
501         rc.background.start;
502     }
503 
504     ~this() @trusted {
505         rc.release;
506     }
507 
508     private static class Background : Thread {
509         import core.sync.condition : Condition;
510         import core.sync.mutex : Mutex;
511 
512         Duration timeout;
513         ProcessT* p;
514         Mutex mtx;
515         Msg[] msg;
516         Reply reply_;
517         RawPid pid;
518 
519         this(ProcessT* p, Duration timeout) {
520             this.p = p;
521             this.timeout = timeout;
522             this.mtx = new Mutex();
523             this.pid = p.osHandle;
524 
525             super(&run);
526         }
527 
528         void run() {
529             checkProcess(this.pid, this.timeout, this);
530         }
531 
532         void put(Msg msg) @trusted nothrow {
533             this.mtx.lock_nothrow();
534             scope (exit)
535                 this.mtx.unlock_nothrow();
536             this.msg ~= msg;
537         }
538 
539         Msg popMsg() @trusted nothrow {
540             this.mtx.lock_nothrow();
541             scope (exit)
542                 this.mtx.unlock_nothrow();
543             if (msg.empty)
544                 return Msg.none;
545             auto rval = msg[$ - 1];
546             msg = msg[0 .. $ - 1];
547             return rval;
548         }
549 
550         void setReply(Reply reply_) @trusted nothrow {
551             this.mtx.lock_nothrow();
552             scope (exit)
553                 this.mtx.unlock_nothrow();
554             this.reply_ = reply_;
555         }
556 
557         Reply reply() @trusted nothrow {
558             this.mtx.lock_nothrow();
559             scope (exit)
560                 this.mtx.unlock_nothrow();
561             return reply_;
562         }
563 
564         void kill() @trusted nothrow {
565             this.mtx.lock_nothrow();
566             scope (exit)
567                 this.mtx.unlock_nothrow();
568             p.kill;
569         }
570     }
571 
572     private static void checkProcess(RawPid p, Duration timeout, Background bg) nothrow {
573         import core.sys.posix.signal : SIGKILL;
574         import std.algorithm : max, min;
575         import std.variant : Variant;
576         static import core.sys.posix.signal;
577 
578         const stopAt = Clock.currTime + timeout;
579         // the purpose is to poll the process often "enough" that if it
580         // terminates early `Process` detects it fast enough. 1000 is chosen
581         // because it "feels good". the purpose
582         auto sleepInterval = min(500, max(20, timeout.total!"msecs" / 1000)).dur!"msecs";
583 
584         bool forceStop;
585         bool running = true;
586         while (running && Clock.currTime < stopAt) {
587             const msg = bg.popMsg;
588 
589             final switch (msg) {
590             case Msg.none:
591                 () @trusted { Thread.sleep(sleepInterval); }();
592                 break;
593             case Msg.stop:
594                 forceStop = true;
595                 running = false;
596                 break;
597             case Msg.status:
598                 bg.setReply(Reply.running);
599                 break;
600             }
601 
602             () @trusted {
603                 if (core.sys.posix.signal.kill(p, 0) == -1) {
604                     running = false;
605                 }
606             }();
607         }
608 
609         // may be children alive thus must ensure that the whole process tree
610         // is killed if this is a sandbox with a timeout.
611         bg.kill;
612 
613         if (!forceStop && Clock.currTime >= stopAt) {
614             bg.setReply(Reply.killedByTimeout);
615         } else {
616             bg.setReply(Reply.normalDeath);
617         }
618     }
619 
620     RawPid osHandle() nothrow @trusted {
621         return rc.pid;
622     }
623 
624     ref Pipe pipe() nothrow @trusted {
625         return rc.p.pipe;
626     }
627 
628     ref FileReadChannel stderr() nothrow @trusted {
629         return rc.p.stderr;
630     }
631 
632     void dispose() @trusted {
633         if (rc.backgroundReply.among(Reply.none, Reply.running)) {
634             rc.background.put(Msg.stop);
635             rc.background.join;
636             rc.backgroundReply = rc.background.reply;
637         }
638         rc.p.dispose;
639     }
640 
641     void kill() nothrow @trusted {
642         rc.background.kill;
643     }
644 
645     int wait() @trusted {
646         while (!this.tryWait) {
647             Thread.sleep(20.dur!"msecs");
648         }
649         return rc.p.wait;
650     }
651 
652     bool tryWait() @trusted {
653         return rc.p.tryWait;
654     }
655 
656     int status() @trusted {
657         return rc.p.status;
658     }
659 
660     bool terminated() @trusted {
661         return rc.p.terminated;
662     }
663 
664     bool timeoutTriggered() @trusted {
665         if (rc.backgroundReply.among(Reply.none, Reply.running)) {
666             rc.background.put(Msg.status);
667             rc.backgroundReply = rc.background.reply;
668         }
669         return rc.backgroundReply == Reply.killedByTimeout;
670     }
671 }
672 
673 auto timeout(T)(T p, Duration timeout_) @trusted {
674     return Timeout!T(p, timeout_);
675 }
676 
677 /// Returns when the process has pending data.
678 void waitForPendingData(ProcessT)(Process p) {
679     while (!p.pipe.hasPendingData || !p.stderr.hasPendingData) {
680         Thread.sleep(20.dur!"msecs");
681     }
682 }
683 
684 @("shall kill the process after the timeout")
685 unittest {
686     import std.datetime.stopwatch : StopWatch, AutoStart;
687 
688     auto p = pipeProcess(["sleep", "1m"]).timeout(100.dur!"msecs").rcKill;
689     auto sw = StopWatch(AutoStart.yes);
690     p.wait;
691     sw.stop;
692 
693     assert(sw.peek >= 100.dur!"msecs");
694     assert(sw.peek <= 500.dur!"msecs");
695     assert(p.wait == -9);
696     assert(p.terminated);
697     assert(p.status == -9);
698     assert(p.timeoutTriggered);
699 }
700 
701 struct DrainElement {
702     enum Type {
703         stdout,
704         stderr,
705     }
706 
707     Type type;
708     const(ubyte)[] data;
709 
710     /// Returns: iterates the data as an input range.
711     auto byUTF8() @safe pure nothrow const @nogc {
712         static import std.utf;
713 
714         return std.utf.byUTF!(const(char))(cast(const(char)[]) data);
715     }
716 
717     bool empty() @safe pure nothrow const @nogc {
718         return data.length == 0;
719     }
720 }
721 
722 /** A range that drains a process stdout/stderr until it terminates.
723  *
724  * There may be `DrainElement` that are empty.
725  */
726 struct DrainRange(ProcessT) {
727     private {
728         enum State {
729             start,
730             draining,
731             lastStdout,
732             lastStderr,
733             lastElement,
734             empty,
735         }
736 
737         ProcessT p;
738         DrainElement front_;
739         State st;
740         ubyte[] buf;
741     }
742 
743     this(ProcessT p) {
744         this.p = p;
745         this.buf = new ubyte[4096];
746     }
747 
748     DrainElement front() @safe pure nothrow const @nogc {
749         assert(!empty, "Can't get front of an empty range");
750         return front_;
751     }
752 
753     void popFront() @safe {
754         assert(!empty, "Can't pop front of an empty range");
755 
756         static bool isAnyPipeOpen(ref ProcessT p) {
757             return p.pipe.isOpen || p.stderr.isOpen;
758         }
759 
760         DrainElement readData(ref ProcessT p) @safe {
761             if (p.stderr.hasPendingData) {
762                 return DrainElement(DrainElement.Type.stderr, p.stderr.read(buf));
763             } else if (p.pipe.hasPendingData) {
764                 return DrainElement(DrainElement.Type.stdout, p.pipe.read(buf));
765             }
766             return DrainElement.init;
767         }
768 
769         DrainElement waitUntilData() @safe {
770             import std.datetime : Clock;
771 
772             // may livelock if the process never terminates and never writes to
773             // the terminal. timeout ensure that it sooner or later is break
774             // the loop. This is important if the drain is part of a timeout
775             // wrapping.
776 
777             const timeout = 100.dur!"msecs";
778             const stopAt = Clock.currTime + timeout;
779             const sleepFor = timeout / 20;
780             const useSleep = Clock.currTime + sleepFor;
781             bool running = true;
782             while (running) {
783                 const now = Clock.currTime;
784 
785                 running = (now < stopAt) && isAnyPipeOpen(p);
786 
787                 auto bufRead = readData(p);
788 
789                 if (!bufRead.empty) {
790                     return DrainElement(bufRead.type, bufRead.data.dup);
791                 } else if (running && now > useSleep && bufRead.empty) {
792                     import core.thread : Thread;
793 
794                     () @trusted { Thread.sleep(sleepFor); }();
795                 }
796             }
797 
798             return DrainElement.init;
799         }
800 
801         front_ = DrainElement.init;
802 
803         final switch (st) {
804         case State.start:
805             st = State.draining;
806             front_ = waitUntilData;
807             break;
808         case State.draining:
809             if (p.terminated) {
810                 st = State.lastStdout;
811             } else if (isAnyPipeOpen(p)) {
812                 front_ = waitUntilData();
813             } else {
814                 st = State.lastStdout;
815             }
816             break;
817         case State.lastStdout:
818             if (p.pipe.hasPendingData) {
819                 front_ = DrainElement(DrainElement.Type.stdout, p.pipe.read(buf).dup);
820             } else {
821                 st = State.lastStderr;
822             }
823             break;
824         case State.lastStderr:
825             if (p.stderr.hasPendingData) {
826                 front_ = DrainElement(DrainElement.Type.stderr, p.stderr.read(buf).dup);
827             } else {
828                 st = State.lastElement;
829             }
830             break;
831         case State.lastElement:
832             st = State.empty;
833             break;
834         case State.empty:
835             break;
836         }
837     }
838 
839     bool empty() @safe pure nothrow const @nogc {
840         return st == State.empty;
841     }
842 }
843 
844 /// Drain a process pipe until empty.
845 auto drain(T)(T p) {
846     return DrainRange!T(p);
847 }
848 
849 /// Read the data from a ReadChannel by line.
850 struct DrainByLineCopyRange(ProcessT) {
851     private {
852         enum State {
853             start,
854             draining,
855             lastLine,
856             lastBuf,
857             empty,
858         }
859 
860         ProcessT process;
861         DrainRange!ProcessT range;
862         State st;
863         const(ubyte)[] buf;
864         const(char)[] line;
865     }
866 
867     this(ProcessT p) {
868         process = p;
869         range = p.drain;
870     }
871 
872     string front() @trusted pure nothrow const @nogc {
873         import std.exception : assumeUnique;
874 
875         assert(!empty, "Can't get front of an empty range");
876         return line.assumeUnique;
877     }
878 
879     void popFront() @safe {
880         assert(!empty, "Can't pop front of an empty range");
881         import std.algorithm : countUntil;
882         import std.array : array;
883         static import std.utf;
884 
885         const(ubyte)[] updateBuf(size_t idx) {
886             const(ubyte)[] tmp;
887             if (buf.empty) {
888                 // do nothing
889             } else if (idx == -1) {
890                 tmp = buf;
891                 buf = null;
892             } else {
893                 idx = () {
894                     if (idx < buf.length) {
895                         return idx + 1;
896                     }
897                     return idx;
898                 }();
899                 tmp = buf[0 .. idx];
900                 buf = buf[idx .. $];
901             }
902 
903             if (!tmp.empty && tmp[$ - 1] == '\n') {
904                 tmp = tmp[0 .. $ - 1];
905             }
906             return tmp;
907         }
908 
909         void drainLine() {
910             void fillBuf() {
911                 if (!range.empty) {
912                     range.popFront;
913                 }
914                 if (!range.empty) {
915                     buf ~= range.front.data;
916                 }
917             }
918 
919             size_t idx;
920             () {
921                 int cnt;
922                 do {
923                     fillBuf();
924                     idx = buf.countUntil('\n');
925                     // 2 is a magic number which mean that it at most wait 2x timeout for data
926                 }
927                 while (!range.empty && idx == -1 && cnt++ < 2);
928             }();
929 
930             if (idx != -1) {
931                 auto tmp = updateBuf(idx);
932                 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array;
933             }
934         }
935 
936         bool lastLine() {
937             size_t idx = buf.countUntil('\n');
938             if (idx == -1)
939                 return true;
940 
941             auto tmp = updateBuf(idx);
942             line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array;
943             return false;
944         }
945 
946         line = null;
947         final switch (st) {
948         case State.start:
949             drainLine;
950             st = State.draining;
951             break;
952         case State.draining:
953             drainLine;
954             if (range.empty)
955                 st = State.lastLine;
956             break;
957         case State.lastLine:
958             if (lastLine)
959                 st = State.lastBuf;
960             break;
961         case State.lastBuf:
962             line = std.utf.byUTF!(const(char))(cast(const(char)[]) buf).array;
963             st = State.empty;
964             break;
965         case State.empty:
966             break;
967         }
968     }
969 
970     bool empty() @safe pure nothrow const @nogc {
971         return st == State.empty;
972     }
973 }
974 
975 @("shall drain the process output by line")
976 unittest {
977     import std.algorithm : filter, joiner, map;
978     import std.array : array;
979 
980     auto p = pipeProcess(["dd", "if=/dev/zero", "bs=10", "count=3"]).rcKill;
981     auto res = p.process.drainByLineCopy.filter!"!a.empty".array;
982 
983     assert(res.length == 3);
984     assert(res.joiner.count >= 30);
985     assert(p.wait == 0);
986     assert(p.terminated);
987 }
988 
989 auto drainByLineCopy(T)(T p) {
990     return DrainByLineCopyRange!T(p);
991 }
992 
993 /// Drain the process output until it is done executing.
994 auto drainToNull(T)(T p) {
995     foreach (l; p.drain()) {
996     }
997     return p;
998 }
999 
1000 /// Drain the output from the process into an output range.
1001 auto drain(ProcessT, T)(ProcessT p, ref T range) {
1002     foreach (l; p.drain()) {
1003         range.put(l);
1004     }
1005     return p;
1006 }
1007 
1008 @("shall drain the output of a process while it is running with a separation of stdout and stderr")
1009 unittest {
1010     auto p = pipeProcess(["dd", "if=/dev/urandom", "bs=10", "count=3"]).rcKill;
1011     auto res = p.drain.array;
1012 
1013     // this is just a sanity check. It has to be kind a high because there is
1014     // some wiggleroom allowed
1015     assert(res.count <= 50);
1016 
1017     assert(res.filter!(a => a.type == DrainElement.Type.stdout)
1018             .map!(a => a.data)
1019             .joiner
1020             .count == 30);
1021     assert(res.filter!(a => a.type == DrainElement.Type.stderr).count == 0);
1022     assert(p.wait == 0);
1023     assert(p.terminated);
1024 }
1025 
1026 @("shall kill the process tree when the timeout is reached")
1027 unittest {
1028     immutable script = makeScript(`#!/bin/bash
1029 sleep 10m
1030 `);
1031     scope (exit)
1032         remove(script);
1033 
1034     auto p = pipeProcess([script]).sandbox.timeout(1.dur!"seconds").rcKill;
1035     waitUntilChildren(p.osHandle, 1);
1036     const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
1037     const res = p.process.drain.array;
1038     const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
1039 
1040     assert(p.wait == -9);
1041     assert(p.terminated);
1042     assert(preChildren == 1);
1043     assert(postChildren == 0);
1044 }
1045 
1046 string makeScript(string script, string file = __FILE__, uint line = __LINE__) {
1047     import core.sys.posix.sys.stat;
1048     import std.file : getAttributes, setAttributes, thisExePath;
1049     import std.stdio : File;
1050     import std.path : baseName;
1051     import std.conv : to;
1052 
1053     immutable fname = thisExePath ~ "_" ~ file.baseName ~ line.to!string ~ ".sh";
1054 
1055     File(fname, "w").writeln(script);
1056     setAttributes(fname, getAttributes(fname) | S_IXUSR | S_IXGRP | S_IXOTH);
1057     return fname;
1058 }
1059 
1060 /// Wait for p to have num children or fail after 10s.
1061 void waitUntilChildren(RawPid p, int num) {
1062     import std.datetime : Clock;
1063 
1064     const failAt = Clock.currTime + 10.dur!"seconds";
1065     do {
1066         Thread.sleep(50.dur!"msecs");
1067         if (Clock.currTime > failAt)
1068             break;
1069     }
1070     while (makePidMap.getSubMap(p).remove(p).length < num);
1071 }