//! Interface to fill/query a timeseries database (InfluxDB). use chrono::{DateTime, Utc}; use influxdb::InfluxDbWriteable; use influxdb::{Client, Timestamp}; use log::error; use super::config::InfluxDbConfig; use protocol::{Location, Value}; // TODO: Configuration mechanism. /// Interface to InfluxDB. pub struct TimeSeriesDatabase { client: Client, // TODO } impl TimeSeriesDatabase { /// Initializes the connection to InfluxDB. /// /// The function does not return an error. Instead, the code will automatically retry /// connection in case the connection cannot be established or the server closes the /// connection. pub fn init(config: &InfluxDbConfig) -> TimeSeriesDatabase { // TODO: Configurable address let client = Client::new(&config.address, "weather"); let client = if config.user != "" && config.password != "" { client.with_auth(&config.user, &config.password) } else { client }; // TODO: Ping the server. TimeSeriesDatabase { client } } /// Inserts values into the database. /// /// If there is no current connection to InfluxDB and the connection cannot be reestablished /// before the application is stopped, the values are lost. If multiple calls to this function /// occur while there is no current connection to InfluxDB, the values are queued and all /// values are inserted at a later time. /// /// # Arguments /// /// * `time` - time at which the values were received /// * `location` - location of the device which sent the values /// * `values` - list of values to be inserted into the database pub async fn insert(&mut self, time: DateTime, location: Location, values: &[Value]) { let location = location.to_str(); for value in values { if let Err(e) = self.try_insert(time, location.to_owned(), value).await { error!("Could not insert value into TSDB: {:?}", e); // TODO: Ping the server and do not try to write anything until ping succeds to // reduce debug spam. } } } async fn try_insert( &mut self, time: DateTime, location: String, value: &Value, ) -> Result<(), TsdbError> { match *value { Value::Temperature(temperature) => { let temperature = temperature as f32 / value.decimal_factor() as f32; let reading = TemperatureReading { time: time, temperature, location, }; self.client.query(&reading.into_query("weather")).await?; } Value::Pressure(pressure) => { let pressure = pressure as f32 / value.decimal_factor() as f32; let reading = PressureReading { time: time, pressure, location, }; self.client.query(&reading.into_query("weather")).await?; } Value::Humidity(humidity) => { let humidity = humidity as f32 / value.decimal_factor() as f32; let reading = HumidityReading { time: time, humidity, location, }; self.client.query(&reading.into_query("weather")).await?; } _ => {} } Ok(()) } } #[derive(InfluxDbWriteable)] struct TemperatureReading { time: DateTime, temperature: f32, #[influxdb(tag)] location: String, } #[derive(InfluxDbWriteable)] struct PressureReading { time: DateTime, pressure: f32, #[influxdb(tag)] location: String, } #[derive(InfluxDbWriteable)] struct HumidityReading { time: DateTime, humidity: f32, #[influxdb(tag)] location: String, } #[derive(thiserror::Error, Debug)] pub enum TsdbError { #[error("Database error")] Database(#[from] influxdb::Error), }