//! Module which implements a low-level packet protocol over websockets. //! //! The low-level protocol needs to implement two basic pieces of functionality: It needs to be //! able to implement remote procedure calls from the client to the server, and it needs to //! provide a method for the server to send asynchronous events. A protocol for both cases is //! outlined below, where each packet is sent as one WebSocket message. //! //! Remote procedure call: //! - The client allocates an ID from the range between 0..2^31-1 that is not currently in use //! for another RPC in progress. //! - The client prepares a packet with this ID and the appropriate payload and sends the packet to //! the server. //! - The server receives the packet, processes the request and sends a packet back with the //! identical ID. //! - In case of an error, the payload has the type "Error" which in turn contains the error //! type. //! - Unpon reception of the response, the client is free to reuse the ID for further RPCs. //! Note that this protocol does not allow the client to cancel an RPC that is currently in //! progress - implementing such functionality is difficult, error-prone, and most likely not //! worth the effort. //! //! Server-side event: //! - The server creates a packet with the ID 2^31 and the event content and sends it to the //! client. //! Note that the client does not have any method to acknowledge event reception. As events are //! asynchronous compared to RPCs on the same connection, the event type needs to be designed in //! a way that no such temporal information is necessary to process the events. //! TODO: Example to demonstrate the problem. pub mod client; pub mod idalloc; pub mod server; pub mod websocket; use serde::{Deserialize, Serialize}; pub const SERVER_EVENT_ID: u32 = u32::max_value(); #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub struct Packet { pub id: u32, pub payload: Payload, } #[cfg(test)] pub mod test_utils { use futures::prelude::*; use futures::sink::Sink; use futures::stream::Stream; use futures::task::Context; use futures::task::Poll; use std::pin::Pin; use std::time::Duration; use tokio::sync::mpsc; pub struct Pipe { sender: Pin, PipeError>>>>, receiver: Pin, PipeError>>>>, } impl Pipe { pub fn new() -> (Self, Self) { let (send1, recv1) = mpsc::unbounded_channel(); let (send2, recv2) = mpsc::unbounded_channel(); ( Self { sender: Box::pin(send1), receiver: Box::pin(recv2), }, Self { sender: Box::pin(send2), receiver: Box::pin(recv1), }, ) } pub fn inject_error(&mut self) { self.sender.send(Err(PipeError::Injected)).unwrap(); } } impl Sink> for Pipe { type Error = PipeError; fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { Poll::Ready(Ok(())) } fn start_send(self: Pin<&mut Self>, item: Vec) -> Result<(), Self::Error> { self.sender.send(Ok(item)).map_err(|_| PipeError::Closed) } fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { Poll::Ready(Ok(())) } fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll> { Poll::Ready(Ok(())) } } impl Stream for Pipe { type Item = Result, PipeError>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { // Safe, as we will not move self_. let self_ = unsafe { self.get_unchecked_mut() }; Pin::as_mut(&mut self_.receiver).poll_next(cx) } } #[derive(Debug, PartialEq)] pub enum PipeError { Closed, Injected, } pub async fn panic_after(d: Duration, task: T) -> T::Output where T: Future + Send + 'static, T::Output: Send + 'static, { let task = tokio::spawn(task); match tokio::time::timeout(d, task).await { Err(_) => panic!("function timed out ({:?})", d), Ok(x) => x.unwrap(), } } }