Browse Source

base-station: Reimplement radio code.

Mathias Gottschlag 5 years ago
parent
commit
1344ba308b

+ 141
- 2
base-station/software/Cargo.lock View File

33
  "embedded-hal",
33
  "embedded-hal",
34
  "embedded-nrf24l01",
34
  "embedded-nrf24l01",
35
  "env_logger",
35
  "env_logger",
36
+ "futures",
37
+ "futures-util",
36
  "linux-embedded-hal",
38
  "linux-embedded-hal",
37
  "log",
39
  "log",
38
  "mqtt-protocol",
40
  "mqtt-protocol",
39
  "protocol",
41
  "protocol",
40
  "rand",
42
  "rand",
43
+ "sysfs_gpio",
41
  "thiserror",
44
  "thiserror",
42
  "tokio",
45
  "tokio",
43
 ]
46
 ]
147
 source = "registry+https://github.com/rust-lang/crates.io-index"
150
 source = "registry+https://github.com/rust-lang/crates.io-index"
148
 checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
151
 checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
149
 
152
 
153
+[[package]]
154
+name = "futures"
155
+version = "0.3.5"
156
+source = "registry+https://github.com/rust-lang/crates.io-index"
157
+checksum = "1e05b85ec287aac0dc34db7d4a569323df697f9c55b99b15d6b4ef8cde49f613"
158
+dependencies = [
159
+ "futures-channel",
160
+ "futures-core",
161
+ "futures-executor",
162
+ "futures-io",
163
+ "futures-sink",
164
+ "futures-task",
165
+ "futures-util",
166
+]
167
+
168
+[[package]]
169
+name = "futures-channel"
170
+version = "0.3.5"
171
+source = "registry+https://github.com/rust-lang/crates.io-index"
172
+checksum = "f366ad74c28cca6ba456d95e6422883cfb4b252a83bed929c83abfdbbf2967d5"
173
+dependencies = [
174
+ "futures-core",
175
+ "futures-sink",
176
+]
177
+
150
 [[package]]
178
 [[package]]
151
 name = "futures-core"
179
 name = "futures-core"
152
 version = "0.3.5"
180
 version = "0.3.5"
153
 source = "registry+https://github.com/rust-lang/crates.io-index"
181
 source = "registry+https://github.com/rust-lang/crates.io-index"
154
 checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399"
182
 checksum = "59f5fff90fd5d971f936ad674802482ba441b6f09ba5e15fd8b39145582ca399"
155
 
183
 
184
+[[package]]
185
+name = "futures-executor"
186
+version = "0.3.5"
187
+source = "registry+https://github.com/rust-lang/crates.io-index"
188
+checksum = "10d6bb888be1153d3abeb9006b11b02cf5e9b209fda28693c31ae1e4e012e314"
189
+dependencies = [
190
+ "futures-core",
191
+ "futures-task",
192
+ "futures-util",
193
+]
194
+
195
+[[package]]
196
+name = "futures-io"
197
+version = "0.3.5"
198
+source = "registry+https://github.com/rust-lang/crates.io-index"
199
+checksum = "de27142b013a8e869c14957e6d2edeef89e97c289e69d042ee3a49acd8b51789"
200
+
201
+[[package]]
202
+name = "futures-macro"
203
+version = "0.3.5"
204
+source = "registry+https://github.com/rust-lang/crates.io-index"
205
+checksum = "d0b5a30a4328ab5473878237c447333c093297bded83a4983d10f4deea240d39"
206
+dependencies = [
207
+ "proc-macro-hack",
208
+ "proc-macro2",
209
+ "quote",
210
+ "syn",
211
+]
212
+
213
+[[package]]
214
+name = "futures-sink"
215
+version = "0.3.5"
216
+source = "registry+https://github.com/rust-lang/crates.io-index"
217
+checksum = "3f2032893cb734c7a05d85ce0cc8b8c4075278e93b24b66f9de99d6eb0fa8acc"
218
+
219
+[[package]]
220
+name = "futures-task"
221
+version = "0.3.5"
222
+source = "registry+https://github.com/rust-lang/crates.io-index"
223
+checksum = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626"
224
+dependencies = [
225
+ "once_cell",
226
+]
227
+
228
+[[package]]
229
+name = "futures-util"
230
+version = "0.3.5"
231
+source = "registry+https://github.com/rust-lang/crates.io-index"
232
+checksum = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6"
233
+dependencies = [
234
+ "futures-channel",
235
+ "futures-core",
236
+ "futures-io",
237
+ "futures-macro",
238
+ "futures-sink",
239
+ "futures-task",
240
+ "memchr",
241
+ "pin-project",
242
+ "pin-utils",
243
+ "proc-macro-hack",
244
+ "proc-macro-nested",
245
+ "slab",
246
+]
247
+
156
 [[package]]
248
 [[package]]
157
 name = "getrandom"
249
 name = "getrandom"
158
 version = "0.1.14"
250
 version = "0.1.14"
381
  "libc",
473
  "libc",
382
 ]
474
 ]
383
 
475
 
476
+[[package]]
477
+name = "once_cell"
478
+version = "1.3.1"
479
+source = "registry+https://github.com/rust-lang/crates.io-index"
480
+checksum = "b1c601810575c99596d4afc46f78a678c80105117c379eb3650cf99b8a21ce5b"
481
+
482
+[[package]]
483
+name = "pin-project"
484
+version = "0.4.14"
485
+source = "registry+https://github.com/rust-lang/crates.io-index"
486
+checksum = "2bbe07cee13ca15295ce93a5b1094d63e0420603e91ffda4f86d4478988916f2"
487
+dependencies = [
488
+ "pin-project-internal",
489
+]
490
+
491
+[[package]]
492
+name = "pin-project-internal"
493
+version = "0.4.14"
494
+source = "registry+https://github.com/rust-lang/crates.io-index"
495
+checksum = "1b789ec51a10e5a985a9863ef8791412523334d6240fab2cf40dd9fd47496dc6"
496
+dependencies = [
497
+ "proc-macro2",
498
+ "quote",
499
+ "syn",
500
+]
501
+
384
 [[package]]
502
 [[package]]
385
 name = "pin-project-lite"
503
 name = "pin-project-lite"
386
 version = "0.1.5"
504
 version = "0.1.5"
387
 source = "registry+https://github.com/rust-lang/crates.io-index"
505
 source = "registry+https://github.com/rust-lang/crates.io-index"
388
 checksum = "f7505eeebd78492e0f6108f7171c4948dbb120ee8119d9d77d0afa5469bef67f"
506
 checksum = "f7505eeebd78492e0f6108f7171c4948dbb120ee8119d9d77d0afa5469bef67f"
389
 
507
 
508
+[[package]]
509
+name = "pin-utils"
510
+version = "0.1.0"
511
+source = "registry+https://github.com/rust-lang/crates.io-index"
512
+checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
513
+
390
 [[package]]
514
 [[package]]
391
 name = "ppv-lite86"
515
 name = "ppv-lite86"
392
 version = "0.2.6"
516
 version = "0.2.6"
393
 source = "registry+https://github.com/rust-lang/crates.io-index"
517
 source = "registry+https://github.com/rust-lang/crates.io-index"
394
 checksum = "74490b50b9fbe561ac330df47c08f3f33073d2d00c150f719147d7c54522fa1b"
518
 checksum = "74490b50b9fbe561ac330df47c08f3f33073d2d00c150f719147d7c54522fa1b"
395
 
519
 
520
+[[package]]
521
+name = "proc-macro-hack"
522
+version = "0.5.15"
523
+source = "registry+https://github.com/rust-lang/crates.io-index"
524
+checksum = "0d659fe7c6d27f25e9d80a1a094c223f5246f6a6596453e09d7229bf42750b63"
525
+
526
+[[package]]
527
+name = "proc-macro-nested"
528
+version = "0.1.4"
529
+source = "registry+https://github.com/rust-lang/crates.io-index"
530
+checksum = "8e946095f9d3ed29ec38de908c22f95d9ac008e424c7bcae54c75a79c527c694"
531
+
396
 [[package]]
532
 [[package]]
397
 name = "proc-macro2"
533
 name = "proc-macro2"
398
 version = "1.0.10"
534
 version = "1.0.10"
588
 [[package]]
724
 [[package]]
589
 name = "sysfs_gpio"
725
 name = "sysfs_gpio"
590
 version = "0.5.4"
726
 version = "0.5.4"
591
-source = "registry+https://github.com/rust-lang/crates.io-index"
592
-checksum = "24961a55846623d8e4f6cec38718945116fed8d6970336a7110710a07aa9b5d1"
727
+source = "git+https://github.com/mgottschlag/rust-sysfs-gpio?branch=new-futures#c412bdc6d18910db8d66cea3db2dbad96af780b2"
593
 dependencies = [
728
 dependencies = [
729
+ "futures",
730
+ "mio",
594
  "nix",
731
  "nix",
732
+ "pin-project-lite",
733
+ "tokio",
595
 ]
734
 ]
596
 
735
 
597
 [[package]]
736
 [[package]]

+ 7
- 1
base-station/software/Cargo.toml View File

16
 thiserror = "1.0"
16
 thiserror = "1.0"
17
 protocol = { path = "../../common/rust-protocol" }
17
 protocol = { path = "../../common/rust-protocol" }
18
 rand = "0.7.3"
18
 rand = "0.7.3"
19
-tokio = { version="0.2", features=[ "full" ] }
19
+tokio = { version = "0.2", features = [ "full" ] }
20
+sysfs_gpio = { version = "0.5.4", features = [ "use_tokio" ] }
21
+futures = "0.3.5"
22
+futures-util = "0.3.5"
23
+
24
+[patch.crates-io]
25
+sysfs_gpio = { git = "https://github.com/mgottschlag/rust-sysfs-gpio", branch = "new-futures" }

+ 3
- 27
base-station/software/src/main.rs View File

3
 //! TODO: Document configuration and hardware setup.
3
 //! TODO: Document configuration and hardware setup.
4
 
4
 
5
 use std::env;
5
 use std::env;
6
-use std::io;
7
 use std::sync::Arc;
6
 use std::sync::Arc;
8
-use std::time::Instant;
7
+use std::time::{Duration, Instant};
9
 
8
 
10
 use log::{error, info};
9
 use log::{error, info};
11
 use protocol::{GetValues, Packet, Report, Value, Values};
10
 use protocol::{GetValues, Packet, Report, Value, Values};
12
 use rand::{rngs::ThreadRng, Rng};
11
 use rand::{rngs::ThreadRng, Rng};
13
 use tokio::sync::Mutex;
12
 use tokio::sync::Mutex;
13
+use tokio::time::delay_for;
14
 
14
 
15
 use current::CurrentValues;
15
 use current::CurrentValues;
16
 use publish::Publish;
16
 use publish::Publish;
93
     }
93
     }
94
 
94
 
95
     async fn handle_get_salt(&mut self, device_id: u8) {
95
     async fn handle_get_salt(&mut self, device_id: u8) {
96
+        delay_for(Duration::from_millis(1)).await;
96
         let salt = (self.rng.gen::<u64>() & !0xff & !(1 << 63)) | device_id as u64;
97
         let salt = (self.rng.gen::<u64>() & !0xff & !(1 << 63)) | device_id as u64;
97
         // TODO: Store the salt as the expected salt in Radio.
98
         // TODO: Store the salt as the expected salt in Radio.
98
         self.radio
99
         self.radio
206
         }
207
         }
207
     }
208
     }
208
 }*/
209
 }*/
209
-
210
-/*pub(crate) struct SensorUpdate {
211
-    location: u32,
212
-    data: Vec<SensorData>,
213
-}
214
-
215
-pub(crate) enum SensorData {
216
-    Temperature(f32),
217
-    Humidity(f32),
218
-    Pressure(f32),
219
-}*/
220
-
221
-#[derive(thiserror::Error, Debug)]
222
-pub(crate) enum Error {
223
-    #[error("I/O error")]
224
-    Io(#[from] io::Error),
225
-    #[error("radio error: {0:?}")]
226
-    Radio(embedded_nrf24l01::Error<std::io::Error>),
227
-}
228
-
229
-impl From<embedded_nrf24l01::Error<std::io::Error>> for Error {
230
-    fn from(e: embedded_nrf24l01::Error<std::io::Error>) -> Self {
231
-        Error::Radio(e)
232
-    }
233
-}

+ 149
- 183
base-station/software/src/radio.rs View File

1
 //! Wrapper around the NRF24 driver which provides tokio-compatible asynchronous interfaces.
1
 //! Wrapper around the NRF24 driver which provides tokio-compatible asynchronous interfaces.
2
 
2
 
3
-/*use std::convert::TryInto;
4
-use std::sync::mpsc;
5
-use std::thread;
6
-use std::thread::sleep;
7
-use std::time::{Duration, Instant};*/
3
+use std::convert::TryInto;
4
+use std::io;
5
+use std::time::Duration;
8
 
6
 
9
-/*use embedded_hal::blocking::spi::Transfer;
7
+use embedded_hal::blocking::spi::Transfer;
10
 use embedded_hal::digital::v2::OutputPin;
8
 use embedded_hal::digital::v2::OutputPin;
11
 use embedded_nrf24l01::{Configuration, CrcMode, DataRate, RxMode, NRF24L01};
9
 use embedded_nrf24l01::{Configuration, CrcMode, DataRate, RxMode, NRF24L01};
10
+use futures_util::stream::StreamExt;
12
 use linux_embedded_hal::spidev::{SpiModeFlags, Spidev, SpidevOptions};
11
 use linux_embedded_hal::spidev::{SpiModeFlags, Spidev, SpidevOptions};
13
-use linux_embedded_hal::sysfs_gpio::Direction;
14
-use linux_embedded_hal::sysfs_gpio::Error as GpioError;
12
+use linux_embedded_hal::sysfs_gpio::{Direction, Edge, Error as GpioError};
15
 use linux_embedded_hal::Pin;
13
 use linux_embedded_hal::Pin;
16
 use log::{error, info};
14
 use log::{error, info};
17
-use rand::rngs::ThreadRng;
18
-use rand::Rng;*/
19
 use protocol::{Location, Packet};
15
 use protocol::{Location, Packet};
20
-use tokio::sync::mpsc::UnboundedReceiver;
16
+use rand::Rng;
17
+use tokio::select;
18
+use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
19
+use tokio::time::delay_for;
21
 
20
 
22
-/*use super::spi::EmbeddedHalSpidev;
23
-use super::Error;
24
-use super::{SensorData, SensorUpdate};*/
21
+use super::spi::EmbeddedHalSpidev;
25
 
22
 
26
 const DISPLAY_ID: u8 = 0x20;
23
 const DISPLAY_ID: u8 = 0x20;
27
-//const DISPLAY_KEY: [u8; 16] = include!("../../../common/display_key.txt");
24
+const DISPLAY_KEY: [u8; 16] = include!("../../../common/display_key.txt");
28
 const WEATHER_STATION_0_ID: u8 = 0x30;
25
 const WEATHER_STATION_0_ID: u8 = 0x30;
29
-//const WEATHER_STATION_0_KEY: [u8; 16] = include!("../../../common/weather_station_0_key.txt");
26
+const WEATHER_STATION_0_KEY: [u8; 16] = include!("../../../common/weather_station_0_key.txt");
30
 
27
 
31
 /// Hardware configuration.
28
 /// Hardware configuration.
32
 pub struct RadioConfig {
29
 pub struct RadioConfig {
35
 
32
 
36
 /// Wrapper around the NRF24 radio driver.
33
 /// Wrapper around the NRF24 radio driver.
37
 pub struct Radio {
34
 pub struct Radio {
38
-    // TODO
35
+    tx: UnboundedSender<(u8, Packet)>,
39
 }
36
 }
40
 
37
 
41
 impl Radio {
38
 impl Radio {
42
     /// Initializes the radio and returns both radio and the stream of incoming packets.
39
     /// Initializes the radio and returns both radio and the stream of incoming packets.
43
     pub async fn init(
40
     pub async fn init(
44
-        _config: RadioConfig,
41
+        config: RadioConfig,
45
     ) -> Result<(Radio, UnboundedReceiver<(u8, Packet)>), RadioError> {
42
     ) -> Result<(Radio, UnboundedReceiver<(u8, Packet)>), RadioError> {
46
-        // TODO
47
-        panic!("Not yet implemented.");
43
+        info!("Initializing radio...");
44
+        let (tx_send, tx_recv) = mpsc::unbounded_channel();
45
+        let (rx_send, rx_recv) = mpsc::unbounded_channel();
46
+        let mut task = RadioTask {
47
+            config,
48
+            nrf24: None,
49
+            irq: None,
50
+            tx: tx_recv,
51
+            rx: rx_send,
52
+        };
53
+        task.init().await?;
54
+        info!("radio initialized.");
55
+
56
+        // Start a task for packet handling.
57
+        tokio::spawn(async move {
58
+            task.run().await;
59
+        });
60
+
61
+        Ok((Radio { tx: tx_send }, rx_recv))
48
     }
62
     }
49
 
63
 
50
     /// Sends a packet to the specified device.
64
     /// Sends a packet to the specified device.
55
     ///
69
     ///
56
     /// * `device_id` - the ID of the targeted device
70
     /// * `device_id` - the ID of the targeted device
57
     /// * `packet` - the packet to send
71
     /// * `packet` - the packet to send
58
-    pub fn send_packet_async(&mut self, _device_id: u8, _packet: Packet) {
59
-        // TODO
60
-        panic!("Not yet implemented.");
72
+    pub fn send_packet_async(&mut self, device_id: u8, packet: Packet) {
73
+        self.tx.send((device_id, packet)).unwrap();
61
     }
74
     }
62
 }
75
 }
63
 
76
 
64
-#[derive(Clone, Copy, Debug)]
65
-pub enum RadioError {
66
-    // TODO
67
-}
68
-
69
-/*pub(crate) fn start(updates: mpsc::Sender<SensorUpdate>) {
70
-    // TODO: Channels for sensor readings.
71
-    thread::spawn(move || {
72
-        // Whenever a error occurs, reinitialize the radio module shortly later.
73
-        loop {
74
-            match radio_thread(&updates) {
75
-                Ok(()) => {}
76
-                Err(e) => {
77
-                    error!("Radio error: {:?}", e);
78
-                }
79
-            }
80
-            thread::sleep(Duration::from_secs(3));
81
-        }
82
-    });
83
-}
84
-
85
-fn radio_thread(updates: &mpsc::Sender<SensorUpdate>) -> Result<(), Error> {
86
-    let mut radio = RadioImpl::init(updates)?;
87
-    radio.run_loop()?;
88
-    Ok(())
89
-}
90
-
91
-struct RadioImpl<'a> {
92
-    rx: Option<RxMode<NRF24L01<GpioError, Pin, Pin, EmbeddedHalSpidev>>>,
93
-    updates: &'a mpsc::Sender<SensorUpdate>,
94
-    rng: ThreadRng,
77
+struct RadioTask {
78
+    config: RadioConfig,
79
+    nrf24: Option<RxMode<NRF24L01<GpioError, Pin, Pin, EmbeddedHalSpidev>>>,
80
+    irq: Option<Pin>,
81
+    tx: UnboundedReceiver<(u8, Packet)>,
82
+    rx: UnboundedSender<(u8, Packet)>,
95
 }
83
 }
96
 
84
 
97
-impl<'a> RadioImpl<'a> {
98
-    fn init(updates: &'a mpsc::Sender<SensorUpdate>) -> Result<RadioImpl<'a>, Error> {
99
-        info!("Initializing radio...");
100
-
85
+impl RadioTask {
86
+    async fn init(&mut self) -> Result<(), RadioError> {
101
         // The NRF module is connected as follows:
87
         // The NRF module is connected as follows:
102
         // - CE: PA1
88
         // - CE: PA1
103
         // - CS: PG8
89
         // - CS: PG8
127
         let irq = Pin::new(irq_nr);
113
         let irq = Pin::new(irq_nr);
128
         irq.export().unwrap();
114
         irq.export().unwrap();
129
         irq.set_direction(Direction::In).unwrap();
115
         irq.set_direction(Direction::In).unwrap();
116
+        irq.set_edge(Edge::FallingEdge)?;
130
 
117
 
131
         // Configure SPI.
118
         // Configure SPI.
132
         let mut spi = Spidev::open("/dev/spidev0.0").unwrap();
119
         let mut spi = Spidev::open("/dev/spidev0.0").unwrap();
140
 
127
 
141
         // HACK: Cycle power until transfers to the module seem to work.
128
         // HACK: Cycle power until transfers to the module seem to work.
142
         loop {
129
         loop {
143
-            pwr.set_high().unwrap();
144
-            sleep(Duration::from_millis(10));
145
-            pwr.set_low().unwrap();
146
-            sleep(Duration::from_millis(10));
147
-            cs.set_low().unwrap();
130
+            pwr.set_high()?;
131
+            delay_for(Duration::from_millis(10)).await;
132
+            pwr.set_low()?;
133
+            delay_for(Duration::from_millis(10)).await;
134
+            cs.set_low()?;
148
             let mut read_aw = [0x03, 0x00];
135
             let mut read_aw = [0x03, 0x00];
149
             if spi.transfer(&mut read_aw).is_ok() {
136
             if spi.transfer(&mut read_aw).is_ok() {
150
                 // Correct address field width?
137
                 // Correct address field width?
152
                     break;
139
                     break;
153
                 }
140
                 }
154
             }
141
             }
142
+            println!("radio not yet responding correctly ({:x})...", read_aw[1]);
143
+            delay_for(Duration::from_millis(1000)).await;
155
         }
144
         }
156
 
145
 
157
         // Initialize the radio module.
146
         // Initialize the radio module.
160
         nrf24.set_rf(DataRate::R2Mbps, 3)?;
149
         nrf24.set_rf(DataRate::R2Mbps, 3)?;
161
         nrf24.set_crc(Some(CrcMode::OneByte))?;
150
         nrf24.set_crc(Some(CrcMode::OneByte))?;
162
         nrf24.set_auto_retransmit(250, 3)?;
151
         nrf24.set_auto_retransmit(250, 3)?;
163
-        nrf24.set_pipes_rx_enable(&[true, true, false, false, false, false])?; // TODO enable pipe 0 once the base station receives messages
164
-        nrf24
165
-            .set_rx_addr(0, &[0xB3, 0xB3, 0xB3, 0xB3, 0x00])
166
-            .unwrap();
167
-        nrf24
168
-            .set_rx_addr(1, &[0xB3, 0xB3, 0xB3, 0xB3, 0x00])
169
-            .unwrap();
170
-        nrf24.flush_rx().unwrap();
171
-        nrf24.flush_tx().unwrap();
172
-        nrf24.set_auto_ack(&[true; 6]).unwrap();
173
-        info!("auto ack: {:?}", nrf24.get_auto_ack().unwrap());
174
-        nrf24.set_pipes_rx_lengths(&[Some(32); 6]).unwrap();
175
-
176
-        info!("width: {}", nrf24.get_address_width().unwrap());
177
-
178
-        let rx = nrf24.rx().unwrap();
179
-
180
-        Ok(RadioImpl {
181
-            rx: Some(rx),
182
-            updates,
183
-            rng: rand::thread_rng(),
184
-        })
185
-    }
152
+        nrf24.set_pipes_rx_enable(&[true, true, false, false, false, false])?;
153
+        nrf24.set_rx_addr(0, &[0xB3, 0xB3, 0xB3, 0xB3, 0x00])?;
154
+        nrf24.set_rx_addr(1, &[0xB3, 0xB3, 0xB3, 0xB3, 0x00])?;
155
+        nrf24.flush_rx()?;
156
+        nrf24.flush_tx()?;
157
+        nrf24.set_auto_ack(&[true; 6])?;
158
+        info!("auto ack: {:?}", nrf24.get_auto_ack()?);
159
+        nrf24.set_pipes_rx_lengths(&[Some(32); 6])?;
160
+
161
+        self.nrf24 = Some(nrf24.rx().unwrap());
162
+        self.irq = Some(irq);
186
 
163
 
187
-    fn run_loop(&mut self) -> Result<(), Error> {
188
-        let mut start = Instant::now();
164
+        Ok(())
165
+    }
189
 
166
 
190
-        // Receive data.
191
-        info!("Starting to receive:");
167
+    async fn run(&mut self) {
192
         loop {
168
         loop {
193
-            sleep(Duration::from_millis(1));
194
-            let rx = self.rx.as_mut().unwrap();
195
-            if let Some(pipe) = rx.can_read().unwrap() {
196
-                let payload = rx.read().unwrap();
197
-                info!(
198
-                    "packet received on pipe {}: {:x?}, {}",
199
-                    pipe,
200
-                    payload.as_ref(),
201
-                    payload.len()
202
-                );
203
-                if payload.len() != 32 {
204
-                    continue;
169
+            let e = self.run_with_initialized_radio().await;
170
+            error!("Error in radio task: {:?}", e);
171
+            self.nrf24 = None;
172
+            // Try to reinitialize the radio.
173
+            loop {
174
+                info!("Trying to reinitialize radio: {:?}", e);
175
+                match self.init().await {
176
+                    Ok(_) => break,
177
+                    Err(e) => error!("Radio reinitialization failed: {:?}", e),
205
                 }
178
                 }
179
+                delay_for(Duration::from_secs(3)).await;
180
+            }
181
+        }
182
+    }
206
 
183
 
207
-                let mut payload: [u8; 32] = payload.as_ref().try_into().unwrap();
208
-                self.handle_packet(&mut payload)?;
209
-
210
-                let end = Instant::now();
211
-                let elapsed = end.duration_since(start);
212
-                info!("Debug: {:?}", elapsed);
213
-                start = end;
184
+    async fn run_with_initialized_radio(&mut self) -> Result<(), RadioError> {
185
+        // Wait for an RX interrupt of until we have to send a packet.
186
+        let mut irq_states = self.irq.as_mut().unwrap().get_value_stream().unwrap();
187
+        loop {
188
+            select! {
189
+                _pin_value = irq_states.next() => {
190
+                    // Try to receive until there are no packets left in the RX fifo and until the
191
+                    // interrupt pin is high again.
192
+                    self.receive_packets().await?;
193
+                },
194
+                packet = self.tx.next() => {
195
+                    let (device_id, packet) = packet.unwrap();
196
+                    // Switch to TX mode and send the packet.
197
+                    self.send_packet(device_id, packet).await?;
198
+                },
214
             }
199
             }
215
         }
200
         }
216
     }
201
     }
217
 
202
 
218
-    fn handle_packet(&mut self, payload: &mut [u8]) -> Result<(), Error> {
203
+    async fn receive_packets(&mut self) -> Result<(), RadioError> {
204
+        let rx = self.nrf24.as_mut().unwrap();
205
+        let pipe = match rx.can_read()? {
206
+            Some(p) => p,
207
+            None => return Ok(()),
208
+        };
209
+        let payload = rx.read()?;
210
+        info!(
211
+            "packet received on pipe {}: {:x?}, {}",
212
+            pipe,
213
+            payload.as_ref(),
214
+            payload.len()
215
+        );
216
+        if payload.len() != 32 {
217
+            return Ok(());
218
+        }
219
+
220
+        let mut payload: [u8; 32] = payload.as_ref().try_into().unwrap();
221
+        self.handle_packet(&mut payload).await;
222
+
223
+        Ok(())
224
+    }
225
+
226
+    async fn handle_packet(&mut self, packet: &mut [u8]) {
219
         // Get the key of the device.
227
         // Get the key of the device.
220
-        let device_id = payload[0];
228
+        let device_id = packet[0];
221
         let key = match get_key(device_id) {
229
         let key = match get_key(device_id) {
222
             Some(k) => k,
230
             Some(k) => k,
223
             None => {
231
             None => {
224
                 info!("packet from unknown device {:02x}", device_id);
232
                 info!("packet from unknown device {:02x}", device_id);
225
-                return Ok(());
233
+                return;
226
             }
234
             }
227
         };
235
         };
228
 
236
 
229
         // Decode the packet.
237
         // Decode the packet.
230
-        let packet = match Packet::decrypt_and_decode(key, payload) {
238
+        let packet = match Packet::decrypt_and_decode(key, packet) {
231
             Err(e) => {
239
             Err(e) => {
232
                 info!(
240
                 info!(
233
                     "invalid packet from device {:02x}, error {:?}",
241
                     "invalid packet from device {:02x}, error {:?}",
234
                     device_id, e
242
                     device_id, e
235
                 );
243
                 );
236
-                return Ok(());
244
+                return;
237
             }
245
             }
238
             Ok(p) => p,
246
             Ok(p) => p,
239
         };
247
         };
243
 
251
 
244
         info!("packet from {}: {:?}", device_id, packet);
252
         info!("packet from {}: {:?}", device_id, packet);
245
 
253
 
246
-        match packet {
247
-            Packet::GetSalt => {
248
-                self.send_salt(device_id)?;
249
-            }
250
-            Packet::Salt(_) => {
251
-                error!("received Salt packet from device.");
252
-            }
253
-            Packet::Report(payload) => {
254
-                let location = get_location(device_id);
255
-                let count = payload.count;
256
-                for i in 0..count {
257
-                    match payload.values[i as usize] {
258
-                        Value::Invalid => {}
259
-                        Value::Time(_) => {
260
-                            error!("device tried to report a time");
261
-                        }
262
-                        Value::Temperature(temperature) => {
263
-                            let temperature = temperature as f32 / 10.0;
264
-                            info!("{:?} temperature: {} °C", location, temperature);
265
-                        }
266
-                        Value::Pressure(pressure) => {
267
-                            let pressure = pressure as f32 / 100.0;
268
-                            info!("{:?} pressure: {} HPa", location, pressure);
269
-                        }
270
-                        Value::Humidity(humidity) => {
271
-                            let humidity = humidity as f32 / 100.0;
272
-                            info!("{:?} humidity: {}%", location, humidity);
273
-                        }
274
-                    }
275
-                }
276
-                // TODO: Send values via MQTT
277
-                /*
278
-                updates
279
-                    .send(SensorUpdate {
280
-                        location: 0,
281
-                        data: vec![
282
-                            SensorData::Temperature(temperature),
283
-                            SensorData::Pressure(pressure),
284
-                            SensorData::Humidity(humidity),
285
-                        ],
286
-                    })
287
-                    .unwrap();
288
-                */
289
-            }
290
-            Packet::GetValues(_payload) => {
291
-                error!("GetValues not yet implemented.");
292
-                // TODO
293
-            }
294
-            Packet::Values(_payload) => {
295
-                error!("received Values packet from device.");
296
-            }
297
-        }
298
-        Ok(())
254
+        self.rx.send((device_id, packet)).unwrap();
299
     }
255
     }
300
 
256
 
301
-    fn send_salt(&mut self, device_id: u8) -> Result<(), Error> {
302
-        let salt = (self.rng.gen::<u64>() & !0xff & !(1 << 63)) | device_id as u64;
303
-        let packet = Packet::Salt(salt);
304
-        self.send_packet(device_id, packet)
305
-    }
306
-
307
-    fn send_packet(&mut self, device_id: u8, packet: Packet) -> Result<(), Error> {
308
-        let salt = self.rng.gen::<u64>();
257
+    async fn send_packet(&mut self, device_id: u8, packet: Packet) -> Result<(), RadioError> {
258
+        let salt = rand::thread_rng().gen::<u64>();
309
         let mut tx = self
259
         let mut tx = self
310
-            .rx
260
+            .nrf24
311
             .take()
261
             .take()
312
             .unwrap()
262
             .unwrap()
313
             .standby()
263
             .standby()
314
             .tx()
264
             .tx()
315
-            .map_err(|(_, e)| Error::Radio(e))?;
265
+            .map_err(|(_, e)| RadioError::Radio(e))?;
316
         let mut encoded = [0u8; 32];
266
         let mut encoded = [0u8; 32];
317
         let key = get_key(device_id);
267
         let key = get_key(device_id);
318
         // TODO: Check whether the packet arrived.
268
         // TODO: Check whether the packet arrived.
325
         } else {
275
         } else {
326
             info!("could not encode packet {:?}", packet);
276
             info!("could not encode packet {:?}", packet);
327
         }
277
         }
328
-        self.rx = Some(
278
+        self.nrf24 = Some(
329
             tx.standby()
279
             tx.standby()
330
-                .map_err(Error::Radio)?
280
+                .map_err(RadioError::Radio)?
331
                 .rx()
281
                 .rx()
332
-                .map_err(|(_, e)| Error::Radio(e))?,
282
+                .map_err(|(_, e)| RadioError::Radio(e))?,
333
         );
283
         );
334
         info!("packet sent: {:?}", packet);
284
         info!("packet sent: {:?}", packet);
335
         Ok(())
285
         Ok(())
336
     }
286
     }
337
-}*/
287
+}
288
+
289
+#[derive(thiserror::Error, Debug)]
290
+pub enum RadioError {
291
+    #[error("I/O error")]
292
+    Io(#[from] io::Error),
293
+    #[error("GPIO error")]
294
+    Gpio(#[from] GpioError),
295
+    #[error("radio error: {0:?}")]
296
+    Radio(embedded_nrf24l01::Error<std::io::Error>),
297
+}
298
+
299
+impl From<embedded_nrf24l01::Error<std::io::Error>> for RadioError {
300
+    fn from(e: embedded_nrf24l01::Error<std::io::Error>) -> Self {
301
+        RadioError::Radio(e)
302
+    }
303
+}
338
 
304
 
339
 pub fn get_device_location(device_id: u8) -> Location {
305
 pub fn get_device_location(device_id: u8) -> Location {
340
     match device_id {
306
     match device_id {
344
     }
310
     }
345
 }
311
 }
346
 
312
 
347
-/*fn get_key(device_id: u8) -> Option<&'static [u8]> {
313
+fn get_key(device_id: u8) -> Option<&'static [u8]> {
348
     match device_id {
314
     match device_id {
349
         DISPLAY_ID => Some(&DISPLAY_KEY),
315
         DISPLAY_ID => Some(&DISPLAY_KEY),
350
         WEATHER_STATION_0_ID => Some(&WEATHER_STATION_0_KEY),
316
         WEATHER_STATION_0_ID => Some(&WEATHER_STATION_0_KEY),
354
 
320
 
355
 fn get_pin_number(c: char, n: u64) -> u64 {
321
 fn get_pin_number(c: char, n: u64) -> u64 {
356
     (c as u64 - 'A' as u64) * 32 + n
322
     (c as u64 - 'A' as u64) * 32 + n
357
-}*/
323
+}

Loading…
Cancel
Save