two-way file system sync
您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

client.rs 6.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. use std::time::Duration;
  2. use async_tungstenite::{tokio::connect_async, tungstenite::Message, WebSocketStream};
  3. use core::pin::Pin;
  4. use futures::future::{self, Either, Future};
  5. use futures::task::{Context, Poll};
  6. use tokio::stream::Stream;
  7. use tokio::stream::StreamExt;
  8. use tokio::sync::{mpsc, oneshot};
  9. use tokio::time::{delay_for, Delay};
  10. use url::Url;
  11. use super::packet::IncomingPacket;
  12. pub fn create<StreamType, ErrorType, PacketType>(
  13. stream: StreamType,
  14. ) -> (
  15. RPCInterface<StreamType, PacketType>,
  16. ServerEventStream<PacketType>,
  17. )
  18. where
  19. StreamType: Stream<Item = Result<Vec<u8>, ErrorType>> + Unpin + Send + 'static,
  20. {
  21. let (cmd_send, cmd_receive) = mpsc::channel(1);
  22. tokio::spawn(async move {
  23. // TODO: Remove type annotations.
  24. client_connection_task::<StreamType, ErrorType, PacketType>(stream, cmd_receive).await;
  25. });
  26. // TODO
  27. panic!("Not yet implemented.");
  28. }
  29. enum ClientTaskCommand {
  30. Close,
  31. SendPackets,
  32. }
  33. struct OutgoingCallMap {
  34. // TODO
  35. }
  36. async fn client_connection_task<StreamType, ErrorType, PacketType>(
  37. mut stream: StreamType,
  38. mut cmd_receive: mpsc::Receiver<ClientTaskCommand>,
  39. ) where
  40. StreamType: Stream<Item = Result<Vec<u8>, ErrorType>> + Unpin,
  41. {
  42. let mut connection_closed = false;
  43. loop {
  44. match future::select(Box::pin(stream.next()), Box::pin(cmd_receive.recv())).await {
  45. Either::Left((packet, _cmd_future)) => {
  46. match packet {
  47. Some(data) => {
  48. // TODO
  49. }
  50. None => {
  51. // Stream closed. We still need to return errors for any
  52. // packet sent by the client, so do not exit the loop
  53. // immediately but rather return an error for any
  54. // incoming packet.
  55. connection_closed = true;
  56. break;
  57. }
  58. }
  59. }
  60. Either::Right((cmd, _stream_future)) => {
  61. match cmd {
  62. Some(ClientTaskCommand::Close) => {
  63. // The RPC interface for this connection was dropped, so
  64. // we just exit the loop and close the connection.
  65. // We need to return errors for all pending packets
  66. // first, though.
  67. // TODO
  68. break;
  69. }
  70. Some(ClientTaskCommand::SendPackets) => {
  71. // TODO
  72. }
  73. None => {
  74. // This should never happen.
  75. break;
  76. }
  77. }
  78. }
  79. };
  80. }
  81. if connection_closed {
  82. // The stream was closed, but the command channel was not. We still need
  83. // to return errors for any packet sent by the client, so do not exit
  84. // the loop immediately but rather return an error for any incoming
  85. // packet.
  86. client_connection_closed(cmd_receive).await;
  87. } else {
  88. // The command channel was closed, but the stream was not. Gracefully
  89. // close the stream.
  90. // TODO
  91. }
  92. }
  93. async fn client_connection_closed(mut cmd_receive: mpsc::Receiver<ClientTaskCommand>) {
  94. loop {
  95. match cmd_receive.recv().await {
  96. Some(ClientTaskCommand::Close) => {
  97. // The connection was already closed.
  98. break;
  99. }
  100. Some(ClientTaskCommand::SendPackets) => {
  101. // The connection was closed, so return an error for any
  102. // packet sent.
  103. // TODO
  104. }
  105. None => {
  106. // This should never happen.
  107. break;
  108. }
  109. }
  110. }
  111. }
  112. pub struct RPCInterface<StreamType, PacketType> {
  113. stream: StreamType,
  114. _packet: Option<PacketType>, // TODO: Remove.
  115. }
  116. impl<StreamType, PacketType> RPCInterface<StreamType, PacketType> {
  117. fn call(_call: &PacketType) -> RPC<StreamType, PacketType> {
  118. // TODO
  119. panic!("Not yet implemented.");
  120. }
  121. }
  122. pub struct RPC<PacketType, ErrorType> {
  123. result: Pin<Box<oneshot::Receiver<Result<IncomingPacket<PacketType>, ErrorType>>>>,
  124. timeout: Option<Pin<Box<Delay>>>,
  125. }
  126. impl<PacketType, ErrorType> RPC<PacketType, ErrorType> {
  127. pub fn with_timeout(mut self, timeout: Duration) -> Self {
  128. self.timeout = Some(Box::pin(delay_for(timeout)));
  129. self
  130. }
  131. }
  132. impl<PacketType, ErrorType> Future for RPC<PacketType, ErrorType> {
  133. type Output = Result<IncomingPacket<PacketType>, RPCError<ErrorType>>;
  134. fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
  135. // Safe, as we will not move self_.
  136. let self_ = unsafe { self.get_unchecked_mut() };
  137. // Try receiving from the receiver.
  138. match Pin::as_mut(&mut self_.result).poll(cx) {
  139. Poll::Ready(Ok(result)) => return Poll::Ready(result.map_err(|x| RPCError::Stream(x))),
  140. Poll::Ready(Err(_)) => {
  141. // The channel was closed.
  142. return Poll::Ready(Err(RPCError::Closed));
  143. }
  144. Poll::Pending => (),
  145. }
  146. // Nothing received, check the timeout instead.
  147. if self_.timeout.is_some() {
  148. match Pin::as_mut(&mut self_.timeout.as_mut().unwrap()).poll(cx) {
  149. Poll::Ready(()) => return Poll::Ready(Err(RPCError::Timeout)),
  150. Poll::Pending => (),
  151. }
  152. }
  153. Poll::Pending
  154. }
  155. }
  156. // TODO: Do we even need this type?
  157. pub struct ServerEventStream<PacketType> {
  158. packets: Pin<Box<mpsc::Receiver<IncomingPacket<PacketType>>>>,
  159. }
  160. impl<PacketType> Stream for ServerEventStream<PacketType> {
  161. type Item = IncomingPacket<PacketType>;
  162. fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
  163. // Safe, as we will not move self_.
  164. let self_ = unsafe { self.get_unchecked_mut() };
  165. Pin::as_mut(&mut self_.packets).poll_next(cx)
  166. }
  167. }
  168. pub enum RPCError<StreamError> {
  169. Closed,
  170. Timeout,
  171. Stream(StreamError)
  172. }
  173. pub async fn low_level_integration_test_client(address: String) {
  174. use super::websocket::WebSocketConnection;
  175. let url = Url::parse(&address).unwrap();
  176. let (mut ws_stream, _) = connect_async(url).await.unwrap();
  177. let mut conn = WebSocketConnection::new(ws_stream);
  178. let (rpc_interface, server_events) = create::<_, _, u32>(conn);
  179. // TODO
  180. panic!("Not yet implemented.");
  181. }