Kaynağa Gözat

Implement RX interrupt handling using gpio-cdev.

Sending still uses busy waiting, using interrupts there requires some
additional restructuring.
Mathias Gottschlag 5 yıl önce
ebeveyn
işleme
223442b0a6

+ 17
- 1
base-station/software/Cargo.lock Dosyayı Görüntüle

@@ -35,9 +35,11 @@ dependencies = [
35 35
  "env_logger",
36 36
  "futures",
37 37
  "futures-util",
38
+ "gpio-cdev",
38 39
  "linux-embedded-hal",
39 40
  "log",
40 41
  "mqtt-protocol",
42
+ "nb",
41 43
  "protocol",
42 44
  "rand",
43 45
  "sysfs_gpio",
@@ -109,10 +111,11 @@ dependencies = [
109 111
 [[package]]
110 112
 name = "embedded-nrf24l01"
111 113
 version = "0.2.0"
112
-source = "git+https://github.com/mgottschlag/embedded-nrf24l01?branch=wip#73fe8c9d16a3501c3e11b1320f66750643798d54"
114
+source = "git+https://github.com/mgottschlag/embedded-nrf24l01?branch=wip#e2d5148d169973550dade11456d6c9b2f58577eb"
113 115
 dependencies = [
114 116
  "bitfield",
115 117
  "embedded-hal",
118
+ "nb",
116 119
 ]
117 120
 
118 121
 [[package]]
@@ -256,6 +259,19 @@ dependencies = [
256 259
  "wasi",
257 260
 ]
258 261
 
262
+[[package]]
263
+name = "gpio-cdev"
264
+version = "0.3.0"
265
+source = "git+https://github.com/mgottschlag/gpio-cdev?branch=tokio#21d1704319fd747f67276149afbd7a285700de20"
266
+dependencies = [
267
+ "bitflags",
268
+ "futures",
269
+ "libc",
270
+ "mio",
271
+ "nix",
272
+ "tokio",
273
+]
274
+
259 275
 [[package]]
260 276
 name = "hermit-abi"
261 277
 version = "0.1.11"

+ 2
- 0
base-station/software/Cargo.toml Dosyayı Görüntüle

@@ -20,6 +20,8 @@ tokio = { version = "0.2", features = [ "full" ] }
20 20
 sysfs_gpio = { version = "0.5.4", features = [ "use_tokio" ] }
21 21
 futures = "0.3.5"
22 22
 futures-util = "0.3.5"
23
+nb = "0.1.2"
24
+gpio-cdev = { git = "https://github.com/mgottschlag/gpio-cdev", branch = "tokio", features = [ "async-tokio" ] }
23 25
 
24 26
 [patch.crates-io]
25 27
 sysfs_gpio = { git = "https://github.com/mgottschlag/rust-sysfs-gpio", branch = "new-futures" }

+ 53
- 30
base-station/software/src/radio.rs Dosyayı Görüntüle

@@ -8,8 +8,9 @@ use embedded_hal::blocking::spi::Transfer;
8 8
 use embedded_hal::digital::v2::OutputPin;
9 9
 use embedded_nrf24l01::{Configuration, CrcMode, DataRate, RxMode, NRF24L01};
10 10
 use futures_util::stream::StreamExt;
11
+use gpio_cdev::{AsyncLineEventHandle, Chip, EventRequestFlags, LineRequestFlags};
11 12
 use linux_embedded_hal::spidev::{SpiModeFlags, Spidev, SpidevOptions};
12
-use linux_embedded_hal::sysfs_gpio::{Direction, Edge, Error as GpioError};
13
+use linux_embedded_hal::sysfs_gpio::{Direction, Error as GpioError};
13 14
 use linux_embedded_hal::Pin;
14 15
 use log::{error, info};
15 16
 use protocol::{Location, Packet};
@@ -77,7 +78,7 @@ impl Radio {
77 78
 struct RadioTask {
78 79
     config: RadioConfig,
79 80
     nrf24: Option<RxMode<NRF24L01<GpioError, Pin, Pin, EmbeddedHalSpidev>>>,
80
-    irq: Option<Pin>,
81
+    irq: Option<AsyncLineEventHandle>,
81 82
     tx: UnboundedReceiver<(u8, Packet)>,
82 83
     rx: UnboundedSender<(u8, Packet)>,
83 84
 }
@@ -109,11 +110,14 @@ impl RadioTask {
109 110
         cs.export().unwrap();
110 111
         cs.set_direction(Direction::Out).unwrap();
111 112
         cs.set_high().unwrap();
113
+        let mut chip = Chip::new("/dev/gpiochip0")?;
112 114
         let irq_nr = get_pin_number('G', 9);
113
-        let irq = Pin::new(irq_nr);
114
-        irq.export().unwrap();
115
-        irq.set_direction(Direction::In).unwrap();
116
-        irq.set_edge(Edge::FallingEdge)?;
115
+        let line = chip.get_line(irq_nr as u32)?;
116
+        let irq = AsyncLineEventHandle::new(line.events(
117
+            LineRequestFlags::INPUT,
118
+            EventRequestFlags::BOTH_EDGES,
119
+            "gpioevents",
120
+        )?)?;
117 121
 
118 122
         // Configure SPI.
119 123
         let mut spi = Spidev::open("/dev/spidev0.0").unwrap();
@@ -139,7 +143,7 @@ impl RadioTask {
139 143
                     break;
140 144
                 }
141 145
             }
142
-            println!("radio not yet responding correctly ({:x})...", read_aw[1]);
146
+            info!("radio not yet responding correctly ({:x})...", read_aw[1]);
143 147
             delay_for(Duration::from_millis(1000)).await;
144 148
         }
145 149
 
@@ -183,10 +187,9 @@ impl RadioTask {
183 187
 
184 188
     async fn run_with_initialized_radio(&mut self) -> Result<(), RadioError> {
185 189
         // Wait for an RX interrupt of until we have to send a packet.
186
-        let mut irq_states = self.irq.as_mut().unwrap().get_value_stream().unwrap();
187 190
         loop {
188 191
             select! {
189
-                _pin_value = irq_states.next() => {
192
+                _pin_event = self.irq.as_mut().unwrap().next() => {
190 193
                     // Try to receive until there are no packets left in the RX fifo and until the
191 194
                     // interrupt pin is high again.
192 195
                     self.receive_packets().await?;
@@ -201,26 +204,26 @@ impl RadioTask {
201 204
     }
202 205
 
203 206
     async fn receive_packets(&mut self) -> Result<(), RadioError> {
204
-        let rx = self.nrf24.as_mut().unwrap();
205
-        let pipe = match rx.can_read()? {
206
-            Some(p) => p,
207
-            None => return Ok(()),
208
-        };
209
-        let payload = rx.read()?;
210
-        info!(
211
-            "packet received on pipe {}: {:x?}, {}",
212
-            pipe,
213
-            payload.as_ref(),
214
-            payload.len()
215
-        );
216
-        if payload.len() != 32 {
217
-            return Ok(());
218
-        }
219
-
220
-        let mut payload: [u8; 32] = payload.as_ref().try_into().unwrap();
221
-        self.handle_packet(&mut payload).await;
207
+        loop {
208
+            let rx = self.nrf24.as_mut().unwrap();
209
+            let pipe = match rx.can_read()? {
210
+                Some(p) => p,
211
+                None => return Ok(()),
212
+            };
213
+            let payload = rx.read()?;
214
+            info!(
215
+                "packet received on pipe {}: {:x?}, {}",
216
+                pipe,
217
+                payload.as_ref(),
218
+                payload.len()
219
+            );
220
+            if payload.len() != 32 {
221
+                return Ok(());
222
+            }
222 223
 
223
-        Ok(())
224
+            let mut payload: [u8; 32] = payload.as_ref().try_into().unwrap();
225
+            self.handle_packet(&mut payload).await;
226
+        }
224 227
     }
225 228
 
226 229
     async fn handle_packet(&mut self, packet: &mut [u8]) {
@@ -256,6 +259,7 @@ impl RadioTask {
256 259
 
257 260
     async fn send_packet(&mut self, device_id: u8, packet: Packet) -> Result<(), RadioError> {
258 261
         let salt = rand::thread_rng().gen::<u64>();
262
+        // TODO: Fix salt (device bit, device id).
259 263
         let mut tx = self
260 264
             .nrf24
261 265
             .take()
@@ -265,13 +269,31 @@ impl RadioTask {
265 269
             .map_err(|(_, e)| RadioError::Radio(e))?;
266 270
         let mut encoded = [0u8; 32];
267 271
         let key = get_key(device_id);
268
-        // TODO: Check whether the packet arrived.
272
+
273
+        info!("sending packet: {:?}", packet);
269 274
         if packet.encode_and_encrypt(key.unwrap(), salt, &mut encoded) {
270 275
             tx.set_tx_addr(&[0xB3, 0xB3, 0xB3, 0xB3, device_id])
271 276
                 .unwrap();
272 277
             tx.set_rx_addr(0, &[0xB3, 0xB3, 0xB3, 0xB3, device_id])
273 278
                 .unwrap();
274 279
             tx.send(&encoded).unwrap();
280
+
281
+            // Wait for the packet to be sent and check whether it was acknowledged.
282
+            loop {
283
+                delay_for(Duration::from_millis(1)).await;
284
+                match tx.poll_send() {
285
+                    Ok(status) => {
286
+                        if !status {
287
+                            error!("packet lost.");
288
+                        }
289
+                        break;
290
+                    }
291
+                    Err(nb::Error::WouldBlock) => {
292
+                        // TODO: Wait for interrupt.
293
+                    }
294
+                    Err(nb::Error::Other(e)) => return Err(e.into()),
295
+                }
296
+            }
275 297
         } else {
276 298
             info!("could not encode packet {:?}", packet);
277 299
         }
@@ -281,7 +303,6 @@ impl RadioTask {
281 303
                 .rx()
282 304
                 .map_err(|(_, e)| RadioError::Radio(e))?,
283 305
         );
284
-        info!("packet sent: {:?}", packet);
285 306
         Ok(())
286 307
     }
287 308
 }
@@ -292,6 +313,8 @@ pub enum RadioError {
292 313
     Io(#[from] io::Error),
293 314
     #[error("GPIO error")]
294 315
     Gpio(#[from] GpioError),
316
+    #[error("GPIO error (gpio-cdev)")]
317
+    GpioCdev(#[from] gpio_cdev::errors::Error),
295 318
     #[error("radio error: {0:?}")]
296 319
     Radio(embedded_nrf24l01::Error<std::io::Error>),
297 320
 }

Loading…
İptal
Kaydet