소스 검색

Add a pipe stream for unit tests and a function for test timeouts.

Mathias Gottschlag 5 년 전
부모
커밋
d710da4748
1개의 변경된 파일87개의 추가작업 그리고 0개의 파일을 삭제
  1. 87
    0
      src/network/mod.rs

+ 87
- 0
src/network/mod.rs 파일 보기

@@ -42,3 +42,90 @@ pub struct Packet<Payload> {
42 42
     pub response: bool, // TODO: Not really required.
43 43
     pub payload: Payload,
44 44
 }
45
+
46
+#[cfg(test)]
47
+pub mod test_utils {
48
+    use futures::prelude::*;
49
+    use futures::sink::Sink;
50
+    use futures::stream::Stream;
51
+    use futures::task::Context;
52
+    use futures::task::Poll;
53
+    use std::pin::Pin;
54
+    use std::time::Duration;
55
+    use tokio::sync::mpsc;
56
+
57
+    pub struct Pipe {
58
+        sender: Pin<Box<mpsc::UnboundedSender<Result<Vec<u8>, PipeError>>>>,
59
+        receiver: Pin<Box<mpsc::UnboundedReceiver<Result<Vec<u8>, PipeError>>>>,
60
+    }
61
+
62
+    impl Pipe {
63
+        pub fn new() -> (Self, Self) {
64
+            let (send1, recv1) = mpsc::unbounded_channel();
65
+            let (send2, recv2) = mpsc::unbounded_channel();
66
+            (
67
+                Self {
68
+                    sender: Box::pin(send1),
69
+                    receiver: Box::pin(recv2),
70
+                },
71
+                Self {
72
+                    sender: Box::pin(send2),
73
+                    receiver: Box::pin(recv1),
74
+                },
75
+            )
76
+        }
77
+
78
+        pub fn inject_error(&mut self) {
79
+            self.sender.send(Err(PipeError::Injected)).unwrap();
80
+        }
81
+    }
82
+
83
+    impl Sink<Vec<u8>> for Pipe {
84
+        type Error = PipeError;
85
+
86
+        fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
87
+            Poll::Ready(Ok(()))
88
+        }
89
+
90
+        fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
91
+            self.sender.send(Ok(item)).map_err(|_| PipeError::Closed)
92
+        }
93
+
94
+        fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
95
+            Poll::Ready(Ok(()))
96
+        }
97
+
98
+        fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
99
+            Poll::Ready(Ok(()))
100
+        }
101
+    }
102
+
103
+    impl Stream for Pipe {
104
+        type Item = Result<Vec<u8>, PipeError>;
105
+
106
+        fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
107
+            // Safe, as we will not move self_.
108
+            let self_ = unsafe { self.get_unchecked_mut() };
109
+
110
+            Pin::as_mut(&mut self_.receiver).poll_next(cx)
111
+        }
112
+    }
113
+
114
+    #[derive(Debug, PartialEq)]
115
+    pub enum PipeError {
116
+        Closed,
117
+        Injected,
118
+    }
119
+
120
+    pub async fn panic_after<T>(d: Duration, task: T) -> T::Output
121
+    where
122
+        T: Future + Send + 'static,
123
+        T::Output: Send + 'static,
124
+    {
125
+        let task = tokio::spawn(task);
126
+        match tokio::time::timeout(d, task).await {
127
+            Err(_) => panic!("function timed out ({:?})", d),
128
+            Ok(x) => x.unwrap(),
129
+        }
130
+    }
131
+}

Loading…
취소
저장