1 /**
2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 
6 This is based on webfreak's
7 [fswatch](git@github.com:WebFreak001/FSWatch.git). I had problems with the
8 API as it where because I needed to be able to watch multiple directories,
9 filter what files are to be watched and to be robust against broken symlinks.
10 
11 Lets say you want to watch a directory for changes and add all directories to
12 be watched too.
13 
14 ---
15 auto fw = fileWatch();
16 fw.watchRecurse("my_dir");
17 while (true) {
18     auto ev = fw.wait;
19     foreach (e; ev) {
20         e.match!(
21         (Event.Access x) => writeln(x),
22         (Event.Attribute x) => writeln(x),
23         (Event.CloseWrite x) => writeln(x),
24         (Event.CloseNoWrite x) => writeln(x),
25         (Event.Create x) { fw.watchRecurse(x.path); },
26         (Event.Delete x) => writeln(x),
27         (Event.DeleteSelf x) => writeln(x),
28         (Event.Modify x) => writeln(x),
29         (Event.MoveSelf x) => writeln(x),
30         (Event.Rename x) => writeln(x),
31         (Event.Open x) => writeln(x),
32         );
33     }
34 }
35 ---
36 */
37 module my.fswatch;
38 
39 import core.sys.linux.errno : errno;
40 import core.sys.linux.fcntl : fcntl, F_SETFD, FD_CLOEXEC;
41 import core.sys.linux.sys.inotify : inotify_rm_watch, inotify_init1, inotify_add_watch, inotify_event, IN_CLOEXEC,
42     IN_NONBLOCK, IN_ACCESS, IN_MODIFY, IN_ATTRIB, IN_CLOSE_WRITE,
43     IN_CLOSE_NOWRITE, IN_OPEN, IN_MOVED_FROM, IN_MOVED_TO,
44     IN_CREATE, IN_DELETE, IN_DELETE_SELF, IN_MOVE_SELF, IN_UNMOUNT, IN_IGNORED, IN_EXCL_UNLINK;
45 import core.sys.linux.unistd : close, read;
46 import core.sys.posix.poll : pollfd, poll, POLLIN, POLLNVAL;
47 import core.thread : Thread;
48 import core.time : dur, Duration;
49 import logger = std.experimental.logger;
50 import std.array : appender, empty, array;
51 import std.conv : to;
52 import std.file : DirEntry, isDir, dirEntries, rmdirRecurse, write, append,
53     rename, remove, exists, SpanMode, mkdir, rmdir;
54 import std.path : buildPath;
55 import std.range : isInputRange;
56 import std.string : toStringz, fromStringz;
57 import std.exception : collectException;
58 
59 import sumtype;
60 
61 import my.path : AbsolutePath, Path;
62 import my.set;
63 
64 struct Event {
65     /// An overflow occured. Unknown what events actually triggered.
66     static struct Overflow {
67     }
68 
69     /// File was accessed (e.g., read(2), execve(2)).
70     static struct Access {
71         AbsolutePath path;
72     }
73 
74     /** Metadata changed—for example, permissions (e.g., chmod(2)), timestamps
75      * (e.g., utimensat(2)), extended attributes (setxattr(2)), link count
76      * (since Linux 2.6.25; e.g., for the target of link(2) and for unlink(2)),
77      * and user/group ID (e.g., chown(2)).
78      */
79     static struct Attribute {
80         AbsolutePath path;
81     }
82 
83     /// File opened for writing was closed.
84     static struct CloseWrite {
85         AbsolutePath path;
86     }
87 
88     /// File or directory not opened for writing was closed.
89     static struct CloseNoWrite {
90         AbsolutePath path;
91     }
92 
93     /** File/directory created in watched directory (e.g., open(2) O_CREAT,
94      * mkdir(2), link(2), symlink(2), bind(2) on a UNIX domain socket).
95      */
96     static struct Create {
97         AbsolutePath path;
98     }
99 
100     /// File/directory deleted from watched directory.
101     static struct Delete {
102         AbsolutePath path;
103     }
104 
105     /** Watched file/directory was itself deleted. (This event also occurs if
106      * an object is moved to another filesystem, since mv(1) in effect copies
107      * the file to the other filesystem and then deletes it from the original
108      * filesys‐ tem.)  In addition, an IN_IGNORED event will subsequently be
109      * generated for the watch descriptor.
110      */
111     static struct DeleteSelf {
112         AbsolutePath path;
113     }
114 
115     /// File was modified (e.g., write(2), truncate(2)).
116     static struct Modify {
117         AbsolutePath path;
118     }
119 
120     /// Watched file/directory was itself moved.
121     static struct MoveSelf {
122         AbsolutePath path;
123     }
124 
125     /// Occurs when a file or folder inside a folder is renamed.
126     static struct Rename {
127         AbsolutePath from;
128         AbsolutePath to;
129     }
130 
131     /// File or directory was opened.
132     static struct Open {
133         AbsolutePath path;
134     }
135 }
136 
137 alias FileChangeEvent = SumType!(Event.Access, Event.Attribute, Event.CloseWrite,
138         Event.CloseNoWrite, Event.Create, Event.Delete, Event.DeleteSelf,
139         Event.Modify, Event.MoveSelf, Event.Rename, Event.Open, Event.Overflow);
140 
141 /// Construct a FileWatch.
142 auto fileWatch() {
143     int fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
144     if (fd == -1) {
145         throw new Exception(
146                 "inotify_init1 returned invalid file descriptor. Error code " ~ errno.to!string);
147     }
148     return FileWatch(fd);
149 }
150 
151 /// Listens for create/modify/removal of files and directories.
152 enum ContentEvents = IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MODIFY
153     | IN_MOVE_SELF | IN_MOVED_FROM | IN_MOVED_TO | IN_EXCL_UNLINK | IN_CLOSE_WRITE;
154 
155 /// Listen for events that change the metadata.
156 enum MetadataEvents = IN_ACCESS | IN_ATTRIB | IN_OPEN | IN_CLOSE_NOWRITE | IN_EXCL_UNLINK;
157 
158 /** An instance of a FileWatcher
159  */
160 struct FileWatch {
161     import std.functional : toDelegate;
162 
163     private {
164         FdPoller poller;
165         int fd;
166         ubyte[1024 * 4] eventBuffer; // 4kb buffer for events
167         struct FDInfo {
168             int wd;
169             bool watched;
170             Path path;
171 
172             this(this) {
173             }
174         }
175 
176         FDInfo[int] directoryMap; // map every watch descriptor to a directory
177     }
178 
179     private this(int fd) {
180         this.fd = fd;
181         poller.put(FdPoll(fd), [PollEvent.in_]);
182     }
183 
184     ~this() {
185         if (fd) {
186             foreach (fdinfo; directoryMap.byValue) {
187                 if (fdinfo.watched)
188                     inotify_rm_watch(fd, fdinfo.wd);
189             }
190             close(fd);
191         }
192     }
193 
194     /** Add a path to watch for events.
195      *
196      * Params:
197      *  path = path to watch
198      *  events = events to watch for. See man inotify and core.sys.linux.sys.inotify.
199      *
200      * Returns: true if the path was successfully added.
201      */
202     bool watch(Path path, uint events = ContentEvents) {
203         const wd = inotify_add_watch(fd, path.toStringz, events);
204         if (wd != -1) {
205             const fc = fcntl(fd, F_SETFD, FD_CLOEXEC);
206             if (fc != -1) {
207                 directoryMap[wd] = FDInfo(wd, true, path);
208                 return true;
209             }
210         }
211 
212         return false;
213     }
214 
215     ///
216     bool watch(string p, uint events = ContentEvents) {
217         return watch(Path(p));
218     }
219 
220     private static bool allFiles(string p) {
221         return true;
222     }
223 
224     /** Recursively add the path and all its subdirectories and files to be watched.
225      *
226      * Params:
227      *  pred = only those files and directories that `pred` returns true for are watched, by default every file/directory.
228      *  root = directory to watch together with its content and subdirectories.
229      *  events = events to watch for. See man inotify and core.sys.linux.sys.inotify.
230      *
231      * Returns: paths that failed to be added.
232      */
233     AbsolutePath[] watchRecurse(Path root, uint events = ContentEvents,
234             bool delegate(string) pred = toDelegate(&allFiles)) {
235         import std.algorithm : filter;
236         import my.file : existsAnd;
237 
238         auto failed = appender!(AbsolutePath[])();
239 
240         if (!watch(root, events)) {
241             failed.put(AbsolutePath(root));
242         }
243 
244         if (!existsAnd!isDir(root)) {
245             return failed.data;
246         }
247 
248         auto dirs = [AbsolutePath(root)];
249         Set!AbsolutePath visited;
250         while (!dirs.empty) {
251             auto front = dirs[0];
252             dirs = dirs[1 .. $];
253             if (front in visited)
254                 continue;
255             visited.add(front);
256 
257             try {
258                 foreach (p; dirEntries(front, SpanMode.shallow).filter!(a => pred(a.name))) {
259                     if (!watch(Path(p.name), events)) {
260                         failed.put(AbsolutePath(p.name));
261                     }
262                     if (existsAnd!isDir(Path(p.name))) {
263                         dirs ~= AbsolutePath(p.name);
264                     }
265                 }
266             } catch (Exception e) {
267                 () @trusted { logger.trace(e); }();
268                 logger.trace(e.msg);
269                 failed.put(AbsolutePath(front));
270             }
271         }
272 
273         return failed.data;
274     }
275 
276     ///
277     AbsolutePath[] watchRecurse(string root, uint events = ContentEvents,
278             bool delegate(string) pred = toDelegate(&allFiles)) {
279         return watchRecurse(Path(root), events, pred);
280     }
281 
282     /** The events that have occured since last query.
283      *
284      * Params:
285      *  timeout = max time to wait for events.
286      *
287      * Returns: the events that has occured to the watched paths.
288      */
289     FileChangeEvent[] getEvents(Duration timeout = Duration.zero) {
290         import std.algorithm : min;
291 
292         FileChangeEvent[] events;
293         if (!fd)
294             return events;
295 
296         auto res = poller.wait(timeout);
297 
298         if (res.empty) {
299             return events;
300         }
301 
302         if (res[0].status[PollStatus.nval]) {
303             throw new Exception("Failed to poll events. File descriptor not open " ~ fd.to!string);
304         }
305 
306         if (!res[0].status[PollStatus.in_]) {
307             // no events to read
308             return events;
309         }
310 
311         const receivedBytes = read(fd, eventBuffer.ptr, eventBuffer.length);
312         int i = 0;
313         AbsolutePath[uint] cookie;
314         while (true) {
315             auto info = cast(inotify_event*)(eventBuffer.ptr + i);
316 
317             if (info.wd == -1) {
318                 events ~= FileChangeEvent(Event.Overflow.init);
319             }
320             if (info.wd !in directoryMap)
321                 continue;
322 
323             auto fname = () {
324                 string fileName = info.name.ptr.fromStringz.idup;
325                 return AbsolutePath(buildPath(directoryMap[info.wd].path, fileName));
326             }();
327 
328             if ((info.mask & IN_MOVED_TO) == 0) {
329                 if (auto v = info.cookie in cookie) {
330                     events ~= FileChangeEvent(Event.Delete(*v));
331                     cookie.remove(info.cookie);
332                 }
333             }
334 
335             if ((info.mask & IN_ACCESS) != 0) {
336                 events ~= FileChangeEvent(Event.Access(fname));
337             }
338 
339             if ((info.mask & IN_ATTRIB) != 0) {
340                 events ~= FileChangeEvent(Event.Attribute(fname));
341             }
342 
343             if ((info.mask & IN_CLOSE_WRITE) != 0) {
344                 events ~= FileChangeEvent(Event.CloseWrite(fname));
345             }
346 
347             if ((info.mask & IN_CLOSE_NOWRITE) != 0) {
348                 events ~= FileChangeEvent(Event.CloseNoWrite(fname));
349             }
350 
351             if ((info.mask & IN_CREATE) != 0) {
352                 events ~= FileChangeEvent(Event.Create(fname));
353             }
354 
355             if ((info.mask & IN_DELETE) != 0) {
356                 events ~= FileChangeEvent(Event.Delete(fname));
357             }
358 
359             if ((info.mask & IN_DELETE_SELF) != 0) {
360                 // must go via the mapping or there may be trailing junk in fname.
361                 events ~= FileChangeEvent(Event.DeleteSelf(directoryMap[info.wd].path.AbsolutePath));
362             }
363 
364             if ((info.mask & IN_MODIFY) != 0) {
365                 events ~= FileChangeEvent(Event.Modify(fname));
366             }
367 
368             if ((info.mask & IN_MOVE_SELF) != 0) {
369                 // must go via the mapping or there may be trailing junk in fname.
370                 events ~= FileChangeEvent(Event.MoveSelf(directoryMap[info.wd].path.AbsolutePath));
371             }
372 
373             if ((info.mask & IN_MOVED_FROM) != 0) {
374                 cookie[info.cookie] = fname;
375             }
376 
377             if ((info.mask & IN_MOVED_TO) != 0) {
378                 if (auto v = info.cookie in cookie) {
379                     events ~= FileChangeEvent(Event.Rename(*v, fname));
380                     cookie.remove(info.cookie);
381                 } else {
382                     events ~= FileChangeEvent(Event.Create(fname));
383                 }
384             }
385 
386             if ((info.mask & IN_DELETE_SELF) != 0 || (info.mask & IN_MOVE_SELF) != 0) {
387                 inotify_rm_watch(fd, info.wd);
388                 directoryMap[info.wd].watched = false;
389             }
390 
391             i += inotify_event.sizeof + info.len;
392 
393             if (i >= receivedBytes)
394                 break;
395         }
396 
397         foreach (c; cookie.byValue) {
398             events ~= FileChangeEvent(Event.Delete(AbsolutePath(c)));
399         }
400 
401         return events;
402     }
403 }
404 
405 ///
406 unittest {
407     import core.thread;
408 
409     if (exists("test"))
410         rmdirRecurse("test");
411     scope (exit) {
412         if (exists("test"))
413             rmdirRecurse("test");
414     }
415 
416     auto watcher = fileWatch();
417 
418     mkdir("test");
419     assert(watcher.watch("test"));
420 
421     write("test/a.txt", "abc");
422     auto ev = watcher.getEvents(5.dur!"seconds");
423     assert(ev.length > 0);
424     assert(ev[0].tryMatch!((Event.Create x) {
425             assert(x.path == AbsolutePath("test/a.txt"));
426             return true;
427         }));
428 
429     append("test/a.txt", "def");
430     ev = watcher.getEvents(5.dur!"seconds");
431     assert(ev.length > 0);
432     assert(ev[0].tryMatch!((Event.Modify x) {
433             assert(x.path == AbsolutePath("test/a.txt"));
434             return true;
435         }));
436 
437     rename("test/a.txt", "test/b.txt");
438     ev = watcher.getEvents(5.dur!"seconds");
439     assert(ev.length > 0);
440     assert(ev[0].tryMatch!((Event.Rename x) {
441             assert(x.from == AbsolutePath("test/a.txt"));
442             assert(x.to == AbsolutePath("test/b.txt"));
443             return true;
444         }));
445 
446     remove("test/b.txt");
447     ev = watcher.getEvents(5.dur!"seconds");
448     assert(ev.length > 0);
449     assert(ev[0].tryMatch!((Event.Delete x) {
450             assert(x.path == AbsolutePath("test/b.txt"));
451             return true;
452         }));
453 
454     rmdirRecurse("test");
455     ev = watcher.getEvents(5.dur!"seconds");
456     assert(ev.length > 0);
457     assert(ev[0].tryMatch!((Event.DeleteSelf x) {
458             assert(x.path == AbsolutePath("test"));
459             return true;
460         }));
461 }
462 
463 ///
464 unittest {
465     import std.algorithm : canFind;
466 
467     if (exists("test2"))
468         rmdirRecurse("test2");
469     if (exists("test3"))
470         rmdirRecurse("test3");
471     scope (exit) {
472         if (exists("test2"))
473             rmdirRecurse("test2");
474         if (exists("test3"))
475             rmdirRecurse("test3");
476     }
477 
478     auto watcher = fileWatch();
479     mkdir("test2");
480     assert(watcher.watchRecurse("test2").length == 0);
481 
482     write("test2/a.txt", "abc");
483     auto ev = watcher.getEvents(5.dur!"seconds");
484     assert(ev.length == 3);
485     assert(ev[0].tryMatch!((Event.Create x) {
486             assert(x.path == AbsolutePath("test2/a.txt"));
487             return true;
488         }));
489     assert(ev[1].tryMatch!((Event.Modify x) {
490             assert(x.path == AbsolutePath("test2/a.txt"));
491             return true;
492         }));
493     assert(ev[2].tryMatch!((Event.CloseWrite x) {
494             assert(x.path == AbsolutePath("test2/a.txt"));
495             return true;
496         }));
497 
498     rename("test2/a.txt", "./testfile-a.txt");
499     ev = watcher.getEvents(5.dur!"seconds");
500     assert(ev.length == 1);
501     assert(ev[0].tryMatch!((Event.Delete x) {
502             assert(x.path == AbsolutePath("test2/a.txt"));
503             return true;
504         }));
505 
506     rename("./testfile-a.txt", "test2/b.txt");
507     ev = watcher.getEvents(5.dur!"seconds");
508     assert(ev.length == 1);
509     assert(ev[0].tryMatch!((Event.Create x) {
510             assert(x.path == AbsolutePath("test2/b.txt"));
511             return true;
512         }));
513 
514     remove("test2/b.txt");
515     ev = watcher.getEvents(5.dur!"seconds");
516     assert(ev.length == 1);
517     assert(ev[0].tryMatch!((Event.Delete x) {
518             assert(x.path == AbsolutePath("test2/b.txt"));
519             return true;
520         }));
521 
522     mkdir("test2/mydir");
523     rmdir("test2/mydir");
524     ev = watcher.getEvents(5.dur!"seconds");
525     assert(ev.length == 2);
526     assert(ev[0].tryMatch!((Event.Create x) {
527             assert(x.path == AbsolutePath("test2/mydir"));
528             return true;
529         }));
530     assert(ev[1].tryMatch!((Event.Delete x) {
531             assert(x.path == AbsolutePath("test2/mydir"));
532             return true;
533         }));
534 
535     // test for creation, modification, removal of subdirectory
536     mkdir("test2/subdir");
537     ev = watcher.getEvents(5.dur!"seconds");
538     assert(ev.length == 1);
539     assert(ev[0].tryMatch!((Event.Create x) {
540             assert(x.path == AbsolutePath("test2/subdir"));
541             // add the created directory to be watched
542             watcher.watchRecurse(x.path);
543             return true;
544         }));
545 
546     write("test2/subdir/c.txt", "abc");
547     ev = watcher.getEvents(5.dur!"seconds");
548     assert(ev.length == 3);
549     assert(ev[0].tryMatch!((Event.Create x) {
550             assert(x.path == AbsolutePath("test2/subdir/c.txt"));
551             return true;
552         }));
553 
554     write("test2/subdir/c.txt", "\nabc");
555     ev = watcher.getEvents(5.dur!"seconds");
556     assert(ev.length == 2);
557     assert(ev[0].tryMatch!((Event.Modify x) {
558             assert(x.path == AbsolutePath("test2/subdir/c.txt"));
559             return true;
560         }));
561 
562     rmdirRecurse("test2/subdir");
563     ev = watcher.getEvents(5.dur!"seconds");
564     assert(ev.length == 3);
565     foreach (e; ev) {
566         assert(ev[0].tryMatch!((Event.Delete x) {
567                 assert(canFind([
568                     AbsolutePath("test2/subdir/c.txt"),
569                     AbsolutePath("test2/subdir")
570                 ], x.path));
571                 return true;
572             }, (Event.DeleteSelf x) {
573                 assert(x.path == AbsolutePath("test2/subdir"));
574                 return true;
575             }));
576     }
577 
578     // removal of watched folder
579     rmdirRecurse("test2");
580     ev = watcher.getEvents(5.dur!"seconds");
581     assert(ev.length == 1);
582     assert(ev[0].tryMatch!((Event.DeleteSelf x) {
583             assert(x.path == AbsolutePath("test2"));
584             return true;
585         }));
586 }
587 
588 struct MonitorResult {
589     enum Kind {
590         Access,
591         Attribute,
592         CloseWrite,
593         CloseNoWrite,
594         Create,
595         Delete,
596         DeleteSelf,
597         Modify,
598         MoveSelf,
599         Rename,
600         Open,
601         Overflow,
602     }
603 
604     Kind kind;
605     AbsolutePath path;
606 }
607 
608 /** Monitor root's for filesystem changes which create/remove/modify
609  * files/directories.
610  */
611 struct Monitor {
612     import std.array : appender;
613     import std.file : isDir;
614     import std.utf : UTFException;
615     import my.filter : GlobFilter;
616     import my.fswatch;
617 
618     private {
619         Set!AbsolutePath roots;
620         FileWatch fw;
621         GlobFilter fileFilter;
622         uint events;
623 
624         // roots that has been removed that may be re-added later on. the user
625         // expects them to trigger events.
626         Set!AbsolutePath monitorRoots;
627     }
628 
629     /**
630      * Params:
631      *  roots = directories to recursively monitor
632      */
633     this(AbsolutePath[] roots, GlobFilter fileFilter, uint events = ContentEvents) {
634         this.roots = toSet(roots);
635         this.fileFilter = fileFilter;
636         this.events = events;
637 
638         auto app = appender!(AbsolutePath[])();
639         fw = fileWatch();
640         foreach (r; roots) {
641             app.put(fw.watchRecurse(r, events, (a) {
642                     return isInteresting(fileFilter, a);
643                 }));
644         }
645 
646         logger.trace(!app.data.empty, "unable to watch ", app.data);
647     }
648 
649     static bool isInteresting(GlobFilter fileFilter, string p) nothrow {
650         import my.file;
651 
652         try {
653             const ap = AbsolutePath(p);
654 
655             if (existsAnd!isDir(ap)) {
656                 return true;
657             }
658             return fileFilter.match(ap);
659         } catch (Exception e) {
660             collectException(logger.trace(e.msg));
661         }
662 
663         return false;
664     }
665 
666     /** Wait up to `timeout` for an event to occur for the monitored `roots`.
667      *
668      * Params:
669      *  timeout = how long to wait for the event
670      */
671     MonitorResult[] wait(Duration timeout) {
672         import std.array : array;
673         import std.algorithm : canFind, startsWith, filter;
674 
675         auto rval = appender!(MonitorResult[])();
676 
677         {
678             auto rm = appender!(AbsolutePath[])();
679             foreach (a; monitorRoots.toRange.filter!(a => exists(a))) {
680                 fw.watchRecurse(a, events, a => isInteresting(fileFilter, a));
681                 rm.put(a);
682                 rval.put(MonitorResult(MonitorResult.Kind.Create, a));
683             }
684             foreach (a; rm.data) {
685                 monitorRoots.remove(a);
686             }
687         }
688 
689         if (!rval.data.empty) {
690             // collect whatever events that happend to have queued up together
691             // with the artifically created.
692             timeout = Duration.zero;
693         }
694 
695         try {
696             foreach (e; fw.getEvents(timeout)) {
697                 e.match!((Event.Overflow x) {
698                     rval.put(MonitorResult(MonitorResult.Kind.Overflow));
699                 }, (Event.Access x) {
700                     rval.put(MonitorResult(MonitorResult.Kind.Access, x.path));
701                 }, (Event.Attribute x) {
702                     rval.put(MonitorResult(MonitorResult.Kind.Attribute, x.path));
703                 }, (Event.CloseWrite x) {
704                     rval.put(MonitorResult(MonitorResult.Kind.CloseWrite, x.path));
705                 }, (Event.CloseNoWrite x) {
706                     rval.put(MonitorResult(MonitorResult.Kind.CloseNoWrite, x.path));
707                 }, (Event.Create x) {
708                     rval.put(MonitorResult(MonitorResult.Kind.Create, x.path));
709                     fw.watchRecurse(x.path, events, a => isInteresting(fileFilter, a));
710                 }, (Event.Modify x) {
711                     rval.put(MonitorResult(MonitorResult.Kind.Modify, x.path));
712                 }, (Event.MoveSelf x) {
713                     rval.put(MonitorResult(MonitorResult.Kind.MoveSelf, x.path));
714                     fw.watchRecurse(x.path, events, a => isInteresting(fileFilter, a));
715 
716                     if (x.path in roots) {
717                         monitorRoots.add(x.path);
718                     }
719                 }, (Event.Delete x) {
720                     rval.put(MonitorResult(MonitorResult.Kind.Delete, x.path));
721                 }, (Event.DeleteSelf x) {
722                     rval.put(MonitorResult(MonitorResult.Kind.DeleteSelf, x.path));
723 
724                     if (x.path in roots) {
725                         monitorRoots.add(x.path);
726                     }
727                 }, (Event.Rename x) {
728                     rval.put(MonitorResult(MonitorResult.Kind.Rename, x.to));
729                 }, (Event.Open x) {
730                     rval.put(MonitorResult(MonitorResult.Kind.Open, x.path));
731                 },);
732             }
733         } catch (Exception e) {
734             logger.trace(e.msg);
735         }
736 
737         return rval.data.filter!(a => fileFilter.match(a.path)).array;
738     }
739 
740     /** Collects events from the monitored `roots` over a period.
741      *
742      * Params:
743      *  collectTime = for how long to clear the queue
744      */
745     MonitorResult[] collect(Duration collectTime) {
746         import std.algorithm : max, min;
747         import std.datetime : Clock;
748 
749         auto rval = appender!(MonitorResult[])();
750         const stopAt = Clock.currTime + collectTime;
751 
752         do {
753             collectTime = max(stopAt - Clock.currTime, 1.dur!"msecs");
754             if (!monitorRoots.empty) {
755                 // must use a hybrid approach of poll + inotify because if a
756                 // root is added it will only be detected by polling.
757                 collectTime = min(10.dur!"msecs", collectTime);
758             }
759 
760             rval.put(wait(collectTime));
761         }
762         while (Clock.currTime < stopAt);
763 
764         return rval.data;
765     }
766 }
767 
768 @("shall re-apply monitoring for a file that is removed")
769 unittest {
770     import my.filter : GlobFilter;
771     import my.test;
772 
773     auto ta = makeTestArea("re-apply monitoring");
774     const testTxt = ta.inSandbox("test.txt").AbsolutePath;
775 
776     write(testTxt, "abc");
777     auto fw = Monitor([testTxt], GlobFilter(["*"], null));
778     write(testTxt, "abcc");
779     assert(!fw.wait(Duration.zero).empty);
780 
781     remove(testTxt);
782     assert(!fw.wait(Duration.zero).empty);
783 
784     write(testTxt, "abcc");
785     assert(!fw.wait(Duration.zero).empty);
786 }
787 
788 /** A file descriptor to poll.
789  */
790 struct FdPoll {
791     int value;
792 }
793 
794 /// Uses the linux poll syscall to wait for activity on the file descriptors.
795 struct FdPoller {
796     import std.algorithm : min, filter;
797 
798     private {
799         pollfd[] fds;
800         PollResult[] results;
801     }
802 
803     void put(FdPoll fd, PollEvent[] evs) {
804         import core.sys.posix.poll;
805 
806         pollfd pfd;
807         pfd.fd = fd.value;
808         foreach (e; evs) {
809             final switch (e) with (PollEvent) {
810             case in_:
811                 pfd.events |= POLLIN;
812                 break;
813             case out_:
814                 pfd.events |= POLLOUT;
815                 break;
816             }
817         }
818         fds ~= pfd;
819 
820         // they must be the same length or else `wait` will fail.
821         results.length = fds.length;
822     }
823 
824     void remove(FdPoll fd) {
825         fds = fds.filter!(a => a.fd != fd.value).array;
826 
827         results.length = fds.length;
828     }
829 
830     PollResult[] wait(Duration timeout = Duration.zero) {
831         import core.sys.posix.poll;
832         import std.bitmanip : BitArray;
833 
834         const code = poll(&fds[0], fds.length, cast(int) min(int.max, timeout.total!"msecs"));
835 
836         if (code < 0) {
837             import core.stdc.errno : errno, EINTR;
838 
839             if (errno == EINTR) {
840                 // poll just interrupted. try again.
841                 return (PollResult[]).init;
842             }
843 
844             throw new Exception("Failed to poll events. Error code " ~ errno.to!string);
845         } else if (code == 0) {
846             // timeout triggered
847             return (PollResult[]).init;
848         }
849 
850         size_t idx;
851         foreach (a; fds.filter!(a => a.revents != 0)) {
852             PollResult res;
853             res.status = BitArray([
854                     (a.revents & POLLIN) != 0, (a.revents & POLLOUT) != 0,
855                     (a.revents & POLLPRI) != 0, (a.revents & POLLERR) != 0,
856                     (a.revents & POLLHUP) != 0, (a.revents & POLLNVAL) != 0,
857                     ]);
858             res.fd = FdPoll(a.fd);
859             results[idx] = res;
860             idx++;
861         }
862 
863         return results[0 .. idx];
864     }
865 }
866 
867 /// Type of event to poll for.
868 enum PollEvent {
869     in_,
870     out_,
871 }
872 
873 /// What each bit in `PollResult.status` represent.
874 enum PollStatus {
875     // There is data to read.
876     in_,
877     // Writing  is  now  possible,  though a write larger that the available
878     // space in a socket or pipe will still block (unless O_NONBLOCK is set).
879     out_,
880     // There is some exceptional condition on the file descriptor.  Possibilities include:
881     // *  There is out-of-band data on a TCP socket (see tcp(7)).
882     // *  A pseudoterminal master in packet mode has seen a state change on the slave (see ioctl_tty(2)).
883     // *  A cgroup.events file has been modified (see cgroups(7)).
884     pri,
885     // Error condition (only returned in revents; ignored in events). This bit
886     // is also set for a file descriptor referring to the write end of a pipe
887     // when the read end has been closed.
888     error,
889     // Hang up (only returned in revents; ignored in events). Note that when
890     // reading from a channel such as a pipe or a stream socket, this event
891     // merely indicates that the peer closed its end of the channel.
892     // Subsequent reads from the channel will re‐ turn 0 (end of file) only
893     // after all outstanding data in the channel has been consumed.
894     hup,
895     /// Invalid request: fd not open (only returned in revents; ignored in events).
896     nval,
897 }
898 
899 /// File descriptors that triggered.
900 struct PollResult {
901     import std.bitmanip : BitArray;
902 
903     BitArray status;
904     FdPoll fd;
905 }