1 /**
2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 
6 This is based on webfreak's
7 [fswatch](git@github.com:WebFreak001/FSWatch.git). I had problems with the
8 API as it where because I needed to be able to watch multiple directories,
9 filter what files are to be watched and to be robust against broken symlinks.
10 
11 Lets say you want to watch a directory for changes and add all directories to
12 be watched too.
13 
14 ---
15 auto fw = fileWatch();
16 fw.watchRecurse("my_dir");
17 while (true) {
18     auto ev = fw.wait;
19     foreach (e; ev) {
20         e.match!(
21         (Event.Access x) => writeln(x),
22         (Event.Attribute x) => writeln(x),
23         (Event.CloseWrite x) => writeln(x),
24         (Event.CloseNoWrite x) => writeln(x),
25         (Event.Create x) { fw.watchRecurse(x.path); },
26         (Event.Delete x) => writeln(x),
27         (Event.DeleteSelf x) => writeln(x),
28         (Event.Modify x) => writeln(x),
29         (Event.MoveSelf x) => writeln(x),
30         (Event.Rename x) => writeln(x),
31         (Event.Open x) => writeln(x),
32         );
33     }
34 }
35 ---
36 */
37 module my.fswatch;
38 
39 import core.sys.linux.errno : errno;
40 import core.sys.linux.fcntl : fcntl, F_SETFD, FD_CLOEXEC;
41 import core.sys.linux.sys.inotify : inotify_rm_watch, inotify_init1,
42     inotify_add_watch, inotify_event, IN_NONBLOCK, IN_ACCESS,
43     IN_MODIFY, IN_ATTRIB, IN_CLOSE_WRITE, IN_CLOSE_NOWRITE, IN_OPEN, IN_MOVED_FROM, IN_MOVED_TO, IN_CREATE,
44     IN_DELETE, IN_DELETE_SELF, IN_MOVE_SELF, IN_UNMOUNT, IN_IGNORED, IN_EXCL_UNLINK;
45 import core.sys.linux.unistd : close, read;
46 import core.sys.posix.poll : pollfd, poll, POLLIN, POLLNVAL;
47 import core.thread : Thread;
48 import core.time : dur, Duration;
49 import logger = std.experimental.logger;
50 import std.array : appender, empty;
51 import std.conv : to;
52 import std.file : DirEntry, isDir, dirEntries, rmdirRecurse, write, append,
53     rename, remove, exists, SpanMode, mkdir, rmdir;
54 import std.path : buildPath;
55 import std.range : isInputRange;
56 import std.string : toStringz, fromStringz;
57 import std.exception : collectException;
58 
59 import sumtype;
60 
61 import my.path : AbsolutePath, Path;
62 
63 struct Event {
64     /// File was accessed (e.g., read(2), execve(2)).
65     static struct Access {
66         AbsolutePath path;
67         this(this) {
68         }
69     }
70 
71     /** Metadata changed—for example, permissions (e.g., chmod(2)), timestamps
72      * (e.g., utimensat(2)), extended attributes (setxattr(2)), link count
73      * (since Linux 2.6.25; e.g., for the target of link(2) and for unlink(2)),
74      * and user/group ID (e.g., chown(2)).
75      */
76     static struct Attribute {
77         AbsolutePath path;
78         this(this) {
79         }
80     }
81 
82     /// File opened for writing was closed.
83     static struct CloseWrite {
84         AbsolutePath path;
85         this(this) {
86         }
87     }
88 
89     /// File or directory not opened for writing was closed.
90     static struct CloseNoWrite {
91         AbsolutePath path;
92         this(this) {
93         }
94     }
95 
96     /** File/directory created in watched directory (e.g., open(2) O_CREAT,
97      * mkdir(2), link(2), symlink(2), bind(2) on a UNIX domain socket).
98      */
99     static struct Create {
100         AbsolutePath path;
101         this(this) {
102         }
103     }
104 
105     /// File/directory deleted from watched directory.
106     static struct Delete {
107         AbsolutePath path;
108         this(this) {
109         }
110     }
111 
112     /** Watched file/directory was itself deleted. (This event also occurs if
113      * an object is moved to another filesystem, since mv(1) in effect copies
114      * the file to the other filesystem and then deletes it from the original
115      * filesys‐ tem.)  In addition, an IN_IGNORED event will subsequently be
116      * generated for the watch descriptor.
117      */
118     static struct DeleteSelf {
119         AbsolutePath path;
120         this(this) {
121         }
122     }
123 
124     /// File was modified (e.g., write(2), truncate(2)).
125     static struct Modify {
126         AbsolutePath path;
127         this(this) {
128         }
129     }
130 
131     /// Watched file/directory was itself moved.
132     static struct MoveSelf {
133         AbsolutePath path;
134         this(this) {
135         }
136     }
137 
138     /// Occurs when a file or folder inside a folder is renamed.
139     static struct Rename {
140         AbsolutePath from;
141         AbsolutePath to;
142         this(this) {
143         }
144     }
145 
146     /// File or directory was opened.
147     static struct Open {
148         AbsolutePath path;
149         this(this) {
150         }
151     }
152 }
153 
154 alias FileChangeEvent = SumType!(Event.Access, Event.Attribute, Event.CloseWrite,
155         Event.CloseNoWrite, Event.Create, Event.Delete, Event.DeleteSelf,
156         Event.Modify, Event.MoveSelf, Event.Rename, Event.Open);
157 
158 /// Construct a FileWatch.
159 auto fileWatch() {
160     int fd = inotify_init1(IN_NONBLOCK);
161     if (fd == -1) {
162         throw new Exception(
163                 "inotify_init1 returned invalid file descriptor. Error code " ~ errno.to!string);
164     }
165     return FileWatch(fd);
166 }
167 
168 /// Listens for create/modify/removal of files and directories.
169 enum ContentEvents = IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MODIFY
170     | IN_MOVE_SELF | IN_MOVED_FROM | IN_MOVED_TO | IN_EXCL_UNLINK | IN_CLOSE_WRITE;
171 
172 /// Listen for events that change the metadata.
173 enum MetadataEvents = IN_ACCESS | IN_ATTRIB | IN_OPEN | IN_CLOSE_NOWRITE | IN_EXCL_UNLINK;
174 
175 /** An instance of a FileWatcher
176  */
177 struct FileWatch {
178     import std.functional : toDelegate;
179 
180     private {
181         int fd;
182         ubyte[1024 * 4] eventBuffer; // 4kb buffer for events
183         struct FDInfo {
184             int wd;
185             bool watched;
186             Path path;
187 
188             this(this) {
189             }
190         }
191 
192         FDInfo[int] directoryMap; // map every watch descriptor to a directory
193     }
194 
195     private this(int fd) {
196         this.fd = fd;
197     }
198 
199     ~this() {
200         if (fd) {
201             foreach (fdinfo; directoryMap.byValue) {
202                 if (fdinfo.watched)
203                     inotify_rm_watch(fd, fdinfo.wd);
204             }
205             close(fd);
206         }
207     }
208 
209     /** Add a path to watch for events.
210      *
211      * Params:
212      *  path = path to watch
213      *  events = events to watch for. See man inotify and core.sys.linux.sys.inotify.
214      *
215      * Returns: true if the path was successfully added.
216      */
217     bool watch(Path path, uint events = ContentEvents) {
218         const wd = inotify_add_watch(fd, path.toStringz, events);
219         if (wd != -1) {
220             const fc = fcntl(fd, F_SETFD, FD_CLOEXEC);
221             if (fc != -1) {
222                 directoryMap[wd] = FDInfo(wd, true, path);
223                 return true;
224             }
225         }
226 
227         return false;
228     }
229 
230     ///
231     bool watch(string p, uint events = ContentEvents) {
232         return watch(Path(p));
233     }
234 
235     private static bool allFiles(string p) {
236         return true;
237     }
238 
239     /** Recursively add the path and all its subdirectories and files to be watched.
240      *
241      * Params:
242      *  pred = only those files and directories that `pred` returns true for are watched, by default every file/directory.
243      *  root = directory to watch together with its content and subdirectories.
244      *  events = events to watch for. See man inotify and core.sys.linux.sys.inotify.
245      *
246      * Returns: paths that failed to be added.
247      */
248     AbsolutePath[] watchRecurse(Path root, uint events = ContentEvents,
249             bool delegate(string) pred = toDelegate(&allFiles)) {
250         import std.algorithm : filter;
251         import my.file : existsAnd;
252         import my.set;
253 
254         auto failed = appender!(AbsolutePath[])();
255 
256         if (!watch(root, events)) {
257             failed.put(AbsolutePath(root));
258         }
259 
260         if (!existsAnd!isDir(root)) {
261             return failed.data;
262         }
263 
264         auto dirs = [AbsolutePath(root)];
265         Set!AbsolutePath visited;
266         while (!dirs.empty) {
267             auto front = dirs[0];
268             dirs = dirs[1 .. $];
269             if (front in visited)
270                 continue;
271             visited.add(front);
272 
273             try {
274                 foreach (p; dirEntries(front, SpanMode.shallow).filter!(a => pred(a.name))) {
275                     if (!watch(Path(p.name), events)) {
276                         failed.put(AbsolutePath(p.name));
277                     }
278                     if (existsAnd!isDir(Path(p.name))) {
279                         dirs ~= AbsolutePath(p.name);
280                     }
281                 }
282             } catch (Exception e) {
283                 () @trusted { logger.trace(e); }();
284                 logger.trace(e.msg);
285                 failed.put(AbsolutePath(front));
286             }
287         }
288 
289         return failed.data;
290     }
291 
292     ///
293     AbsolutePath[] watchRecurse(string root, uint events = ContentEvents,
294             bool delegate(string) pred = toDelegate(&allFiles)) {
295         return watchRecurse(Path(root), events, pred);
296     }
297 
298     /** The events that have occured since last query.
299      *
300      * Params:
301      *  timeout = max time to wait for events.
302      *
303      * Returns: the events that has occured to the watched paths.
304      */
305     FileChangeEvent[] getEvents(Duration timeout = Duration.zero) {
306         import std.algorithm : min;
307 
308         FileChangeEvent[] events;
309         if (!fd)
310             return events;
311 
312         pollfd pfd;
313         pfd.fd = fd;
314         pfd.events = POLLIN;
315         const code = poll(&pfd, 1, cast(int) min(int.max, timeout.total!"msecs"));
316 
317         if (code < 0) {
318             throw new Exception("Failed to poll events. Error code " ~ errno.to!string);
319         } else if (code == 0) {
320             // timeout triggered
321             return events;
322         } else if ((pfd.revents & POLLNVAL) != 0) {
323             throw new Exception("Failed to poll events. File descriptor not open " ~ fd.to!string);
324         }
325 
326         const receivedBytes = read(fd, eventBuffer.ptr, eventBuffer.length);
327         int i = 0;
328         AbsolutePath[uint] cookie;
329         while (true) {
330             auto info = cast(inotify_event*)(eventBuffer.ptr + i);
331 
332             if (info.wd !in directoryMap)
333                 continue;
334 
335             auto fname = () {
336                 string fileName = info.name.ptr.fromStringz.idup;
337                 return AbsolutePath(buildPath(directoryMap[info.wd].path, fileName));
338             }();
339 
340             if ((info.mask & IN_MOVED_TO) == 0) {
341                 if (auto v = info.cookie in cookie) {
342                     events ~= FileChangeEvent(Event.Delete(*v));
343                     cookie.remove(info.cookie);
344                 }
345             }
346 
347             if ((info.mask & IN_ACCESS) != 0) {
348                 events ~= FileChangeEvent(Event.Access(fname));
349             }
350 
351             if ((info.mask & IN_ATTRIB) != 0) {
352                 events ~= FileChangeEvent(Event.Attribute(fname));
353             }
354 
355             if ((info.mask & IN_CLOSE_WRITE) != 0) {
356                 events ~= FileChangeEvent(Event.CloseWrite(fname));
357             }
358 
359             if ((info.mask & IN_CLOSE_NOWRITE) != 0) {
360                 events ~= FileChangeEvent(Event.CloseNoWrite(fname));
361             }
362 
363             if ((info.mask & IN_CREATE) != 0) {
364                 events ~= FileChangeEvent(Event.Create(fname));
365             }
366 
367             if ((info.mask & IN_DELETE) != 0) {
368                 events ~= FileChangeEvent(Event.Delete(fname));
369             }
370 
371             if ((info.mask & IN_DELETE_SELF) != 0) {
372                 // must go via the mapping or there may be trailing junk in fname.
373                 events ~= FileChangeEvent(Event.DeleteSelf(directoryMap[info.wd].path.AbsolutePath));
374             }
375 
376             if ((info.mask & IN_MODIFY) != 0) {
377                 events ~= FileChangeEvent(Event.Modify(fname));
378             }
379 
380             if ((info.mask & IN_MOVE_SELF) != 0) {
381                 // must go via the mapping or there may be trailing junk in fname.
382                 events ~= FileChangeEvent(Event.MoveSelf(directoryMap[info.wd].path.AbsolutePath));
383             }
384 
385             if ((info.mask & IN_MOVED_FROM) != 0) {
386                 cookie[info.cookie] = fname;
387             }
388 
389             if ((info.mask & IN_MOVED_TO) != 0) {
390                 if (auto v = info.cookie in cookie) {
391                     events ~= FileChangeEvent(Event.Rename(*v, fname));
392                     cookie.remove(info.cookie);
393                 } else {
394                     events ~= FileChangeEvent(Event.Create(fname));
395                 }
396             }
397 
398             if ((info.mask & IN_DELETE_SELF) != 0 || (info.mask & IN_MOVE_SELF) != 0) {
399                 inotify_rm_watch(fd, info.wd);
400                 directoryMap[info.wd].watched = false;
401             }
402 
403             i += inotify_event.sizeof + info.len;
404 
405             if (i >= receivedBytes)
406                 break;
407         }
408 
409         foreach (c; cookie.byValue) {
410             events ~= FileChangeEvent(Event.Delete(AbsolutePath(c)));
411         }
412 
413         return events;
414     }
415 }
416 
417 ///
418 unittest {
419     import core.thread;
420 
421     if (exists("test"))
422         rmdirRecurse("test");
423     scope (exit) {
424         if (exists("test"))
425             rmdirRecurse("test");
426     }
427 
428     auto watcher = fileWatch();
429 
430     mkdir("test");
431     assert(watcher.watch("test"));
432 
433     write("test/a.txt", "abc");
434     auto ev = watcher.getEvents(5.dur!"seconds");
435     assert(ev.length > 0);
436     assert(ev[0].tryMatch!((Event.Create x) {
437             assert(x.path == AbsolutePath("test/a.txt"));
438             return true;
439         }));
440 
441     append("test/a.txt", "def");
442     ev = watcher.getEvents(5.dur!"seconds");
443     assert(ev.length > 0);
444     assert(ev[0].tryMatch!((Event.Modify x) {
445             assert(x.path == AbsolutePath("test/a.txt"));
446             return true;
447         }));
448 
449     rename("test/a.txt", "test/b.txt");
450     ev = watcher.getEvents(5.dur!"seconds");
451     assert(ev.length > 0);
452     assert(ev[0].tryMatch!((Event.Rename x) {
453             assert(x.from == AbsolutePath("test/a.txt"));
454             assert(x.to == AbsolutePath("test/b.txt"));
455             return true;
456         }));
457 
458     remove("test/b.txt");
459     ev = watcher.getEvents(5.dur!"seconds");
460     assert(ev.length > 0);
461     assert(ev[0].tryMatch!((Event.Delete x) {
462             assert(x.path == AbsolutePath("test/b.txt"));
463             return true;
464         }));
465 
466     rmdirRecurse("test");
467     ev = watcher.getEvents(5.dur!"seconds");
468     assert(ev.length > 0);
469     assert(ev[0].tryMatch!((Event.DeleteSelf x) {
470             assert(x.path == AbsolutePath("test"));
471             return true;
472         }));
473 }
474 
475 ///
476 unittest {
477     import std.algorithm : canFind;
478 
479     if (exists("test2"))
480         rmdirRecurse("test2");
481     if (exists("test3"))
482         rmdirRecurse("test3");
483     scope (exit) {
484         if (exists("test2"))
485             rmdirRecurse("test2");
486         if (exists("test3"))
487             rmdirRecurse("test3");
488     }
489 
490     auto watcher = fileWatch();
491     mkdir("test2");
492     assert(watcher.watchRecurse("test2").length == 0);
493 
494     write("test2/a.txt", "abc");
495     auto ev = watcher.getEvents(5.dur!"seconds");
496     assert(ev.length == 3);
497     assert(ev[0].tryMatch!((Event.Create x) {
498             assert(x.path == AbsolutePath("test2/a.txt"));
499             return true;
500         }));
501     assert(ev[1].tryMatch!((Event.Modify x) {
502             assert(x.path == AbsolutePath("test2/a.txt"));
503             return true;
504         }));
505     assert(ev[2].tryMatch!((Event.CloseWrite x) {
506             assert(x.path == AbsolutePath("test2/a.txt"));
507             return true;
508         }));
509 
510     rename("test2/a.txt", "./testfile-a.txt");
511     ev = watcher.getEvents(5.dur!"seconds");
512     assert(ev.length == 1);
513     assert(ev[0].tryMatch!((Event.Delete x) {
514             assert(x.path == AbsolutePath("test2/a.txt"));
515             return true;
516         }));
517 
518     rename("./testfile-a.txt", "test2/b.txt");
519     ev = watcher.getEvents(5.dur!"seconds");
520     assert(ev.length == 1);
521     assert(ev[0].tryMatch!((Event.Create x) {
522             assert(x.path == AbsolutePath("test2/b.txt"));
523             return true;
524         }));
525 
526     remove("test2/b.txt");
527     ev = watcher.getEvents(5.dur!"seconds");
528     assert(ev.length == 1);
529     assert(ev[0].tryMatch!((Event.Delete x) {
530             assert(x.path == AbsolutePath("test2/b.txt"));
531             return true;
532         }));
533 
534     mkdir("test2/mydir");
535     rmdir("test2/mydir");
536     ev = watcher.getEvents(5.dur!"seconds");
537     assert(ev.length == 2);
538     assert(ev[0].tryMatch!((Event.Create x) {
539             assert(x.path == AbsolutePath("test2/mydir"));
540             return true;
541         }));
542     assert(ev[1].tryMatch!((Event.Delete x) {
543             assert(x.path == AbsolutePath("test2/mydir"));
544             return true;
545         }));
546 
547     // test for creation, modification, removal of subdirectory
548     mkdir("test2/subdir");
549     ev = watcher.getEvents(5.dur!"seconds");
550     assert(ev.length == 1);
551     assert(ev[0].tryMatch!((Event.Create x) {
552             assert(x.path == AbsolutePath("test2/subdir"));
553             // add the created directory to be watched
554             watcher.watchRecurse(x.path);
555             return true;
556         }));
557 
558     write("test2/subdir/c.txt", "abc");
559     ev = watcher.getEvents(5.dur!"seconds");
560     assert(ev.length == 3);
561     assert(ev[0].tryMatch!((Event.Create x) {
562             assert(x.path == AbsolutePath("test2/subdir/c.txt"));
563             return true;
564         }));
565 
566     write("test2/subdir/c.txt", "\nabc");
567     ev = watcher.getEvents(5.dur!"seconds");
568     assert(ev.length == 2);
569     assert(ev[0].tryMatch!((Event.Modify x) {
570             assert(x.path == AbsolutePath("test2/subdir/c.txt"));
571             return true;
572         }));
573 
574     rmdirRecurse("test2/subdir");
575     ev = watcher.getEvents(5.dur!"seconds");
576     assert(ev.length == 3);
577     foreach (e; ev) {
578         assert(ev[0].tryMatch!((Event.Delete x) {
579                 assert(canFind([
580                     AbsolutePath("test2/subdir/c.txt"),
581                     AbsolutePath("test2/subdir")
582                 ], x.path));
583                 return true;
584             }, (Event.DeleteSelf x) {
585                 assert(x.path == AbsolutePath("test2/subdir"));
586                 return true;
587             }));
588     }
589 
590     // removal of watched folder
591     rmdirRecurse("test2");
592     ev = watcher.getEvents(5.dur!"seconds");
593     assert(ev.length == 1);
594     assert(ev[0].tryMatch!((Event.DeleteSelf x) {
595             assert(x.path == AbsolutePath("test2"));
596             return true;
597         }));
598 }
599 
600 struct MonitorResult {
601     enum Kind {
602         Access,
603         Attribute,
604         CloseWrite,
605         CloseNoWrite,
606         Create,
607         Delete,
608         DeleteSelf,
609         Modify,
610         MoveSelf,
611         Rename,
612         Open,
613     }
614 
615     Kind kind;
616     AbsolutePath path;
617 }
618 
619 /** Monitor root's for filesystem changes which create/remove/modify
620  * files/directories.
621  */
622 struct Monitor {
623     import std.array : appender;
624     import std.file : isDir;
625     import std.utf : UTFException;
626     import my.filter : GlobFilter;
627     import my.fswatch;
628     import my.set;
629     import sumtype;
630 
631     private {
632         Set!AbsolutePath roots;
633         FileWatch fw;
634         GlobFilter fileFilter;
635         uint events;
636 
637         // roots that has been removed that may be re-added later on. the user
638         // expects them to trigger events.
639         Set!AbsolutePath monitorRoots;
640     }
641 
642     /**
643      * Params:
644      *  roots = directories to recursively monitor
645      */
646     this(AbsolutePath[] roots, GlobFilter fileFilter, uint events = ContentEvents) {
647         this.roots = toSet(roots);
648         this.fileFilter = fileFilter;
649         this.events = events;
650 
651         auto app = appender!(AbsolutePath[])();
652         fw = fileWatch();
653         foreach (r; roots) {
654             app.put(fw.watchRecurse(r, events, (a) {
655                     return isInteresting(fileFilter, a);
656                 }));
657         }
658 
659         logger.trace(!app.data.empty, "unable to watch ", app.data);
660     }
661 
662     static bool isInteresting(GlobFilter fileFilter, string p) nothrow {
663         import my.file;
664 
665         try {
666             const ap = AbsolutePath(p);
667 
668             if (existsAnd!isDir(ap)) {
669                 return true;
670             }
671             return fileFilter.match(ap);
672         } catch (Exception e) {
673             collectException(logger.trace(e.msg));
674         }
675 
676         return false;
677     }
678 
679     /** Wait up to `timeout` for an event to occur for the monitored `roots`.
680      *
681      * Params:
682      *  timeout = how long to wait for the event
683      */
684     MonitorResult[] wait(Duration timeout) {
685         import std.array : array;
686         import std.algorithm : canFind, startsWith, filter;
687 
688         auto rval = appender!(MonitorResult[])();
689 
690         {
691             auto rm = appender!(AbsolutePath[])();
692             foreach (a; monitorRoots.toRange.filter!(a => exists(a))) {
693                 fw.watchRecurse(a, events, a => isInteresting(fileFilter, a));
694                 rm.put(a);
695                 rval.put(MonitorResult(MonitorResult.Kind.Create, a));
696             }
697             foreach (a; rm.data) {
698                 monitorRoots.remove(a);
699             }
700         }
701 
702         if (!rval.data.empty) {
703             // collect whatever events that happend to have queued up together
704             // with the artifically created.
705             timeout = Duration.zero;
706         }
707 
708         try {
709             foreach (e; fw.getEvents(timeout)) {
710                 e.match!((Event.Access x) {
711                     rval.put(MonitorResult(MonitorResult.Kind.Access, x.path));
712                 }, (Event.Attribute x) {
713                     rval.put(MonitorResult(MonitorResult.Kind.Attribute, x.path));
714                 }, (Event.CloseWrite x) {
715                     rval.put(MonitorResult(MonitorResult.Kind.CloseWrite, x.path));
716                 }, (Event.CloseNoWrite x) {
717                     rval.put(MonitorResult(MonitorResult.Kind.CloseNoWrite, x.path));
718                 }, (Event.Create x) {
719                     rval.put(MonitorResult(MonitorResult.Kind.Create, x.path));
720                     fw.watchRecurse(x.path, events, a => isInteresting(fileFilter, a));
721                 }, (Event.Modify x) {
722                     rval.put(MonitorResult(MonitorResult.Kind.Modify, x.path));
723                 }, (Event.MoveSelf x) {
724                     rval.put(MonitorResult(MonitorResult.Kind.MoveSelf, x.path));
725                     fw.watchRecurse(x.path, events, a => isInteresting(fileFilter, a));
726 
727                     if (x.path in roots) {
728                         monitorRoots.add(x.path);
729                     }
730                 }, (Event.Delete x) {
731                     rval.put(MonitorResult(MonitorResult.Kind.Delete, x.path));
732                 }, (Event.DeleteSelf x) {
733                     rval.put(MonitorResult(MonitorResult.Kind.DeleteSelf, x.path));
734 
735                     if (x.path in roots) {
736                         monitorRoots.add(x.path);
737                     }
738                 }, (Event.Rename x) {
739                     rval.put(MonitorResult(MonitorResult.Kind.Rename, x.to));
740                 }, (Event.Open x) {
741                     rval.put(MonitorResult(MonitorResult.Kind.Open, x.path));
742                 },);
743             }
744         } catch (Exception e) {
745             logger.trace(e.msg);
746         }
747 
748         return rval.data.filter!(a => fileFilter.match(a.path)).array;
749     }
750 
751     /** Collects events from the monitored `roots` over a period.
752      *
753      * Params:
754      *  collectTime = for how long to clear the queue
755      */
756     MonitorResult[] collect(Duration collectTime) {
757         import std.algorithm : max, min;
758         import std.datetime : Clock;
759 
760         auto rval = appender!(MonitorResult[])();
761         const stopAt = Clock.currTime + collectTime;
762 
763         do {
764             collectTime = max(stopAt - Clock.currTime, 1.dur!"msecs");
765             if (!monitorRoots.empty) {
766                 // must use a hybrid approach of poll + inotify because if a
767                 // root is added it will only be detected by polling.
768                 collectTime = min(10.dur!"msecs", collectTime);
769             }
770 
771             rval.put(wait(collectTime));
772         }
773         while (Clock.currTime < stopAt);
774 
775         return rval.data;
776     }
777 }
778 
779 @("shall re-apply monitoring for a file that is removed")
780 unittest {
781     import my.filter : GlobFilter;
782     import my.test;
783 
784     auto ta = makeTestArea("re-apply monitoring");
785     const testTxt = ta.inSandbox("test.txt").AbsolutePath;
786 
787     write(testTxt, "abc");
788     auto fw = Monitor([testTxt], GlobFilter(["*"], null));
789     write(testTxt, "abcc");
790     assert(!fw.wait(Duration.zero).empty);
791 
792     remove(testTxt);
793     assert(!fw.wait(Duration.zero).empty);
794 
795     write(testTxt, "abcc");
796     assert(!fw.wait(Duration.zero).empty);
797 }