Browse Source

base-station: Add code to publish data to MQTT.

Mathias Gottschlag 5 years ago
parent
commit
c78d3da2e6

+ 56
- 0
base-station/software/Cargo.lock View File

30
  "linux-embedded-hal",
30
  "linux-embedded-hal",
31
  "log",
31
  "log",
32
  "mqtt-protocol",
32
  "mqtt-protocol",
33
+ "thiserror",
33
 ]
34
 ]
34
 
35
 
35
 [[package]]
36
 [[package]]
216
  "void",
217
  "void",
217
 ]
218
 ]
218
 
219
 
220
+[[package]]
221
+name = "proc-macro2"
222
+version = "1.0.10"
223
+source = "registry+https://github.com/rust-lang/crates.io-index"
224
+checksum = "df246d292ff63439fea9bc8c0a270bed0e390d5ebd4db4ba15aba81111b5abe3"
225
+dependencies = [
226
+ "unicode-xid",
227
+]
228
+
219
 [[package]]
229
 [[package]]
220
 name = "quick-error"
230
 name = "quick-error"
221
 version = "1.2.3"
231
 version = "1.2.3"
222
 source = "registry+https://github.com/rust-lang/crates.io-index"
232
 source = "registry+https://github.com/rust-lang/crates.io-index"
223
 checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
233
 checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
224
 
234
 
235
+[[package]]
236
+name = "quote"
237
+version = "1.0.3"
238
+source = "registry+https://github.com/rust-lang/crates.io-index"
239
+checksum = "2bdc6c187c65bca4260c9011c9e3132efe4909da44726bad24cf7572ae338d7f"
240
+dependencies = [
241
+ "proc-macro2",
242
+]
243
+
225
 [[package]]
244
 [[package]]
226
 name = "regex"
245
 name = "regex"
227
 version = "1.3.7"
246
 version = "1.3.7"
296
  "nix",
315
  "nix",
297
 ]
316
 ]
298
 
317
 
318
+[[package]]
319
+name = "syn"
320
+version = "1.0.17"
321
+source = "registry+https://github.com/rust-lang/crates.io-index"
322
+checksum = "0df0eb663f387145cab623dea85b09c2c5b4b0aef44e945d928e682fce71bb03"
323
+dependencies = [
324
+ "proc-macro2",
325
+ "quote",
326
+ "unicode-xid",
327
+]
328
+
299
 [[package]]
329
 [[package]]
300
 name = "sysfs_gpio"
330
 name = "sysfs_gpio"
301
 version = "0.5.4"
331
 version = "0.5.4"
323
  "libc",
353
  "libc",
324
 ]
354
 ]
325
 
355
 
356
+[[package]]
357
+name = "thiserror"
358
+version = "1.0.15"
359
+source = "registry+https://github.com/rust-lang/crates.io-index"
360
+checksum = "54b3d3d2ff68104100ab257bb6bb0cb26c901abe4bd4ba15961f3bf867924012"
361
+dependencies = [
362
+ "thiserror-impl",
363
+]
364
+
365
+[[package]]
366
+name = "thiserror-impl"
367
+version = "1.0.15"
368
+source = "registry+https://github.com/rust-lang/crates.io-index"
369
+checksum = "ca972988113b7715266f91250ddb98070d033c62a011fa0fcc57434a649310dd"
370
+dependencies = [
371
+ "proc-macro2",
372
+ "quote",
373
+ "syn",
374
+]
375
+
326
 [[package]]
376
 [[package]]
327
 name = "thread_local"
377
 name = "thread_local"
328
 version = "1.0.1"
378
 version = "1.0.1"
332
  "lazy_static",
382
  "lazy_static",
333
 ]
383
 ]
334
 
384
 
385
+[[package]]
386
+name = "unicode-xid"
387
+version = "0.2.0"
388
+source = "registry+https://github.com/rust-lang/crates.io-index"
389
+checksum = "826e7639553986605ec5979c7dd957c7895e93eabed50ab2ffa7f6128a75097c"
390
+
335
 [[package]]
391
 [[package]]
336
 name = "void"
392
 name = "void"
337
 version = "1.0.2"
393
 version = "1.0.2"

+ 1
- 0
base-station/software/Cargo.toml View File

13
 mqtt-protocol = "0.8"
13
 mqtt-protocol = "0.8"
14
 env_logger = "0.7"
14
 env_logger = "0.7"
15
 log = "0.4"
15
 log = "0.4"
16
+thiserror = "1.0"

+ 44
- 3
base-station/software/src/main.rs View File

1
 use std::env;
1
 use std::env;
2
-use std::thread;
2
+use std::io;
3
+use std::thread::sleep;
3
 use std::time::Duration;
4
 use std::time::Duration;
4
 
5
 
6
+use log::error;
7
+use ::mqtt::control::variable_header::ConnectReturnCode;
8
+
9
+use crate::mqtt::MQTT;
10
+
11
+mod mqtt;
5
 mod radio;
12
 mod radio;
6
 mod spi;
13
 mod spi;
7
 
14
 
14
 
21
 
15
     radio::start();
22
     radio::start();
16
 
23
 
17
-    // TODO
24
+    // Whenever the MQTT connection returns an error, we wait for a couple of seconds and then try
25
+    // to reconnect.
26
+    loop {
27
+        // TODO: Pass events from ratio to mqtt connection.
28
+        if let Err(e) = run_mqtt_connection() {
29
+            error!("MQTT error: {:?}", e);
30
+        }
31
+        sleep(Duration::from_secs(3));
32
+    }
33
+}
34
+
35
+fn run_mqtt_connection() -> Result<(), Error> {
36
+    let mut m = MQTT::connect("127.0.0.1:1883")?;
37
+
38
+    let mut counter = 0;
18
     loop {
39
     loop {
19
-        thread::sleep(Duration::from_secs(10));
40
+        // TODO: Real values
41
+        m.publish("gottschlag/livingroom/temperature", &format!("{}", counter))?;
42
+
43
+        sleep(Duration::from_secs(3));
44
+        counter += 1;
45
+    }
46
+}
47
+
48
+#[derive(thiserror::Error, Debug)]
49
+pub enum Error {
50
+    #[error("I/O error")]
51
+    Io(#[from] io::Error),
52
+    #[error("connection refused, return code {0:?}")]
53
+    ConnectionRefused(ConnectReturnCode),
54
+    #[error("radio error: {0:?}")]
55
+    Radio(embedded_nrf24l01::Error<std::io::Error>)
56
+}
57
+
58
+impl From<embedded_nrf24l01::Error<std::io::Error>> for Error {
59
+    fn from(e: embedded_nrf24l01::Error<std::io::Error>) -> Self {
60
+        Error::Radio(e)
20
     }
61
     }
21
 }
62
 }

+ 88
- 0
base-station/software/src/mqtt.rs View File

1
+use std::io::Write;
2
+use std::net::TcpStream;
3
+use std::thread;
4
+
5
+use log::{info, error};
6
+use mqtt::control::variable_header::ConnectReturnCode;
7
+use mqtt::packet::*;
8
+use mqtt::{Decodable, Encodable, QualityOfService};
9
+use mqtt::{TopicFilter, TopicName};
10
+
11
+use super::Error;
12
+
13
+pub struct MQTT {
14
+    stream: TcpStream,
15
+}
16
+
17
+impl MQTT {
18
+    pub fn connect(server_addr: &str) -> Result<MQTT, Error> {
19
+        info!("Connecting to MQTT broker at {}...", server_addr);
20
+        let mut stream = TcpStream::connect(server_addr)?;
21
+
22
+        let mut conn = ConnectPacket::new("MQTT", "72711cd2-faaf-44f0-8490-b92eb063ff4b");
23
+        conn.set_clean_session(true);
24
+        let mut buf = Vec::new();
25
+        conn.encode(&mut buf).unwrap();
26
+        stream.write_all(&buf[..])?;
27
+
28
+        let connack = ConnackPacket::decode(&mut stream).unwrap();
29
+        if connack.connect_return_code() != ConnectReturnCode::ConnectionAccepted {
30
+            panic!(
31
+                "Failed to connect to server, return code {:?}",
32
+                connack.connect_return_code()
33
+            );
34
+        }
35
+        info!("Connected to MQTT broker.");
36
+
37
+        // We need to listen for packets from the server to reply to ping requests.
38
+        // TODO: More protocol handling.
39
+        let mut cloned_stream = stream.try_clone().unwrap();
40
+        thread::spawn(move || {
41
+            loop {
42
+                let packet = match VariablePacket::decode(&mut cloned_stream) {
43
+                    Ok(pk) => pk,
44
+                    Err(err) => {
45
+                        error!("could not decode received packet: {:?}", err);
46
+                        continue;
47
+                    }
48
+                };
49
+
50
+                match packet {
51
+                    VariablePacket::PingreqPacket(..) => {
52
+                        let pingresp = PingrespPacket::new();
53
+                        info!("Sending Ping response {:?}", pingresp);
54
+                        pingresp.encode(&mut cloned_stream).unwrap();
55
+                    }
56
+                    VariablePacket::DisconnectPacket(..) => {
57
+                        // Nothing to do here, the other end closes the connection.
58
+                        break;
59
+                    }
60
+                    _ => {},
61
+                }
62
+            }
63
+        });
64
+
65
+        Ok(MQTT{
66
+            stream
67
+        })
68
+    }
69
+
70
+    pub fn publish(&mut self, topic: &str, value: &str) -> Result<(), Error> {
71
+        // Publish the value.
72
+        let topic = TopicName::new(topic).unwrap();
73
+        let publish_packet = PublishPacket::new(topic, QoSWithPacketIdentifier::Level0, value);
74
+        let mut buf = Vec::new();
75
+        publish_packet.encode(&mut buf).unwrap();
76
+        self.stream.write_all(&buf[..])?;
77
+
78
+        // Check for any packets from the server to see whether we were disconnected.
79
+        // TODO
80
+        Ok(())
81
+    }
82
+}
83
+
84
+impl Drop for MQTT {
85
+    fn drop(&mut self) {
86
+        // TODO: Stop the receive thread.
87
+    }
88
+}

+ 8
- 8
base-station/software/src/radio.rs View File

1
 use std::convert::TryInto;
1
 use std::convert::TryInto;
2
-use std::io;
3
 use std::thread;
2
 use std::thread;
4
 use std::time::{Duration, Instant};
3
 use std::time::{Duration, Instant};
5
 
4
 
10
 use log::{error, info};
9
 use log::{error, info};
11
 
10
 
12
 use crate::spi::EmbeddedHalSpidev;
11
 use crate::spi::EmbeddedHalSpidev;
12
+use crate::Error;
13
 
13
 
14
 pub fn start() {
14
 pub fn start() {
15
     // TODO: Channels for sensor readings.
15
     // TODO: Channels for sensor readings.
27
     });
27
     });
28
 }
28
 }
29
 
29
 
30
-fn radio_thread() -> Result<(), io::Error> {
30
+fn radio_thread() -> Result<(), Error> {
31
     info!("Initializing radio...");
31
     info!("Initializing radio...");
32
 
32
 
33
     // The NRF module is connected as follows:
33
     // The NRF module is connected as follows:
63
     irq.set_direction(Direction::In).unwrap();
63
     irq.set_direction(Direction::In).unwrap();
64
 
64
 
65
     // Initialize the radio module.
65
     // Initialize the radio module.
66
-    let mut nrf24 = NRF24L01::new(ce, cs, spi).unwrap();
67
-    nrf24.set_frequency(0x32).unwrap();
68
-    nrf24.set_rf(DataRate::R2Mbps, 3).unwrap();
69
-    nrf24.set_crc(Some(CrcMode::OneByte)).unwrap();
70
-    nrf24.set_auto_retransmit(250, 3).unwrap();
66
+    let mut nrf24 = NRF24L01::new(ce, cs, spi)?;
67
+    nrf24.set_frequency(0x32)?;
68
+    nrf24.set_rf(DataRate::R2Mbps, 3)?;
69
+    nrf24.set_crc(Some(CrcMode::OneByte))?;
70
+    nrf24.set_auto_retransmit(250, 3)?;
71
     nrf24
71
     nrf24
72
         .set_pipes_rx_enable(&[true, false, false, false, false, false])
72
         .set_pipes_rx_enable(&[true, false, false, false, false, false])
73
-        .unwrap();
73
+        ?;
74
     nrf24
74
     nrf24
75
         .set_rx_addr(0, &[0x56, 0x34, 0x12, 0x00, 0x00])
75
         .set_rx_addr(0, &[0x56, 0x34, 0x12, 0x00, 0x00])
76
         .unwrap();
76
         .unwrap();

Loading…
Cancel
Save