two-way file system sync
Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.

client.rs 6.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. use std::fs::create_dir_all;
  2. use std::sync::Arc;
  3. use std::ffi::{OsStr, OsString};
  4. use tokio::io::{AsyncReadExt, AsyncWriteExt};
  5. use tokio::net::{UnixListener, UnixStream};
  6. use tokio::stream::StreamExt;
  7. use tokio::sync::{mpsc, Mutex};
  8. use tokio::signal;
  9. use twfss::{Database, FileTree, ClientSideSync, FileSystemWatcher, Error, SynchronizationError, paths};
  10. mod cli;
  11. fn config_dir() -> String {
  12. format!(
  13. "{}/.config/twfss",
  14. dirs::home_dir().unwrap().to_str().unwrap().to_owned()
  15. )
  16. }
  17. fn data_dir() -> String {
  18. format!(
  19. "{}/.local/share/twfss-client",
  20. dirs::home_dir().unwrap().to_str().unwrap().to_owned()
  21. )
  22. }
  23. fn db_path() -> String {
  24. format!("{}/client.db", data_dir())
  25. }
  26. struct SynchronizedDirectory {
  27. path: OsString,
  28. client_side_sync: ClientSideSync,
  29. file_system_watcher: FileSystemWatcher,
  30. last_error: Option<Error>,
  31. }
  32. impl SynchronizedDirectory {
  33. async fn open(database: Arc<Mutex<Database>>, directory: &OsStr, errors: mpsc::Sender<SynchronizationError>) -> Result<SynchronizedDirectory, Error> {
  34. let file_tree = Arc::new(Mutex::new(FileTree::open(database, directory)?));
  35. let client_side_sync = ClientSideSync::new(file_tree.clone(), errors.clone()).await;
  36. let file_system_watcher = FileSystemWatcher::new(file_tree, errors).await;
  37. Ok(SynchronizedDirectory{
  38. path: directory.to_owned(),
  39. client_side_sync,
  40. file_system_watcher,
  41. last_error: None,
  42. })
  43. }
  44. async fn unpause(&mut self) {
  45. self.client_side_sync.unpause().await;
  46. self.file_system_watcher.unpause().await;
  47. }
  48. async fn pause(&mut self) {
  49. self.client_side_sync.pause().await;
  50. self.file_system_watcher.pause().await;
  51. }
  52. }
  53. #[tokio::main]
  54. async fn main() {
  55. // Create the data directories if necessary.
  56. create_dir_all(config_dir()).unwrap();
  57. create_dir_all(data_dir()).unwrap();
  58. // Check whether the client is already running, and exit if it is.
  59. // TODO
  60. let mut ctrl_c = Box::pin(signal::ctrl_c());
  61. let (errors_send, mut errors) = mpsc::channel(32);
  62. // We create the socket before starting synchronization, so that the unwrap() below does not
  63. // cause corruption.
  64. let mut listener = UnixListener::bind(paths::client_socket()).unwrap();
  65. let mut incoming = listener.incoming();
  66. // Initialize the existing synchronized directories.
  67. let mut directories = Vec::new();
  68. let db = Arc::new(Mutex::new(Database::create_or_open(&db_path()).unwrap()));
  69. for directory in db.lock().await.synchronized_directories() {
  70. // The function cannot fail - we just fetched the directory from the database.
  71. let mut sync_dir = SynchronizedDirectory::open(db.clone(), &directory, errors_send.clone()).await.unwrap();
  72. sync_dir.unpause().await;
  73. directories.push(sync_dir);
  74. }
  75. loop {
  76. tokio::select! {
  77. result = &mut ctrl_c => {
  78. result.expect("could not listen for Ctrl+C");
  79. println!("Ctrl+C pressed, terminating...");
  80. // Ctrl+C was pressed, so we terminate the program.
  81. break;
  82. }
  83. stream = incoming.next() => {
  84. match stream {
  85. Some(Ok(stream)) => {
  86. let mut stream = stream;
  87. match handle_cli_client(&mut stream, &mut directories, db.clone()).await {
  88. Ok(()) => {}
  89. Err(e) => {
  90. // Log error and try to send it to the stream.
  91. // TODO: We want to send a return code as well.
  92. stream
  93. .write_all(format!("Error: {:?}", e).as_bytes())
  94. .await
  95. .ok();
  96. }
  97. };
  98. }
  99. Some(Err(err)) => {
  100. eprintln!("Error while listening on the local unix socket: {:?}", err);
  101. break;
  102. }
  103. None => {
  104. eprintln!("listening for CLI connections failed");
  105. break;
  106. }
  107. }
  108. }
  109. error = errors.next() => {
  110. let error = error.unwrap();
  111. eprintln!("error: {:?}", error.error);
  112. for directory in directories.iter_mut() {
  113. if directory.path == error.directory {
  114. // In theory, there could be a race condition here where we pause a
  115. // directory that was just recreated, when the directory which caused the
  116. // error was already deleted.
  117. if error.needs_pause {
  118. directory.pause().await;
  119. }
  120. directory.last_error = Some(error.error);
  121. break;
  122. }
  123. }
  124. }
  125. }
  126. }
  127. // For a clean shutdown, we need to pause all synchtonized directories. Then,
  128. // we can be sure that the database is not modified anymore.
  129. for mut directory in directories.into_iter() {
  130. directory.pause().await;
  131. }
  132. }
  133. async fn handle_cli_client(stream: &mut UnixStream, directories: &mut Vec<SynchronizedDirectory>, _db: Arc<Mutex<Database>>) -> Result<(), Error> {
  134. let mut request = String::new();
  135. stream.read_to_string(&mut request).await?;
  136. let options: cli::Options = serde_json::from_str(&request)?;
  137. let verbose = options.verbose;
  138. match options.command {
  139. _cmd @ cli::Command::ListDirectories { .. } => {
  140. // TODO
  141. }
  142. _cmd @ cli::Command::AddDirectory { .. } => {
  143. // TODO
  144. }
  145. _cmd @ cli::Command::RemoveDirectory { .. } => {
  146. // TODO
  147. }
  148. cli::Command::Pause => {
  149. for directory in directories.iter_mut() {
  150. directory.pause().await;
  151. if verbose {
  152. stream.write_all(format!("Paused {}.\n", directory.path.to_string_lossy()).as_bytes()).await.ok();
  153. }
  154. }
  155. stream.write_all("Synchronization paused.\n".as_bytes()).await.ok();
  156. }
  157. cli::Command::Resume => {
  158. for directory in directories.iter_mut() {
  159. directory.unpause().await;
  160. if verbose {
  161. stream.write_all(format!("Resumed {}.\n", directory.path.to_string_lossy()).as_bytes()).await.ok();
  162. }
  163. }
  164. stream.write_all("Synchronization resumed.\n".as_bytes()).await.ok();
  165. }
  166. }
  167. Ok(())
  168. }