浏览代码

Stub out more of the file system watcher structure.

父节点
当前提交
91e28880ea
共有 4 个文件被更改,包括 105 次插入3 次删除
  1. 24
    0
      Cargo.lock
  2. 1
    1
      Cargo.toml
  3. 73
    1
      src/file_system_watcher.rs
  4. 7
    1
      src/filetree.rs

+ 24
- 0
Cargo.lock 查看文件

432
  "unicode-normalization 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
432
  "unicode-normalization 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
433
 ]
433
 ]
434
 
434
 
435
+[[package]]
436
+name = "inotify"
437
+version = "0.8.0"
438
+source = "registry+https://github.com/rust-lang/crates.io-index"
439
+dependencies = [
440
+ "bitflags 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
441
+ "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
442
+ "inotify-sys 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
443
+ "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
444
+ "mio 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)",
445
+ "tokio 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)",
446
+]
447
+
448
+[[package]]
449
+name = "inotify-sys"
450
+version = "0.1.3"
451
+source = "registry+https://github.com/rust-lang/crates.io-index"
452
+dependencies = [
453
+ "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
454
+]
455
+
435
 [[package]]
456
 [[package]]
436
 name = "input_buffer"
457
 name = "input_buffer"
437
 version = "0.2.0"
458
 version = "0.2.0"
926
  "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
947
  "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
927
  "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
948
  "futures 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
928
  "futures-tokio-compat 0.1.0 (git+https://github.com/mgottschlag/futures-tokio-compat.git)",
949
  "futures-tokio-compat 0.1.0 (git+https://github.com/mgottschlag/futures-tokio-compat.git)",
950
+ "inotify 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
929
  "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
951
  "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
930
  "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
952
  "serde 1.0.104 (registry+https://github.com/rust-lang/crates.io-index)",
931
  "serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)",
953
  "serde_json 1.0.44 (registry+https://github.com/rust-lang/crates.io-index)",
1103
 "checksum httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9"
1125
 "checksum httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9"
1104
 "checksum humantime 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
1126
 "checksum humantime 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f"
1105
 "checksum idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9"
1127
 "checksum idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9"
1128
+"checksum inotify 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c45f8a4a890357e626ac878be77e43d9a29ca17f67c477517c26ee108d32d973"
1129
+"checksum inotify-sys 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)" = "e74a1aa87c59aeff6ef2cc2fa62d41bc43f54952f55652656b18a02fd5e356c0"
1106
 "checksum input_buffer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8e1b822cc844905551931d6f81608ed5f50a79c1078a4e2b4d42dbc7c1eedfbf"
1130
 "checksum input_buffer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8e1b822cc844905551931d6f81608ed5f50a79c1078a4e2b4d42dbc7c1eedfbf"
1107
 "checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
1131
 "checksum iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
1108
 "checksum itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "501266b7edd0174f8530248f87f99c88fbe60ca4ef3dd486835b8d8d53136f7f"
1132
 "checksum itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)" = "501266b7edd0174f8530248f87f99c88fbe60ca4ef3dd486835b8d8d53136f7f"

+ 1
- 1
Cargo.toml 查看文件

5
 edition = "2018"
5
 edition = "2018"
6
 
6
 
7
 [dependencies]
7
 [dependencies]
8
-#fswatcher = { path = "./fswatcher" }
9
 structopt = "0.3"
8
 structopt = "0.3"
10
 serde = { version = "1.0", features = ["derive"] }
9
 serde = { version = "1.0", features = ["derive"] }
11
 serde_json = "1.0"
10
 serde_json = "1.0"
17
 async-tungstenite = { version = "0.3", features = ["tokio-runtime"] }
16
 async-tungstenite = { version = "0.3", features = ["tokio-runtime"] }
18
 env_logger = "0.7"
17
 env_logger = "0.7"
19
 log = "0.4"
18
 log = "0.4"
19
+inotify = "0.8"

+ 73
- 1
src/file_system_watcher.rs 查看文件

1
+use std::collections::VecDeque;
1
 use std::sync::Arc;
2
 use std::sync::Arc;
3
+use std::path::Path;
2
 
4
 
5
+use inotify::{Inotify, WatchMask};
3
 use tokio::sync::{mpsc, oneshot, Mutex};
6
 use tokio::sync::{mpsc, oneshot, Mutex};
7
+use tokio::task;
4
 
8
 
5
 use super::{Error, FileTree};
9
 use super::{Error, FileTree};
6
 
10
 
38
     }
42
     }
39
 
43
 
40
     async fn task_paused(
44
     async fn task_paused(
41
-        _file_tree: Arc<Mutex<FileTree>>,
45
+        mut file_tree: Arc<Mutex<FileTree>>,
42
         mut state_receive: mpsc::Receiver<(StateChange, oneshot::Sender<()>)>,
46
         mut state_receive: mpsc::Receiver<(StateChange, oneshot::Sender<()>)>,
43
     ) {
47
     ) {
44
         let mut paused = true;
48
         let mut paused = true;
56
             } else {
60
             } else {
57
                 // We were unpaused, initialize the file system watcher and start waiting for
61
                 // We were unpaused, initialize the file system watcher and start waiting for
58
                 // that as well.
62
                 // that as well.
63
+                let next_state = Self::task_unpaused(&mut file_tree, &mut state_receive).await;
64
+                match next_state {
65
+                    StateChange::Pause => paused = true,
66
+                    StateChange::Unpause => paused = false,
67
+                    StateChange::Terminate => break,
68
+                }
69
+            }
70
+        }
71
+    }
72
+
73
+    async fn task_unpaused(
74
+        file_tree: &mut Arc<Mutex<FileTree>>,
75
+        _state_receive: &mut mpsc::Receiver<(StateChange, oneshot::Sender<()>)>,
76
+    ) -> StateChange {
77
+        let mut inotify = Inotify::init().expect("Failed to initialize inotify");
78
+        let mut new_directories = VecDeque::new();
79
+        new_directories.push_back(file_tree.lock().await.root_path().to_owned());
80
+
81
+        let mut inotify_buffer = InotifyBuffer { data: [0; 1024] };
82
+        let _inotify_stream = inotify.event_stream(&mut inotify_buffer).unwrap();
83
+        //let watches_by_path = BTreeMap::new();
84
+        //let paths_by_watch = HashMap::new();
85
+
86
+        loop {
87
+            if new_directories.is_empty() {
88
+                // If all the inotify watches are up-to-date, simply wait for any events.
59
                 // TODO
89
                 // TODO
90
+            } else {
91
+                // Install an inotify watch for the directory and check the directory contents
92
+                // against the database.
93
+                let new_directory = new_directories.pop_front().unwrap();
94
+                // TODO: Do not follow links!
95
+                // TODO: Is ONLYDIR correct?
96
+                // TODO: We want to delay events for 2-3 seconds, so that we limit the rate of
97
+                // modification events and so that we are maybe able to detect moved
98
+                // files/directories across different directories!
99
+                if Path::new(&new_directory).is_dir() {
100
+                    let watch = inotify.add_watch(new_directory, WatchMask::ATTRIB | WatchMask::CLOSE_WRITE | WatchMask::CREATE | WatchMask::DELETE | WatchMask::DELETE_SELF | WatchMask::MODIFY | WatchMask::MOVE | WatchMask::EXCL_UNLINK | WatchMask::ONLYDIR);
101
+                    // TODO
102
+                }
103
+
104
+                // Proceed recursively at the next loop iteration.
105
+                // TODO
106
+
107
+                // Poll for events (without blocking, so that we immediately process the next
108
+                // directory if nothing happened).
109
+                // TODO
110
+
111
+                // Yield to other parts of the program to minimize the impact of scanning large
112
+                // directory trees.
113
+                // TODO: Check whether this is really necessary and how large the impact on
114
+                // performance is.
115
+                task::yield_now().await;
60
             }
116
             }
61
         }
117
         }
62
     }
118
     }
75
     }
131
     }
76
 }
132
 }
77
 
133
 
134
+struct InotifyBuffer {
135
+    data: [u8; 1024],
136
+}
137
+
138
+impl AsMut<[u8]> for InotifyBuffer {
139
+    fn as_mut<'a>(&'a mut self) -> &'a mut [u8] {
140
+        &mut self.data
141
+    }
142
+}
143
+
144
+impl AsRef<[u8]> for InotifyBuffer {
145
+    fn as_ref<'a>(&'a self) -> &'a [u8] {
146
+        &self.data
147
+    }
148
+}
149
+
78
 #[derive(Debug)]
150
 #[derive(Debug)]
79
 enum StateChange {
151
 enum StateChange {
80
     Pause,
152
     Pause,

+ 7
- 1
src/filetree.rs 查看文件

9
     db: Arc<Mutex<Database>>,
9
     db: Arc<Mutex<Database>>,
10
     new_version_notifiers: Vec<UnboundedSender<NewVersionEvent>>,
10
     new_version_notifiers: Vec<UnboundedSender<NewVersionEvent>>,
11
     next_version_source: u64,
11
     next_version_source: u64,
12
+    root_path: String,
12
 }
13
 }
13
 
14
 
14
 impl FileTree {
15
 impl FileTree {
15
-    pub fn open(db: Arc<Mutex<Database>>, _directory: &str) -> Result<FileTree, Error> {
16
+    pub fn open(db: Arc<Mutex<Database>>, directory: &str) -> Result<FileTree, Error> {
16
         // Check whether the database entry for this file tree exists and fetch the root
17
         // Check whether the database entry for this file tree exists and fetch the root
17
         // file ID.
18
         // file ID.
18
         // TODO
19
         // TODO
30
             db,
31
             db,
31
             new_version_notifiers: Vec::new(),
32
             new_version_notifiers: Vec::new(),
32
             next_version_source: 0,
33
             next_version_source: 0,
34
+            root_path: directory.to_owned(),
33
         })
35
         })
34
     }
36
     }
35
 
37
 
38
+    pub fn root_path(&self) -> &str {
39
+        &self.root_path
40
+    }
41
+
36
     pub fn new_notifier(&mut self) -> NewVersionNotifier {
42
     pub fn new_notifier(&mut self) -> NewVersionNotifier {
37
         let (send, receive) = unbounded_channel();
43
         let (send, receive) = unbounded_channel();
38
         self.new_version_notifiers.push(send);
44
         self.new_version_notifiers.push(send);

正在加载...
取消
保存