Browse Source

base-station: Broadcast weather data via MQTT.

Mathias Gottschlag 5 years ago
parent
commit
bb01d6b5d6
2 changed files with 56 additions and 16 deletions
  1. 40
    9
      base-station/software/src/main.rs
  2. 16
    7
      base-station/software/src/radio.rs

+ 40
- 9
base-station/software/src/main.rs View File

2
 use std::io;
2
 use std::io;
3
 use std::thread::sleep;
3
 use std::thread::sleep;
4
 use std::time::Duration;
4
 use std::time::Duration;
5
+use std::sync::mpsc;
5
 
6
 
6
 use log::error;
7
 use log::error;
7
 use ::mqtt::control::variable_header::ConnectReturnCode;
8
 use ::mqtt::control::variable_header::ConnectReturnCode;
19
     );
20
     );
20
     env_logger::init();
21
     env_logger::init();
21
 
22
 
22
-    radio::start();
23
+    let (sensor_send, sensor_recv) = mpsc::channel();
24
+
25
+    radio::start(sensor_send);
23
 
26
 
24
     // Whenever the MQTT connection returns an error, we wait for a couple of seconds and then try
27
     // Whenever the MQTT connection returns an error, we wait for a couple of seconds and then try
25
     // to reconnect.
28
     // to reconnect.
26
     loop {
29
     loop {
27
         // TODO: Pass events from ratio to mqtt connection.
30
         // TODO: Pass events from ratio to mqtt connection.
28
-        if let Err(e) = run_mqtt_connection() {
31
+        if let Err(e) = run_mqtt_connection(&sensor_recv) {
29
             error!("MQTT error: {:?}", e);
32
             error!("MQTT error: {:?}", e);
30
         }
33
         }
31
         sleep(Duration::from_secs(3));
34
         sleep(Duration::from_secs(3));
32
     }
35
     }
33
 }
36
 }
34
 
37
 
35
-fn run_mqtt_connection() -> Result<(), Error> {
38
+fn run_mqtt_connection(updates: &mpsc::Receiver<SensorUpdate>) -> Result<(), Error> {
36
     let mut m = MQTT::connect("127.0.0.1:1883")?;
39
     let mut m = MQTT::connect("127.0.0.1:1883")?;
37
 
40
 
38
-    let mut counter = 0;
39
     loop {
41
     loop {
40
-        // TODO: Real values
41
-        m.publish("gottschlag/livingroom/temperature", &format!("{}", counter))?;
42
-
43
-        sleep(Duration::from_secs(3));
44
-        counter += 1;
42
+        let update = updates.recv().unwrap();
43
+        let location = match update.location {
44
+            0 => "livingroom",
45
+            _ => {
46
+                error!("Unknown location in sensor update.");
47
+                continue;
48
+            },
49
+        };
50
+        for value in update.data.iter() {
51
+            match value {
52
+                SensorData::Temperature(value) => {
53
+                    m.publish(&format!("gottschlag/{}/temperature", location), &format!("{}", value))?;
54
+                },
55
+                SensorData::Humidity(value) => {
56
+                    m.publish(&format!("gottschlag/{}/humidity", location), &format!("{}", value))?;
57
+                },
58
+                SensorData::Pressure(value) => {
59
+                    m.publish(&format!("gottschlag/{}/pressure", location), &format!("{}", value))?;
60
+                }
61
+            }
62
+        }
45
     }
63
     }
46
 }
64
 }
47
 
65
 
66
+pub struct SensorUpdate {
67
+    location: u32,
68
+    data: Vec<SensorData>,
69
+}
70
+
71
+pub enum SensorData {
72
+    Temperature(f32),
73
+    Humidity(f32),
74
+    Pressure(f32),
75
+}
76
+
77
+
78
+
48
 #[derive(thiserror::Error, Debug)]
79
 #[derive(thiserror::Error, Debug)]
49
 pub enum Error {
80
 pub enum Error {
50
     #[error("I/O error")]
81
     #[error("I/O error")]

+ 16
- 7
base-station/software/src/radio.rs View File

1
 use std::convert::TryInto;
1
 use std::convert::TryInto;
2
 use std::thread;
2
 use std::thread;
3
 use std::time::{Duration, Instant};
3
 use std::time::{Duration, Instant};
4
+use std::sync::mpsc;
4
 
5
 
5
 use embedded_nrf24l01::{Configuration, CrcMode, DataRate, NRF24L01};
6
 use embedded_nrf24l01::{Configuration, CrcMode, DataRate, NRF24L01};
6
 use linux_embedded_hal::spidev::{SpiModeFlags, Spidev, SpidevOptions};
7
 use linux_embedded_hal::spidev::{SpiModeFlags, Spidev, SpidevOptions};
10
 
11
 
11
 use crate::spi::EmbeddedHalSpidev;
12
 use crate::spi::EmbeddedHalSpidev;
12
 use crate::Error;
13
 use crate::Error;
14
+use crate::{SensorUpdate, SensorData};
13
 
15
 
14
-pub fn start() {
16
+pub fn start(updates: mpsc::Sender<SensorUpdate>) {
15
     // TODO: Channels for sensor readings.
17
     // TODO: Channels for sensor readings.
16
     thread::spawn(move || {
18
     thread::spawn(move || {
17
         // Whenever a error occurs, reinitialize the radio module shortly later.
19
         // Whenever a error occurs, reinitialize the radio module shortly later.
18
         loop {
20
         loop {
19
-            match radio_thread() {
21
+            match radio_thread(&updates) {
20
                 Ok(()) => {}
22
                 Ok(()) => {}
21
                 Err(e) => {
23
                 Err(e) => {
22
                     error!("Radio error: {:?}", e);
24
                     error!("Radio error: {:?}", e);
27
     });
29
     });
28
 }
30
 }
29
 
31
 
30
-fn radio_thread() -> Result<(), Error> {
32
+fn radio_thread(updates: &mpsc::Sender<SensorUpdate>) -> Result<(), Error> {
31
     info!("Initializing radio...");
33
     info!("Initializing radio...");
32
 
34
 
33
     // The NRF module is connected as follows:
35
     // The NRF module is connected as follows:
90
     loop {
92
     loop {
91
         if let Some(pipe) = nrf24.can_read().unwrap() {
93
         if let Some(pipe) = nrf24.can_read().unwrap() {
92
             let payload = nrf24.read().unwrap();
94
             let payload = nrf24.read().unwrap();
93
-            info!("packet received on pipe {}: {:?}", pipe, payload.as_ref());
95
+            info!("packet received on pipe {}: {:?}, {}", pipe, payload.as_ref(), payload.len());
94
             let pressure = u32::from_le_bytes(payload.as_ref()[0..4].try_into().unwrap());
96
             let pressure = u32::from_le_bytes(payload.as_ref()[0..4].try_into().unwrap());
95
             let temperature = u32::from_le_bytes(payload.as_ref()[4..8].try_into().unwrap());
97
             let temperature = u32::from_le_bytes(payload.as_ref()[4..8].try_into().unwrap());
96
             let humidity = u32::from_le_bytes(payload.as_ref()[8..12].try_into().unwrap());
98
             let humidity = u32::from_le_bytes(payload.as_ref()[8..12].try_into().unwrap());
97
-            info!("pressure: {}", pressure as f32 / 100.0);
98
-            info!("temperature: {}", temperature as f32 / 100.0);
99
-            info!("humidity: {}", humidity as f32 / 1024.0);
99
+            let pressure = pressure as f32 / 100.0;
100
+            let temperature = temperature as f32 / 100.0;
101
+            let humidity = humidity as f32 / 1024.0;
102
+            info!("pressure: {}", pressure);
103
+            info!("temperature: {}", temperature);
104
+            info!("humidity: {}", humidity);
105
+            updates.send(SensorUpdate{
106
+                location: 0,
107
+                data: vec![SensorData::Temperature(temperature), SensorData::Pressure(pressure), SensorData::Humidity(humidity)],
108
+            }).unwrap();
100
             let end = Instant::now();
109
             let end = Instant::now();
101
             let elapsed = end.duration_since(start);
110
             let elapsed = end.duration_since(start);
102
             info!("Debug: {:?}", elapsed);
111
             info!("Debug: {:?}", elapsed);

Loading…
Cancel
Save