Selaa lähdekoodia

Implement InfluxDB support and reading the configuration from a file.

Mathias Gottschlag 4 vuotta sitten
vanhempi
commit
8fc44d9803

+ 1397
- 43
base-station/software/Cargo.lock
File diff suppressed because it is too large
Näytä tiedosto


+ 4
- 0
base-station/software/Cargo.toml Näytä tiedosto

@@ -29,6 +29,10 @@ nb = "0.1.2"
29 29
 gpio-cdev = { git = "https://github.com/mgottschlag/gpio-cdev", branch = "tokio", features = [ "async-tokio" ], optional = true }
30 30
 chrono = "0.4.11"
31 31
 nrf24l01-stick-driver = { git = "https://github.com/mgottschlag/nrf24l01-stick", optional = true }
32
+influxdb = { version = "0.3", features = [ "derive" ] }
33
+serde = { version = "1.0.119", features = ["derive"] }
34
+toml = "0.5.8"
35
+structopt = "0.3"
32 36
 
33 37
 [patch.crates-io]
34 38
 sysfs_gpio = { git = "https://github.com/mgottschlag/rust-sysfs-gpio", branch = "new-futures" }

+ 5
- 0
base-station/software/default_config.toml Näytä tiedosto

@@ -0,0 +1,5 @@
1
+[influxdb]
2
+enabled = false
3
+# address = "http://localhost:8086"
4
+# user = ""
5
+# password = ""

+ 72
- 0
base-station/software/src/config.rs Näytä tiedosto

@@ -0,0 +1,72 @@
1
+use std::fs;
2
+use std::path::Path;
3
+
4
+use serde::Deserialize;
5
+
6
+#[derive(Deserialize)]
7
+pub struct Config {
8
+    pub influxdb: InfluxDbConfig,
9
+}
10
+
11
+impl Config {
12
+    pub fn load<P: AsRef<Path>>(path: P) -> Result<Config, ConfigError> {
13
+        let file = fs::read(path)?;
14
+        let config = toml::from_slice::<Config>(&file)?;
15
+        config.validate()?;
16
+        Ok(config)
17
+    }
18
+
19
+    fn validate(&self) -> Result<(), ConfigError> {
20
+        self.influxdb.validate()?;
21
+        Ok(())
22
+    }
23
+}
24
+
25
+#[derive(Deserialize)]
26
+pub struct InfluxDbConfig {
27
+    #[serde(default)]
28
+    pub enabled: bool,
29
+    #[serde(default)]
30
+    pub address: String,
31
+    #[serde(default)]
32
+    pub user: String,
33
+    #[serde(default)]
34
+    pub password: String,
35
+}
36
+
37
+impl InfluxDbConfig {
38
+    fn validate(&self) -> Result<(), ConfigError> {
39
+        if (self.user != "") != (self.password != "") {
40
+            return Err(ConfigError::Settings(
41
+                "need to specify both InfluxDB user and password".to_owned(),
42
+            ));
43
+        }
44
+        Ok(())
45
+    }
46
+}
47
+
48
+impl Default for InfluxDbConfig {
49
+    fn default() -> InfluxDbConfig {
50
+        InfluxDbConfig {
51
+            enabled: false,
52
+            address: "http://localhost:8086".to_owned(),
53
+            user: "".to_owned(),
54
+            password: "".to_owned(),
55
+        }
56
+    }
57
+}
58
+
59
+#[derive(thiserror::Error, Debug)]
60
+pub enum ConfigError {
61
+    #[error("I/O error")]
62
+    Io(#[from] std::io::Error),
63
+    #[error("Parser error")]
64
+    Parser(#[from] toml::de::Error),
65
+    #[error("Invalid settings")]
66
+    Settings(String),
67
+}
68
+
69
+fn default_disabled() -> bool {
70
+    // https://github.com/serde-rs/serde/issues/1030
71
+    true
72
+}

+ 39
- 15
base-station/software/src/main.rs Näytä tiedosto

@@ -6,21 +6,26 @@
6 6
 compile_error!("Select an SPI implementation via the corresponding cargo feature!");
7 7
 
8 8
 use std::env;
9
+use std::path::PathBuf;
9 10
 use std::sync::Arc;
10
-use std::time::{Duration, Instant};
11
+use std::time::Duration;
11 12
 
12 13
 use chrono::offset::Local;
14
+use chrono::Utc;
13 15
 use log::{error, info};
14
-use protocol::{GetValues, Packet, Report, Value, ValueType, Values};
15 16
 use rand::{rngs::ThreadRng, Rng};
17
+use structopt::StructOpt;
16 18
 use tokio::sync::Mutex;
17 19
 use tokio::time::delay_for;
18 20
 
21
+use config::Config;
19 22
 use current::CurrentValues;
23
+use protocol::{GetValues, Packet, Report, Value, ValueType, Values};
20 24
 use publish::Publish;
21
-use radio::{Radio, RadioConfig};
25
+use radio::Radio;
22 26
 use tsdb::TimeSeriesDatabase;
23 27
 
28
+mod config;
24 29
 mod current;
25 30
 mod publish;
26 31
 mod radio;
@@ -28,6 +33,19 @@ mod radio;
28 33
 mod spi;
29 34
 mod tsdb;
30 35
 
36
+#[derive(Debug, StructOpt)]
37
+#[structopt(name = "base-station", about = "Smart home base station.")]
38
+struct Options {
39
+    /// Config file
40
+    #[structopt(
41
+        short,
42
+        long,
43
+        default_value = "/etc/smart-home.toml",
44
+        parse(from_os_str)
45
+    )]
46
+    config: PathBuf,
47
+}
48
+
31 49
 #[tokio::main]
32 50
 async fn main() {
33 51
     env::set_var(
@@ -36,14 +54,18 @@ async fn main() {
36 54
     );
37 55
     env_logger::init();
38 56
 
57
+    // Load the configuration.
58
+    let options = Options::from_args();
59
+    let config = Config::load(options.config)
60
+        .expect("could not open config file - did you forget to pass \"-c\"?");
61
+
39 62
     // Initialize radio, MQTT and InfluxDB.
40
-    let radio_config = RadioConfig{
41
-        // TODO: Configure pins.
63
+    let (radio, mut incoming_packets) = Radio::init().await.expect("radio initialization failed");
64
+    let tsdb = if config.influxdb.enabled {
65
+        Some(TimeSeriesDatabase::init(&config.influxdb))
66
+    } else {
67
+        None
42 68
     };
43
-    let (radio, mut incoming_packets) = Radio::init(radio_config)
44
-        .await
45
-        .expect("radio initialization failed");
46
-    let tsdb = TimeSeriesDatabase::init();
47 69
     let publish = Publish::init();
48 70
 
49 71
     let mut app = Application {
@@ -68,7 +90,7 @@ async fn main() {
68 90
 struct Application {
69 91
     radio: Arc<Mutex<Radio>>,
70 92
     publish: Arc<Mutex<Publish>>,
71
-    tsdb: Arc<Mutex<TimeSeriesDatabase>>,
93
+    tsdb: Arc<Mutex<Option<TimeSeriesDatabase>>>,
72 94
     current: CurrentValues,
73 95
     rng: ThreadRng,
74 96
 }
@@ -110,6 +132,7 @@ impl Application {
110 132
     async fn handle_report(&mut self, device_id: u8, report: Report) {
111 133
         let location = radio::get_device_location(device_id);
112 134
         let count = report.count as usize;
135
+        let time = Utc::now();
113 136
         // Update local copy of current values.
114 137
         for i in 0..count {
115 138
             match report.values[i] {
@@ -118,15 +141,15 @@ impl Application {
118 141
                     error!("device tried to report a time");
119 142
                 }
120 143
                 Value::Temperature(temperature) => {
121
-                    let temperature = temperature as f32 / 10.0;
144
+                    let temperature = temperature as f32 / report.values[i].decimal_factor() as f32;
122 145
                     info!("{:?} temperature: {} °C", location, temperature);
123 146
                 }
124 147
                 Value::Pressure(pressure) => {
125
-                    let pressure = pressure as f32 / 100.0;
148
+                    let pressure = pressure as f32 / report.values[i].decimal_factor() as f32;
126 149
                     info!("{:?} pressure: {} HPa", location, pressure);
127 150
                 }
128 151
                 Value::Humidity(humidity) => {
129
-                    let humidity = humidity as f32 / 100.0;
152
+                    let humidity = humidity as f32 / report.values[i].decimal_factor() as f32;
130 153
                     info!("{:?} humidity: {}%", location, humidity);
131 154
                 }
132 155
             }
@@ -145,8 +168,9 @@ impl Application {
145 168
         let tsdb = self.tsdb.clone();
146 169
         tokio::spawn(async move {
147 170
             let mut tsdb = tsdb.lock().await;
148
-            tsdb.insert(Instant::now(), location, &report.values[0..count])
149
-                .await;
171
+            if let Some(tsdb) = tsdb.as_mut() {
172
+                tsdb.insert(time, location, &report.values[0..count]).await;
173
+            }
150 174
         });
151 175
     }
152 176
 

+ 1
- 4
base-station/software/src/radio/gpio.rs Näytä tiedosto

@@ -18,10 +18,9 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
18 18
 use tokio::time::delay_for;
19 19
 
20 20
 use super::super::spi::EmbeddedHalSpidev;
21
-use super::{get_key, RadioConfig};
21
+use super::get_key;
22 22
 
23 23
 pub struct RadioTask {
24
-    config: super::RadioConfig,
25 24
     nrf24: Option<RxMode<NRF24L01<GpioError, Pin, Pin, EmbeddedHalSpidev>>>,
26 25
     irq: Option<AsyncLineEventHandle>,
27 26
     tx: UnboundedReceiver<(u8, Packet)>,
@@ -30,12 +29,10 @@ pub struct RadioTask {
30 29
 
31 30
 impl RadioTask {
32 31
     pub fn new(
33
-        config: RadioConfig,
34 32
         tx: UnboundedReceiver<(u8, Packet)>,
35 33
         rx: UnboundedSender<(u8, Packet)>,
36 34
     ) -> RadioTask {
37 35
         RadioTask {
38
-            config,
39 36
             nrf24: None,
40 37
             irq: None,
41 38
             tx,

+ 2
- 9
base-station/software/src/radio/mod.rs Näytä tiedosto

@@ -25,11 +25,6 @@ const WEATHER_STATION_0_KEY: [u8; 16] = include!("../../../../common/weather_sta
25 25
 const WEATHER_STATION_1_ID: u8 = 0x31;
26 26
 const WEATHER_STATION_1_KEY: [u8; 16] = include!("../../../../common/weather_station_1_key.txt");
27 27
 
28
-/// Hardware configuration.
29
-pub struct RadioConfig {
30
-    // TODO
31
-}
32
-
33 28
 /// Wrapper around the NRF24 radio driver.
34 29
 pub struct Radio {
35 30
     tx: UnboundedSender<(u8, Packet)>,
@@ -37,13 +32,11 @@ pub struct Radio {
37 32
 
38 33
 impl Radio {
39 34
     /// Initializes the radio and returns both radio and the stream of incoming packets.
40
-    pub async fn init(
41
-        config: RadioConfig,
42
-    ) -> Result<(Radio, UnboundedReceiver<(u8, Packet)>), RadioError> {
35
+    pub async fn init() -> Result<(Radio, UnboundedReceiver<(u8, Packet)>), RadioError> {
43 36
         info!("Initializing radio...");
44 37
         let (tx_send, tx_recv) = mpsc::unbounded_channel();
45 38
         let (rx_send, rx_recv) = mpsc::unbounded_channel();
46
-        let mut task = RadioTask::new(config, tx_recv, rx_send);
39
+        let mut task = RadioTask::new(tx_recv, rx_send);
47 40
         task.init().await?;
48 41
         info!("radio initialized.");
49 42
 

+ 3
- 12
base-station/software/src/radio/nrfstick.rs Näytä tiedosto

@@ -4,19 +4,16 @@ use std::time::Duration;
4 4
 
5 5
 use futures_util::stream::StreamExt;
6 6
 use log::{error, info};
7
-use nrf24l01_stick_driver::{
8
-    Configuration, CrcMode, DataRate, Receiver, MAX_PAYLOAD_LEN, NRF24L01,
9
-};
7
+use nrf24l01_stick_driver::{Configuration, CrcMode, DataRate, Receiver, NRF24L01};
10 8
 use protocol::Packet;
11 9
 use rand::Rng;
12 10
 use tokio::select;
13 11
 use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
14 12
 use tokio::time::delay_for;
15 13
 
16
-use super::{get_key, RadioConfig};
14
+use super::get_key;
17 15
 
18 16
 pub struct RadioTask {
19
-    config: super::RadioConfig,
20 17
     nrf: Option<Receiver>,
21 18
 
22 19
     tx: UnboundedReceiver<(u8, Packet)>,
@@ -25,16 +22,10 @@ pub struct RadioTask {
25 22
 
26 23
 impl RadioTask {
27 24
     pub fn new(
28
-        config: RadioConfig,
29 25
         tx: UnboundedReceiver<(u8, Packet)>,
30 26
         rx: UnboundedSender<(u8, Packet)>,
31 27
     ) -> RadioTask {
32
-        RadioTask {
33
-            config,
34
-            nrf: None,
35
-            tx,
36
-            rx,
37
-        }
28
+        RadioTask { nrf: None, tx, rx }
38 29
     }
39 30
 
40 31
     pub async fn init(&mut self) -> Result<(), RadioError> {

+ 95
- 6
base-station/software/src/tsdb.rs Näytä tiedosto

@@ -1,13 +1,18 @@
1 1
 //! Interface to fill/query a timeseries database (InfluxDB).
2 2
 
3
-use std::time::Instant;
3
+use chrono::{DateTime, Utc};
4
+use influxdb::InfluxDbWriteable;
5
+use influxdb::{Client, Timestamp};
6
+use log::error;
4 7
 
8
+use super::config::InfluxDbConfig;
5 9
 use protocol::{Location, Value};
6 10
 
7 11
 // TODO: Configuration mechanism.
8 12
 
9 13
 /// Interface to InfluxDB.
10 14
 pub struct TimeSeriesDatabase {
15
+    client: Client,
11 16
     // TODO
12 17
 }
13 18
 
@@ -17,9 +22,16 @@ impl TimeSeriesDatabase {
17 22
     /// The function does not return an error. Instead, the code will automatically retry
18 23
     /// connection in case the connection cannot be established or the server closes the
19 24
     /// connection.
20
-    pub fn init() -> TimeSeriesDatabase {
21
-        // TODO
22
-        TimeSeriesDatabase {}
25
+    pub fn init(config: &InfluxDbConfig) -> TimeSeriesDatabase {
26
+        // TODO: Configurable address
27
+        let client = Client::new(&config.address, "weather");
28
+        let client = if config.user != "" && config.password != "" {
29
+            client.with_auth(&config.user, &config.password)
30
+        } else {
31
+            client
32
+        };
33
+        // TODO: Ping the server.
34
+        TimeSeriesDatabase { client }
23 35
     }
24 36
 
25 37
     /// Inserts values into the database.
@@ -34,7 +46,84 @@ impl TimeSeriesDatabase {
34 46
     /// * `time` - time at which the values were received
35 47
     /// * `location` - location of the device which sent the values
36 48
     /// * `values` - list of values to be inserted into the database
37
-    pub async fn insert(&mut self, _time: Instant, _location: Location, _values: &[Value]) {
38
-        // TODO
49
+    pub async fn insert(&mut self, time: DateTime<Utc>, location: Location, values: &[Value]) {
50
+        let location = location.to_str();
51
+
52
+        for value in values {
53
+            if let Err(e) = self.try_insert(time, location.to_owned(), value).await {
54
+                error!("Could not insert value into TSDB: {:?}", e);
55
+                // TODO: Ping the server and do not try to write anything until ping succeds to
56
+                // reduce debug spam.
57
+            }
58
+        }
59
+    }
60
+
61
+    async fn try_insert(
62
+        &mut self,
63
+        time: DateTime<Utc>,
64
+        location: String,
65
+        value: &Value,
66
+    ) -> Result<(), TsdbError> {
67
+        match *value {
68
+            Value::Temperature(temperature) => {
69
+                let temperature = temperature as f32 / value.decimal_factor() as f32;
70
+                let reading = TemperatureReading {
71
+                    time: time,
72
+                    temperature,
73
+                    location,
74
+                };
75
+                self.client.query(&reading.into_query("weather")).await?;
76
+            }
77
+            Value::Pressure(pressure) => {
78
+                let pressure = pressure as f32 / value.decimal_factor() as f32;
79
+                let reading = PressureReading {
80
+                    time: time,
81
+                    pressure,
82
+                    location,
83
+                };
84
+                self.client.query(&reading.into_query("weather")).await?;
85
+            }
86
+            Value::Humidity(humidity) => {
87
+                let humidity = humidity as f32 / value.decimal_factor() as f32;
88
+                let reading = HumidityReading {
89
+                    time: time,
90
+                    humidity,
91
+                    location,
92
+                };
93
+                self.client.query(&reading.into_query("weather")).await?;
94
+            }
95
+            _ => {}
96
+        }
97
+        Ok(())
39 98
     }
40 99
 }
100
+
101
+#[derive(InfluxDbWriteable)]
102
+struct TemperatureReading {
103
+    time: DateTime<Utc>,
104
+    temperature: f32,
105
+    #[tag]
106
+    location: String,
107
+}
108
+
109
+#[derive(InfluxDbWriteable)]
110
+struct PressureReading {
111
+    time: DateTime<Utc>,
112
+    pressure: f32,
113
+    #[tag]
114
+    location: String,
115
+}
116
+
117
+#[derive(InfluxDbWriteable)]
118
+struct HumidityReading {
119
+    time: DateTime<Utc>,
120
+    humidity: f32,
121
+    #[tag]
122
+    location: String,
123
+}
124
+
125
+#[derive(thiserror::Error, Debug)]
126
+pub enum TsdbError {
127
+    #[error("Database error")]
128
+    Database(#[from] influxdb::Error),
129
+}

+ 20
- 0
common/rust-protocol/src/lib.rs Näytä tiedosto

@@ -236,6 +236,16 @@ impl Location {
236 236
             _ => Err(Error::InvalidLocation),
237 237
         }
238 238
     }
239
+
240
+    pub fn to_str(self) -> &'static str {
241
+        match self {
242
+            Self::Livingroom => "Livingroom",
243
+            Self::Bathroom => "Bathroom",
244
+            Self::Bedroom => "Bedroom",
245
+            Self::Kitchen => "Kitchen",
246
+            Self::Balcony => "Balcony",
247
+        }
248
+    }
239 249
 }
240 250
 
241 251
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -322,6 +332,16 @@ impl Value {
322 332
             ValueType::Humidity => Self::Humidity(value as u16),
323 333
         }
324 334
     }
335
+
336
+    pub fn decimal_factor(&self) -> u32 {
337
+        match self {
338
+            Self::Invalid => 1,
339
+            Self::Time(_) => 1,
340
+            Self::Temperature(_) => 10,
341
+            Self::Pressure(_) => 100,
342
+            Self::Humidity(_) => 100,
343
+        }
344
+    }
325 345
 }
326 346
 
327 347
 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]

Loading…
Peruuta
Tallenna