瀏覽代碼

Add FileTree API for central synchronization mechanisms.

Mathias Gottschlag 5 年之前
父節點
當前提交
a9ba0d0c39
共有 7 個檔案被更改,包括 267 行新增54 行删除
  1. 1
    0
      Cargo.lock
  2. 1
    1
      Cargo.toml
  3. 5
    5
      src/bin/client.rs
  4. 8
    0
      src/bin/server.rs
  5. 15
    0
      src/database.rs
  6. 124
    0
      src/filetree.rs
  7. 113
    48
      src/lib.rs

+ 1
- 0
Cargo.lock 查看文件

@@ -877,6 +877,7 @@ version = "0.2.9"
877 877
 source = "registry+https://github.com/rust-lang/crates.io-index"
878 878
 dependencies = [
879 879
  "bytes 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)",
880
+ "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
880 881
  "futures-core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
881 882
  "iovec 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
882 883
  "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",

+ 1
- 1
Cargo.toml 查看文件

@@ -11,7 +11,7 @@ serde = { version = "1.0", features = ["derive"] }
11 11
 serde_json = "1.0"
12 12
 dirs = "2.0"
13 13
 byteorder = "1.3"
14
-tokio = { version = "0.2", features = ["fs", "io-util", "macros", "net", "stream", "tcp"] }
14
+tokio = { version = "0.2", features = ["fs", "io-util", "macros", "net", "stream", "sync", "tcp"] }
15 15
 futures = "0.3.1"
16 16
 futures-tokio-compat = { git = "https://github.com/mgottschlag/futures-tokio-compat.git" }
17 17
 async-tungstenite = { version = "0.3", features = ["tokio-runtime"] }

+ 5
- 5
src/bin/client.rs 查看文件

@@ -6,7 +6,7 @@ use tokio::net::{UnixListener, UnixStream};
6 6
 use tokio::stream::StreamExt;
7 7
 use tokio::io::{AsyncWriteExt, AsyncReadExt};
8 8
 
9
-use twfss::{Database, SynchronizedDirectory};
9
+use twfss::Database;
10 10
 
11 11
 mod cli;
12 12
 
@@ -40,13 +40,13 @@ async fn main() {
40 40
     // TODO: Register signal handler for graceful shutdown.
41 41
 
42 42
     // Initialize the existing synchronized directories.
43
-    let mut directories = Vec::new();
44
-    let db = Arc::new(Mutex::new(Database::create_or_open(&db_path()).unwrap()));
45
-    for directory in db.lock().unwrap().synchronized_directories() {
43
+    //let mut directories = Vec::new();
44
+    let _db = Arc::new(Mutex::new(Database::create_or_open(&db_path()).unwrap()));
45
+    /*for directory in db.lock().unwrap().synchronized_directories() {
46 46
         // TODO: Error handling. If the directory was removed, remove it from the
47 47
         // list of synchronized directories.
48 48
         directories.push(SynchronizedDirectory::open(db.clone(), &directory, false).unwrap());
49
-    }
49
+    }*/
50 50
 
51 51
     // Listen for CLI commands.
52 52
     // TODO: Graceful shutdown on errors.

+ 8
- 0
src/bin/server.rs 查看文件

@@ -1,4 +1,6 @@
1 1
 use std::net::SocketAddr;
2
+use std::sync::{Arc, Mutex};
3
+use std::fs::create_dir_all;
2 4
 
3 5
 use async_tungstenite::accept_async;
4 6
 use log::*;
@@ -7,6 +9,8 @@ use tokio::stream::StreamExt;
7 9
 use futures_tokio_compat::Compat;
8 10
 use futures::sink::SinkExt;
9 11
 
12
+use twfss::Database;
13
+
10 14
 fn config_dir() -> String {
11 15
     format!(
12 16
         "{}/.config/twfss",
@@ -29,7 +33,11 @@ fn db_path() -> String {
29 33
 async fn main() {
30 34
     env_logger::init();
31 35
 
36
+    // Create the configuration directory if necessary.
37
+    create_dir_all(config_dir()).unwrap();
38
+
32 39
     // Read configuration.
40
+    let _db = Arc::new(Mutex::new(Database::create_or_open(&db_path()).unwrap()));
33 41
     // TODO
34 42
 
35 43
     // Start a thread that periodically checks whether the maximum database size

+ 15
- 0
src/database.rs 查看文件

@@ -0,0 +1,15 @@
1
+pub struct Database {
2
+    // TODO
3
+}
4
+
5
+impl Database {
6
+    pub fn create_or_open(_path: &str) -> Result<Database, crate::Error> {
7
+        // TODO
8
+        Ok(Database {})
9
+    }
10
+
11
+    pub fn synchronized_directories(&self) -> Vec<String> {
12
+        // TODO
13
+        Vec::new()
14
+    }
15
+}

+ 124
- 0
src/filetree.rs 查看文件

@@ -0,0 +1,124 @@
1
+use std::sync::Arc;
2
+
3
+use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
4
+use tokio::sync::Mutex;
5
+
6
+use super::Database;
7
+use super::Error;
8
+
9
+pub struct FileTree {
10
+    db: Arc<Mutex<Database>>,
11
+    new_version_notifiers: Vec<UnboundedSender<NewVersionEvent>>,
12
+    next_version_source: u64,
13
+}
14
+
15
+impl FileTree {
16
+    pub fn open(db: Arc<Mutex<Database>>, _directory: &str) -> Result<FileTree, Error> {
17
+        // Check whether the database entry for this file tree exists and fetch the root
18
+        // file ID.
19
+        // TODO
20
+
21
+        // Create a lock file to make sure that the directory is only synchronized once.
22
+        // TODO
23
+
24
+        // Clear up any temporary files which were left over.
25
+        // TODO
26
+
27
+        // Enqueue the root directory to be rechecked against the database.
28
+        // TODO
29
+
30
+        Ok(FileTree {
31
+            db,
32
+            new_version_notifiers: Vec::new(),
33
+            next_version_source: 0,
34
+        })
35
+    }
36
+
37
+    pub fn new_notifier(&mut self) -> NewVersionNotifier {
38
+        let (send, receive) = unbounded_channel();
39
+        self.new_version_notifiers.push(send);
40
+        NewVersionNotifier { receive }
41
+    }
42
+
43
+    pub fn new_version_source(&mut self) -> NewVersionSource {
44
+        let source = NewVersionSource {
45
+            id: self.next_version_source,
46
+        };
47
+        self.next_version_source += 1;
48
+        source
49
+    }
50
+
51
+    pub fn get_file_info(_path: &str) -> Option<FileInfo> {
52
+        // TODO
53
+        panic!("Not yet implemented.");
54
+    }
55
+
56
+    pub fn get_directory_listing(_path: &str) -> Option<Vec<FileInfo>> {
57
+        // TODO
58
+        panic!("Not yet implemented.");
59
+    }
60
+
61
+    pub fn local_file_changed(_source: NewVersionSource, _path: &str, _info: FileInfo) {
62
+        // TODO
63
+        panic!("Not yet implemented.");
64
+    }
65
+
66
+    pub fn local_file_removed(_source: NewVersionSource, _path: &str) {
67
+        // TODO
68
+        panic!("Not yet implemented.");
69
+    }
70
+
71
+    // TODO: Limit on size of all temporary files, make this function return a
72
+    // future which yields a temporary file once enough space is available?
73
+    pub fn create_temporary_file(_target_path: &str) -> Result<TemporaryFile, Error> {
74
+        // TODO
75
+        panic!("Not yet implemented.");
76
+    }
77
+
78
+    pub fn update_file(
79
+        _source: NewVersionSource,
80
+        _path: &str,
81
+        _info: FileInfo,
82
+        _contents: TemporaryFile,
83
+        _origin: OriginInfo,
84
+    ) -> Result<NewVersionEvent, Error> {
85
+        // TODO
86
+        panic!("Not yet implemented.");
87
+    }
88
+
89
+    pub fn create_directory(
90
+        _source: NewVersionSource,
91
+        _path: &str,
92
+        _origin: OriginInfo,
93
+    ) -> Result<NewVersionEvent, Error> {
94
+        // TODO
95
+        panic!("Not yet implemented.");
96
+    }
97
+}
98
+
99
+pub struct NewVersionNotifier {
100
+    // TODO: Probably should not be public.
101
+    pub receive: UnboundedReceiver<NewVersionEvent>,
102
+}
103
+
104
+pub struct NewVersionEvent {
105
+    pub source: NewVersionSource,
106
+    // TODO
107
+}
108
+
109
+#[derive(Clone, Copy, PartialEq, Eq)]
110
+pub struct NewVersionSource {
111
+    id: u64,
112
+}
113
+
114
+pub struct FileInfo {
115
+    // TODO
116
+}
117
+
118
+pub struct TemporaryFile {
119
+    // TODO
120
+}
121
+
122
+pub struct OriginInfo {
123
+    // TODO
124
+}

+ 113
- 48
src/lib.rs 查看文件

@@ -1,57 +1,122 @@
1 1
 use std::io;
2
-use std::sync::{Arc, Mutex};
3 2
 
4
-pub struct SynchronizedDirectory {
5
-    db: Arc<Mutex<Database>>,
6
-    master: bool,
7
-    // TODO
8
-}
3
+mod database;
4
+mod filetree;
9 5
 
10
-impl SynchronizedDirectory {
11
-    pub fn open(
12
-        db: Arc<Mutex<Database>>,
13
-        _path: &str,
14
-        master: bool,
15
-    ) -> Result<SynchronizedDirectory, Error> {
16
-        // TODO
17
-        Ok(SynchronizedDirectory { db, master })
18
-    }
6
+pub use database::Database;
7
+pub use filetree::FileTree;
19 8
 
20
-    // TODO: Login information.
21
-    pub fn new(
22
-        db: Arc<Mutex<Database>>,
23
-        local_path: &str,
24
-        remote_path: &str,
25
-        master: bool,
26
-    ) -> Result<SynchronizedDirectory, Error> {
27
-        // TODO
28
-        Ok(SynchronizedDirectory { db, master })
29
-    }
30
-}
9
+// Sketch of the synchronization algorithm:
10
+//
11
+// if (filesystem_change) {
12
+//     if the change is already represented in the mode in the database, ignore it;
13
+//     scan the directory/file for changes (recalculate hashes if necessary);
14
+//     if necessary, update the local version;
15
+//     notify client connections about the change, push the change to server connections;
16
+//     if a server rejects the change, copy contents to a conflict file and revert;
17
+// }
18
+// if (new_version_notification_from_server) {
19
+//     the server sent us a path and a version for each element in the path;
20
+//     if the path is a file, download the hashes for the file and start downloading
21
+//     content;
22
+//     if the path is a directory, list the directory (recursively) and download changes;
23
+//     update the version whenever an element was locally updated;
24
+//     update the version of directories when all files/directories below it were updated;
25
+//     notify clients;
26
+// }
27
+// if (update_from_client) {
28
+//     compare the client's server version with our version, if different, early reject;
29
+//     download file information and store in temporary file;
30
+//     check whether our version changed, if so, late reject;
31
+//     move file and update;
32
+//     update database version and hashes;
33
+//     push update to server, similar to local file system change;
34
+// }
35
+//
36
+// Synchronization issues between the three code paths:
37
+// - File system operations (except for temporary files) and database changes (final
38
+// conflict checks as well as hash updates?) need to be synchronized.
39
+// - If any operation locks a directory, operations to the files within the directory can
40
+// be delayed due to locking, and have to re-check whether the file in question is still
41
+// the same file (version check + ID check?).
31 42
 
32
-pub struct SyncEvent {
33
-    // TODO
34
-}
35
-
36
-impl SyncEvent {
37
-    // TODO
38
-}
43
+// Sketch of the database structure:
44
+// (TODO: Rework and adapt to reality?)
45
+//
46
+// The server-side database has a list of synchronization roots and stores a file tree for each.
47
+// It also stores all recent/valid synchronization "connection" declarations, to be able to prune
48
+// synchronization roots periodically.
49
+//
50
+// The client-side databse has a list of synchronization roots and stores a file tree for each.
51
+// It also stores one synchronization "connection" for each root, which contains the server address,
52
+// login information and the mode of synchronization.
53
+//
54
+// FileTree:
55
+// - path
56
+// Object (file or directory):
57
+// - path
58
+// - type
59
+// - version
60
+// - remote version
61
+// - remote author (connection which created the last remote version)
62
+// - modification time
63
+// - size
64
+// - checksum
65
+// - content-id (points to file contents, can stay the same for file move)
66
+// Content (for files, one per ):
67
+// - content-id
68
+// - offset
69
+// - length
70
+// - checksum
71
+//
72
+// Synchronization connection:
73
+// - id
74
+// - key
75
+// - path
76
+// - address (only for client)
77
+// - direction
78
+// - last logged in
79
+// - conflict (only for server, a second upload-only client logged in since the last logout)
39 80
 
40
-pub struct Database {
41
-    // TODO
42
-}
43
-
44
-impl Database {
45
-    pub fn create_or_open(_path: &str) -> Result<Database, Error> {
46
-        // TODO
47
-        Ok(Database {})
48
-    }
49
-
50
-    pub fn synchronized_directories(&self) -> Vec<String> {
51
-        // TODO
52
-        Vec::new()
53
-    }
54
-}
81
+//use std::sync::{Arc, Mutex};
82
+//
83
+//pub mod protocol;
84
+//
85
+//pub struct SynchronizedDirectory {
86
+//    db: Arc<Mutex<Database>>,
87
+//    master: bool,
88
+//    // TODO
89
+//}
90
+//
91
+//impl SynchronizedDirectory {
92
+//    pub fn open(
93
+//        db: Arc<Mutex<Database>>,
94
+//        _path: &str,
95
+//        master: bool,
96
+//    ) -> Result<SynchronizedDirectory, Error> {
97
+//        // TODO
98
+//        Ok(SynchronizedDirectory { db, master })
99
+//    }
100
+//
101
+//    // TODO: Login information.
102
+//    pub fn new(
103
+//        db: Arc<Mutex<Database>>,
104
+//        local_path: &str,
105
+//        remote_path: &str,
106
+//        master: bool,
107
+//    ) -> Result<SynchronizedDirectory, Error> {
108
+//        // TODO
109
+//        Ok(SynchronizedDirectory { db, master })
110
+//    }
111
+//}
112
+//
113
+//pub struct SyncEvent {
114
+//    // TODO
115
+//}
116
+//
117
+//impl SyncEvent {
118
+//    // TODO
119
+//}
55 120
 
56 121
 #[derive(Debug)]
57 122
 pub enum Error {

Loading…
取消
儲存