use std::sync::Arc; use std::time::Duration; use fswatcher::{FileEventDelay, FileSystemEvent, StopReason}; use futures::future::{self, Either}; use tokio::stream::StreamExt; use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::task; use super::{Error, FileTree, SynchronizationError}; pub struct FileSystemWatcher { state_send: mpsc::Sender<(StateChange, oneshot::Sender<()>)>, } impl FileSystemWatcher { pub fn new( file_tree: Arc>, errors: mpsc::Sender, ) -> Result { // Spawn the task that watches the file system as soon as unpause() is called. let (state_send, pause_receive) = mpsc::channel(1); tokio::spawn(async move { Self::task_paused(file_tree, pause_receive, errors).await; }); Ok(FileSystemWatcher { state_send }) } pub async fn pause(&mut self) { let (send, receive) = oneshot::channel(); self.state_send .send((StateChange::Pause, send)) .await .unwrap(); receive.await.unwrap(); } pub async fn unpause(&mut self) { let (send, receive) = oneshot::channel(); self.state_send .send((StateChange::Unpause, send)) .await .unwrap(); receive.await.unwrap(); } async fn task_paused( mut file_tree: Arc>, mut state_receive: mpsc::Receiver<(StateChange, oneshot::Sender<()>)>, mut errors: mpsc::Sender, ) { let mut paused = true; loop { if paused { // If we are paused, we only need to wait for state changes. let (new_state, send) = state_receive.recv().await.unwrap(); match new_state { StateChange::Pause => paused = true, StateChange::Unpause => paused = false, StateChange::Terminate => break, // No reply, see drop(). } send.send(()).unwrap(); } else { // We were unpaused, initialize the file system watcher and start waiting for // that as well. let next_state = Self::task_unpaused(&mut file_tree, &mut state_receive, &mut errors).await; match next_state { StateChange::Pause => paused = true, StateChange::Unpause => paused = false, StateChange::Terminate => break, } } } } async fn task_unpaused( file_tree: &mut Arc>, state_receive: &mut mpsc::Receiver<(StateChange, oneshot::Sender<()>)>, errors: &mut mpsc::Sender, ) -> StateChange { let root_path = file_tree.lock().await.root_path().to_owned(); let fsw = match fswatcher::FileSystemWatcher::new(&root_path) { Ok(fsw) => fsw, Err(e) => return Self::wait_for_pause(e.into(), state_receive, errors).await, }; let mut file_events = FileEventDelay::new(fsw, Duration::from_secs(3)); let mut error_needs_pause = None; while error_needs_pause.is_none() { // We only want to wait for file system events until a state change pauses the file // system watcher. match future::select(Box::pin(file_events.next()), Box::pin(state_receive.recv())).await { Either::Left((event, _state_future)) => { // We cannot get any end-of-stream None here - we would first get a "Stopped" // file event. if let Err(error) = Self::handle_file_event(file_tree, errors, event.unwrap()).await { error_needs_pause = Some(error); } } Either::Right((next_state, _file_event_future)) => { match next_state.unwrap() { (StateChange::Unpause, send) => send.send(()).unwrap(), (StateChange::Pause, send) => { send.send(()).unwrap(); return StateChange::Pause; } (StateChange::Terminate, _) => return StateChange::Terminate, }; } }; } return Self::wait_for_pause(error_needs_pause.unwrap(), state_receive, errors).await; } async fn wait_for_pause( reason: Error, state_receive: &mut mpsc::Receiver<(StateChange, oneshot::Sender<()>)>, errors: &mut mpsc::Sender, ) -> StateChange { match errors .send(SynchronizationError { needs_pause: true, error: reason, }) .await { Ok(()) => (), Err(_) => panic!("Could not send synchronization error!"), }; loop { match state_receive.recv().await.unwrap() { (StateChange::Unpause, send) => send.send(()).unwrap(), (StateChange::Pause, send) => { send.send(()).unwrap(); return StateChange::Pause; } (StateChange::Terminate, _) => return StateChange::Terminate, } } } async fn handle_file_event( _file_tree: &mut Arc>, errors: &mut mpsc::Sender, event: FileSystemEvent, ) -> Result<(), Error> { match event { FileSystemEvent::Stopped(reason) => match reason { StopReason::DirectoryRemoved => { return Err(Error::DirectoryRemoved); } }, FileSystemEvent::DirectoryWatched(_path) => { // TODO } FileSystemEvent::DirectoryCreated(_path) => { // TODO } FileSystemEvent::DirectoryModified(_path) => { // TODO } FileSystemEvent::DirectoryRemoved(_path) => { // TODO } FileSystemEvent::DirectoryMoved(_from, _to) => { // TODO } FileSystemEvent::FileCreated(_path) => { // TODO } FileSystemEvent::FileModified(_path) => { // TODO } FileSystemEvent::FileRemoved(_path) => { // TODO } FileSystemEvent::FileMoved(_from, _to) => { // TODO } FileSystemEvent::Error(e) => { errors .send(SynchronizationError { needs_pause: false, error: e.into(), }) .await .ok(); } } // Yield to other parts of the program to minimize the impact of scanning large // directory trees. // TODO: Check whether this is really necessary and how large the impact on // performance is. task::yield_now().await; Ok(()) } } impl Drop for FileSystemWatcher { fn drop(&mut self) { // The receiver is immediately dropped, so the task must not send a reply. // TODO: This is racy, the task should be synchronously paused first. let (send, _receive) = oneshot::channel(); // Here, try_send() always succeeds, because the channel always has enough capacity. // pause() and unpause() are synchronous and wait until the task has emptied the channel. self.state_send .try_send((StateChange::Terminate, send)) .unwrap(); } } struct InotifyBuffer { data: [u8; 1024], } impl AsMut<[u8]> for InotifyBuffer { fn as_mut<'a>(&'a mut self) -> &'a mut [u8] { &mut self.data } } impl AsRef<[u8]> for InotifyBuffer { fn as_ref<'a>(&'a self) -> &'a [u8] { &self.data } } #[derive(Debug)] enum StateChange { Pause, Unpause, Terminate, }