瀏覽代碼

client: Lots of top-level client logic.

Mathias Gottschlag 5 年之前
父節點
當前提交
96beeebd69
共有 5 個文件被更改,包括 104 次插入92 次删除
  1. 2
    0
      src/bin/cli.rs
  2. 87
    33
      src/bin/client.rs
  3. 3
    3
      src/client_side_sync.rs
  4. 10
    3
      src/file_system_watcher.rs
  5. 2
    53
      src/lib.rs

+ 2
- 0
src/bin/cli.rs 查看文件

@@ -37,6 +37,8 @@ pub enum Command {
37 37
     RemoveDirectory {
38 38
         local_directory: PathBuf,
39 39
     },
40
+    Pause,
41
+    Resume,
40 42
     // TODO: Command to update login information in case of login problems.
41 43
 }
42 44
 

+ 87
- 33
src/bin/client.rs 查看文件

@@ -1,11 +1,12 @@
1 1
 use std::fs::create_dir_all;
2 2
 use std::sync::Arc;
3
-use std::ffi::OsStr;
3
+use std::ffi::{OsStr, OsString};
4 4
 
5 5
 use tokio::io::{AsyncReadExt, AsyncWriteExt};
6 6
 use tokio::net::{UnixListener, UnixStream};
7 7
 use tokio::stream::StreamExt;
8 8
 use tokio::sync::{mpsc, Mutex};
9
+use tokio::signal;
9 10
 
10 11
 use twfss::{Database, FileTree, ClientSideSync, FileSystemWatcher, Error, SynchronizationError};
11 12
 
@@ -30,18 +31,22 @@ fn db_path() -> String {
30 31
 }
31 32
 
32 33
 struct SynchronizedDirectory {
34
+    path: OsString,
33 35
     client_side_sync: ClientSideSync,
34 36
     file_system_watcher: FileSystemWatcher,
37
+    last_error: Option<Error>,
35 38
 }
36 39
 
37 40
 impl SynchronizedDirectory {
38 41
     async fn open(database: Arc<Mutex<Database>>, directory: &OsStr, errors: mpsc::Sender<SynchronizationError>) -> Result<SynchronizedDirectory, Error> {
39 42
         let file_tree = Arc::new(Mutex::new(FileTree::open(database, directory)?));
40
-        let client_side_sync = ClientSideSync::new(file_tree.clone(), errors.clone()).await?;
41
-        let file_system_watcher = FileSystemWatcher::new(file_tree, errors).await?;
43
+        let client_side_sync = ClientSideSync::new(file_tree.clone(), errors.clone()).await;
44
+        let file_system_watcher = FileSystemWatcher::new(file_tree, errors).await;
42 45
         Ok(SynchronizedDirectory{
46
+            path: directory.to_owned(),
43 47
             client_side_sync,
44 48
             file_system_watcher,
49
+            last_error: None,
45 50
         })
46 51
     }
47 52
 
@@ -65,57 +70,89 @@ async fn main() {
65 70
     // Check whether the client is already running, and exit if it is.
66 71
     // TODO
67 72
 
68
-    // TODO: Register signal handler for graceful shutdown.
73
+    let mut ctrl_c = Box::pin(signal::ctrl_c());
74
+    let (errors_send, mut errors) = mpsc::channel(32);
69 75
 
70
-    // TODO: Error handling. If a directory was removed, remove it from the
71
-    // list of synchronized directories (or mark it for the user).
72
-    let (errors_send, errors) = mpsc::channel(32);
76
+    // We create the socket before starting synchronization, so that the unwrap() below does not
77
+    // cause corruption.
78
+    let mut listener = UnixListener::bind(cli::socket_path()).unwrap();
79
+    let mut incoming = listener.incoming();
73 80
 
74 81
     // Initialize the existing synchronized directories.
75 82
     let mut directories = Vec::new();
76 83
     let db = Arc::new(Mutex::new(Database::create_or_open(&db_path()).unwrap()));
77 84
     for directory in db.lock().await.synchronized_directories() {
78
-        // TODO: Error handling - remove the synchronized directory instead (or mark it for the
79
-        // user).
85
+        // The function cannot fail - we just fetched the directory from the database.
80 86
         let mut sync_dir = SynchronizedDirectory::open(db.clone(), &directory, errors_send.clone()).await.unwrap();
81 87
         sync_dir.unpause().await;
82 88
         directories.push(sync_dir);
83 89
     }
84 90
 
85
-    // Listen for CLI commands.
86
-    // TODO: Graceful shutdown on errors.
87
-    let mut listener = UnixListener::bind(cli::socket_path()).unwrap();
88
-    let mut incoming = listener.incoming();
89
-    while let Some(stream) = incoming.next().await {
90
-        match stream {
91
-            Ok(stream) => {
92
-                let mut stream = stream;
93
-                match handle_cli_client(&mut stream).await {
94
-                    Ok(()) => {}
95
-                    Err(e) => {
96
-                        // Log error and try to send it to the stream.
97
-                        // TODO
98
-                        stream
99
-                            .write_all(format!("Error: {:?}", e).as_bytes())
100
-                            .await
101
-                            .ok();
91
+    loop {
92
+        tokio::select! {
93
+            result = &mut ctrl_c => {
94
+                result.expect("could not listen for Ctrl+C");
95
+                // Ctrl+C was pressed, so we terminate the program.
96
+                break;
97
+            }
98
+            stream = incoming.next() => {
99
+                match stream {
100
+                    Some(Ok(stream)) => {
101
+                        let mut stream = stream;
102
+                        match handle_cli_client(&mut stream, &mut directories, db.clone()).await {
103
+                            Ok(()) => {}
104
+                            Err(e) => {
105
+                                // Log error and try to send it to the stream.
106
+                                // TODO: We want to send a return code as well.
107
+                                stream
108
+                                    .write_all(format!("Error: {:?}", e).as_bytes())
109
+                                    .await
110
+                                    .ok();
111
+                            }
112
+                        };
113
+                    }
114
+                    Some(Err(err)) => {
115
+                        eprintln!("Error while listening on the local unix socket: {:?}", err);
116
+                        break;
102 117
                     }
103
-                };
118
+                    None => {
119
+                        eprintln!("listening for CLI connections failed");
120
+                        break;
121
+                    }
122
+                }
104 123
             }
105
-            Err(err) => {
106
-                eprintln!("Error while listening on the local unix socket: {:?}", err);
107
-                break;
124
+            error = errors.next() => {
125
+                let error = error.unwrap();
126
+                eprintln!("error: {:?}", error.error);
127
+                for directory in directories.iter_mut() {
128
+                    if directory.path == error.directory {
129
+                        // In theory, there could be a race condition here where we pause a
130
+                        // directory that was just recreated, when the directory which caused the
131
+                        // error was already deleted.
132
+                        if error.needs_pause {
133
+                            directory.pause().await;
134
+                        }
135
+                        directory.last_error = Some(error.error);
136
+                        break;
137
+                    }
138
+                }
108 139
             }
109 140
         }
110 141
     }
142
+
143
+    // For a clean shutdown, we need to pause all synchtonized directories. Then,
144
+    // we can be sure that the database is not modified anymore.
145
+    for mut directory in directories.into_iter() {
146
+        directory.pause().await;
147
+    }
111 148
 }
112 149
 
113
-async fn handle_cli_client(stream: &mut UnixStream) -> Result<(), Error> {
150
+async fn handle_cli_client(stream: &mut UnixStream, directories: &mut Vec<SynchronizedDirectory>, _db: Arc<Mutex<Database>>) -> Result<(), Error> {
114 151
     let mut request = String::new();
115 152
     stream.read_to_string(&mut request).await?;
116 153
 
117 154
     let options: cli::Options = serde_json::from_str(&request)?;
118
-    let _verbose = options.verbose;
155
+    let verbose = options.verbose;
119 156
     match options.command {
120 157
         _cmd @ cli::Command::ListDirectories { .. } => {
121 158
             // TODO
@@ -126,9 +163,26 @@ async fn handle_cli_client(stream: &mut UnixStream) -> Result<(), Error> {
126 163
         _cmd @ cli::Command::RemoveDirectory { .. } => {
127 164
             // TODO
128 165
         }
166
+        cli::Command::Pause => {
167
+            for  directory in directories.iter_mut() {
168
+                directory.pause().await;
169
+                if verbose {
170
+                    stream.write_all(format!("Paused {}.\n", directory.path.to_string_lossy()).as_bytes()).await.ok();
171
+                }
172
+            }
173
+            stream.write_all("Synchronization paused.\n".as_bytes()).await.ok();
174
+        }
175
+        cli::Command::Resume => {
176
+            for  directory in directories.iter_mut() {
177
+                directory.unpause().await;
178
+                if verbose {
179
+                    stream.write_all(format!("Resumed {}.\n", directory.path.to_string_lossy()).as_bytes()).await.ok();
180
+                }
181
+            }
182
+            stream.write_all("Synchronization resumed.\n".as_bytes()).await.ok();
183
+        }
129 184
     }
130 185
 
131
-    // TODO
132 186
     Ok(())
133 187
 }
134 188
 

+ 3
- 3
src/client_side_sync.rs 查看文件

@@ -17,7 +17,7 @@ impl ClientSideSync {
17 17
     pub async fn new(
18 18
         file_tree: Arc<Mutex<FileTree>>,
19 19
         errors: mpsc::Sender<SynchronizationError>,
20
-    ) -> Result<ClientSideSync, Error> {
20
+    ) -> ClientSideSync {
21 21
         let new_version_source = file_tree.lock().await.new_version_source();
22 22
         // Spawn the task that connects to the server as soon as unpause() is called.
23 23
         let (state_send, pause_receive) = mpsc::channel(1);
@@ -30,10 +30,10 @@ impl ClientSideSync {
30 30
             sync_task.task_paused(pause_receive).await;
31 31
         });
32 32
 
33
-        Ok(ClientSideSync{
33
+        ClientSideSync{
34 34
             state_send,
35 35
             new_version_source,
36
-        })
36
+        }
37 37
     }
38 38
 
39 39
     // TODO: Deduplicate this code.

+ 10
- 3
src/file_system_watcher.rs 查看文件

@@ -20,15 +20,19 @@ pub struct FileSystemWatcher {
20 20
 }
21 21
 
22 22
 impl FileSystemWatcher {
23
+    // TODO: Let the caller pass a filter to ignore the files, and then apply that filter only when
24
+    // files are not already in the database?
23 25
     pub async fn new(
24 26
         file_tree: Arc<Mutex<FileTree>>,
25 27
         errors: mpsc::Sender<SynchronizationError>,
26
-    ) -> Result<FileSystemWatcher, Error> {
28
+    ) -> FileSystemWatcher {
27 29
         let new_version_source = file_tree.lock().await.new_version_source();
28 30
         // Spawn the task that watches the file system as soon as unpause() is called.
29 31
         let (state_send, pause_receive) = mpsc::channel(1);
30 32
         tokio::spawn(async move {
33
+            let directory = file_tree.lock().await.root_path().to_owned();
31 34
             let mut fsw_task = FileSystemWatcherTask {
35
+                directory,
32 36
                 file_tree,
33 37
                 errors,
34 38
                 new_version_source,
@@ -36,10 +40,10 @@ impl FileSystemWatcher {
36 40
             fsw_task.task_paused(pause_receive).await;
37 41
         });
38 42
 
39
-        Ok(FileSystemWatcher {
43
+        FileSystemWatcher {
40 44
             state_send,
41 45
             new_version_source,
42
-        })
46
+        }
43 47
     }
44 48
 
45 49
     pub async fn pause(&mut self) {
@@ -62,6 +66,7 @@ impl FileSystemWatcher {
62 66
 }
63 67
 
64 68
 struct FileSystemWatcherTask {
69
+    directory: OsString,
65 70
     file_tree: Arc<Mutex<FileTree>>,
66 71
     errors: mpsc::Sender<SynchronizationError>,
67 72
     new_version_source: NewVersionSource,
@@ -146,6 +151,7 @@ impl FileSystemWatcherTask {
146 151
         match self
147 152
             .errors
148 153
             .send(SynchronizationError {
154
+                directory: self.directory.clone(),
149 155
                 needs_pause: true,
150 156
                 error: reason,
151 157
             })
@@ -404,6 +410,7 @@ impl FileSystemWatcherTask {
404 410
         // TODO: Encode the path in the error!
405 411
         self.errors
406 412
             .send(SynchronizationError {
413
+                directory: self.directory.clone(),
407 414
                 needs_pause: false,
408 415
                 error: error,
409 416
             })

+ 2
- 53
src/lib.rs 查看文件

@@ -1,6 +1,7 @@
1 1
 //! Library for file-system synchronization.
2 2
 extern crate rmp_serde as rmps;
3 3
 
4
+use std::ffi::OsString;
4 5
 use std::io;
5 6
 
6 7
 mod client_side_sync;
@@ -87,64 +88,12 @@ pub use file_tree::FileTree;
87 88
 // - last logged in
88 89
 // - conflict (only for server, a second upload-only client logged in since the last logout)
89 90
 
90
-//use std::sync::{Arc, Mutex};
91
-//
92
-//pub mod protocol;
93
-//
94
-//pub struct SynchronizedDirectory {
95
-//    db: Arc<Mutex<Database>>,
96
-//    master: bool,
97
-//    // TODO
98
-//}
99
-//
100
-//impl SynchronizedDirectory {
101
-//    pub fn open(
102
-//        db: Arc<Mutex<Database>>,
103
-//        _path: &str,
104
-//        master: bool,
105
-//    ) -> Result<SynchronizedDirectory, Error> {
106
-//        // TODO
107
-//        Ok(SynchronizedDirectory { db, master })
108
-//    }
109
-//
110
-//    // TODO: Login information.
111
-//    pub fn new(
112
-//        db: Arc<Mutex<Database>>,
113
-//        local_path: &str,
114
-//        remote_path: &str,
115
-//        master: bool,
116
-//    ) -> Result<SynchronizedDirectory, Error> {
117
-//        // TODO
118
-//        Ok(SynchronizedDirectory { db, master })
119
-//    }
120
-//}
121
-//
122
-//pub struct SyncEvent {
123
-//    // TODO
124
-//}
125
-//
126
-//impl SyncEvent {
127
-//    // TODO
128
-//}
129
-
130 91
 pub struct SynchronizationError {
92
+    pub directory: OsString,
131 93
     pub needs_pause: bool,
132 94
     pub error: Error,
133 95
 }
134 96
 
135
-impl<T> From<T> for SynchronizationError
136
-where
137
-    T: Into<Error>,
138
-{
139
-    fn from(e: T) -> SynchronizationError {
140
-        // By default, errors do not set the pause flag.
141
-        SynchronizationError {
142
-            needs_pause: false,
143
-            error: e.into(),
144
-        }
145
-    }
146
-}
147
-
148 97
 #[derive(Debug)]
149 98
 pub enum Error {
150 99
     Io(std::io::Error),

Loading…
取消
儲存