From 6c83a2c5d0340cec75005576382ede44c079b471 Mon Sep 17 00:00:00 2001 From: Isabelle L Date: Sun, 10 May 2020 23:21:11 -0500 Subject: [PATCH] idfk bro i'm out of it atm --- Cargo.lock | 10 +++++ Cargo.toml | 1 + client_config.toml | 1 + src/client.rs | 2 +- src/config.rs | 15 ++++++++ src/lib.rs | 3 +- src/packet.rs | 64 +++++++++++--------------------- src/packet/join.rs | 38 +++++++++++++++++++ src/packet/message.rs | 42 +++++++++++++++++++++ src/server.rs | 85 +++++++++++++++++++++++++++++++------------ 10 files changed, 193 insertions(+), 68 deletions(-) create mode 100644 client_config.toml create mode 100644 src/config.rs create mode 100644 src/packet/join.rs create mode 100644 src/packet/message.rs diff --git a/Cargo.lock b/Cargo.lock index 52a31af..f944257 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -433,6 +433,7 @@ dependencies = [ "serde", "serde_json", "structopt", + "toml", "uuid", ] @@ -737,6 +738,15 @@ dependencies = [ "winapi 0.3.8", ] +[[package]] +name = "toml" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffc92d160b1eef40665be3a05630d003936a3bc7da7421277846c2613e92c71a" +dependencies = [ + "serde", +] + [[package]] name = "unicode-segmentation" version = "1.6.0" diff --git a/Cargo.toml b/Cargo.toml index 87912f7..a6d380b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,4 @@ uuid = { version = "0.8.1", features = ["v4"] } lazy_static = "1.4.0" futures = "0.3.5" futures-util = "0.3.5" +toml = "0.5.6" diff --git a/client_config.toml b/client_config.toml new file mode 100644 index 0000000..d052b8c --- /dev/null +++ b/client_config.toml @@ -0,0 +1 @@ +username="Isabelle" \ No newline at end of file diff --git a/src/client.rs b/src/client.rs index 17e49de..702aa33 100644 --- a/src/client.rs +++ b/src/client.rs @@ -14,5 +14,5 @@ pub async fn client(port: u16) -> Result<()> { Message::new("Isabelle".to_owned(), "Hello Server".to_owned()).try_into()?; message.write(&mut stream).await?; - Ok(()) + loop {} } diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..c58fa16 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,15 @@ +use crate::Result; +use serde::Deserialize; + +#[derive(Deserialize)] +pub struct ClientConfig { + user: String, +} + +impl ClientConfig { + pub fn load() -> Result { + let config = std::fs::read_to_string("./client_config.toml")?; + let config: ClientConfig = toml::from_str(&config)?; + Ok(config) + } +} diff --git a/src/lib.rs b/src/lib.rs index 8fc3ddc..b386b5f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ // modules mod client; +mod config; #[allow(dead_code)] mod packet; mod server; @@ -9,5 +10,5 @@ pub use client::client; pub use server::server; // lazy idiot error/result type -pub type Error = Box; +pub type Error = std::io::Error; pub type Result = std::result::Result; diff --git a/src/packet.rs b/src/packet.rs index 1faf4e2..bcfb9bd 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -1,11 +1,15 @@ // namespacing -use crate::{Error, Result}; +use crate::Result; use async_std::net::TcpStream; use async_std::prelude::*; -use chrono::prelude::*; -use serde::{Deserialize, Serialize}; +use futures_util::io::ReadHalf; use std::convert::TryInto; +mod join; +pub use join::Join; +mod message; +pub use message::Message; + /// structured [packet type byte][four bytes of packet length][contents of packet] pub struct NetworkPacket(Vec); @@ -25,6 +29,11 @@ impl std::convert::Into for Packet { } } +pub trait Sendable { + fn to_packet(self) -> Packet; + fn from_packet(packet: Packet) -> Self; +} + /// contains data to be turned into a network packet or into a more specific packet pub struct Packet { pub packet_type: PacketType, @@ -38,9 +47,13 @@ impl Packet { } /// read a packet from a tcpstream - pub async fn read(stream: &mut TcpStream) -> Result { + pub async fn read(stream: &mut ReadHalf) -> Result> { let mut info_buf = [0u8; 5]; - stream.read(&mut info_buf).await?; + let check = stream.read(&mut info_buf).await?; + if check == 0 { + return Ok(None); + } + let packet_type = PacketType::from_u8(info_buf[0]).unwrap(); let length = u32::from_le_bytes(info_buf[1..5].try_into().unwrap()) as usize; @@ -48,13 +61,13 @@ impl Packet { let mut contents: Vec = vec![0; length]; stream.read(&mut contents).await?; - Ok(Packet::new(packet_type, contents)) + Ok(Some(Packet::new(packet_type, contents))) } /// write a packet to the tcpstream pub async fn write(self, stream: &mut TcpStream) -> Result<()> { let network_packet: NetworkPacket = self.into(); - let _ = stream.write(&network_packet.0).await?; + stream.write(&network_packet.0).await?; Ok(()) } } @@ -63,6 +76,7 @@ impl Packet { #[repr(u8)] pub enum PacketType { Message = 0, + Join = 1, } impl PacketType { @@ -74,39 +88,3 @@ impl PacketType { } } } - -/// a Message -#[derive(Deserialize, Serialize, Debug)] -pub struct Message { - user: String, - contents: String, - timestamp: i64, -} - -impl Message { - /// create a new message - pub fn new(user: String, contents: String) -> Self { - let timestamp = Utc::now().timestamp(); - Self { user, contents, timestamp } - } -} - -impl std::convert::TryFrom for Message { - type Error = Error; - - fn try_from(packet: Packet) -> Result { - let packet_contents = &String::from_utf8(packet.packet_contents)?; - let message: Message = serde_json::from_str(packet_contents)?; - Ok(message) - } -} - -impl std::convert::TryInto for Message { - type Error = Error; - - fn try_into(self) -> Result { - let packet_contents: Vec = serde_json::to_string(&self)?.into_bytes(); - let packet_type = PacketType::Message; - Ok(Packet { packet_type, packet_contents }) - } -} diff --git a/src/packet/join.rs b/src/packet/join.rs new file mode 100644 index 0000000..545fa69 --- /dev/null +++ b/src/packet/join.rs @@ -0,0 +1,38 @@ +use crate::packet::{Packet, PacketType}; +use crate::{Error, Result}; +use chrono::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize, Debug)] +pub struct Join { + user: String, + timestamp: i64, +} + +impl Join { + fn new(user: String) -> Self { + let timestamp = Utc::now().timestamp(); + Self { user, timestamp } + } +} + +impl std::convert::TryFrom for Join { + type Error = Error; + + fn try_from(packet: Packet) -> Result { + let packet_contents = + &String::from_utf8(packet.packet_contents).expect("could not decode as utf8"); + let message: Join = serde_json::from_str(packet_contents)?; + Ok(message) + } +} + +impl std::convert::TryInto for Join { + type Error = Error; + + fn try_into(self) -> Result { + let packet_contents: Vec = serde_json::to_string(&self)?.into_bytes(); + let packet_type = PacketType::Join; + Ok(Packet { packet_type, packet_contents }) + } +} diff --git a/src/packet/message.rs b/src/packet/message.rs new file mode 100644 index 0000000..8d23b80 --- /dev/null +++ b/src/packet/message.rs @@ -0,0 +1,42 @@ +// namespacing +use crate::packet::{Packet, PacketType}; +use crate::{Error, Result}; +use chrono::prelude::*; +use serde::{Deserialize, Serialize}; + +/// a Message +#[derive(Deserialize, Serialize, Debug)] +pub struct Message { + user: String, + contents: String, + timestamp: i64, +} + +impl Message { + /// create a new message + pub fn new(user: String, contents: String) -> Self { + let timestamp = Utc::now().timestamp(); + Self { user, contents, timestamp } + } +} + +impl std::convert::TryFrom for Message { + type Error = Error; + + fn try_from(packet: Packet) -> Result { + let packet_contents = + &String::from_utf8(packet.packet_contents).expect("could not decode as utf8"); + let message: Message = serde_json::from_str(packet_contents)?; + Ok(message) + } +} + +impl std::convert::TryInto for Message { + type Error = Error; + + fn try_into(self) -> Result { + let packet_contents: Vec = serde_json::to_string(&self)?.into_bytes(); + let packet_type = PacketType::Message; + Ok(Packet { packet_type, packet_contents }) + } +} diff --git a/src/server.rs b/src/server.rs index 6dce178..7427ac3 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,15 +1,17 @@ // namespacing -/*use crate::packet::{Message, Packet, PacketType};*/ -use crate::Result; -use async_std::net::{TcpListener, TcpStream}; -use async_std::prelude::*; -/*use async_std::task;*/ -use futures::io::WriteHalf; -use lazy_static::lazy_static; -use std::collections::HashMap; -/*use std::convert::TryFrom;*/ +use crate::{ + packet::{Join, Message, Packet, PacketType}, + Result, +}; +use async_std::{ + net::{TcpListener, TcpStream}, + prelude::*, + task, +}; +use futures::io::{ReadHalf, WriteHalf}; use futures_util::io::AsyncReadExt; -use std::sync::Mutex; +use lazy_static::lazy_static; +use std::{collections::HashMap, convert::TryFrom, sync::Mutex}; use uuid::Uuid; lazy_static! { @@ -24,23 +26,60 @@ pub async fn server(port: u16) -> Result<()> { let mut incoming = listener.incoming(); while let Some(stream) = incoming.next().await { - let mut stream = stream?; - println!("new stream from: {}", &stream.peer_addr()?.ip()); + let stream = stream?; + let stream_addr = &stream.peer_addr()?.ip(); + + println!("new stream from: {}", &stream_addr); + let (read, write) = stream.split(); let stream_id = Uuid::new_v4(); - WRITE_STREAMS.lock()?.insert(stream_id, write); - - // handle stream - /* task::spawn(async { - loop { - let packet = Packet::read(&mut stream).await?; - let message = match packet.packet_type { - PacketType::Message => Message::try_from(packet), - }; - println!("{:?}", message); + + WRITE_STREAMS.lock().expect("could not aqcuire lock").insert(stream_id.clone(), write); + task::spawn(handle_stream(read, stream_id)); + } + + Ok(()) +} + +async fn handle_stream(mut stream: ReadHalf, stream_id: Uuid) -> Result<()> { + loop { + let packet = match Packet::read(&mut stream).await { + Ok(packet) => packet, + Err(err) => { + println!("error reading packet: {:?}", err); + return Ok(()); + } + }; + + let packet = if let Some(packet) = packet { + println!("got packet"); + packet + } else { + break; + }; + + match packet.packet_type { + PacketType::Message => { + let msg = Message::try_from(packet)?; + println!("{:?}", msg); + } + PacketType::Join => { + let join = Join::try_from(packet)?; + println!("{:?}", join); } - });*/ + } + + println!("packet processed"); } + println!("disconnecting"); + + WRITE_STREAMS.lock().expect("failed to aqcuire lock").remove(&stream_id); Ok(()) } + +/* +async fn relay_packet() -> Result<()> { + let locked_stream = WRITE_STREAMS.lock(). + +}*/