浏览代码

More CLI client code.

Mathias Gottschlag 4 年前
父节点
当前提交
02e3c9f54f
共有 4 个文件被更改,包括 146 次插入23 次删除
  1. 129
    22
      src/bin/client.rs
  2. 6
    0
      src/database.rs
  3. 9
    0
      src/file_tree.rs
  4. 2
    1
      src/lib.rs

+ 129
- 22
src/bin/client.rs 查看文件

1
-use std::ffi::{OsStr, OsString};
1
+use std::ffi::OsStr;
2
 use std::fs::{self, create_dir_all, remove_file};
2
 use std::fs::{self, create_dir_all, remove_file};
3
 use std::io::Write;
3
 use std::io::Write;
4
+use std::path::{Path, PathBuf};
4
 use std::sync::Arc;
5
 use std::sync::Arc;
5
 
6
 
6
 use env_logger::Env;
7
 use env_logger::Env;
19
 mod cli;
20
 mod cli;
20
 
21
 
21
 struct SynchronizedDirectory {
22
 struct SynchronizedDirectory {
22
-    path: OsString,
23
+    path: PathBuf,
23
     client_side_sync: ClientSideSync,
24
     client_side_sync: ClientSideSync,
24
     file_system_watcher: FileSystemWatcher,
25
     file_system_watcher: FileSystemWatcher,
25
     last_error: Option<Error>,
26
     last_error: Option<Error>,
36
         let client_side_sync = ClientSideSync::new(file_tree.clone(), errors.clone()).await;
37
         let client_side_sync = ClientSideSync::new(file_tree.clone(), errors.clone()).await;
37
         let file_system_watcher = FileSystemWatcher::new(file_tree, errors).await;
38
         let file_system_watcher = FileSystemWatcher::new(file_tree, errors).await;
38
         Ok(SynchronizedDirectory {
39
         Ok(SynchronizedDirectory {
39
-            path: directory.to_owned(),
40
+            path: PathBuf::from(directory),
40
             client_side_sync,
41
             client_side_sync,
41
             file_system_watcher,
42
             file_system_watcher,
42
             last_error: None,
43
             last_error: None,
44
         })
45
         })
45
     }
46
     }
46
 
47
 
48
+    async fn create(
49
+        database: Arc<Mutex<Database>>,
50
+        directory: &OsStr,
51
+        remote_url: String,
52
+        errors: mpsc::Sender<SynchronizationError>,
53
+    ) -> Result<SynchronizedDirectory, Error> {
54
+        // TODO: First, connect to the server and test whether we can access the URL?
55
+        // At this point, the directory is still paused.
56
+        // TODO: Return an error if the directory is not empty (unless the user specified an
57
+        // override option).
58
+        let file_tree = Arc::new(Mutex::new(FileTree::create(
59
+            database, directory, remote_url,
60
+        )?));
61
+        let client_side_sync = ClientSideSync::new(file_tree.clone(), errors.clone()).await;
62
+        let file_system_watcher = FileSystemWatcher::new(file_tree, errors).await;
63
+        Ok(SynchronizedDirectory {
64
+            path: PathBuf::from(directory),
65
+            client_side_sync,
66
+            file_system_watcher,
67
+            last_error: None,
68
+            paused: true,
69
+        })
70
+    }
71
+
72
+    async fn remove(mut self, database: &Arc<Mutex<Database>>) -> Result<(), Error> {
73
+        let path = self.path.clone();
74
+        self.pause().await;
75
+        drop(self);
76
+
77
+        database
78
+            .lock()
79
+            .await
80
+            .remove_synchronized_directory(path.as_ref())?;
81
+        Ok(())
82
+    }
83
+
47
     async fn unpause(&mut self) {
84
     async fn unpause(&mut self) {
48
         self.client_side_sync.unpause().await;
85
         self.client_side_sync.unpause().await;
49
         self.file_system_watcher.unpause().await;
86
         self.file_system_watcher.unpause().await;
109
                     Some(Ok(stream)) => {
146
                     Some(Ok(stream)) => {
110
                         let mut stream = stream;
147
                         let mut stream = stream;
111
                         let mut output = Vec::new();
148
                         let mut output = Vec::new();
112
-                        match handle_cli_client(&mut stream, &mut output, &mut directories, db.clone()).await {
149
+                        match handle_cli_client(&mut stream, &mut output, &mut directories, db.clone(), &errors_send).await {
113
                             Ok(()) => {
150
                             Ok(()) => {
114
                                 stream.write_all(&0i32.to_be_bytes()).await.ok();
151
                                 stream.write_all(&0i32.to_be_bytes()).await.ok();
115
                                 stream.write_all(&output).await.ok();
152
                                 stream.write_all(&output).await.ok();
166
     stream: &mut UnixStream,
203
     stream: &mut UnixStream,
167
     output: &mut Vec<u8>,
204
     output: &mut Vec<u8>,
168
     directories: &mut Vec<SynchronizedDirectory>,
205
     directories: &mut Vec<SynchronizedDirectory>,
169
-    _db: Arc<Mutex<Database>>,
206
+    db: Arc<Mutex<Database>>,
207
+    errors_send: &mpsc::Sender<SynchronizationError>,
170
 ) -> Result<(), Error> {
208
 ) -> Result<(), Error> {
171
     let mut request = String::new();
209
     let mut request = String::new();
172
     stream.read_to_string(&mut request).await?;
210
     stream.read_to_string(&mut request).await?;
177
     let options: cli::Options = serde_json::from_str(&request)?;
215
     let options: cli::Options = serde_json::from_str(&request)?;
178
     let verbose = options.verbose;
216
     let verbose = options.verbose;
179
     match options.command {
217
     match options.command {
180
-        _cmd @ cli::Command::ListDirectories { .. } => {
218
+        cli::Command::ListDirectories { status } => {
181
             write!(output, "{} directories\n", directories.len()).ok();
219
             write!(output, "{} directories\n", directories.len()).ok();
182
             for directory in directories.iter() {
220
             for directory in directories.iter() {
183
-                write!(
184
-                    output,
185
-                    "\nDirectory: {}\nActive: {}\nLast error: {}\n",
186
-                    directory.path.to_string_lossy(),
187
-                    !directory.paused,
188
-                    if directory.last_error.is_some() {
189
-                        format!("{:?}", directory.last_error.as_ref().unwrap())
190
-                    } else {
191
-                        "None".to_owned()
192
-                    }
193
-                )
194
-                .ok();
221
+                if status {
222
+                    write!(
223
+                        output,
224
+                        "\nDirectory: {}\nActive: {}\nLast error: {}\n",
225
+                        directory.path.to_string_lossy(),
226
+                        !directory.paused,
227
+                        if directory.last_error.is_some() {
228
+                            format!("{:?}", directory.last_error.as_ref().unwrap())
229
+                        } else {
230
+                            "None".to_owned()
231
+                        }
232
+                    )
233
+                    .ok();
234
+                } else {
235
+                    write!(output, "{}\n", directory.path.to_string_lossy()).ok();
236
+                }
195
             }
237
             }
196
         }
238
         }
197
-        _cmd @ cli::Command::AddDirectory { .. } => {
198
-            // TODO
239
+        cli::Command::AddDirectory {
240
+            local_directory,
241
+            remote_url,
242
+        } => {
243
+            let local_directory = fs::canonicalize(local_directory)?;
244
+            // We need to check that we are not synchronizing directories that are within
245
+            // other synchronized directories.
246
+            for directory in directories.iter() {
247
+                if directory.path.starts_with(&local_directory)
248
+                    || local_directory.starts_with(&directory.path)
249
+                {
250
+                    return Err(Error::InvalidParam(format!(
251
+                        "cannot add the directory {}, as the directory {} is already synchronized.",
252
+                        local_directory.to_string_lossy(),
253
+                        directory.path.to_string_lossy()
254
+                    )));
255
+                }
256
+            }
257
+
258
+            let mut new_dir = SynchronizedDirectory::create(
259
+                db.clone(),
260
+                local_directory.as_os_str(),
261
+                remote_url,
262
+                errors_send.clone(),
263
+            )
264
+            .await?;
265
+            // We unpause if the new directory if there is at least one other unpaused directory.
266
+            for directory in directories.iter_mut() {
267
+                if !directory.paused {
268
+                    new_dir.unpause().await;
269
+                    break;
270
+                }
271
+            }
272
+            directories.push(new_dir);
273
+            write!(
274
+                output,
275
+                "Directory added: {}.\n",
276
+                local_directory.to_string_lossy()
277
+            )
278
+            .ok();
199
         }
279
         }
200
-        _cmd @ cli::Command::RemoveDirectory { .. } => {
201
-            // TODO
280
+        cli::Command::RemoveDirectory { local_directory } => {
281
+            let local_directory = fs::canonicalize(local_directory)?;
282
+            let mut sync_dir = None;
283
+            for i in 0..directories.len() {
284
+                if local_directory == directories[i].path.as_ref() as &Path {
285
+                    sync_dir = Some(directories.remove(i));
286
+                    break;
287
+                }
288
+            }
289
+
290
+            match sync_dir {
291
+                Some(sync_dir) => {
292
+                    sync_dir.remove(&db).await?;
293
+                    write!(
294
+                        output,
295
+                        "Directory removed: {}.\n",
296
+                        local_directory.to_string_lossy()
297
+                    )
298
+                    .ok();
299
+                }
300
+                None => {
301
+                    write!(
302
+                        output,
303
+                        "Unknown path: {}.\n",
304
+                        local_directory.to_string_lossy()
305
+                    )
306
+                    .ok();
307
+                }
308
+            }
202
         }
309
         }
203
         cli::Command::Pause => {
310
         cli::Command::Pause => {
204
             for directory in directories.iter_mut() {
311
             for directory in directories.iter_mut() {

+ 6
- 0
src/database.rs 查看文件

1
 use std::ffi::OsString;
1
 use std::ffi::OsString;
2
+use std::path::Path;
2
 
3
 
3
 use rusqlite::Connection;
4
 use rusqlite::Connection;
4
 
5
 
23
         // TODO
24
         // TODO
24
         Vec::new()
25
         Vec::new()
25
     }
26
     }
27
+
28
+    pub fn remove_synchronized_directory(&self, path: &Path) -> Result<(), crate::Error> {
29
+        // TODO
30
+        panic!("Not yet implemented.");
31
+    }
26
 }
32
 }

+ 9
- 0
src/file_tree.rs 查看文件

38
         })
38
         })
39
     }
39
     }
40
 
40
 
41
+    pub fn create(
42
+        db: Arc<Mutex<Database>>,
43
+        directory: &OsStr,
44
+        remote_url: String,
45
+    ) -> Result<FileTree, Error> {
46
+        // TODO
47
+        panic!("Not yet implemented.");
48
+    }
49
+
41
     pub fn root_path(&self) -> &OsStr {
50
     pub fn root_path(&self) -> &OsStr {
42
         &self.root_path
51
         &self.root_path
43
     }
52
     }

+ 2
- 1
src/lib.rs 查看文件

2
 extern crate rmp_serde as rmps;
2
 extern crate rmp_serde as rmps;
3
 
3
 
4
 use std::ffi::OsString;
4
 use std::ffi::OsString;
5
-use std::io;
6
 
5
 
7
 use thiserror::Error;
6
 use thiserror::Error;
8
 
7
 
107
     Sqlite(#[from] rusqlite::Error),
106
     Sqlite(#[from] rusqlite::Error),
108
     #[error("directory removed")]
107
     #[error("directory removed")]
109
     DirectoryRemoved,
108
     DirectoryRemoved,
109
+    #[error("invalid parameter: {0}")]
110
+    InvalidParam(String),
110
 }
111
 }
111
 
112
 
112
 impl From<fswatcher::Error> for Error {
113
 impl From<fswatcher::Error> for Error {

正在加载...
取消
保存