Quellcode durchsuchen

base-station/software: Reimplement MQTT support and integrate with tokio.

Mathias Gottschlag vor 4 Jahren
Ursprung
Commit
857d4df37a

+ 19
- 2
base-station/software/Cargo.lock Datei anzeigen

@@ -790,7 +790,7 @@ dependencies = [
790 790
  "indexmap",
791 791
  "slab",
792 792
  "tokio 0.2.20",
793
- "tokio-util",
793
+ "tokio-util 0.3.1",
794 794
  "tracing",
795 795
  "tracing-futures",
796 796
 ]
@@ -1314,10 +1314,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
1314 1314
 checksum = "28e75553120607736d0a248f483fb0682e0279bb9a6f30a7c76c56a9a6ac9ade"
1315 1315
 dependencies = [
1316 1316
  "byteorder",
1317
+ "bytes 1.0.1",
1317 1318
  "lazy_static",
1318 1319
  "log",
1319 1320
  "regex",
1320 1321
  "thiserror",
1322
+ "tokio 1.10.1",
1323
+ "tokio-util 0.6.7",
1321 1324
 ]
1322 1325
 
1323 1326
 [[package]]
@@ -1392,7 +1395,7 @@ dependencies = [
1392 1395
  "thiserror",
1393 1396
  "tokio 0.2.20",
1394 1397
  "tokio-serial",
1395
- "tokio-util",
1398
+ "tokio-util 0.3.1",
1396 1399
 ]
1397 1400
 
1398 1401
 [[package]]
@@ -2444,6 +2447,20 @@ dependencies = [
2444 2447
  "tokio 0.2.20",
2445 2448
 ]
2446 2449
 
2450
+[[package]]
2451
+name = "tokio-util"
2452
+version = "0.6.7"
2453
+source = "registry+https://github.com/rust-lang/crates.io-index"
2454
+checksum = "1caa0b0c8d94a049db56b5acf8cba99dc0623aab1b26d5b5f5e2d945846b3592"
2455
+dependencies = [
2456
+ "bytes 1.0.1",
2457
+ "futures-core",
2458
+ "futures-sink",
2459
+ "log",
2460
+ "pin-project-lite 0.2.4",
2461
+ "tokio 1.10.1",
2462
+]
2463
+
2447 2464
 [[package]]
2448 2465
 name = "toml"
2449 2466
 version = "0.5.8"

+ 1
- 1
base-station/software/Cargo.toml Datei anzeigen

@@ -15,7 +15,7 @@ nrf24l01-stick = [ "nrf24l01-stick-driver" ]
15 15
 linux-embedded-hal = "0.3"
16 16
 embedded-nrf24l01 = { git = "https://github.com/mgottschlag/embedded-nrf24l01", branch = "wip" }
17 17
 embedded-hal = "0.2.3"
18
-mqtt-protocol = "0.11"
18
+mqtt-protocol = { version = "0.11", features = [ "tokio-codec" ] }
19 19
 env_logger = "0.9"
20 20
 log = "0.4.14"
21 21
 thiserror = "1.0"

+ 4
- 0
base-station/software/default_config.toml Datei anzeigen

@@ -3,3 +3,7 @@ enabled = false
3 3
 # address = "http://localhost:8086"
4 4
 # user = ""
5 5
 # password = ""
6
+
7
+[mqtt]
8
+enabled = false
9
+# address = "http://localhost:1883"

+ 25
- 0
base-station/software/src/config.rs Datei anzeigen

@@ -6,6 +6,7 @@ use serde::Deserialize;
6 6
 #[derive(Deserialize)]
7 7
 pub struct Config {
8 8
     pub influxdb: InfluxDbConfig,
9
+    pub mqtt: MQTTConfig,
9 10
 }
10 11
 
11 12
 impl Config {
@@ -18,6 +19,7 @@ impl Config {
18 19
 
19 20
     fn validate(&self) -> Result<(), ConfigError> {
20 21
         self.influxdb.validate()?;
22
+        self.mqtt.validate()?;
21 23
         Ok(())
22 24
     }
23 25
 }
@@ -56,6 +58,29 @@ impl Default for InfluxDbConfig {
56 58
     }
57 59
 }
58 60
 
61
+#[derive(Clone, Deserialize)]
62
+pub struct MQTTConfig {
63
+    #[serde(default)]
64
+    pub enabled: bool,
65
+    #[serde(default)]
66
+    pub address: String,
67
+}
68
+
69
+impl MQTTConfig {
70
+    fn validate(&self) -> Result<(), ConfigError> {
71
+        Ok(())
72
+    }
73
+}
74
+
75
+impl Default for MQTTConfig {
76
+    fn default() -> MQTTConfig {
77
+        MQTTConfig {
78
+            enabled: false,
79
+            address: "http://localhost:1883".to_owned(),
80
+        }
81
+    }
82
+}
83
+
59 84
 #[derive(thiserror::Error, Debug)]
60 85
 pub enum ConfigError {
61 86
     #[error("I/O error")]

+ 9
- 3
base-station/software/src/main.rs Datei anzeigen

@@ -66,7 +66,11 @@ async fn main() {
66 66
     } else {
67 67
         None
68 68
     };
69
-    let publish = Publish::init();
69
+    let publish = if config.influxdb.enabled {
70
+        Some(Publish::init(&config.mqtt))
71
+    } else {
72
+        None
73
+    };
70 74
 
71 75
     let mut app = Application {
72 76
         radio: Arc::new(Mutex::new(radio)),
@@ -89,7 +93,7 @@ async fn main() {
89 93
 /// Type which implements packet handling and responds to requests from devices.
90 94
 struct Application {
91 95
     radio: Arc<Mutex<Radio>>,
92
-    publish: Arc<Mutex<Publish>>,
96
+    publish: Arc<Mutex<Option<Publish>>>,
93 97
     tsdb: Arc<Mutex<Option<TimeSeriesDatabase>>>,
94 98
     current: CurrentValues,
95 99
     rng: ThreadRng,
@@ -161,7 +165,9 @@ impl Application {
161 165
         let r = report.clone();
162 166
         tokio::spawn(async move {
163 167
             let mut publish = publish.lock().await;
164
-            publish.publish(location, &r.values[0..count]).await;
168
+            if let Some(publish) = publish.as_mut() {
169
+                publish.publish(location, &r.values[0..count]).await;
170
+            }
165 171
         });
166 172
 
167 173
         // Insert values into TSDB.

+ 0
- 86
base-station/software/src/mqtt.rs Datei anzeigen

@@ -1,86 +0,0 @@
1
-use std::io::Write;
2
-use std::net::TcpStream;
3
-use std::thread;
4
-
5
-use log::{error, info};
6
-use mqtt::control::variable_header::ConnectReturnCode;
7
-use mqtt::packet::*;
8
-use mqtt::{Decodable, Encodable, QualityOfService};
9
-use mqtt::{TopicFilter, TopicName};
10
-
11
-use super::Error;
12
-
13
-pub(crate) struct MQTT {
14
-    stream: TcpStream,
15
-}
16
-
17
-impl MQTT {
18
-    pub(crate) fn connect(server_addr: &str) -> Result<MQTT, Error> {
19
-        info!("Connecting to MQTT broker at {}...", server_addr);
20
-        let mut stream = TcpStream::connect(server_addr)?;
21
-
22
-        let mut conn = ConnectPacket::new("MQTT", "72711cd2-faaf-44f0-8490-b92eb063ff4b");
23
-        conn.set_clean_session(true);
24
-        let mut buf = Vec::new();
25
-        conn.encode(&mut buf).unwrap();
26
-        stream.write_all(&buf[..])?;
27
-
28
-        let connack = ConnackPacket::decode(&mut stream).unwrap();
29
-        if connack.connect_return_code() != ConnectReturnCode::ConnectionAccepted {
30
-            panic!(
31
-                "Failed to connect to server, return code {:?}",
32
-                connack.connect_return_code()
33
-            );
34
-        }
35
-        info!("Connected to MQTT broker.");
36
-
37
-        // We need to listen for packets from the server to reply to ping requests.
38
-        // TODO: More protocol handling.
39
-        let mut cloned_stream = stream.try_clone().unwrap();
40
-        thread::spawn(move || {
41
-            loop {
42
-                let packet = match VariablePacket::decode(&mut cloned_stream) {
43
-                    Ok(pk) => pk,
44
-                    Err(err) => {
45
-                        error!("could not decode received packet: {:?}", err);
46
-                        continue;
47
-                    }
48
-                };
49
-
50
-                match packet {
51
-                    VariablePacket::PingreqPacket(..) => {
52
-                        let pingresp = PingrespPacket::new();
53
-                        info!("Sending Ping response {:?}", pingresp);
54
-                        pingresp.encode(&mut cloned_stream).unwrap();
55
-                    }
56
-                    VariablePacket::DisconnectPacket(..) => {
57
-                        // Nothing to do here, the other end closes the connection.
58
-                        break;
59
-                    }
60
-                    _ => {}
61
-                }
62
-            }
63
-        });
64
-
65
-        Ok(MQTT { stream })
66
-    }
67
-
68
-    pub(crate) fn publish(&mut self, topic: &str, value: &str) -> Result<(), Error> {
69
-        // Publish the value.
70
-        let topic = TopicName::new(topic).unwrap();
71
-        let publish_packet = PublishPacket::new(topic, QoSWithPacketIdentifier::Level0, value);
72
-        let mut buf = Vec::new();
73
-        publish_packet.encode(&mut buf).unwrap();
74
-        self.stream.write_all(&buf[..])?;
75
-
76
-        // Check for any packets from the server to see whether we were disconnected.
77
-        // TODO
78
-        Ok(())
79
-    }
80
-}
81
-
82
-impl Drop for MQTT {
83
-    fn drop(&mut self) {
84
-        // TODO: Stop the receive thread.
85
-    }
86
-}

+ 212
- 8
base-station/software/src/publish.rs Datei anzeigen

@@ -1,12 +1,32 @@
1 1
 //! Interface to a publish values via MQTT.
2 2
 
3
-use protocol::{Location, Value};
3
+use std::collections::HashMap;
4
+use std::mem;
5
+use std::ops::DerefMut;
6
+use std::sync::Arc;
7
+use std::time::Duration;
8
+
9
+use log::{error, info};
10
+use mqtt::control::variable_header::ConnectReturnCode;
11
+use mqtt::packet::*;
12
+use mqtt::{Encodable, TopicName};
13
+use tokio::io::AsyncWriteExt;
14
+use tokio::net::TcpStream;
15
+use tokio::sync::{Mutex, Notify};
16
+use tokio::time::sleep;
4 17
 
5
-// TODO: Configuration mechanism.
18
+use super::config::MQTTConfig;
19
+use protocol::{Location, Value};
6 20
 
7 21
 /// Interface to MQTT.
8 22
 pub struct Publish {
9
-    // TODO
23
+    // We only want to send the latest values if we do not have a connection, so we simply buffer
24
+    // the values until the sender thread collects them.
25
+    to_be_published: Arc<Mutex<HashMap<Location, Vec<Value>>>>,
26
+    trigger: Arc<Notify>,
27
+    /*config: MQTTConfig,
28
+    //publish_send: UnboundedSender<(Location, Vec<Value>)>,
29
+    stream: Option<OwnedWriteHalf>,*/
10 30
 }
11 31
 
12 32
 impl Publish {
@@ -15,9 +35,184 @@ impl Publish {
15 35
     /// The function does not return an error. Instead, the code will automatically retry
16 36
     /// connection in case the connection cannot be established or the server closes the
17 37
     /// connection.
18
-    pub fn init() -> Publish {
19
-        // TODO
20
-        Publish {}
38
+    pub fn init(config: &MQTTConfig) -> Publish {
39
+        let to_be_published = Arc::new(Mutex::new(HashMap::new()));
40
+        let to_be_published2 = to_be_published.clone();
41
+        let trigger = Arc::new(Notify::new());
42
+        let trigger2 = trigger.clone();
43
+        let config = config.clone();
44
+
45
+        tokio::spawn(async move {
46
+            let mut currently_sending = HashMap::new();
47
+            loop {
48
+                // When we are not connected, try to connect.
49
+                let stream = match Self::connect(&config).await {
50
+                    Some(stream) => stream,
51
+                    None => {
52
+                        sleep(Duration::from_millis(500)).await;
53
+                        continue;
54
+                    }
55
+                };
56
+                let (mut read, write) = stream.into_split();
57
+                let write = Arc::new(Mutex::new(write));
58
+
59
+                // When we are connected, listen for ping packets and send a response.
60
+                {
61
+                    let write = write.clone();
62
+                    tokio::spawn(async move {
63
+                        loop {
64
+                            let packet = match VariablePacket::parse(&mut read).await {
65
+                                Ok(packet) => packet,
66
+                                Err(e) => {
67
+                                    error!("could not receive MQTT packet: {:?}", e);
68
+                                    return;
69
+                                }
70
+                            };
71
+
72
+                            match packet {
73
+                                VariablePacket::PingreqPacket(..) => {
74
+                                    let pingresp = PingrespPacket::new();
75
+                                    let mut data = Vec::new();
76
+                                    pingresp.encode(&mut data).unwrap();
77
+                                    let mut write = write.lock().await;
78
+                                    if let Err(e) = write.write_all(&data[..]).await {
79
+                                        error!("could not reply to MQTT ping packet: {:?}", e);
80
+                                        return;
81
+                                    }
82
+                                }
83
+                                VariablePacket::DisconnectPacket(..) => {
84
+                                    // Nothing to do here, the other end closes the connection.
85
+                                    error!("MQTT server closed the connection.");
86
+                                    return;
87
+                                }
88
+                                _ => {}
89
+                            }
90
+                        }
91
+                    });
92
+                }
93
+
94
+                // Whenever we receive values to be sent, send them until the server disconnects or
95
+                // until we are told to shut down.
96
+                loop {
97
+                    // Fetch values to be sent.
98
+                    let to_be_published = {
99
+                        let mut new = HashMap::<Location, Vec<Value>>::new();
100
+                        mem::swap(&mut new, to_be_published2.lock().await.deref_mut());
101
+                        new
102
+                    };
103
+                    for (location, values) in to_be_published {
104
+                        // TODO: Merge values.
105
+                        currently_sending.insert(location, values);
106
+                    }
107
+
108
+                    // If there is nothing else to send, wait and try again.
109
+                    if currently_sending.len() == 0 {
110
+                        trigger2.notified().await;
111
+                        continue;
112
+                    }
113
+
114
+                    // Publish one value, then start again.
115
+                    let first_location = *currently_sending.keys().next().unwrap();
116
+                    let count = currently_sending[&first_location].len();
117
+                    let first_value = currently_sending[&first_location][count - 1];
118
+                    let (label, value) = match first_value {
119
+                        Value::Temperature(temperature) => {
120
+                            let temperature =
121
+                                temperature as f32 / first_value.decimal_factor() as f32;
122
+                            ("temperature", temperature)
123
+                        }
124
+                        Value::Pressure(pressure) => {
125
+                            let pressure = pressure as f32 / first_value.decimal_factor() as f32;
126
+                            ("pressure", pressure)
127
+                        }
128
+                        Value::Humidity(humidity) => {
129
+                            let humidity = humidity as f32 / first_value.decimal_factor() as f32;
130
+                            ("humidity", humidity)
131
+                        }
132
+                        _ => {
133
+                            // Invalid value, remove it from the queue and try again.
134
+                            currently_sending.get_mut(&first_location).unwrap().pop();
135
+                            if currently_sending[&first_location].len() == 0 {
136
+                                currently_sending.remove(&first_location);
137
+                            }
138
+                            continue;
139
+                        }
140
+                    };
141
+                    let topic = format!(
142
+                        "smarthome/{}/{}",
143
+                        first_location.to_str().to_lowercase(),
144
+                        label
145
+                    );
146
+                    let topic = TopicName::new(topic).unwrap();
147
+                    let publish_packet = PublishPacket::new(
148
+                        topic,
149
+                        QoSWithPacketIdentifier::Level0,
150
+                        format!("{}", value),
151
+                    );
152
+                    let mut buf = Vec::new();
153
+                    publish_packet.encode(&mut buf).unwrap();
154
+                    if let Err(e) = write.lock().await.write_all(&buf[..]).await {
155
+                        error!("could not send value to MQTT: {:?}", e);
156
+                        break;
157
+                    } else {
158
+                        // The value was sent, remove it from the queue.
159
+                        currently_sending.get_mut(&first_location).unwrap().pop();
160
+                        if currently_sending[&first_location].len() == 0 {
161
+                            currently_sending.remove(&first_location);
162
+                        }
163
+                    }
164
+                }
165
+            }
166
+        });
167
+        Publish {
168
+            to_be_published,
169
+            trigger,
170
+        }
171
+    }
172
+
173
+    async fn connect(config: &MQTTConfig) -> Option<TcpStream> {
174
+        info!("Connecting to MQTT broker...");
175
+        let mut stream = match TcpStream::connect(&config.address).await {
176
+            Ok(stream) => stream,
177
+            Err(e) => {
178
+                error!("Failed to connect to the MQTT broker: {:?}", e);
179
+                return None;
180
+            }
181
+        };
182
+
183
+        let mut conn = ConnectPacket::new("72711cd2-faaf-44f0-8490-b92eb063ff4b");
184
+        conn.set_clean_session(true);
185
+        let mut buf = Vec::new();
186
+        conn.encode(&mut buf).unwrap();
187
+        match stream.write_all(&buf[..]).await {
188
+            Ok(stream) => stream,
189
+            Err(e) => {
190
+                error!("Failed to send MQTT connect packet: {:?}", e);
191
+                return None;
192
+            }
193
+        };
194
+
195
+        let connack = match VariablePacket::parse(&mut stream).await {
196
+            Ok(VariablePacket::ConnackPacket(connack)) => connack,
197
+            Ok(_) => {
198
+                error!("Received wrong MQTT packet, expected CONNACK!");
199
+                return None;
200
+            }
201
+            Err(e) => {
202
+                error!("Failed to receive MQTT CONNACK packet: {:?}", e);
203
+                return None;
204
+            }
205
+        };
206
+        if connack.connect_return_code() != ConnectReturnCode::ConnectionAccepted {
207
+            error!(
208
+                "Failed to connect to server, return code {:?}",
209
+                connack.connect_return_code()
210
+            );
211
+            return None;
212
+        }
213
+
214
+        info!("Connected to MQTT broker.");
215
+        Some(stream)
21 216
     }
22 217
 
23 218
     /// Publishes values received from a device.
@@ -30,7 +225,16 @@ impl Publish {
30 225
     ///
31 226
     /// * `location` - the location of the device
32 227
     /// * `values` - a list of values to be published
33
-    pub async fn publish(&mut self, _location: Location, _values: &[Value]) {
34
-        // TODO
228
+    pub async fn publish(&mut self, location: Location, values: &[Value]) {
229
+        let mut buffer = self.to_be_published.lock().await;
230
+        // TODO: Merge the values?
231
+        *buffer.entry(location).or_insert(Vec::new()) = values.to_vec();
232
+        self.trigger.notify_one();
233
+    }
234
+}
235
+
236
+impl Drop for Publish {
237
+    fn drop(&mut self) {
238
+        // TODO: Stop the thread.
35 239
     }
36 240
 }

Laden…
Abbrechen
Speichern