1 /** 2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 6 This is based on webfreak's 7 [fswatch](git@github.com:WebFreak001/FSWatch.git). I had problems with the 8 API as it where because I needed to be able to watch multiple directories, 9 filter what files are to be watched and to be robust against broken symlinks. 10 11 Lets say you want to watch a directory for changes and add all directories to 12 be watched too. 13 14 --- 15 auto fw = fileWatch(); 16 fw.watchRecurse("my_dir"); 17 while (true) { 18 auto ev = fw.wait; 19 foreach (e; ev) { 20 e.match!( 21 (Event.Access x) => writeln(x), 22 (Event.Attribute x) => writeln(x), 23 (Event.CloseWrite x) => writeln(x), 24 (Event.CloseNoWrite x) => writeln(x), 25 (Event.Create x) { fw.watchRecurse(x.path); }, 26 (Event.Delete x) => writeln(x), 27 (Event.DeleteSelf x) => writeln(x), 28 (Event.Modify x) => writeln(x), 29 (Event.MoveSelf x) => writeln(x), 30 (Event.Rename x) => writeln(x), 31 (Event.Open x) => writeln(x), 32 ); 33 } 34 } 35 --- 36 */ 37 module my.fswatch; 38 39 import core.sys.linux.errno : errno; 40 import core.sys.linux.fcntl : fcntl, F_SETFD, FD_CLOEXEC; 41 import core.sys.linux.sys.inotify : inotify_rm_watch, inotify_init1, inotify_add_watch, inotify_event, IN_CLOEXEC, 42 IN_NONBLOCK, IN_ACCESS, IN_MODIFY, IN_ATTRIB, IN_CLOSE_WRITE, 43 IN_CLOSE_NOWRITE, IN_OPEN, IN_MOVED_FROM, IN_MOVED_TO, 44 IN_CREATE, IN_DELETE, IN_DELETE_SELF, IN_MOVE_SELF, IN_UNMOUNT, IN_IGNORED, IN_EXCL_UNLINK; 45 import core.sys.linux.unistd : close, read; 46 import core.sys.posix.poll : pollfd, poll, POLLIN, POLLNVAL; 47 import core.thread : Thread; 48 import core.time : dur, Duration; 49 import logger = std.experimental.logger; 50 import std.array : appender, empty, array; 51 import std.conv : to; 52 import std.file : DirEntry, isDir, dirEntries, rmdirRecurse, write, append, 53 rename, remove, exists, SpanMode, mkdir, rmdir; 54 import std.path : buildPath; 55 import std.range : isInputRange; 56 import std.string : toStringz, fromStringz; 57 import std.exception : collectException; 58 59 import sumtype; 60 61 import my.path : AbsolutePath, Path; 62 import my.set; 63 64 struct Event { 65 /// An overflow occured. Unknown what events actually triggered. 66 static struct Overflow { 67 } 68 69 /// File was accessed (e.g., read(2), execve(2)). 70 static struct Access { 71 AbsolutePath path; 72 } 73 74 /** Metadata changed—for example, permissions (e.g., chmod(2)), timestamps 75 * (e.g., utimensat(2)), extended attributes (setxattr(2)), link count 76 * (since Linux 2.6.25; e.g., for the target of link(2) and for unlink(2)), 77 * and user/group ID (e.g., chown(2)). 78 */ 79 static struct Attribute { 80 AbsolutePath path; 81 } 82 83 /// File opened for writing was closed. 84 static struct CloseWrite { 85 AbsolutePath path; 86 } 87 88 /// File or directory not opened for writing was closed. 89 static struct CloseNoWrite { 90 AbsolutePath path; 91 } 92 93 /** File/directory created in watched directory (e.g., open(2) O_CREAT, 94 * mkdir(2), link(2), symlink(2), bind(2) on a UNIX domain socket). 95 */ 96 static struct Create { 97 AbsolutePath path; 98 } 99 100 /// File/directory deleted from watched directory. 101 static struct Delete { 102 AbsolutePath path; 103 } 104 105 /** Watched file/directory was itself deleted. (This event also occurs if 106 * an object is moved to another filesystem, since mv(1) in effect copies 107 * the file to the other filesystem and then deletes it from the original 108 * filesys‐ tem.) In addition, an IN_IGNORED event will subsequently be 109 * generated for the watch descriptor. 110 */ 111 static struct DeleteSelf { 112 AbsolutePath path; 113 } 114 115 /// File was modified (e.g., write(2), truncate(2)). 116 static struct Modify { 117 AbsolutePath path; 118 } 119 120 /// Watched file/directory was itself moved. 121 static struct MoveSelf { 122 AbsolutePath path; 123 } 124 125 /// Occurs when a file or folder inside a folder is renamed. 126 static struct Rename { 127 AbsolutePath from; 128 AbsolutePath to; 129 } 130 131 /// File or directory was opened. 132 static struct Open { 133 AbsolutePath path; 134 } 135 } 136 137 alias FileChangeEvent = SumType!(Event.Access, Event.Attribute, Event.CloseWrite, 138 Event.CloseNoWrite, Event.Create, Event.Delete, Event.DeleteSelf, 139 Event.Modify, Event.MoveSelf, Event.Rename, Event.Open, Event.Overflow); 140 141 /// Construct a FileWatch. 142 auto fileWatch() { 143 int fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC); 144 if (fd == -1) { 145 throw new Exception( 146 "inotify_init1 returned invalid file descriptor. Error code " ~ errno.to!string); 147 } 148 return FileWatch(fd); 149 } 150 151 /// Listens for create/modify/removal of files and directories. 152 enum ContentEvents = IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MODIFY 153 | IN_MOVE_SELF | IN_MOVED_FROM | IN_MOVED_TO | IN_EXCL_UNLINK | IN_CLOSE_WRITE; 154 155 /// Listen for events that change the metadata. 156 enum MetadataEvents = IN_ACCESS | IN_ATTRIB | IN_OPEN | IN_CLOSE_NOWRITE | IN_EXCL_UNLINK; 157 158 /** An instance of a FileWatcher 159 */ 160 struct FileWatch { 161 import std.functional : toDelegate; 162 163 private { 164 FdPoller poller; 165 int fd; 166 ubyte[1024 * 4] eventBuffer; // 4kb buffer for events 167 struct FDInfo { 168 int wd; 169 bool watched; 170 Path path; 171 172 this(this) { 173 } 174 } 175 176 FDInfo[int] directoryMap; // map every watch descriptor to a directory 177 } 178 179 private this(int fd) { 180 this.fd = fd; 181 poller.put(FdPoll(fd), [PollEvent.in_]); 182 } 183 184 ~this() { 185 if (fd) { 186 foreach (fdinfo; directoryMap.byValue) { 187 if (fdinfo.watched) 188 inotify_rm_watch(fd, fdinfo.wd); 189 } 190 close(fd); 191 } 192 } 193 194 /** Add a path to watch for events. 195 * 196 * Params: 197 * path = path to watch 198 * events = events to watch for. See man inotify and core.sys.linux.sys.inotify. 199 * 200 * Returns: true if the path was successfully added. 201 */ 202 bool watch(Path path, uint events = ContentEvents) { 203 const wd = inotify_add_watch(fd, path.toStringz, events); 204 if (wd != -1) { 205 const fc = fcntl(fd, F_SETFD, FD_CLOEXEC); 206 if (fc != -1) { 207 directoryMap[wd] = FDInfo(wd, true, path); 208 return true; 209 } 210 } 211 212 return false; 213 } 214 215 /// 216 bool watch(string p, uint events = ContentEvents) { 217 return watch(Path(p)); 218 } 219 220 private static bool allFiles(string p) { 221 return true; 222 } 223 224 /** Recursively add the path and all its subdirectories and files to be watched. 225 * 226 * Params: 227 * pred = only those files and directories that `pred` returns true for are watched, by default every file/directory. 228 * root = directory to watch together with its content and subdirectories. 229 * events = events to watch for. See man inotify and core.sys.linux.sys.inotify. 230 * 231 * Returns: paths that failed to be added. 232 */ 233 AbsolutePath[] watchRecurse(Path root, uint events = ContentEvents, 234 bool delegate(string) pred = toDelegate(&allFiles)) { 235 import std.algorithm : filter; 236 import my.file : existsAnd; 237 238 auto failed = appender!(AbsolutePath[])(); 239 240 if (!watch(root, events)) { 241 failed.put(AbsolutePath(root)); 242 } 243 244 if (!existsAnd!isDir(root)) { 245 return failed.data; 246 } 247 248 auto dirs = [AbsolutePath(root)]; 249 Set!AbsolutePath visited; 250 while (!dirs.empty) { 251 auto front = dirs[0]; 252 dirs = dirs[1 .. $]; 253 if (front in visited) 254 continue; 255 visited.add(front); 256 257 try { 258 foreach (p; dirEntries(front, SpanMode.shallow).filter!(a => pred(a.name))) { 259 if (!watch(Path(p.name), events)) { 260 failed.put(AbsolutePath(p.name)); 261 } 262 if (existsAnd!isDir(Path(p.name))) { 263 dirs ~= AbsolutePath(p.name); 264 } 265 } 266 } catch (Exception e) { 267 () @trusted { logger.trace(e); }(); 268 logger.trace(e.msg); 269 failed.put(AbsolutePath(front)); 270 } 271 } 272 273 return failed.data; 274 } 275 276 /// 277 AbsolutePath[] watchRecurse(string root, uint events = ContentEvents, 278 bool delegate(string) pred = toDelegate(&allFiles)) { 279 return watchRecurse(Path(root), events, pred); 280 } 281 282 /** The events that have occured since last query. 283 * 284 * Params: 285 * timeout = max time to wait for events. 286 * 287 * Returns: the events that has occured to the watched paths. 288 */ 289 FileChangeEvent[] getEvents(Duration timeout = Duration.zero) { 290 import std.algorithm : min; 291 292 FileChangeEvent[] events; 293 if (!fd) 294 return events; 295 296 auto res = poller.wait(timeout); 297 298 if (res.empty) { 299 return events; 300 } 301 302 if (res[0].status[PollStatus.nval]) { 303 throw new Exception("Failed to poll events. File descriptor not open " ~ fd.to!string); 304 } 305 306 if (!res[0].status[PollStatus.in_]) { 307 // no events to read 308 return events; 309 } 310 311 const receivedBytes = read(fd, eventBuffer.ptr, eventBuffer.length); 312 int i = 0; 313 AbsolutePath[uint] cookie; 314 while (true) { 315 auto info = cast(inotify_event*)(eventBuffer.ptr + i); 316 317 if (info.wd == -1) { 318 events ~= FileChangeEvent(Event.Overflow.init); 319 } 320 if (info.wd !in directoryMap) 321 continue; 322 323 auto fname = () { 324 string fileName = info.name.ptr.fromStringz.idup; 325 return AbsolutePath(buildPath(directoryMap[info.wd].path, fileName)); 326 }(); 327 328 if ((info.mask & IN_MOVED_TO) == 0) { 329 if (auto v = info.cookie in cookie) { 330 events ~= FileChangeEvent(Event.Delete(*v)); 331 cookie.remove(info.cookie); 332 } 333 } 334 335 if ((info.mask & IN_ACCESS) != 0) { 336 events ~= FileChangeEvent(Event.Access(fname)); 337 } 338 339 if ((info.mask & IN_ATTRIB) != 0) { 340 events ~= FileChangeEvent(Event.Attribute(fname)); 341 } 342 343 if ((info.mask & IN_CLOSE_WRITE) != 0) { 344 events ~= FileChangeEvent(Event.CloseWrite(fname)); 345 } 346 347 if ((info.mask & IN_CLOSE_NOWRITE) != 0) { 348 events ~= FileChangeEvent(Event.CloseNoWrite(fname)); 349 } 350 351 if ((info.mask & IN_CREATE) != 0) { 352 events ~= FileChangeEvent(Event.Create(fname)); 353 } 354 355 if ((info.mask & IN_DELETE) != 0) { 356 events ~= FileChangeEvent(Event.Delete(fname)); 357 } 358 359 if ((info.mask & IN_DELETE_SELF) != 0) { 360 // must go via the mapping or there may be trailing junk in fname. 361 events ~= FileChangeEvent(Event.DeleteSelf(directoryMap[info.wd].path.AbsolutePath)); 362 } 363 364 if ((info.mask & IN_MODIFY) != 0) { 365 events ~= FileChangeEvent(Event.Modify(fname)); 366 } 367 368 if ((info.mask & IN_MOVE_SELF) != 0) { 369 // must go via the mapping or there may be trailing junk in fname. 370 events ~= FileChangeEvent(Event.MoveSelf(directoryMap[info.wd].path.AbsolutePath)); 371 } 372 373 if ((info.mask & IN_MOVED_FROM) != 0) { 374 cookie[info.cookie] = fname; 375 } 376 377 if ((info.mask & IN_MOVED_TO) != 0) { 378 if (auto v = info.cookie in cookie) { 379 events ~= FileChangeEvent(Event.Rename(*v, fname)); 380 cookie.remove(info.cookie); 381 } else { 382 events ~= FileChangeEvent(Event.Create(fname)); 383 } 384 } 385 386 if ((info.mask & IN_DELETE_SELF) != 0 || (info.mask & IN_MOVE_SELF) != 0) { 387 inotify_rm_watch(fd, info.wd); 388 directoryMap[info.wd].watched = false; 389 } 390 391 i += inotify_event.sizeof + info.len; 392 393 if (i >= receivedBytes) 394 break; 395 } 396 397 foreach (c; cookie.byValue) { 398 events ~= FileChangeEvent(Event.Delete(AbsolutePath(c))); 399 } 400 401 return events; 402 } 403 } 404 405 /// 406 unittest { 407 import core.thread; 408 409 if (exists("test")) 410 rmdirRecurse("test"); 411 scope (exit) { 412 if (exists("test")) 413 rmdirRecurse("test"); 414 } 415 416 auto watcher = fileWatch(); 417 418 mkdir("test"); 419 assert(watcher.watch("test")); 420 421 write("test/a.txt", "abc"); 422 auto ev = watcher.getEvents(5.dur!"seconds"); 423 assert(ev.length > 0); 424 assert(ev[0].tryMatch!((Event.Create x) { 425 assert(x.path == AbsolutePath("test/a.txt")); 426 return true; 427 })); 428 429 append("test/a.txt", "def"); 430 ev = watcher.getEvents(5.dur!"seconds"); 431 assert(ev.length > 0); 432 assert(ev[0].tryMatch!((Event.Modify x) { 433 assert(x.path == AbsolutePath("test/a.txt")); 434 return true; 435 })); 436 437 rename("test/a.txt", "test/b.txt"); 438 ev = watcher.getEvents(5.dur!"seconds"); 439 assert(ev.length > 0); 440 assert(ev[0].tryMatch!((Event.Rename x) { 441 assert(x.from == AbsolutePath("test/a.txt")); 442 assert(x.to == AbsolutePath("test/b.txt")); 443 return true; 444 })); 445 446 remove("test/b.txt"); 447 ev = watcher.getEvents(5.dur!"seconds"); 448 assert(ev.length > 0); 449 assert(ev[0].tryMatch!((Event.Delete x) { 450 assert(x.path == AbsolutePath("test/b.txt")); 451 return true; 452 })); 453 454 rmdirRecurse("test"); 455 ev = watcher.getEvents(5.dur!"seconds"); 456 assert(ev.length > 0); 457 assert(ev[0].tryMatch!((Event.DeleteSelf x) { 458 assert(x.path == AbsolutePath("test")); 459 return true; 460 })); 461 } 462 463 /// 464 unittest { 465 import std.algorithm : canFind; 466 467 if (exists("test2")) 468 rmdirRecurse("test2"); 469 if (exists("test3")) 470 rmdirRecurse("test3"); 471 scope (exit) { 472 if (exists("test2")) 473 rmdirRecurse("test2"); 474 if (exists("test3")) 475 rmdirRecurse("test3"); 476 } 477 478 auto watcher = fileWatch(); 479 mkdir("test2"); 480 assert(watcher.watchRecurse("test2").length == 0); 481 482 write("test2/a.txt", "abc"); 483 auto ev = watcher.getEvents(5.dur!"seconds"); 484 assert(ev.length == 3); 485 assert(ev[0].tryMatch!((Event.Create x) { 486 assert(x.path == AbsolutePath("test2/a.txt")); 487 return true; 488 })); 489 assert(ev[1].tryMatch!((Event.Modify x) { 490 assert(x.path == AbsolutePath("test2/a.txt")); 491 return true; 492 })); 493 assert(ev[2].tryMatch!((Event.CloseWrite x) { 494 assert(x.path == AbsolutePath("test2/a.txt")); 495 return true; 496 })); 497 498 rename("test2/a.txt", "./testfile-a.txt"); 499 ev = watcher.getEvents(5.dur!"seconds"); 500 assert(ev.length == 1); 501 assert(ev[0].tryMatch!((Event.Delete x) { 502 assert(x.path == AbsolutePath("test2/a.txt")); 503 return true; 504 })); 505 506 rename("./testfile-a.txt", "test2/b.txt"); 507 ev = watcher.getEvents(5.dur!"seconds"); 508 assert(ev.length == 1); 509 assert(ev[0].tryMatch!((Event.Create x) { 510 assert(x.path == AbsolutePath("test2/b.txt")); 511 return true; 512 })); 513 514 remove("test2/b.txt"); 515 ev = watcher.getEvents(5.dur!"seconds"); 516 assert(ev.length == 1); 517 assert(ev[0].tryMatch!((Event.Delete x) { 518 assert(x.path == AbsolutePath("test2/b.txt")); 519 return true; 520 })); 521 522 mkdir("test2/mydir"); 523 rmdir("test2/mydir"); 524 ev = watcher.getEvents(5.dur!"seconds"); 525 assert(ev.length == 2); 526 assert(ev[0].tryMatch!((Event.Create x) { 527 assert(x.path == AbsolutePath("test2/mydir")); 528 return true; 529 })); 530 assert(ev[1].tryMatch!((Event.Delete x) { 531 assert(x.path == AbsolutePath("test2/mydir")); 532 return true; 533 })); 534 535 // test for creation, modification, removal of subdirectory 536 mkdir("test2/subdir"); 537 ev = watcher.getEvents(5.dur!"seconds"); 538 assert(ev.length == 1); 539 assert(ev[0].tryMatch!((Event.Create x) { 540 assert(x.path == AbsolutePath("test2/subdir")); 541 // add the created directory to be watched 542 watcher.watchRecurse(x.path); 543 return true; 544 })); 545 546 write("test2/subdir/c.txt", "abc"); 547 ev = watcher.getEvents(5.dur!"seconds"); 548 assert(ev.length == 3); 549 assert(ev[0].tryMatch!((Event.Create x) { 550 assert(x.path == AbsolutePath("test2/subdir/c.txt")); 551 return true; 552 })); 553 554 write("test2/subdir/c.txt", "\nabc"); 555 ev = watcher.getEvents(5.dur!"seconds"); 556 assert(ev.length == 2); 557 assert(ev[0].tryMatch!((Event.Modify x) { 558 assert(x.path == AbsolutePath("test2/subdir/c.txt")); 559 return true; 560 })); 561 562 rmdirRecurse("test2/subdir"); 563 ev = watcher.getEvents(5.dur!"seconds"); 564 assert(ev.length == 3); 565 foreach (e; ev) { 566 assert(ev[0].tryMatch!((Event.Delete x) { 567 assert(canFind([ 568 AbsolutePath("test2/subdir/c.txt"), 569 AbsolutePath("test2/subdir") 570 ], x.path)); 571 return true; 572 }, (Event.DeleteSelf x) { 573 assert(x.path == AbsolutePath("test2/subdir")); 574 return true; 575 })); 576 } 577 578 // removal of watched folder 579 rmdirRecurse("test2"); 580 ev = watcher.getEvents(5.dur!"seconds"); 581 assert(ev.length == 1); 582 assert(ev[0].tryMatch!((Event.DeleteSelf x) { 583 assert(x.path == AbsolutePath("test2")); 584 return true; 585 })); 586 } 587 588 struct MonitorResult { 589 enum Kind { 590 Access, 591 Attribute, 592 CloseWrite, 593 CloseNoWrite, 594 Create, 595 Delete, 596 DeleteSelf, 597 Modify, 598 MoveSelf, 599 Rename, 600 Open, 601 Overflow, 602 } 603 604 Kind kind; 605 AbsolutePath path; 606 } 607 608 /** Monitor root's for filesystem changes which create/remove/modify 609 * files/directories. 610 */ 611 struct Monitor { 612 import std.array : appender; 613 import std.file : isDir; 614 import std.utf : UTFException; 615 import my.filter : GlobFilter; 616 import my.fswatch; 617 618 private { 619 Set!AbsolutePath roots; 620 FileWatch fw; 621 GlobFilter fileFilter; 622 uint events; 623 624 // roots that has been removed that may be re-added later on. the user 625 // expects them to trigger events. 626 Set!AbsolutePath monitorRoots; 627 } 628 629 /** 630 * Params: 631 * roots = directories to recursively monitor 632 */ 633 this(AbsolutePath[] roots, GlobFilter fileFilter, uint events = ContentEvents) { 634 this.roots = toSet(roots); 635 this.fileFilter = fileFilter; 636 this.events = events; 637 638 auto app = appender!(AbsolutePath[])(); 639 fw = fileWatch(); 640 foreach (r; roots) { 641 app.put(fw.watchRecurse(r, events, (a) { 642 return isInteresting(fileFilter, a); 643 })); 644 } 645 646 logger.trace(!app.data.empty, "unable to watch ", app.data); 647 } 648 649 static bool isInteresting(GlobFilter fileFilter, string p) nothrow { 650 import my.file; 651 652 try { 653 const ap = AbsolutePath(p); 654 655 if (existsAnd!isDir(ap)) { 656 return true; 657 } 658 return fileFilter.match(ap); 659 } catch (Exception e) { 660 collectException(logger.trace(e.msg)); 661 } 662 663 return false; 664 } 665 666 /** Wait up to `timeout` for an event to occur for the monitored `roots`. 667 * 668 * Params: 669 * timeout = how long to wait for the event 670 */ 671 MonitorResult[] wait(Duration timeout) { 672 import std.array : array; 673 import std.algorithm : canFind, startsWith, filter; 674 675 auto rval = appender!(MonitorResult[])(); 676 677 { 678 auto rm = appender!(AbsolutePath[])(); 679 foreach (a; monitorRoots.toRange.filter!(a => exists(a))) { 680 fw.watchRecurse(a, events, a => isInteresting(fileFilter, a)); 681 rm.put(a); 682 rval.put(MonitorResult(MonitorResult.Kind.Create, a)); 683 } 684 foreach (a; rm.data) { 685 monitorRoots.remove(a); 686 } 687 } 688 689 if (!rval.data.empty) { 690 // collect whatever events that happend to have queued up together 691 // with the artifically created. 692 timeout = Duration.zero; 693 } 694 695 try { 696 foreach (e; fw.getEvents(timeout)) { 697 e.match!((Event.Overflow x) { 698 rval.put(MonitorResult(MonitorResult.Kind.Overflow)); 699 }, (Event.Access x) { 700 rval.put(MonitorResult(MonitorResult.Kind.Access, x.path)); 701 }, (Event.Attribute x) { 702 rval.put(MonitorResult(MonitorResult.Kind.Attribute, x.path)); 703 }, (Event.CloseWrite x) { 704 rval.put(MonitorResult(MonitorResult.Kind.CloseWrite, x.path)); 705 }, (Event.CloseNoWrite x) { 706 rval.put(MonitorResult(MonitorResult.Kind.CloseNoWrite, x.path)); 707 }, (Event.Create x) { 708 rval.put(MonitorResult(MonitorResult.Kind.Create, x.path)); 709 fw.watchRecurse(x.path, events, a => isInteresting(fileFilter, a)); 710 }, (Event.Modify x) { 711 rval.put(MonitorResult(MonitorResult.Kind.Modify, x.path)); 712 }, (Event.MoveSelf x) { 713 rval.put(MonitorResult(MonitorResult.Kind.MoveSelf, x.path)); 714 fw.watchRecurse(x.path, events, a => isInteresting(fileFilter, a)); 715 716 if (x.path in roots) { 717 monitorRoots.add(x.path); 718 } 719 }, (Event.Delete x) { 720 rval.put(MonitorResult(MonitorResult.Kind.Delete, x.path)); 721 }, (Event.DeleteSelf x) { 722 rval.put(MonitorResult(MonitorResult.Kind.DeleteSelf, x.path)); 723 724 if (x.path in roots) { 725 monitorRoots.add(x.path); 726 } 727 }, (Event.Rename x) { 728 rval.put(MonitorResult(MonitorResult.Kind.Rename, x.to)); 729 }, (Event.Open x) { 730 rval.put(MonitorResult(MonitorResult.Kind.Open, x.path)); 731 },); 732 } 733 } catch (Exception e) { 734 logger.trace(e.msg); 735 } 736 737 return rval.data.filter!(a => fileFilter.match(a.path)).array; 738 } 739 740 /** Collects events from the monitored `roots` over a period. 741 * 742 * Params: 743 * collectTime = for how long to clear the queue 744 */ 745 MonitorResult[] collect(Duration collectTime) { 746 import std.algorithm : max, min; 747 import std.datetime : Clock; 748 749 auto rval = appender!(MonitorResult[])(); 750 const stopAt = Clock.currTime + collectTime; 751 752 do { 753 collectTime = max(stopAt - Clock.currTime, 1.dur!"msecs"); 754 if (!monitorRoots.empty) { 755 // must use a hybrid approach of poll + inotify because if a 756 // root is added it will only be detected by polling. 757 collectTime = min(10.dur!"msecs", collectTime); 758 } 759 760 rval.put(wait(collectTime)); 761 } 762 while (Clock.currTime < stopAt); 763 764 return rval.data; 765 } 766 } 767 768 @("shall re-apply monitoring for a file that is removed") 769 unittest { 770 import my.filter : GlobFilter; 771 import my.test; 772 773 auto ta = makeTestArea("re-apply monitoring"); 774 const testTxt = ta.inSandbox("test.txt").AbsolutePath; 775 776 write(testTxt, "abc"); 777 auto fw = Monitor([testTxt], GlobFilter(["*"], null)); 778 write(testTxt, "abcc"); 779 assert(!fw.wait(Duration.zero).empty); 780 781 remove(testTxt); 782 assert(!fw.wait(Duration.zero).empty); 783 784 write(testTxt, "abcc"); 785 assert(!fw.wait(Duration.zero).empty); 786 } 787 788 /** A file descriptor to poll. 789 */ 790 struct FdPoll { 791 int value; 792 } 793 794 /// Uses the linux poll syscall to wait for activity on the file descriptors. 795 struct FdPoller { 796 import std.algorithm : min, filter; 797 798 private { 799 pollfd[] fds; 800 PollResult[] results; 801 } 802 803 void put(FdPoll fd, PollEvent[] evs) { 804 import core.sys.posix.poll; 805 806 pollfd pfd; 807 pfd.fd = fd.value; 808 foreach (e; evs) { 809 final switch (e) with (PollEvent) { 810 case in_: 811 pfd.events |= POLLIN; 812 break; 813 case out_: 814 pfd.events |= POLLOUT; 815 break; 816 } 817 } 818 fds ~= pfd; 819 820 // they must be the same length or else `wait` will fail. 821 results.length = fds.length; 822 } 823 824 void remove(FdPoll fd) { 825 fds = fds.filter!(a => a.fd != fd.value).array; 826 827 results.length = fds.length; 828 } 829 830 PollResult[] wait(Duration timeout = Duration.zero) { 831 import core.sys.posix.poll; 832 import std.bitmanip : BitArray; 833 834 const code = poll(&fds[0], fds.length, cast(int) min(int.max, timeout.total!"msecs")); 835 836 if (code < 0) { 837 import core.stdc.errno : errno, EINTR; 838 839 if (errno == EINTR) { 840 // poll just interrupted. try again. 841 return (PollResult[]).init; 842 } 843 844 throw new Exception("Failed to poll events. Error code " ~ errno.to!string); 845 } else if (code == 0) { 846 // timeout triggered 847 return (PollResult[]).init; 848 } 849 850 size_t idx; 851 foreach (a; fds.filter!(a => a.revents != 0)) { 852 PollResult res; 853 res.status = BitArray([ 854 (a.revents & POLLIN) != 0, (a.revents & POLLOUT) != 0, 855 (a.revents & POLLPRI) != 0, (a.revents & POLLERR) != 0, 856 (a.revents & POLLHUP) != 0, (a.revents & POLLNVAL) != 0, 857 ]); 858 res.fd = FdPoll(a.fd); 859 results[idx] = res; 860 idx++; 861 } 862 863 return results[0 .. idx]; 864 } 865 } 866 867 /// Type of event to poll for. 868 enum PollEvent { 869 in_, 870 out_, 871 } 872 873 /// What each bit in `PollResult.status` represent. 874 enum PollStatus { 875 // There is data to read. 876 in_, 877 // Writing is now possible, though a write larger that the available 878 // space in a socket or pipe will still block (unless O_NONBLOCK is set). 879 out_, 880 // There is some exceptional condition on the file descriptor. Possibilities include: 881 // * There is out-of-band data on a TCP socket (see tcp(7)). 882 // * A pseudoterminal master in packet mode has seen a state change on the slave (see ioctl_tty(2)). 883 // * A cgroup.events file has been modified (see cgroups(7)). 884 pri, 885 // Error condition (only returned in revents; ignored in events). This bit 886 // is also set for a file descriptor referring to the write end of a pipe 887 // when the read end has been closed. 888 error, 889 // Hang up (only returned in revents; ignored in events). Note that when 890 // reading from a channel such as a pipe or a stream socket, this event 891 // merely indicates that the peer closed its end of the channel. 892 // Subsequent reads from the channel will re‐ turn 0 (end of file) only 893 // after all outstanding data in the channel has been consumed. 894 hup, 895 /// Invalid request: fd not open (only returned in revents; ignored in events). 896 nval, 897 } 898 899 /// File descriptors that triggered. 900 struct PollResult { 901 import std.bitmanip : BitArray; 902 903 BitArray status; 904 FdPoll fd; 905 }