Browse Source

Add code to pause/unpause the file system watcher.

Mathias Gottschlag 5 years ago
parent
commit
01c954f0db
3 changed files with 86 additions and 2 deletions
  1. 83
    0
      src/file_system_watcher.rs
  2. 1
    2
      src/filetree.rs
  3. 2
    0
      src/lib.rs

+ 83
- 0
src/file_system_watcher.rs View File

@@ -0,0 +1,83 @@
1
+use std::sync::Arc;
2
+
3
+use tokio::sync::{mpsc, oneshot, Mutex};
4
+
5
+use super::{Error, FileTree};
6
+
7
+pub struct FileSystemWatcher {
8
+    state_send: mpsc::Sender<(StateChange, oneshot::Sender<()>)>,
9
+}
10
+
11
+impl FileSystemWatcher {
12
+    pub fn new(file_tree: Arc<Mutex<FileTree>>) -> Result<FileSystemWatcher, Error> {
13
+        // Spawn the task that watches the file system as soon as unpause() is called.
14
+        let (state_send, pause_receive) = mpsc::channel(1);
15
+        tokio::spawn(async move {
16
+            Self::task_paused(file_tree, pause_receive).await;
17
+        });
18
+
19
+        Ok(FileSystemWatcher { state_send })
20
+    }
21
+
22
+    pub async fn pause(&mut self) {
23
+        let (send, receive) = oneshot::channel();
24
+        self.state_send
25
+            .send((StateChange::Pause, send))
26
+            .await
27
+            .unwrap();
28
+        receive.await.unwrap();
29
+    }
30
+
31
+    pub async fn unpause(&mut self) {
32
+        let (send, receive) = oneshot::channel();
33
+        self.state_send
34
+            .send((StateChange::Unpause, send))
35
+            .await
36
+            .unwrap();
37
+        receive.await.unwrap();
38
+    }
39
+
40
+    async fn task_paused(
41
+        _file_tree: Arc<Mutex<FileTree>>,
42
+        mut state_receive: mpsc::Receiver<(StateChange, oneshot::Sender<()>)>,
43
+    ) {
44
+        let mut paused = true;
45
+
46
+        loop {
47
+            if paused {
48
+                // If we are paused, we only need to wait for state changes.
49
+                let (new_state, send) = state_receive.recv().await.unwrap();
50
+                match new_state {
51
+                    StateChange::Pause => paused = true,
52
+                    StateChange::Unpause => paused = false,
53
+                    StateChange::Terminate => break, // No reply, see drop().
54
+                }
55
+                send.send(()).unwrap();
56
+            } else {
57
+                // We were unpaused, initialize the file system watcher and start waiting for
58
+                // that as well.
59
+                // TODO
60
+            }
61
+        }
62
+    }
63
+}
64
+
65
+impl Drop for FileSystemWatcher {
66
+    fn drop(&mut self) {
67
+        // The receiver is immediately dropped, so the task must not send a reply.
68
+        // TODO: This is racy, the task should be synchronously paused first.
69
+        let (send, _receive) = oneshot::channel();
70
+        // Here, try_send() always succeeds, because the channel always has enough capacity.
71
+        // pause() and unpause() are synchronous and wait until the task has emptied the channel.
72
+        self.state_send
73
+            .try_send((StateChange::Terminate, send))
74
+            .unwrap();
75
+    }
76
+}
77
+
78
+#[derive(Debug)]
79
+enum StateChange {
80
+    Pause,
81
+    Unpause,
82
+    Terminate,
83
+}

+ 1
- 2
src/filetree.rs View File

@@ -3,8 +3,7 @@ use std::sync::Arc;
3 3
 use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
4 4
 use tokio::sync::Mutex;
5 5
 
6
-use super::Database;
7
-use super::Error;
6
+use super::{Database, Error};
8 7
 
9 8
 pub struct FileTree {
10 9
     db: Arc<Mutex<Database>>,

+ 2
- 0
src/lib.rs View File

@@ -1,9 +1,11 @@
1 1
 use std::io;
2 2
 
3 3
 mod database;
4
+mod file_system_watcher;
4 5
 mod filetree;
5 6
 
6 7
 pub use database::Database;
8
+pub use file_system_watcher::FileSystemWatcher;
7 9
 pub use filetree::FileTree;
8 10
 
9 11
 // Sketch of the synchronization algorithm:

Loading…
Cancel
Save