use std::sync::Arc; use tokio::sync::{mpsc, oneshot, Mutex}; use super::file_tree::{FileTree, NewVersionSource}; use super::{Error, SynchronizationError}; pub struct ClientSideSync { state_send: mpsc::Sender<(StateChange, oneshot::Sender<()>)>, new_version_source: NewVersionSource, } // TODO: Parametrize the server protocol! // TODO: Can one directory be synchronized to multiple servers? Cycles are then possible! impl ClientSideSync { // TODO: Restrict the size of the files downloaded by the synchronization client? pub async fn new( file_tree: Arc>, errors: mpsc::Sender, ) -> Result { let new_version_source = file_tree.lock().await.new_version_source(); // Spawn the task that connects to the server as soon as unpause() is called. let (state_send, pause_receive) = mpsc::channel(1); tokio::spawn(async move { let mut sync_task = ClientSideSyncTask { file_tree, errors, new_version_source, }; sync_task.task_paused(pause_receive).await; }); Ok(ClientSideSync{ state_send, new_version_source, }) } // TODO: Deduplicate this code. pub async fn pause(&mut self) { let (send, receive) = oneshot::channel(); self.state_send .send((StateChange::Pause, send)) .await .unwrap(); receive.await.unwrap(); } pub async fn unpause(&mut self) { let (send, receive) = oneshot::channel(); self.state_send .send((StateChange::Unpause, send)) .await .unwrap(); receive.await.unwrap(); } } struct ClientSideSyncTask { file_tree: Arc>, errors: mpsc::Sender, new_version_source: NewVersionSource, } impl ClientSideSyncTask { async fn task_paused( &mut self, mut state_receive: mpsc::Receiver<(StateChange, oneshot::Sender<()>)>, ) { let mut paused = true; loop { if paused { // If we are paused, we only need to wait for state changes. let (new_state, send) = state_receive.recv().await.unwrap(); match new_state { StateChange::Pause => paused = true, StateChange::Unpause => paused = false, StateChange::Terminate => break, // No reply, see drop(). } send.send(()).unwrap(); } else { // We were unpaused, initialize the file system watcher and start waiting for // that as well. let next_state = self.task_unpaused(&mut state_receive).await; match next_state { StateChange::Pause => paused = true, StateChange::Unpause => paused = false, StateChange::Terminate => break, } } } } async fn task_unpaused( &mut self, state_receive: &mut mpsc::Receiver<(StateChange, oneshot::Sender<()>)>, ) -> StateChange { // Connect to the server. // TODO // Start listening to local events, remote events and pause commands. // TODO // Disconnect again. // TODO panic!("Not yet implemented."); } } #[derive(Debug)] enum StateChange { Pause, Unpause, Terminate, }