Bladeren bron

More file system watcher code.

Mathias Gottschlag 5 jaren geleden
bovenliggende
commit
83f696a1a4
3 gewijzigde bestanden met toevoegingen van 319 en 74 verwijderingen
  1. 220
    62
      src/file_system_watcher.rs
  2. 86
    12
      src/file_tree.rs
  3. 13
    0
      src/lib.rs

+ 220
- 62
src/file_system_watcher.rs Bestand weergeven

@@ -1,6 +1,7 @@
1 1
 use std::collections::HashMap;
2 2
 use std::ffi::OsString;
3 3
 use std::fs;
4
+use std::path::{Path, PathBuf};
4 5
 use std::sync::Arc;
5 6
 use std::time::Duration;
6 7
 
@@ -10,25 +11,35 @@ use tokio::stream::StreamExt;
10 11
 use tokio::sync::{mpsc, oneshot, Mutex};
11 12
 use tokio::task;
12 13
 
13
-use super::{Error, FileTree, SynchronizationError};
14
+use super::file_tree::{FileInfo, FileTree, FileType, LockedPath, NewVersionSource, OriginInfo};
15
+use super::{Error, SynchronizationError};
14 16
 
15 17
 pub struct FileSystemWatcher {
16 18
     state_send: mpsc::Sender<(StateChange, oneshot::Sender<()>)>,
19
+    new_version_source: NewVersionSource,
17 20
 }
18 21
 
19 22
 impl FileSystemWatcher {
20
-    pub fn new(
23
+    pub async fn new(
21 24
         file_tree: Arc<Mutex<FileTree>>,
22 25
         errors: mpsc::Sender<SynchronizationError>,
23 26
     ) -> Result<FileSystemWatcher, Error> {
27
+        let new_version_source = file_tree.lock().await.new_version_source();
24 28
         // Spawn the task that watches the file system as soon as unpause() is called.
25 29
         let (state_send, pause_receive) = mpsc::channel(1);
26 30
         tokio::spawn(async move {
27
-            let mut fsw_task = FileSystemWatcherTask { file_tree, errors };
31
+            let mut fsw_task = FileSystemWatcherTask {
32
+                file_tree,
33
+                errors,
34
+                new_version_source,
35
+            };
28 36
             fsw_task.task_paused(pause_receive).await;
29 37
         });
30 38
 
31
-        Ok(FileSystemWatcher { state_send })
39
+        Ok(FileSystemWatcher {
40
+            state_send,
41
+            new_version_source,
42
+        })
32 43
     }
33 44
 
34 45
     pub async fn pause(&mut self) {
@@ -53,6 +64,7 @@ impl FileSystemWatcher {
53 64
 struct FileSystemWatcherTask {
54 65
     file_tree: Arc<Mutex<FileTree>>,
55 66
     errors: mpsc::Sender<SynchronizationError>,
67
+    new_version_source: NewVersionSource,
56 68
 }
57 69
 
58 70
 impl FileSystemWatcherTask {
@@ -160,77 +172,102 @@ impl FileSystemWatcherTask {
160 172
         // directory is not readable, we do not want to synchronize it.
161 173
         match event {
162 174
             FileSystemEvent::Stopped(reason) => match reason {
163
-                StopReason::DirectoryRemoved => {
164
-                    return Err(Error::DirectoryRemoved);
165
-                }
175
+                StopReason::DirectoryRemoved => Err(Error::DirectoryRemoved),
166 176
             },
167
-            FileSystemEvent::DirectoryWatched(path) => {
168
-                // Fetch the directory contents as well as the database contents.
169
-                // TODO: We load all entries into a vector, which might be problematic for *very*
170
-                // large directories.
171
-                let fs_entries = match fs::read_dir(&path) {
172
-                    Ok(entries) => entries,
173
-                    Err(_) => return Ok(()),
174
-                };
175
-                let fs_entries = fs_entries
176
-                    .filter(|x| x.is_ok())
177
-                    .map(|x| (x.as_ref().unwrap().file_name(), x.unwrap()))
178
-                    .collect::<HashMap<_, _>>();
179
-                let db_entries = match self.file_tree.lock().await.get_directory_listing(&path) {
180
-                    Some(entries) => entries,
181
-                    None => return Ok(()),
182
-                };
183
-                let db_entries = db_entries
184
-                    .into_iter()
185
-                    .map(|x| (x.file_name.clone(), x))
186
-                    .collect::<HashMap<_, _>>();
187
-
188
-                // Compare file metadata and report changed, deleted and created files and
189
-                // directories.
190
-                for (name, _entry) in fs_entries.iter() {
191
-                    if db_entries.contains_key(name) {
192
-                        // The entry is known, compare the file attributes.
193
-                        // TODO
194
-                        // If the file attributes do not match, compare the file checksums.
195
-                        // TODO
196
-                    } else {
197
-                        // The file seems to be new, insert it into the database.
198
-                        // TODO
199
-                    }
200
-                }
201
-                for (name, _entry) in db_entries.iter() {
202
-                    if !fs_entries.contains_key(name) {
203
-                        // The file has been deleted, remove it from the database.
204
-                        // TODO
205
-                    }
206
-                }
207
-            }
208
-            FileSystemEvent::DirectoryCreated(_path) => {
209
-                // TODO
210
-            }
177
+            FileSystemEvent::DirectoryWatched(path) => self.handle_directory_watched(path).await,
178
+            FileSystemEvent::DirectoryCreated(path) => self.handle_directory_created(path).await,
211 179
             FileSystemEvent::DirectoryModified(_path) => {
212 180
                 // TODO
181
+                Ok(())
213 182
             }
214
-            FileSystemEvent::DirectoryRemoved(_path) => {
215
-                // TODO
216
-            }
183
+            FileSystemEvent::DirectoryRemoved(path) => self.handle_directory_removed(path).await,
217 184
             FileSystemEvent::DirectoryMoved(_from, _to) => {
218 185
                 // TODO
186
+                Ok(())
219 187
             }
220
-            FileSystemEvent::FileCreated(_path) => {
188
+            FileSystemEvent::FileCreated(path) => self.handle_file_modified(path).await,
189
+            FileSystemEvent::FileModified(path) => self.handle_file_modified(path).await,
190
+            FileSystemEvent::FileRemoved(path) => self.handle_file_removed(path).await,
191
+            FileSystemEvent::FileMoved(_from, _to) => {
221 192
                 // TODO
193
+                Ok(())
222 194
             }
223
-            FileSystemEvent::FileModified(_path) => {
224
-                // TODO
195
+            FileSystemEvent::Error(e) => {
196
+                self.send_error_no_pause(e.into(), None).await;
197
+                Ok(())
225 198
             }
226
-            FileSystemEvent::FileRemoved(_path) => {
227
-                // TODO
199
+        }
200
+    }
201
+
202
+    async fn handle_directory_watched(&mut self, path: OsString) -> Result<(), Error> {
203
+        // We simply lock the whole directory to remove the need for
204
+        // double-checking before/after locks.
205
+        // TODO: More fine-grained locking.
206
+        let path_lock = match self.file_tree.lock().await.lock_path(&path).await {
207
+            Some(lock) => lock,
208
+            None => {
209
+                // The entry does not exist anymore.
210
+                return Ok(());
228 211
             }
229
-            FileSystemEvent::FileMoved(_from, _to) => {
212
+        };
213
+
214
+        // Fetch the directory contents as well as the database contents.
215
+        // TODO: We load all entries into a vector, which might be problematic for *very*
216
+        // large directories.
217
+        let fs_entries = match fs::read_dir(&path) {
218
+            Ok(entries) => entries,
219
+            Err(_) => return Ok(()),
220
+        };
221
+        let fs_entries = fs_entries
222
+            .filter(|x| x.is_ok())
223
+            .map(|x| (x.as_ref().unwrap().file_name(), x.unwrap()))
224
+            .collect::<HashMap<_, _>>();
225
+        let db_entries = match path_lock.get_directory_listing(&path) {
226
+            Some(entries) => entries,
227
+            None => return Ok(()),
228
+        };
229
+        let db_entries = db_entries
230
+            .into_iter()
231
+            .map(|x| (x.file_name.clone(), x))
232
+            .collect::<HashMap<_, _>>();
233
+
234
+        // Compare file metadata and report changed, deleted and created files and
235
+        // directories.
236
+        for (name, entry) in fs_entries.iter() {
237
+            if db_entries.contains_key(name) {
238
+                let db_entry = &db_entries[name];
239
+                let mut entry_path = PathBuf::from(&path);
240
+                entry_path.push(name);
241
+
242
+                // The entry is known, compare the file attributes.
243
+                let fs_metadata = match entry.metadata() {
244
+                    Ok(m) => m,
245
+                    Err(_) => continue,
246
+                };
247
+                let local_type = match FileType::from(fs_metadata.file_type()) {
248
+                    Some(t) => t,
249
+                    None => continue,
250
+                };
251
+                // If the type is different, interpret as deletion+creation.
252
+                if local_type != db_entry.file_type {
253
+                    path_lock.local_file_removed(
254
+                        self.new_version_source,
255
+                        entry_path.as_path().as_os_str(),
256
+                    );
257
+                // TODO: Create the new file/directory.
258
+                } else if local_type != FileType::Directory {
259
+                    // If the file attributes do not match, compare the file checksums.
260
+                    // TODO
261
+                }
262
+            } else {
263
+                // The file seems to be new, insert it into the database.
230 264
                 // TODO
231 265
             }
232
-            FileSystemEvent::Error(e) => {
233
-                self.send_error_no_pause(e.into(), None).await;
266
+        }
267
+        for (name, _entry) in db_entries.iter() {
268
+            if !fs_entries.contains_key(name) {
269
+                // The file has been deleted, remove it from the database.
270
+                // TODO
234 271
             }
235 272
         }
236 273
 
@@ -242,6 +279,127 @@ impl FileSystemWatcherTask {
242 279
         Ok(())
243 280
     }
244 281
 
282
+    async fn handle_directory_created<P>(&mut self, path: P) -> Result<(), Error>
283
+    where
284
+        P: AsRef<Path>,
285
+    {
286
+        let path = path.as_ref();
287
+        let parent_path = path.parent().unwrap();
288
+        let (path_lock, fs_metadata, db_metadata) =
289
+            self.prepare_single_path_change(parent_path, path).await;
290
+        if path_lock.is_none() {
291
+            return Ok(());
292
+        }
293
+        if fs_metadata.is_none() || !fs_metadata.unwrap().is_dir() {
294
+            return Ok(());
295
+        }
296
+        if db_metadata.is_some() {
297
+            if db_metadata.unwrap().file_type != FileType::Directory {
298
+                // TODO: Log an error.
299
+            }
300
+            return Ok(());
301
+        }
302
+        path_lock
303
+            .unwrap()
304
+            .create_directory(self.new_version_source, path, OriginInfo::Local)?;
305
+        Ok(())
306
+    }
307
+
308
+    async fn handle_directory_removed<P>(&mut self, path: P) -> Result<(), Error>
309
+    where
310
+        P: AsRef<Path>,
311
+    {
312
+        let path = path.as_ref();
313
+        let parent_path = path.parent().unwrap();
314
+        let (path_lock, fs_metadata, db_metadata) =
315
+            self.prepare_single_path_change(parent_path, path).await;
316
+        if path_lock.is_none() || db_metadata.is_none() {
317
+            return Ok(());
318
+        }
319
+        if fs_metadata.is_some() && fs_metadata.unwrap().is_dir() {
320
+            // The directory exists again.
321
+            return Ok(());
322
+        }
323
+        path_lock
324
+            .unwrap()
325
+            .local_file_removed(self.new_version_source, path);
326
+        Ok(())
327
+    }
328
+
329
+    async fn handle_file_modified<P>(&mut self, path: P) -> Result<(), Error>
330
+    where
331
+        P: AsRef<Path>,
332
+    {
333
+        // This function is also called for new files. If we create a file ourselves, and the user
334
+        // immediately modifies it, we need to interpref "FileCreated" as a modification.
335
+        // However, that means that we also need to expect that the file does not yet exist in the
336
+        // database yet.
337
+        let path = path.as_ref();
338
+        let path_lock = match self.file_tree.lock().await.lock_path(path).await {
339
+            Some(lock) => lock,
340
+            None => {
341
+                // The entry does not exist in the database, lock the parent.
342
+                // TODO
343
+                return Ok(());
344
+            }
345
+        };
346
+        let metadata = match fs::symlink_metadata(path) {
347
+            Ok(metadata) => metadata,
348
+            Err(_) => return Ok(()),
349
+        };
350
+        if metadata.is_dir() {
351
+            // The directory exists again, ignore.
352
+            return Ok(());
353
+        }
354
+        let db_metadata = match path_lock.get_file_info(path) {
355
+            Some(m) => m,
356
+            None => {
357
+                // The directory was already removed from the database - i.e., it was deleted via
358
+                // the network.
359
+                return Ok(());
360
+            }
361
+        };
362
+
363
+        // TODO
364
+        Ok(())
365
+    }
366
+
367
+    async fn handle_file_removed<P>(&mut self, path: P) -> Result<(), Error>
368
+    where
369
+        P: AsRef<Path>,
370
+    {
371
+        let path = path.as_ref();
372
+        let parent_path = path.parent().unwrap();
373
+        let (path_lock, fs_metadata, db_metadata) =
374
+            self.prepare_single_path_change(parent_path, path).await;
375
+        if path_lock.is_none() || db_metadata.is_none() {
376
+            return Ok(());
377
+        }
378
+        // TODO: What about symlinks?
379
+        if fs_metadata.is_some() && fs_metadata.unwrap().is_file() {
380
+            // The file exists again.
381
+            return Ok(());
382
+        }
383
+        path_lock
384
+            .unwrap()
385
+            .local_file_removed(self.new_version_source, path);
386
+        Ok(())
387
+    }
388
+
389
+    async fn prepare_single_path_change(
390
+        &mut self,
391
+        lock_path: &Path,
392
+        path: &Path,
393
+    ) -> (Option<LockedPath>, Option<fs::Metadata>, Option<FileInfo>) {
394
+        let path_lock = match self.file_tree.lock().await.lock_path(lock_path).await {
395
+            Some(lock) => lock,
396
+            None => return (None, None, None),
397
+        };
398
+        let fs_metadata = fs::symlink_metadata(path).ok();
399
+        let db_metadata = path_lock.get_file_info(path);
400
+        (Some(path_lock), fs_metadata, db_metadata)
401
+    }
402
+
245 403
     async fn send_error_no_pause(&mut self, error: Error, _path: Option<OsString>) {
246 404
         // TODO: Encode the path in the error!
247 405
         self.errors

+ 86
- 12
src/file_tree.rs Bestand weergeven

@@ -1,4 +1,6 @@
1 1
 use std::ffi::{OsStr, OsString};
2
+use std::fs;
3
+use std::path::Path;
2 4
 use std::sync::Arc;
3 5
 
4 6
 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
@@ -54,29 +56,62 @@ impl FileTree {
54 56
         source
55 57
     }
56 58
 
57
-    pub fn get_file_info(&self, _path: &OsStr) -> Option<FileInfo> {
59
+    // TODO: Limit on size of all temporary files, make this function return a
60
+    // future which yields a temporary file once enough space is available?
61
+    pub fn create_temporary_file(&self, _target_path: &OsStr) -> Result<TemporaryFile, Error> {
58 62
         // TODO
59 63
         panic!("Not yet implemented.");
60 64
     }
61 65
 
62
-    pub fn get_directory_listing(&self, _path: &OsStr) -> Option<Vec<FileInfo>> {
66
+    pub async fn lock_path<P>(&mut self, path: P) -> Option<LockedPath>
67
+    where
68
+        P: AsRef<Path>,
69
+    {
70
+        let _path = path.as_ref();
63 71
         // TODO
64 72
         panic!("Not yet implemented.");
65 73
     }
66 74
 
67
-    pub fn local_file_changed(&self, _source: NewVersionSource, _path: &OsStr, _info: FileInfo) {
75
+    pub async fn lock_two_paths<P1, P2>(&mut self, path1: P1, path2: P2) -> Option<LockedPath>
76
+    where
77
+        P1: AsRef<Path>,
78
+        P2: AsRef<Path>,
79
+    {
80
+        let _path1 = path1.as_ref();
81
+        let _path2 = path2.as_ref();
68 82
         // TODO
69 83
         panic!("Not yet implemented.");
70 84
     }
85
+}
86
+
87
+pub struct LockedPath {
88
+    // TODO
89
+}
71 90
 
72
-    pub fn local_file_removed(&self, _source: NewVersionSource, _path: &OsStr) {
91
+impl LockedPath {
92
+    pub fn get_file_info<P>(&self, path: P) -> Option<FileInfo>
93
+    where
94
+        P: AsRef<Path>,
95
+    {
96
+        let _path = path.as_ref();
73 97
         // TODO
74 98
         panic!("Not yet implemented.");
75 99
     }
76 100
 
77
-    // TODO: Limit on size of all temporary files, make this function return a
78
-    // future which yields a temporary file once enough space is available?
79
-    pub fn create_temporary_file(&self, _target_path: &OsStr) -> Result<TemporaryFile, Error> {
101
+    pub fn get_directory_listing(&self, _path: &OsStr) -> Option<Vec<FileInfo>> {
102
+        // TODO
103
+        panic!("Not yet implemented.");
104
+    }
105
+
106
+    pub fn local_file_changed(&self, _source: NewVersionSource, _path: &OsStr, _info: FileInfo) {
107
+        // TODO
108
+        panic!("Not yet implemented.");
109
+    }
110
+
111
+    pub fn local_file_removed<P>(&self, _source: NewVersionSource, _path: P)
112
+    where
113
+        P: AsRef<Path>,
114
+    {
80 115
         // TODO
81 116
         panic!("Not yet implemented.");
82 117
     }
@@ -93,17 +128,27 @@ impl FileTree {
93 128
         panic!("Not yet implemented.");
94 129
     }
95 130
 
96
-    pub fn create_directory(
131
+    pub fn create_directory<P>(
97 132
         &self,
98 133
         _source: NewVersionSource,
99
-        _path: &OsStr,
134
+        _path: P,
100 135
         _origin: OriginInfo,
101
-    ) -> Result<NewVersionEvent, Error> {
136
+    ) -> Result<NewVersionEvent, Error>
137
+    where
138
+        P: AsRef<Path>,
139
+    {
102 140
         // TODO
103 141
         panic!("Not yet implemented.");
104 142
     }
105 143
 }
106 144
 
145
+impl Drop for LockedPath {
146
+    fn drop(&mut self) {
147
+        // Release the lock again.
148
+        // TODO
149
+    }
150
+}
151
+
107 152
 pub struct NewVersionNotifier {
108 153
     // TODO: Probably should not be public.
109 154
     pub receive: UnboundedReceiver<NewVersionEvent>,
@@ -121,13 +166,42 @@ pub struct NewVersionSource {
121 166
 
122 167
 pub struct FileInfo {
123 168
     pub file_name: OsString,
169
+    pub file_type: FileType,
170
+    /// Local modification time in seconds since the Unix epoch. Only valid for files and symlinks,
171
+    /// not for directories.
172
+    pub local_modification_time: Option<u64>,
173
+    /// Local size of the file in bytes. Only valid for files and symlinks, not for directories.
174
+    pub local_size: Option<u64>,
124 175
     // TODO
125 176
 }
126 177
 
178
+#[derive(PartialEq)]
179
+pub enum FileType {
180
+    File,
181
+    Directory,
182
+    Symlink,
183
+}
184
+
185
+impl FileType {
186
+    pub fn from(ft: fs::FileType) -> Option<FileType> {
187
+        if ft.is_file() {
188
+            Some(FileType::File)
189
+        } else if ft.is_dir() {
190
+            Some(FileType::Directory)
191
+        } else if ft.is_symlink() {
192
+            Some(FileType::Symlink)
193
+        } else {
194
+            None
195
+        }
196
+    }
197
+}
198
+
127 199
 pub struct TemporaryFile {
128 200
     // TODO
129 201
 }
130 202
 
131
-pub struct OriginInfo {
132
-    // TODO
203
+pub enum OriginInfo {
204
+    Local,
205
+    // TODO: For remote sources, include the previous version to detect conflicts?
206
+    Remote,
133 207
 }

+ 13
- 0
src/lib.rs Bestand weergeven

@@ -125,6 +125,19 @@ pub struct SynchronizationError {
125 125
     pub error: Error,
126 126
 }
127 127
 
128
+impl<T> From<T> for SynchronizationError
129
+where
130
+    T: Into<Error>,
131
+{
132
+    fn from(e: T) -> SynchronizationError {
133
+        // By default, errors do not set the pause flag.
134
+        SynchronizationError {
135
+            needs_pause: false,
136
+            error: e.into(),
137
+        }
138
+    }
139
+}
140
+
128 141
 #[derive(Debug)]
129 142
 pub enum Error {
130 143
     Io(std::io::Error),

Laden…
Annuleren
Opslaan