Mathias Gottschlag il y a 5 ans
Parent
révision
1b6b9ec44e

+ 1
- 0
.rustfmt.toml Voir le fichier

@@ -0,0 +1 @@
1
+edition = "2018"

+ 11
- 0
Cargo.lock Voir le fichier

@@ -1283,6 +1283,7 @@ dependencies = [
1283 1283
  "thiserror 1.0.19 (registry+https://github.com/rust-lang/crates.io-index)",
1284 1284
  "tokio 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
1285 1285
  "url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
1286
+ "uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
1286 1287
 ]
1287 1288
 
1288 1289
 [[package]]
@@ -1336,6 +1337,15 @@ name = "utf-8"
1336 1337
 version = "0.7.5"
1337 1338
 source = "registry+https://github.com/rust-lang/crates.io-index"
1338 1339
 
1340
+[[package]]
1341
+name = "uuid"
1342
+version = "0.8.1"
1343
+source = "registry+https://github.com/rust-lang/crates.io-index"
1344
+dependencies = [
1345
+ "rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
1346
+ "serde 1.0.111 (registry+https://github.com/rust-lang/crates.io-index)",
1347
+]
1348
+
1339 1349
 [[package]]
1340 1350
 name = "vcpkg"
1341 1351
 version = "0.2.9"
@@ -1628,6 +1638,7 @@ dependencies = [
1628 1638
 "checksum unicode-xid 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c"
1629 1639
 "checksum url 2.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "829d4a8476c35c9bf0bbce5a3b23f4106f79728039b726d292bb93bc106787cb"
1630 1640
 "checksum utf-8 0.7.5 (registry+https://github.com/rust-lang/crates.io-index)" = "05e42f7c18b8f902290b009cde6d651262f956c98bc51bca4cd1d511c9cd85c7"
1641
+"checksum uuid 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "9fde2f6a4bea1d6e007c4ad38c6839fa71cbb63b6dbf5b595aa38dc9b1093c11"
1631 1642
 "checksum vcpkg 0.2.9 (registry+https://github.com/rust-lang/crates.io-index)" = "55d1e41d56121e07f1e223db0a4def204e45c85425f6a16d462fd07c8d10d74c"
1632 1643
 "checksum vec_map 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
1633 1644
 "checksum version_check 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"

+ 1
- 0
Cargo.toml Voir le fichier

@@ -23,6 +23,7 @@ fslock = "0.1"
23 23
 rusqlite = "0.23"
24 24
 thiserror = "1.0"
25 25
 pin-project = "0.4"
26
+uuid = { version = "0.8", features = ["serde", "v4"] }
26 27
 
27 28
 [dev-dependencies]
28 29
 criterion = "0.3"

+ 7
- 0
coverage.sh Voir le fichier

@@ -0,0 +1,7 @@
1
+#/bin/bash
2
+set -e
3
+export CARGO_INCREMENTAL=0
4
+export RUSTFLAGS="-Zprofile -Ccodegen-units=1 -Cinline-threshold=0 -Clink-dead-code -Coverflow-checks=off -Zno-landing-pads"
5
+cargo +nightly build
6
+cargo +nightly test
7
+grcov ./target/debug/ -s . -t html --llvm --branch --ignore-not-existing -o ./target/debug/coverage/

+ 5
- 0
src/bin/multi-user-wrapper.rs Voir le fichier

@@ -0,0 +1,5 @@
1
+#[tokio::main]
2
+async fn main() {
3
+    env_logger::init();
4
+    // TODO
5
+}

+ 14
- 0
src/bin/server.rs Voir le fichier

@@ -9,6 +9,8 @@ use log::*;
9 9
 use tokio::net::{TcpListener, TcpStream};
10 10
 use tokio::stream::StreamExt;
11 11
 
12
+use twfss::network::server::{RPCResponse, RPCServer};
13
+use twfss::protocol::PacketPayload;
12 14
 use twfss::{paths, Database};
13 15
 
14 16
 #[tokio::main]
@@ -63,3 +65,15 @@ async fn accept_connection(peer: SocketAddr, stream: TcpStream) {
63 65
         }
64 66
     }
65 67
 }
68
+
69
+struct ServerConnection {
70
+    // TODO
71
+}
72
+
73
+impl RPCServer for ServerConnection {
74
+    type PacketType = PacketPayload;
75
+
76
+    fn incoming_call(&mut self, call: Self::PacketType, response: RPCResponse<Self::PacketType>) {
77
+        // TODO
78
+    }
79
+}

+ 5
- 1
src/database.rs Voir le fichier

@@ -1,10 +1,10 @@
1 1
 use std::ffi::OsString;
2 2
 
3 3
 use rusqlite::Connection;
4
+//use uuid::Uuid;
4 5
 
5 6
 pub struct Database {
6 7
     db: Connection,
7
-    // TODO
8 8
 }
9 9
 
10 10
 impl Database {
@@ -24,3 +24,7 @@ impl Database {
24 24
         Vec::new()
25 25
     }
26 26
 }
27
+
28
+pub struct SynchronizedDirectoryInfo {
29
+    //root_id: Uuid,
30
+}

+ 0
- 1
src/lib.rs Voir le fichier

@@ -2,7 +2,6 @@
2 2
 extern crate rmp_serde as rmps;
3 3
 
4 4
 use std::ffi::OsString;
5
-use std::io;
6 5
 
7 6
 use thiserror::Error;
8 7
 

+ 48
- 0
src/network/integration_test.rs Voir le fichier

@@ -0,0 +1,48 @@
1
+use log::*;
2
+use tokio::net::{TcpListener, TcpStream};
3
+use tokio::stream::StreamExt;
4
+
5
+pub struct LowLevelIntegrationTestServer {
6
+    // TODO
7
+}
8
+
9
+impl LowLevelIntegrationTestServer {
10
+    pub async fn start(_bind_address: &str) -> LowLevelIntegrationTestServer {
11
+        // TODO
12
+        let addr = "127.0.0.1:12345";
13
+        let mut listener = TcpListener::bind(&addr)
14
+            .await
15
+            .expect("cannot bind to server port");
16
+        let mut incoming = listener.incoming();
17
+        info!("Listening on {}.", addr);
18
+
19
+        while let Some(Ok(stream)) = incoming.next().await {
20
+            let peer = stream
21
+                .peer_addr()
22
+                .expect("connected streams should have a peer address");
23
+
24
+            tokio::spawn(Self::accept_connection(peer, stream));
25
+        }
26
+        LowLevelIntegrationTestServer {}
27
+    }
28
+
29
+    pub async fn stop(self) {
30
+        // TODO
31
+    }
32
+
33
+    async fn accept_connection(peer: SocketAddr, stream: TcpStream) {
34
+        let mut ws_stream = accept_async(TokioAdapter(stream))
35
+            .await
36
+            .expect("Failed to accept");
37
+
38
+        info!("New WebSocket connection from {}.", peer);
39
+
40
+        while let Some(msg) = ws_stream.next().await {
41
+            let msg = msg.expect("Failed to get request");
42
+            // TODO
43
+            if msg.is_text() || msg.is_binary() {
44
+                ws_stream.send(msg).await.expect("Failed to send response");
45
+            }
46
+        }
47
+    }
48
+}

+ 1
- 0
src/network/mod.rs Voir le fichier

@@ -29,6 +29,7 @@
29 29
 
30 30
 pub mod client;
31 31
 pub mod idalloc;
32
+pub mod integration_test;
32 33
 pub mod server;
33 34
 pub mod websocket;
34 35
 

+ 179
- 18
src/network/server.rs Voir le fichier

@@ -1,33 +1,194 @@
1
-// Trait that is implemented by any RPC server to process incoming calls.
2
-trait RPCServer {
1
+use std::fmt;
2
+
3
+use futures::sink::{Sink, SinkExt};
4
+use futures::stream::{Stream, StreamExt};
5
+use log::{error, info};
6
+use rmps::Serializer;
7
+use serde::de::DeserializeOwned;
8
+use serde::Serialize;
9
+use tokio::sync::mpsc;
10
+
11
+use super::{Packet, SERVER_EVENT_ID};
12
+
13
+/// Trait that is implemented by any RPC server to process incoming calls.
14
+pub trait RPCServer {
3 15
     type PacketType;
4
-    type NetworkError;
5 16
 
6
-    fn incoming_call<Response>(&mut self, call: Self::PacketType, response: Response)
7
-    where
8
-        Response: RPCResponse<PacketType = Self::PacketType, NetworkError = Self::NetworkError>;
17
+    fn incoming_call(&mut self, call: Self::PacketType, response: RPCResponse<Self::PacketType>);
9 18
 }
10 19
 
11 20
 // TODO: Document that types implementing this trait should also implement Drop and should return
12 21
 // an error if the type is dropped without send() being called.
13
-trait RPCResponse {
14
-    type PacketType;
15
-    type NetworkError;
22
+pub struct RPCResponse<PacketType> {
23
+    send_packet: mpsc::UnboundedSender<Packet<PacketType>>,
24
+    id: u32,
25
+    error_value: Option<PacketType>,
26
+}
16 27
 
17
-    fn send(self, packet: &Self::PacketType) -> Result<(), Self::NetworkError>;
28
+impl<PacketType> RPCResponse<PacketType>
29
+where
30
+    PacketType: Serialize,
31
+{
32
+    pub fn send(self, packet: PacketType) {
33
+        // We ignore errors here, as they mean that the connection was just closed.
34
+        self.send_packet
35
+            .send(Packet {
36
+                payload: packet,
37
+                id: self.id,
38
+            })
39
+            .ok();
40
+    }
18 41
 }
19 42
 
20
-pub struct LowLevelIntegrationTestServer {
21
-    // TODO
43
+impl<PacketType> Drop for RPCResponse<PacketType> {
44
+    fn drop(&mut self) {
45
+        // Send an error response?
46
+        self.send_packet
47
+            .send(Packet {
48
+                payload: self.error_value.take().unwrap(),
49
+                id: self.id,
50
+            })
51
+            .ok();
52
+    }
22 53
 }
23 54
 
24
-impl LowLevelIntegrationTestServer {
25
-    pub async fn start(_bind_address: &str) -> LowLevelIntegrationTestServer {
26
-        // TODO
27
-        LowLevelIntegrationTestServer {}
55
+/// Wraps an `RPCServer` and adds the network protocol implementation around it.
56
+pub struct RPCServerWrapper<Server: RPCServer, Connection> {
57
+    server: Server,
58
+    connection: Connection,
59
+    server_events: ServerEventStream<Server::PacketType>,
60
+    send_packet: mpsc::UnboundedSender<Packet<Server::PacketType>>,
61
+    sent_packets: mpsc::UnboundedReceiver<Packet<Server::PacketType>>,
62
+    error_value: Server::PacketType,
63
+}
64
+
65
+impl<Server: RPCServer, Connection, ConnectionError> RPCServerWrapper<Server, Connection>
66
+where
67
+    Server::PacketType: DeserializeOwned + Serialize + Clone + Send + 'static,
68
+    Connection: Stream<Item = Vec<u8>> + Sink<Vec<u8>, Error = ConnectionError> + Unpin,
69
+    ConnectionError: fmt::Debug,
70
+{
71
+    pub fn new(
72
+        server: Server,
73
+        connection: Connection,
74
+        server_events: ServerEventStream<Server::PacketType>,
75
+        error_value: Server::PacketType,
76
+    ) -> Self {
77
+        let (send_packet, sent_packets) = mpsc::unbounded_channel();
78
+        Self {
79
+            server,
80
+            connection,
81
+            server_events,
82
+            send_packet,
83
+            sent_packets,
84
+            error_value,
85
+        }
28 86
     }
29 87
 
30
-    pub async fn stop(self) {
31
-        // TODO
88
+    pub async fn run(&mut self) {
89
+        loop {
90
+            tokio::select! {
91
+                packet = &mut self.connection.next() => {
92
+                    match packet {
93
+                        Some(data) => {
94
+                            if !self.packet_received(data) {
95
+                                // Disconnect on errors.
96
+                                break;
97
+                            }
98
+                        }
99
+                        None => {
100
+                            // Stream closed.
101
+                            println!("Disconnected.");
102
+                            break;
103
+                        }
104
+                    }
105
+                }
106
+                cmd = &mut self.sent_packets.next() => {
107
+                    match cmd {
108
+                        Some(packet) => {
109
+                            if !self.send_packet(packet).await {
110
+                                // Disconnect on errors.
111
+                                break;
112
+                            }
113
+                        }
114
+                        None => {
115
+                            // This should never happen.
116
+                            break;
117
+                        }
118
+                    }
119
+                }
120
+                cmd = &mut self.server_events.receive.next() => {
121
+                    match cmd {
122
+                        Some(packet) => {
123
+                            if !self.send_packet(Packet{payload: packet, id: SERVER_EVENT_ID}).await {
124
+                                // Disconnect on errors.
125
+                                break;
126
+                            }
127
+                        }
128
+                        None => {
129
+                            // This should never happen.
130
+                            break;
131
+                        }
132
+                    }
133
+                }
134
+            }
135
+        }
32 136
     }
137
+
138
+    fn packet_received(&mut self, packet: Vec<u8>) -> bool {
139
+        match rmps::decode::from_slice(&packet) as Result<Packet<Server::PacketType>, _> {
140
+            Ok(deserialized) => {
141
+                if deserialized.id == SERVER_EVENT_ID {
142
+                    // Clients must never send server events.
143
+                    info!("Client sent SERVER_EVENT_ID!");
144
+                    return false;
145
+                } else {
146
+                    let response = RPCResponse {
147
+                        send_packet: self.send_packet.clone(),
148
+                        id: deserialized.id,
149
+                        error_value: Some(self.error_value.clone()),
150
+                    };
151
+                    self.server.incoming_call(deserialized.payload, response);
152
+                }
153
+            }
154
+            Err(e) => {
155
+                error!("Invalid packet received: {:?}", e);
156
+                return false;
157
+            }
158
+        };
159
+        return true;
160
+    }
161
+
162
+    async fn send_packet(&mut self, packet: Packet<Server::PacketType>) -> bool {
163
+        let mut serialized = Vec::new();
164
+        packet
165
+            .serialize(
166
+                &mut Serializer::new(&mut serialized)
167
+                    .with_struct_map()
168
+                    .with_string_variants(),
169
+            )
170
+            .unwrap();
171
+        match self.connection.send(serialized).await {
172
+            Ok(()) => true,
173
+            Err(e) => {
174
+                info!("Could not send packet: {:?}", e);
175
+                false
176
+            }
177
+        }
178
+    }
179
+}
180
+
181
+pub fn server_events<PacketType>() -> (ServerEventSink<PacketType>, ServerEventStream<PacketType>) {
182
+    let (send, receive) = mpsc::unbounded_channel();
183
+    (ServerEventSink { send }, ServerEventStream { receive })
184
+}
185
+
186
+/// Stream of asynchronous packets sent to the clients.
187
+#[derive(Clone)]
188
+pub struct ServerEventSink<PacketType> {
189
+    send: mpsc::UnboundedSender<PacketType>,
190
+}
191
+
192
+pub struct ServerEventStream<PacketType> {
193
+    receive: mpsc::UnboundedReceiver<PacketType>,
33 194
 }

+ 1
- 2
src/network/websocket.rs Voir le fichier

@@ -5,8 +5,7 @@ use async_tungstenite::WebSocketStream;
5 5
 use futures::prelude::*;
6 6
 use futures::sink::Sink;
7 7
 use futures::stream::Stream;
8
-use futures::task::Context;
9
-use futures::task::Poll;
8
+use futures::task::{Context, Poll};
10 9
 use pin_project::pin_project;
11 10
 
12 11
 #[pin_project]

+ 12
- 0
src/online_test.rs Voir le fichier

@@ -0,0 +1,12 @@
1
+trait OnlineTest {
2
+    // TODO
3
+}
4
+
5
+/// Tests whether the system is online by connecting the specified address.
6
+struct GenericOnlineTest {
7
+    // TODO
8
+}
9
+
10
+impl OnlineTest for GenericOnlineTest {
11
+    // TODO
12
+}

Loading…
Annuler
Enregistrer