Explorar el Código

Remove all unsafe code via pin_project.

Mathias Gottschlag hace 5 años
padre
commit
3f1462a9fb
Se han modificado 5 ficheros con 41 adiciones y 47 borrados
  1. 1
    0
      Cargo.lock
  2. 1
    0
      Cargo.toml
  3. 12
    9
      src/network/client.rs
  4. 4
    4
      src/network/mod.rs
  5. 23
    34
      src/network/websocket.rs

+ 1
- 0
Cargo.lock Ver fichero

@@ -1273,6 +1273,7 @@ dependencies = [
1273 1273
  "fswatcher 0.1.0 (git+https://github.com/mgottschlag/fswatcher-rs.git)",
1274 1274
  "futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
1275 1275
  "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
1276
+ "pin-project 0.4.20 (registry+https://github.com/rust-lang/crates.io-index)",
1276 1277
  "range-set 0.0.5 (registry+https://github.com/rust-lang/crates.io-index)",
1277 1278
  "rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)",
1278 1279
  "rusqlite 0.23.1 (registry+https://github.com/rust-lang/crates.io-index)",

+ 1
- 0
Cargo.toml Ver fichero

@@ -22,6 +22,7 @@ range-set = "*"
22 22
 fslock = "0.1"
23 23
 rusqlite = "0.23"
24 24
 thiserror = "1.0"
25
+pin-project = "0.4"
25 26
 
26 27
 [dev-dependencies]
27 28
 criterion = "0.3"

+ 12
- 9
src/network/client.rs Ver fichero

@@ -8,6 +8,7 @@ use futures::sink::{Sink, SinkExt};
8 8
 use futures::stream::{Stream, StreamExt};
9 9
 use futures::task::{Context, Poll};
10 10
 use log::error;
11
+use pin_project::pin_project;
11 12
 use rmps::Serializer;
12 13
 use serde::de::DeserializeOwned;
13 14
 use serde::Serialize;
@@ -240,7 +241,7 @@ impl<PacketType, ErrorType> RPCInterface<PacketType, ErrorType> {
240 241
         }
241 242
 
242 243
         RPC {
243
-            result: Box::pin(receiver),
244
+            result: receiver,
244 245
             timeout: None,
245 246
         }
246 247
     }
@@ -260,14 +261,17 @@ struct CallData<PacketType, ErrorType> {
260 261
     result: oneshot::Sender<Result<PacketType, RPCError<ErrorType>>>,
261 262
 }
262 263
 
264
+#[pin_project]
263 265
 pub struct RPC<PacketType, ErrorType> {
264
-    result: Pin<Box<oneshot::Receiver<Result<PacketType, RPCError<ErrorType>>>>>,
265
-    timeout: Option<Pin<Box<Delay>>>,
266
+    #[pin]
267
+    result: oneshot::Receiver<Result<PacketType, RPCError<ErrorType>>>,
268
+    #[pin]
269
+    timeout: Option<Delay>,
266 270
 }
267 271
 
268 272
 impl<PacketType, ErrorType> RPC<PacketType, ErrorType> {
269 273
     pub fn with_timeout(mut self, timeout: Duration) -> Self {
270
-        self.timeout = Some(Box::pin(delay_for(timeout)));
274
+        self.timeout = Some(delay_for(timeout));
271 275
         self
272 276
     }
273 277
 }
@@ -276,11 +280,10 @@ impl<PacketType, ErrorType> Future for RPC<PacketType, ErrorType> {
276 280
     type Output = Result<PacketType, RPCError<ErrorType>>;
277 281
 
278 282
     fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
279
-        // Safe, as we will not move self_.
280
-        let self_ = unsafe { self.get_unchecked_mut() };
283
+        let this = self.project();
281 284
 
282 285
         // Try receiving from the receiver.
283
-        match Pin::as_mut(&mut self_.result).poll(cx) {
286
+        match this.result.poll(cx) {
284 287
             Poll::Ready(Ok(result)) => return Poll::Ready(result),
285 288
             Poll::Ready(Err(_)) => {
286 289
                 // The channel was closed.
@@ -290,8 +293,8 @@ impl<PacketType, ErrorType> Future for RPC<PacketType, ErrorType> {
290 293
         }
291 294
 
292 295
         // Nothing received, check the timeout instead.
293
-        if self_.timeout.is_some() {
294
-            match Pin::as_mut(&mut self_.timeout.as_mut().unwrap()).poll(cx) {
296
+        if this.timeout.is_some() {
297
+            match this.timeout.as_pin_mut().unwrap().poll(cx) {
295 298
                 Poll::Ready(()) => return Poll::Ready(Err(RPCError::Timeout)),
296 299
                 Poll::Pending => (),
297 300
             }

+ 4
- 4
src/network/mod.rs Ver fichero

@@ -49,12 +49,15 @@ pub mod test_utils {
49 49
     use futures::stream::Stream;
50 50
     use futures::task::Context;
51 51
     use futures::task::Poll;
52
+    use pin_project::pin_project;
52 53
     use std::pin::Pin;
53 54
     use std::time::Duration;
54 55
     use tokio::sync::mpsc;
55 56
 
57
+    #[pin_project]
56 58
     pub struct Pipe {
57 59
         sender: Pin<Box<mpsc::UnboundedSender<Result<Vec<u8>, PipeError>>>>,
60
+        #[pin]
58 61
         receiver: Pin<Box<mpsc::UnboundedReceiver<Result<Vec<u8>, PipeError>>>>,
59 62
     }
60 63
 
@@ -103,10 +106,7 @@ pub mod test_utils {
103 106
         type Item = Result<Vec<u8>, PipeError>;
104 107
 
105 108
         fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
106
-            // Safe, as we will not move self_.
107
-            let self_ = unsafe { self.get_unchecked_mut() };
108
-
109
-            Pin::as_mut(&mut self_.receiver).poll_next(cx)
109
+            self.project().receiver.poll_next(cx)
110 110
         }
111 111
     }
112 112
 

+ 23
- 34
src/network/websocket.rs Ver fichero

@@ -1,3 +1,5 @@
1
+use std::pin::Pin;
2
+
1 3
 use async_tungstenite::tungstenite::{Error as WsError, Message};
2 4
 use async_tungstenite::WebSocketStream;
3 5
 use futures::prelude::*;
@@ -5,17 +7,17 @@ use futures::sink::Sink;
5 7
 use futures::stream::Stream;
6 8
 use futures::task::Context;
7 9
 use futures::task::Poll;
8
-use std::pin::Pin;
10
+use pin_project::pin_project;
9 11
 
12
+#[pin_project]
10 13
 pub struct WebSocketConnection<S> {
11
-    stream: Pin<Box<WebSocketStream<S>>>,
14
+    #[pin]
15
+    stream: WebSocketStream<S>,
12 16
 }
13 17
 
14 18
 impl<S> WebSocketConnection<S> {
15
-    pub fn new(s: WebSocketStream<S>) -> Self {
16
-        Self {
17
-            stream: Box::pin(s),
18
-        }
19
+    pub fn new(stream: WebSocketStream<S>) -> Self {
20
+        Self { stream }
19 21
     }
20 22
 }
21 23
 
@@ -26,31 +28,19 @@ where
26 28
     type Error = WsError;
27 29
 
28 30
     fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
29
-        // Safe, as we will not move self_.
30
-        let self_ = unsafe { self.get_unchecked_mut() };
31
-
32
-        Pin::as_mut(&mut self_.stream).poll_ready(cx)
31
+        self.project().stream.poll_ready(cx)
33 32
     }
34 33
 
35 34
     fn start_send(self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
36
-        // Safe, as we will not move self_.
37
-        let self_ = unsafe { self.get_unchecked_mut() };
38
-
39
-        Pin::as_mut(&mut self_.stream).start_send(Message::Binary(item))
35
+        self.project().stream.start_send(Message::Binary(item))
40 36
     }
41 37
 
42 38
     fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
43
-        // Safe, as we will not move self_.
44
-        let self_ = unsafe { self.get_unchecked_mut() };
45
-
46
-        Pin::as_mut(&mut self_.stream).poll_flush(cx)
39
+        self.project().stream.poll_flush(cx)
47 40
     }
48 41
 
49 42
     fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
50
-        // Safe, as we will not move self_.
51
-        let self_ = unsafe { self.get_unchecked_mut() };
52
-
53
-        Pin::as_mut(&mut self_.stream).poll_close(cx)
43
+        self.project().stream.poll_close(cx)
54 44
     }
55 45
 }
56 46
 
@@ -64,18 +54,17 @@ where
64 54
         self: Pin<&mut Self>,
65 55
         cx: &mut Context,
66 56
     ) -> Poll<Option<<WebSocketConnection<S> as Stream>::Item>> {
67
-        // Safe, as we will not move self_.
68
-        let self_ = unsafe { self.get_unchecked_mut() };
69
-
70
-        match Pin::as_mut(&mut self_.stream).poll_next(cx) {
71
-            Poll::Ready(Some(Ok(Message::Binary(data)))) => Poll::Ready(Some(Ok(data))),
72
-            Poll::Ready(Some(Ok(_))) => unsafe {
73
-                // Ignore other message types, try again.
74
-                Pin::new_unchecked(self_).poll_next(cx)
75
-            },
76
-            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
77
-            Poll::Ready(None) => Poll::Ready(None),
78
-            Poll::Pending => Poll::Pending,
57
+        let mut this = self.project();
58
+        loop {
59
+            match this.stream.as_mut().poll_next(cx) {
60
+                Poll::Ready(Some(Ok(Message::Binary(data)))) => return Poll::Ready(Some(Ok(data))),
61
+                Poll::Ready(Some(Ok(_))) => {
62
+                    // Ignore other message types, try again.
63
+                }
64
+                Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
65
+                Poll::Ready(None) => return Poll::Ready(None),
66
+                Poll::Pending => return Poll::Pending,
67
+            }
79 68
         }
80 69
     }
81 70
 }

Loading…
Cancelar
Guardar