|
|
@@ -7,18 +7,21 @@ use tokio::stream::StreamExt;
|
|
7
|
7
|
use tokio::sync::{mpsc, oneshot, Mutex};
|
|
8
|
8
|
use tokio::task;
|
|
9
|
9
|
|
|
10
|
|
-use super::{Error, FileTree};
|
|
|
10
|
+use super::{Error, FileTree, SynchronizationError};
|
|
11
|
11
|
|
|
12
|
12
|
pub struct FileSystemWatcher {
|
|
13
|
13
|
state_send: mpsc::Sender<(StateChange, oneshot::Sender<()>)>,
|
|
14
|
14
|
}
|
|
15
|
15
|
|
|
16
|
16
|
impl FileSystemWatcher {
|
|
17
|
|
- pub fn new(file_tree: Arc<Mutex<FileTree>>) -> Result<FileSystemWatcher, Error> {
|
|
|
17
|
+ pub fn new(
|
|
|
18
|
+ file_tree: Arc<Mutex<FileTree>>,
|
|
|
19
|
+ errors: mpsc::Sender<SynchronizationError>,
|
|
|
20
|
+ ) -> Result<FileSystemWatcher, Error> {
|
|
18
|
21
|
// Spawn the task that watches the file system as soon as unpause() is called.
|
|
19
|
22
|
let (state_send, pause_receive) = mpsc::channel(1);
|
|
20
|
23
|
tokio::spawn(async move {
|
|
21
|
|
- Self::task_paused(file_tree, pause_receive).await;
|
|
|
24
|
+ Self::task_paused(file_tree, pause_receive, errors).await;
|
|
22
|
25
|
});
|
|
23
|
26
|
|
|
24
|
27
|
Ok(FileSystemWatcher { state_send })
|
|
|
@@ -45,6 +48,7 @@ impl FileSystemWatcher {
|
|
45
|
48
|
async fn task_paused(
|
|
46
|
49
|
mut file_tree: Arc<Mutex<FileTree>>,
|
|
47
|
50
|
mut state_receive: mpsc::Receiver<(StateChange, oneshot::Sender<()>)>,
|
|
|
51
|
+ mut errors: mpsc::Sender<SynchronizationError>,
|
|
48
|
52
|
) {
|
|
49
|
53
|
let mut paused = true;
|
|
50
|
54
|
|
|
|
@@ -61,7 +65,8 @@ impl FileSystemWatcher {
|
|
61
|
65
|
} else {
|
|
62
|
66
|
// We were unpaused, initialize the file system watcher and start waiting for
|
|
63
|
67
|
// that as well.
|
|
64
|
|
- let next_state = Self::task_unpaused(&mut file_tree, &mut state_receive).await;
|
|
|
68
|
+ let next_state =
|
|
|
69
|
+ Self::task_unpaused(&mut file_tree, &mut state_receive, &mut errors).await;
|
|
65
|
70
|
match next_state {
|
|
66
|
71
|
StateChange::Pause => paused = true,
|
|
67
|
72
|
StateChange::Unpause => paused = false,
|
|
|
@@ -74,19 +79,23 @@ impl FileSystemWatcher {
|
|
74
|
79
|
async fn task_unpaused(
|
|
75
|
80
|
file_tree: &mut Arc<Mutex<FileTree>>,
|
|
76
|
81
|
state_receive: &mut mpsc::Receiver<(StateChange, oneshot::Sender<()>)>,
|
|
|
82
|
+ errors: &mut mpsc::Sender<SynchronizationError>,
|
|
77
|
83
|
) -> StateChange {
|
|
78
|
84
|
let root_path = file_tree.lock().await.root_path().to_owned();
|
|
79
|
|
- // TODO: Proper error handling.
|
|
80
|
|
- let mut file_events = FileEventDelay::new(
|
|
81
|
|
- fswatcher::FileSystemWatcher::new(&root_path).unwrap(),
|
|
82
|
|
- Duration::from_secs(3),
|
|
83
|
|
- );
|
|
|
85
|
+ let fsw = match fswatcher::FileSystemWatcher::new(&root_path) {
|
|
|
86
|
+ Ok(fsw) => fsw,
|
|
|
87
|
+ Err(e) => return Self::wait_for_pause(e.into(), state_receive, errors).await,
|
|
|
88
|
+ };
|
|
|
89
|
+ let mut file_events = FileEventDelay::new(fsw, Duration::from_secs(3));
|
|
84
|
90
|
|
|
85
|
91
|
loop {
|
|
|
92
|
+ // We only want to wait for file system events until a state change pauses the file
|
|
|
93
|
+ // system watcher.
|
|
86
|
94
|
match future::select(Box::pin(file_events.next()), Box::pin(state_receive.recv())).await
|
|
87
|
95
|
{
|
|
88
|
96
|
Either::Left((event, _state_future)) => {
|
|
89
|
|
- // TODO: Error handling, handle None event.
|
|
|
97
|
+ // We cannot get any end-of-stream None here - we would first get a "Stopped"
|
|
|
98
|
+ // file event.
|
|
90
|
99
|
Self::handle_file_event(file_tree, event.unwrap()).await;
|
|
91
|
100
|
}
|
|
92
|
101
|
Either::Right((next_state, _file_event_future)) => {
|
|
|
@@ -96,13 +105,41 @@ impl FileSystemWatcher {
|
|
96
|
105
|
send.send(()).unwrap();
|
|
97
|
106
|
return StateChange::Pause;
|
|
98
|
107
|
}
|
|
99
|
|
- (StateChange::Terminate, _) => return StateChange::Pause,
|
|
|
108
|
+ (StateChange::Terminate, _) => return StateChange::Terminate,
|
|
100
|
109
|
};
|
|
101
|
110
|
}
|
|
102
|
111
|
};
|
|
103
|
112
|
}
|
|
104
|
113
|
}
|
|
105
|
114
|
|
|
|
115
|
+ async fn wait_for_pause(
|
|
|
116
|
+ reason: Error,
|
|
|
117
|
+ state_receive: &mut mpsc::Receiver<(StateChange, oneshot::Sender<()>)>,
|
|
|
118
|
+ errors: &mut mpsc::Sender<SynchronizationError>,
|
|
|
119
|
+ ) -> StateChange {
|
|
|
120
|
+ match errors
|
|
|
121
|
+ .send(SynchronizationError {
|
|
|
122
|
+ needs_pause: true,
|
|
|
123
|
+ error: reason,
|
|
|
124
|
+ })
|
|
|
125
|
+ .await
|
|
|
126
|
+ {
|
|
|
127
|
+ Ok(()) => (),
|
|
|
128
|
+ Err(_) => panic!("Could not send synchronization error!"),
|
|
|
129
|
+ };
|
|
|
130
|
+ loop {
|
|
|
131
|
+ match state_receive.recv().await.unwrap() {
|
|
|
132
|
+ (StateChange::Unpause, send) => send.send(()).unwrap(),
|
|
|
133
|
+ (StateChange::Pause, send) => {
|
|
|
134
|
+ send.send(()).unwrap();
|
|
|
135
|
+ return StateChange::Pause;
|
|
|
136
|
+ }
|
|
|
137
|
+ (StateChange::Terminate, _) => return StateChange::Terminate,
|
|
|
138
|
+ }
|
|
|
139
|
+ }
|
|
|
140
|
+ // TODO
|
|
|
141
|
+ }
|
|
|
142
|
+
|
|
106
|
143
|
async fn handle_file_event(_file_tree: &mut Arc<Mutex<FileTree>>, event: FileSystemEvent) {
|
|
107
|
144
|
match event {
|
|
108
|
145
|
FileSystemEvent::Stopped(_reason) => {
|