瀏覽代碼

Switch to async-std and add a websocket server.

Mathias Gottschlag 6 年之前
父節點
當前提交
490837dafe
共有 4 個檔案被更改,包括 914 行新增18 行删除
  1. 823
    0
      Cargo.lock
  2. 5
    0
      Cargo.toml
  3. 23
    18
      src/bin/client.rs
  4. 63
    0
      src/bin/server.rs

+ 823
- 0
Cargo.lock
文件差異過大導致無法顯示
查看文件


+ 5
- 0
Cargo.toml 查看文件

@@ -11,3 +11,8 @@ serde = { version = "1.0", features = ["derive"] }
11 11
 serde_json = "1.0"
12 12
 dirs = "2.0"
13 13
 byteorder = "1.3"
14
+async-std = "1.0"
15
+async-tungstenite = { version = "0.3", features = ["async-std-runtime"] }
16
+futures = "0.3"
17
+env_logger = "0.7"
18
+log = "0.4"

+ 23
- 18
src/bin/client.rs 查看文件

@@ -1,8 +1,10 @@
1 1
 use std::fs::create_dir_all;
2
-use std::io::{self, Read, Write};
3
-use std::os::unix::net::{UnixListener, UnixStream};
2
+use std::io;
4 3
 use std::sync::{Arc, Mutex};
5
-use std::thread;
4
+
5
+use async_std::os::unix::net::{UnixListener, UnixStream};
6
+use async_std::prelude::*;
7
+use async_std::task;
6 8
 
7 9
 use twfss::{Database, SynchronizedDirectory};
8 10
 
@@ -27,6 +29,10 @@ fn db_path() -> String {
27 29
 }
28 30
 
29 31
 fn main() {
32
+    task::block_on(run());
33
+}
34
+
35
+async fn run() {
30 36
     // Create the data directories if necessary.
31 37
     create_dir_all(config_dir()).unwrap();
32 38
     create_dir_all(data_dir()).unwrap();
@@ -47,21 +53,20 @@ fn main() {
47 53
 
48 54
     // Listen for CLI commands.
49 55
     // TODO: Graceful shutdown on errors.
50
-    let listener = UnixListener::bind(cli::socket_path()).unwrap();
51
-    for stream in listener.incoming() {
56
+    let listener = UnixListener::bind(cli::socket_path()).await.unwrap();
57
+    let mut incoming = listener.incoming();
58
+    while let Some(stream) = incoming.next().await {
52 59
         match stream {
53 60
             Ok(stream) => {
54
-                thread::spawn(move || {
55
-                    let mut stream = stream;
56
-                    match handle_cli_client(&mut stream) {
57
-                        Ok(()) => {}
58
-                        Err(e) => {
59
-                            // Log error and try to send it to the stream.
60
-                            // TODO
61
-                            write!(stream, "Error: {:?}", e).ok();
62
-                        }
63
-                    };
64
-                });
61
+                let mut stream = stream;
62
+                match handle_cli_client(&mut stream).await {
63
+                    Ok(()) => {}
64
+                    Err(e) => {
65
+                        // Log error and try to send it to the stream.
66
+                        // TODO
67
+                        write!(stream, "Error: {:?}", e).await.ok();
68
+                    }
69
+                };
65 70
             }
66 71
             Err(err) => {
67 72
                 eprintln!("Error while listening on the local unix socket: {:?}", err);
@@ -71,9 +76,9 @@ fn main() {
71 76
     }
72 77
 }
73 78
 
74
-fn handle_cli_client(stream: &mut UnixStream) -> Result<(), Error> {
79
+async fn handle_cli_client(stream: &mut UnixStream) -> Result<(), Error> {
75 80
     let mut request = String::new();
76
-    stream.read_to_string(&mut request)?;
81
+    stream.read_to_string(&mut request).await?;
77 82
 
78 83
     let options: cli::Options = serde_json::from_str(&request)?;
79 84
     let _verbose = options.verbose;

+ 63
- 0
src/bin/server.rs 查看文件

@@ -1,3 +1,66 @@
1
+use async_std::net::{SocketAddr, TcpListener, TcpStream};
2
+use async_std::task;
3
+use async_tungstenite::accept_async;
4
+use futures::{SinkExt, StreamExt};
5
+use log::*;
6
+
7
+fn config_dir() -> String {
8
+    format!(
9
+        "{}/.config/twfss",
10
+        dirs::home_dir().unwrap().to_str().unwrap().to_owned()
11
+    )
12
+}
13
+
14
+fn data_dir() -> String {
15
+    format!(
16
+        "{}/.local/share/twfss-server",
17
+        dirs::home_dir().unwrap().to_str().unwrap().to_owned()
18
+    )
19
+}
20
+
21
+fn db_path() -> String {
22
+    format!("{}/server.db", data_dir())
23
+}
24
+
1 25
 fn main() {
26
+    task::block_on(run());
27
+}
28
+
29
+async fn run() {
30
+    env_logger::init();
31
+
32
+    // Read configuration.
33
+    // TODO
34
+
35
+    // Start a thread that periodically checks whether the maximum database size
36
+    // has been exceeded, and if has, whether a directory has not been used for
37
+    // a long time. In that case, the file database for that directory is deleted.
2 38
     // TODO
39
+
40
+    // Start the websocket server and listen for connections.
41
+    let addr = "127.0.0.1:1199";
42
+    let listener = TcpListener::bind(&addr).await.unwrap();
43
+    info!("Listening on {}.", addr);
44
+
45
+    while let Ok((stream, _)) = listener.accept().await {
46
+        let peer = stream
47
+            .peer_addr()
48
+            .expect("connected streams should have a peer address");
49
+
50
+        async_std::task::spawn(accept_connection(peer, stream));
51
+    }
52
+}
53
+
54
+async fn accept_connection(peer: SocketAddr, stream: TcpStream) {
55
+    let mut ws_stream = accept_async(stream).await.expect("Failed to accept");
56
+
57
+    info!("New WebSocket connection from {}.", peer);
58
+
59
+    while let Some(msg) = ws_stream.next().await {
60
+        let msg = msg.expect("Failed to get request");
61
+        // TODO
62
+        if msg.is_text() || msg.is_binary() {
63
+            ws_stream.send(msg).await.expect("Failed to send response");
64
+        }
65
+    }
3 66
 }

Loading…
取消
儲存