From 088c9b9878dddcb91146a72a9974eeb580cf7999 Mon Sep 17 00:00:00 2001 From: Isabelle L Date: Thu, 14 May 2020 00:34:04 -0500 Subject: [PATCH] removed packet module and most of the server/client code in preparation for using ilmp --- Cargo.lock | 26 +++++++++--- Cargo.toml | 1 + src/client.rs | 15 +++---- src/lib.rs | 1 - src/packet.rs | 93 ------------------------------------------- src/packet/join.rs | 32 --------------- src/packet/message.rs | 36 ----------------- src/server.rs | 61 ++++++++++------------------ 8 files changed, 47 insertions(+), 218 deletions(-) delete mode 100644 src/packet.rs delete mode 100644 src/packet/join.rs delete mode 100644 src/packet/message.rs diff --git a/Cargo.lock b/Cargo.lock index baf210e..a1cf0e5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -309,6 +309,19 @@ dependencies = [ "libc", ] +[[package]] +name = "ilmp" +version = "0.1.0" +dependencies = [ + "chrono", + "futures", + "futures-util", + "orion", + "serde", + "serde_json", + "uuid", +] + [[package]] name = "iovec" version = "0.1.4" @@ -435,6 +448,7 @@ dependencies = [ "chrono", "futures", "futures-util", + "ilmp", "lazy_static", "orion", "serde", @@ -651,18 +665,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" [[package]] name = "serde" -version = "1.0.107" +version = "1.0.110" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eba7550f2cdf88ffc23ab0f1607133486c390a8c0f89b57e589b9654ee15e04d" +checksum = "99e7b308464d16b56eba9964e4972a3eee817760ab60d88c3f86e1fecb08204c" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.107" +version = "1.0.110" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10be45e22e5597d4b88afcc71f9d7bfadcd604bf0c78a3ab4582b8d2b37f39f3" +checksum = "818fbf6bfa9a42d3bfcaca148547aa00c7b915bec71d1757aa2d44ca68771984" dependencies = [ "proc-macro2", "quote", @@ -671,9 +685,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.52" +version = "1.0.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7894c8ed05b7a3a279aeb79025fdec1d3158080b75b98a08faf2806bb799edd" +checksum = "993948e75b189211a9b31a7528f950c6adc21f9720b6438ff80a7fa2f864cea2" dependencies = [ "itoa", "ryu", diff --git a/Cargo.toml b/Cargo.toml index 6e0a6d6..a787cbb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,4 @@ chrono = "0.4.11" futures = "0.3.5" toml = "0.5.6" orion = "0.15.1" +ilmp = {path = "./ilmp"} diff --git a/src/client.rs b/src/client.rs index 81a0a05..e2ce55b 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,5 @@ // namespacing use crate::config::ClientConfig as Config; -use crate::packet::{Join, Message, Packet, Sendable}; use crate::Result; use async_std::net::TcpStream; use futures_util::io::AsyncReadExt; @@ -10,16 +9,12 @@ pub async fn client(port: u16) -> Result<()> { let config = Config::load()?; let stream = TcpStream::connect(format!("127.0.0.1:{}", &port)).await?; - println!("connection established to: {}:{}", stream.peer_addr()?.ip(), port); + println!( + "connection established to: {}:{}", + stream.peer_addr()?.ip(), + port + ); let (_read, mut write) = stream.split(); - let join: Packet = Join::new(config.user).to_packet()?; - join.write(&mut write).await?; - - // testing stuffs - let message: Packet = - Message::new("Isabelle".to_owned(), "Hello Server".to_owned()).to_packet()?; - message.write(&mut write).await?; - loop {} } diff --git a/src/lib.rs b/src/lib.rs index 714cc07..9ae3249 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,6 @@ // modules mod client; mod config; -mod packet; mod server; // re-exports diff --git a/src/packet.rs b/src/packet.rs deleted file mode 100644 index 6b554d9..0000000 --- a/src/packet.rs +++ /dev/null @@ -1,93 +0,0 @@ -// namespacing -use crate::Result; -use async_std::net::TcpStream; -use async_std::prelude::*; -use futures_util::io::{ReadHalf, WriteHalf}; -use std::convert::TryInto; - -mod join; -pub use join::Join; -mod message; -pub use message::Message; - -/// structured [packet type byte][four bytes of packet length][contents of packet] -pub struct NetworkPacket(Vec); - -impl std::convert::Into for Packet { - fn into(self) -> NetworkPacket { - let mut contents: Vec = Vec::new(); - - // packet type byte - contents.push(self.packet_type as u8); - // write the packet length - let contents_length = self.packet_contents.len() as u32; - contents.extend_from_slice(&contents_length.to_le_bytes()); - // write the rest of the contents - contents.extend_from_slice(&self.packet_contents); - - NetworkPacket(contents) - } -} - -pub trait Sendable: Sized { - fn to_packet(self) -> Result; - fn from_packet(packet: Packet) -> Result; -} - -/// contains data to be turned into a network packet or into a more specific packet -#[derive(Debug, Clone)] -pub struct Packet { - pub packet_type: PacketType, - packet_contents: Vec, -} - -impl Packet { - /// create a new packet - pub fn new(packet_type: PacketType, packet_contents: Vec) -> Self { - Self { packet_type, packet_contents } - } - - /// read a packet from a tcpstream - pub async fn read(stream: &mut ReadHalf) -> Result> { - let mut info_buf = [0u8; 5]; - let check = stream.read(&mut info_buf).await?; - if check == 0 { - return Ok(None); - } - - let packet_type = PacketType::from_u8(info_buf[0]).unwrap(); - - let length = u32::from_le_bytes(info_buf[1..5].try_into().unwrap()) as usize; - - let mut contents: Vec = vec![0; length]; - stream.read(&mut contents).await?; - - Ok(Some(Packet::new(packet_type, contents))) - } - - /// write a packet to the tcpstream - pub async fn write(self, stream: &mut WriteHalf) -> Result<()> { - let network_packet: NetworkPacket = self.into(); - stream.write(&network_packet.0).await?; - Ok(()) - } -} - -/// represent the specific packet type -#[derive(Debug, Clone)] -#[repr(u8)] -pub enum PacketType { - Message = 0, - Join = 1, -} - -impl PacketType { - /// returns the PacketType if the u8 is a valid packet type - pub fn from_u8(packet_type: u8) -> Option { - match packet_type { - 0 => Some(Self::Message), - 1 => Some(Self::Join), - _ => None, - } - } -} diff --git a/src/packet/join.rs b/src/packet/join.rs deleted file mode 100644 index e0b759a..0000000 --- a/src/packet/join.rs +++ /dev/null @@ -1,32 +0,0 @@ -use crate::packet::{Packet, PacketType}; -use crate::Result; -use chrono::prelude::*; -use serde::{Deserialize, Serialize}; - -#[derive(Deserialize, Serialize, Debug, Clone)] -pub struct Join { - user: String, - timestamp: i64, -} - -impl Join { - pub fn new(user: String) -> Self { - let timestamp = Utc::now().timestamp(); - Self { user, timestamp } - } -} - -impl crate::packet::Sendable for Join { - fn to_packet(self) -> Result { - let packet_contents: Vec = serde_json::to_string(&self)?.into_bytes(); - let packet_type = PacketType::Join; - Ok(Packet { packet_type, packet_contents }) - } - - fn from_packet(packet: Packet) -> Result { - let packet_contents = - &String::from_utf8(packet.packet_contents).expect("could not decode as utf8"); - let join: Join = serde_json::from_str(packet_contents)?; - Ok(join) - } -} diff --git a/src/packet/message.rs b/src/packet/message.rs deleted file mode 100644 index ee6e1a4..0000000 --- a/src/packet/message.rs +++ /dev/null @@ -1,36 +0,0 @@ -// namespacing -use crate::packet::{Packet, PacketType}; -use crate::Result; -use chrono::prelude::*; -use serde::{Deserialize, Serialize}; - -/// a Message -#[derive(Deserialize, Serialize, Debug, Clone)] -pub struct Message { - user: String, - contents: String, - timestamp: i64, -} - -impl Message { - /// create a new message - pub fn new(user: String, contents: String) -> Self { - let timestamp = Utc::now().timestamp(); - Self { user, contents, timestamp } - } -} - -impl crate::packet::Sendable for Message { - fn to_packet(self) -> Result { - let packet_contents: Vec = serde_json::to_string(&self)?.into_bytes(); - let packet_type = PacketType::Message; - Ok(Packet { packet_type, packet_contents }) - } - - fn from_packet(packet: Packet) -> Result { - let packet_contents = - &String::from_utf8(packet.packet_contents).expect("could not decode as utf8"); - let message: Message = serde_json::from_str(packet_contents)?; - Ok(message) - } -} diff --git a/src/server.rs b/src/server.rs index b4c83df..0b5d179 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,8 +1,5 @@ // namespacing -use crate::{ - packet::{Join, Message, Packet, PacketType, Sendable}, - Result, -}; +use crate::Result; use async_std::{ net::{TcpListener, TcpStream}, task, @@ -21,7 +18,11 @@ lazy_static! { /// wraps the server pub async fn server(port: u16) -> Result<()> { let listener = TcpListener::bind(format!("127.0.0.1:{}", &port)).await?; - println!("online as server at: {}:{}", listener.local_addr()?.ip(), port); + println!( + "online as server at: {}:{}", + listener.local_addr()?.ip(), + port + ); let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { @@ -33,7 +34,10 @@ pub async fn server(port: u16) -> Result<()> { let (read, write) = stream.split(); let stream_id = Uuid::new_v4(); - WRITE_STREAMS.lock().expect("could not aqcuire lock").insert(stream_id.clone(), write); + WRITE_STREAMS + .lock() + .expect("could not aqcuire lock") + .insert(stream_id.clone(), write); task::spawn(handle_stream(read, stream_id)); } @@ -41,53 +45,30 @@ pub async fn server(port: u16) -> Result<()> { } async fn handle_stream(mut stream: ReadHalf, stream_id: Uuid) -> Result<()> { - loop { - let packet = match Packet::read(&mut stream).await { - Ok(packet) => packet, - Err(err) => { - println!("error reading packet: {:?}", err); - return Ok(()); - } - }; - - let packet = if let Some(packet) = packet { - println!("got packet"); - packet - } else { - break; - }; - - match packet.packet_type { - PacketType::Message => { - let msg = Message::from_packet(packet)?; - println!("{:?}", msg); - task::spawn(relay_packet(msg)); - } - PacketType::Join => { - let join = Join::from_packet(packet)?; - println!("{:?}", join); - task::spawn(relay_packet(join)); - } - } - - println!("packet processed"); - } + loop {} println!("disconnecting"); - WRITE_STREAMS.lock().expect("failed to aqcuire lock").remove(&stream_id); + WRITE_STREAMS + .lock() + .expect("failed to aqcuire lock") + .remove(&stream_id); Ok(()) } -async fn relay_packet(packet: T) -> Result<()> { +/*async fn relay_packet(packet: T) -> Result<()> { let mut locked_write_streams = WRITE_STREAMS.lock().expect("failed to aqcuire lock"); let stream = futures::stream::iter(locked_write_streams.iter_mut()); let packet = &packet; stream.for_each_concurrent(None, |(_, mut stream)| async move { - let packet = packet.clone().to_packet().expect("failed to convert to packet"); + let packet = packet + .clone() + .to_packet() + .expect("failed to convert to packet"); // in case any of the writes fail just ignore them let _ = packet.write(&mut stream); }); Ok(()) } +*/