Няма описание
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. use std::io::Write;
  2. use std::net::TcpStream;
  3. use std::thread;
  4. use log::{error, info};
  5. use mqtt::control::variable_header::ConnectReturnCode;
  6. use mqtt::packet::*;
  7. use mqtt::{Decodable, Encodable, QualityOfService};
  8. use mqtt::{TopicFilter, TopicName};
  9. use super::Error;
  10. pub struct MQTT {
  11. stream: TcpStream,
  12. }
  13. impl MQTT {
  14. pub fn connect(server_addr: &str) -> Result<MQTT, Error> {
  15. info!("Connecting to MQTT broker at {}...", server_addr);
  16. let mut stream = TcpStream::connect(server_addr)?;
  17. let mut conn = ConnectPacket::new("MQTT", "72711cd2-faaf-44f0-8490-b92eb063ff4b");
  18. conn.set_clean_session(true);
  19. let mut buf = Vec::new();
  20. conn.encode(&mut buf).unwrap();
  21. stream.write_all(&buf[..])?;
  22. let connack = ConnackPacket::decode(&mut stream).unwrap();
  23. if connack.connect_return_code() != ConnectReturnCode::ConnectionAccepted {
  24. panic!(
  25. "Failed to connect to server, return code {:?}",
  26. connack.connect_return_code()
  27. );
  28. }
  29. info!("Connected to MQTT broker.");
  30. // We need to listen for packets from the server to reply to ping requests.
  31. // TODO: More protocol handling.
  32. let mut cloned_stream = stream.try_clone().unwrap();
  33. thread::spawn(move || {
  34. loop {
  35. let packet = match VariablePacket::decode(&mut cloned_stream) {
  36. Ok(pk) => pk,
  37. Err(err) => {
  38. error!("could not decode received packet: {:?}", err);
  39. continue;
  40. }
  41. };
  42. match packet {
  43. VariablePacket::PingreqPacket(..) => {
  44. let pingresp = PingrespPacket::new();
  45. info!("Sending Ping response {:?}", pingresp);
  46. pingresp.encode(&mut cloned_stream).unwrap();
  47. }
  48. VariablePacket::DisconnectPacket(..) => {
  49. // Nothing to do here, the other end closes the connection.
  50. break;
  51. }
  52. _ => {}
  53. }
  54. }
  55. });
  56. Ok(MQTT { stream })
  57. }
  58. pub fn publish(&mut self, topic: &str, value: &str) -> Result<(), Error> {
  59. // Publish the value.
  60. let topic = TopicName::new(topic).unwrap();
  61. let publish_packet = PublishPacket::new(topic, QoSWithPacketIdentifier::Level0, value);
  62. let mut buf = Vec::new();
  63. publish_packet.encode(&mut buf).unwrap();
  64. self.stream.write_all(&buf[..])?;
  65. // Check for any packets from the server to see whether we were disconnected.
  66. // TODO
  67. Ok(())
  68. }
  69. }
  70. impl Drop for MQTT {
  71. fn drop(&mut self) {
  72. // TODO: Stop the receive thread.
  73. }
  74. }