|
|
@@ -24,7 +24,8 @@ impl FileSystemWatcher {
|
|
24
|
24
|
// Spawn the task that watches the file system as soon as unpause() is called.
|
|
25
|
25
|
let (state_send, pause_receive) = mpsc::channel(1);
|
|
26
|
26
|
tokio::spawn(async move {
|
|
27
|
|
- Self::task_paused(file_tree, pause_receive, errors).await;
|
|
|
27
|
+ let mut fsw_task = FileSystemWatcherTask { file_tree, errors };
|
|
|
28
|
+ fsw_task.task_paused(pause_receive).await;
|
|
28
|
29
|
});
|
|
29
|
30
|
|
|
30
|
31
|
Ok(FileSystemWatcher { state_send })
|
|
|
@@ -47,11 +48,17 @@ impl FileSystemWatcher {
|
|
47
|
48
|
.unwrap();
|
|
48
|
49
|
receive.await.unwrap();
|
|
49
|
50
|
}
|
|
|
51
|
+}
|
|
|
52
|
+
|
|
|
53
|
+struct FileSystemWatcherTask {
|
|
|
54
|
+ file_tree: Arc<Mutex<FileTree>>,
|
|
|
55
|
+ errors: mpsc::Sender<SynchronizationError>,
|
|
|
56
|
+}
|
|
50
|
57
|
|
|
|
58
|
+impl FileSystemWatcherTask {
|
|
51
|
59
|
async fn task_paused(
|
|
52
|
|
- mut file_tree: Arc<Mutex<FileTree>>,
|
|
|
60
|
+ &mut self,
|
|
53
|
61
|
mut state_receive: mpsc::Receiver<(StateChange, oneshot::Sender<()>)>,
|
|
54
|
|
- mut errors: mpsc::Sender<SynchronizationError>,
|
|
55
|
62
|
) {
|
|
56
|
63
|
let mut paused = true;
|
|
57
|
64
|
|
|
|
@@ -68,8 +75,7 @@ impl FileSystemWatcher {
|
|
68
|
75
|
} else {
|
|
69
|
76
|
// We were unpaused, initialize the file system watcher and start waiting for
|
|
70
|
77
|
// that as well.
|
|
71
|
|
- let next_state =
|
|
72
|
|
- Self::task_unpaused(&mut file_tree, &mut state_receive, &mut errors).await;
|
|
|
78
|
+ let next_state = self.task_unpaused(&mut state_receive).await;
|
|
73
|
79
|
match next_state {
|
|
74
|
80
|
StateChange::Pause => paused = true,
|
|
75
|
81
|
StateChange::Unpause => paused = false,
|
|
|
@@ -80,14 +86,13 @@ impl FileSystemWatcher {
|
|
80
|
86
|
}
|
|
81
|
87
|
|
|
82
|
88
|
async fn task_unpaused(
|
|
83
|
|
- file_tree: &mut Arc<Mutex<FileTree>>,
|
|
|
89
|
+ &mut self,
|
|
84
|
90
|
state_receive: &mut mpsc::Receiver<(StateChange, oneshot::Sender<()>)>,
|
|
85
|
|
- errors: &mut mpsc::Sender<SynchronizationError>,
|
|
86
|
91
|
) -> StateChange {
|
|
87
|
|
- let root_path = file_tree.lock().await.root_path().to_owned();
|
|
|
92
|
+ let root_path = self.file_tree.lock().await.root_path().to_owned();
|
|
88
|
93
|
let fsw = match fswatcher::FileSystemWatcher::new(&root_path) {
|
|
89
|
94
|
Ok(fsw) => fsw,
|
|
90
|
|
- Err(e) => return Self::wait_for_pause(e.into(), state_receive, errors).await,
|
|
|
95
|
+ Err(e) => return self.wait_for_pause(e.into(), state_receive).await,
|
|
91
|
96
|
};
|
|
92
|
97
|
let mut file_events = FileEventDelay::new(fsw, Duration::from_secs(3));
|
|
93
|
98
|
|
|
|
@@ -100,9 +105,7 @@ impl FileSystemWatcher {
|
|
100
|
105
|
Either::Left((event, _state_future)) => {
|
|
101
|
106
|
// We cannot get any end-of-stream None here - we would first get a "Stopped"
|
|
102
|
107
|
// file event.
|
|
103
|
|
- if let Err(error) =
|
|
104
|
|
- Self::handle_file_event(file_tree, errors, event.unwrap()).await
|
|
105
|
|
- {
|
|
|
108
|
+ if let Err(error) = self.handle_file_event(event.unwrap()).await {
|
|
106
|
109
|
error_needs_pause = Some(error);
|
|
107
|
110
|
}
|
|
108
|
111
|
}
|
|
|
@@ -118,15 +121,18 @@ impl FileSystemWatcher {
|
|
118
|
121
|
}
|
|
119
|
122
|
};
|
|
120
|
123
|
}
|
|
121
|
|
- return Self::wait_for_pause(error_needs_pause.unwrap(), state_receive, errors).await;
|
|
|
124
|
+ return self
|
|
|
125
|
+ .wait_for_pause(error_needs_pause.unwrap(), state_receive)
|
|
|
126
|
+ .await;
|
|
122
|
127
|
}
|
|
123
|
128
|
|
|
124
|
129
|
async fn wait_for_pause(
|
|
|
130
|
+ &mut self,
|
|
125
|
131
|
reason: Error,
|
|
126
|
132
|
state_receive: &mut mpsc::Receiver<(StateChange, oneshot::Sender<()>)>,
|
|
127
|
|
- errors: &mut mpsc::Sender<SynchronizationError>,
|
|
128
|
133
|
) -> StateChange {
|
|
129
|
|
- match errors
|
|
|
134
|
+ match self
|
|
|
135
|
+ .errors
|
|
130
|
136
|
.send(SynchronizationError {
|
|
131
|
137
|
needs_pause: true,
|
|
132
|
138
|
error: reason,
|
|
|
@@ -148,11 +154,7 @@ impl FileSystemWatcher {
|
|
148
|
154
|
}
|
|
149
|
155
|
}
|
|
150
|
156
|
|
|
151
|
|
- async fn handle_file_event(
|
|
152
|
|
- file_tree: &mut Arc<Mutex<FileTree>>,
|
|
153
|
|
- errors: &mut mpsc::Sender<SynchronizationError>,
|
|
154
|
|
- event: FileSystemEvent,
|
|
155
|
|
- ) -> Result<(), Error> {
|
|
|
157
|
+ async fn handle_file_event(&mut self, event: FileSystemEvent) -> Result<(), Error> {
|
|
156
|
158
|
// When we get an I/O error, we simply ignore the corresponding file/directory. Most
|
|
157
|
159
|
// likely, it has been deleted, and we will get a deletion event later. If a file or
|
|
158
|
160
|
// directory is not readable, we do not want to synchronize it.
|
|
|
@@ -174,7 +176,7 @@ impl FileSystemWatcher {
|
|
174
|
176
|
.filter(|x| x.is_ok())
|
|
175
|
177
|
.map(|x| (x.as_ref().unwrap().file_name(), x.unwrap()))
|
|
176
|
178
|
.collect::<HashMap<_, _>>();
|
|
177
|
|
- let db_entries = match file_tree.lock().await.get_directory_listing(&path) {
|
|
|
179
|
+ let db_entries = match self.file_tree.lock().await.get_directory_listing(&path) {
|
|
178
|
180
|
Some(entries) => entries,
|
|
179
|
181
|
None => return Ok(()),
|
|
180
|
182
|
};
|
|
|
@@ -228,7 +230,7 @@ impl FileSystemWatcher {
|
|
228
|
230
|
// TODO
|
|
229
|
231
|
}
|
|
230
|
232
|
FileSystemEvent::Error(e) => {
|
|
231
|
|
- Self::send_error_no_pause(e.into(), None, errors).await;
|
|
|
233
|
+ self.send_error_no_pause(e.into(), None).await;
|
|
232
|
234
|
}
|
|
233
|
235
|
}
|
|
234
|
236
|
|
|
|
@@ -240,13 +242,9 @@ impl FileSystemWatcher {
|
|
240
|
242
|
Ok(())
|
|
241
|
243
|
}
|
|
242
|
244
|
|
|
243
|
|
- async fn send_error_no_pause(
|
|
244
|
|
- error: Error,
|
|
245
|
|
- _path: Option<OsString>,
|
|
246
|
|
- errors: &mut mpsc::Sender<SynchronizationError>,
|
|
247
|
|
- ) {
|
|
|
245
|
+ async fn send_error_no_pause(&mut self, error: Error, _path: Option<OsString>) {
|
|
248
|
246
|
// TODO: Encode the path in the error!
|
|
249
|
|
- errors
|
|
|
247
|
+ self.errors
|
|
250
|
248
|
.send(SynchronizationError {
|
|
251
|
249
|
needs_pause: false,
|
|
252
|
250
|
error: error,
|