Bläddra i källkod

removed packet module and most of the server/client code in preparation for using ilmp

master
Isabelle L. 5 år sedan
förälder
incheckning
088c9b9878
8 ändrade filer med 47 tillägg och 218 borttagningar
  1. +20
    -6
      Cargo.lock
  2. +1
    -0
      Cargo.toml
  3. +5
    -10
      src/client.rs
  4. +0
    -1
      src/lib.rs
  5. +0
    -93
      src/packet.rs
  6. +0
    -32
      src/packet/join.rs
  7. +0
    -36
      src/packet/message.rs
  8. +21
    -40
      src/server.rs

+ 20
- 6
Cargo.lock Visa fil

@@ -309,6 +309,19 @@ dependencies = [
"libc",
]

[[package]]
name = "ilmp"
version = "0.1.0"
dependencies = [
"chrono",
"futures",
"futures-util",
"orion",
"serde",
"serde_json",
"uuid",
]

[[package]]
name = "iovec"
version = "0.1.4"
@@ -435,6 +448,7 @@ dependencies = [
"chrono",
"futures",
"futures-util",
"ilmp",
"lazy_static",
"orion",
"serde",
@@ -651,18 +665,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"

[[package]]
name = "serde"
version = "1.0.107"
version = "1.0.110"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eba7550f2cdf88ffc23ab0f1607133486c390a8c0f89b57e589b9654ee15e04d"
checksum = "99e7b308464d16b56eba9964e4972a3eee817760ab60d88c3f86e1fecb08204c"
dependencies = [
"serde_derive",
]

[[package]]
name = "serde_derive"
version = "1.0.107"
version = "1.0.110"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10be45e22e5597d4b88afcc71f9d7bfadcd604bf0c78a3ab4582b8d2b37f39f3"
checksum = "818fbf6bfa9a42d3bfcaca148547aa00c7b915bec71d1757aa2d44ca68771984"
dependencies = [
"proc-macro2",
"quote",
@@ -671,9 +685,9 @@ dependencies = [

[[package]]
name = "serde_json"
version = "1.0.52"
version = "1.0.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7894c8ed05b7a3a279aeb79025fdec1d3158080b75b98a08faf2806bb799edd"
checksum = "993948e75b189211a9b31a7528f950c6adc21f9720b6438ff80a7fa2f864cea2"
dependencies = [
"itoa",
"ryu",


+ 1
- 0
Cargo.toml Visa fil

@@ -16,3 +16,4 @@ chrono = "0.4.11"
futures = "0.3.5"
toml = "0.5.6"
orion = "0.15.1"
ilmp = {path = "./ilmp"}

+ 5
- 10
src/client.rs Visa fil

@@ -1,6 +1,5 @@
// namespacing
use crate::config::ClientConfig as Config;
use crate::packet::{Join, Message, Packet, Sendable};
use crate::Result;
use async_std::net::TcpStream;
use futures_util::io::AsyncReadExt;
@@ -10,16 +9,12 @@ pub async fn client(port: u16) -> Result<()> {
let config = Config::load()?;

let stream = TcpStream::connect(format!("127.0.0.1:{}", &port)).await?;
println!("connection established to: {}:{}", stream.peer_addr()?.ip(), port);
println!(
"connection established to: {}:{}",
stream.peer_addr()?.ip(),
port
);
let (_read, mut write) = stream.split();

let join: Packet = Join::new(config.user).to_packet()?;
join.write(&mut write).await?;

// testing stuffs
let message: Packet =
Message::new("Isabelle".to_owned(), "Hello Server".to_owned()).to_packet()?;
message.write(&mut write).await?;

loop {}
}

+ 0
- 1
src/lib.rs Visa fil

@@ -1,7 +1,6 @@
// modules
mod client;
mod config;
mod packet;
mod server;

// re-exports


+ 0
- 93
src/packet.rs Visa fil

@@ -1,93 +0,0 @@
// namespacing
use crate::Result;
use async_std::net::TcpStream;
use async_std::prelude::*;
use futures_util::io::{ReadHalf, WriteHalf};
use std::convert::TryInto;

mod join;
pub use join::Join;
mod message;
pub use message::Message;

/// structured [packet type byte][four bytes of packet length][contents of packet]
pub struct NetworkPacket(Vec<u8>);

impl std::convert::Into<NetworkPacket> for Packet {
fn into(self) -> NetworkPacket {
let mut contents: Vec<u8> = Vec::new();

// packet type byte
contents.push(self.packet_type as u8);
// write the packet length
let contents_length = self.packet_contents.len() as u32;
contents.extend_from_slice(&contents_length.to_le_bytes());
// write the rest of the contents
contents.extend_from_slice(&self.packet_contents);

NetworkPacket(contents)
}
}

pub trait Sendable: Sized {
fn to_packet(self) -> Result<Packet>;
fn from_packet(packet: Packet) -> Result<Self>;
}

/// contains data to be turned into a network packet or into a more specific packet
#[derive(Debug, Clone)]
pub struct Packet {
pub packet_type: PacketType,
packet_contents: Vec<u8>,
}

impl Packet {
/// create a new packet
pub fn new(packet_type: PacketType, packet_contents: Vec<u8>) -> Self {
Self { packet_type, packet_contents }
}

/// read a packet from a tcpstream
pub async fn read(stream: &mut ReadHalf<TcpStream>) -> Result<Option<Packet>> {
let mut info_buf = [0u8; 5];
let check = stream.read(&mut info_buf).await?;
if check == 0 {
return Ok(None);
}

let packet_type = PacketType::from_u8(info_buf[0]).unwrap();

let length = u32::from_le_bytes(info_buf[1..5].try_into().unwrap()) as usize;

let mut contents: Vec<u8> = vec![0; length];
stream.read(&mut contents).await?;

Ok(Some(Packet::new(packet_type, contents)))
}

/// write a packet to the tcpstream
pub async fn write(self, stream: &mut WriteHalf<TcpStream>) -> Result<()> {
let network_packet: NetworkPacket = self.into();
stream.write(&network_packet.0).await?;
Ok(())
}
}

/// represent the specific packet type
#[derive(Debug, Clone)]
#[repr(u8)]
pub enum PacketType {
Message = 0,
Join = 1,
}

impl PacketType {
/// returns the PacketType if the u8 is a valid packet type
pub fn from_u8(packet_type: u8) -> Option<Self> {
match packet_type {
0 => Some(Self::Message),
1 => Some(Self::Join),
_ => None,
}
}
}

+ 0
- 32
src/packet/join.rs Visa fil

@@ -1,32 +0,0 @@
use crate::packet::{Packet, PacketType};
use crate::Result;
use chrono::prelude::*;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct Join {
user: String,
timestamp: i64,
}

impl Join {
pub fn new(user: String) -> Self {
let timestamp = Utc::now().timestamp();
Self { user, timestamp }
}
}

impl crate::packet::Sendable for Join {
fn to_packet(self) -> Result<Packet> {
let packet_contents: Vec<u8> = serde_json::to_string(&self)?.into_bytes();
let packet_type = PacketType::Join;
Ok(Packet { packet_type, packet_contents })
}

fn from_packet(packet: Packet) -> Result<Self> {
let packet_contents =
&String::from_utf8(packet.packet_contents).expect("could not decode as utf8");
let join: Join = serde_json::from_str(packet_contents)?;
Ok(join)
}
}

+ 0
- 36
src/packet/message.rs Visa fil

@@ -1,36 +0,0 @@
// namespacing
use crate::packet::{Packet, PacketType};
use crate::Result;
use chrono::prelude::*;
use serde::{Deserialize, Serialize};

/// a Message
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct Message {
user: String,
contents: String,
timestamp: i64,
}

impl Message {
/// create a new message
pub fn new(user: String, contents: String) -> Self {
let timestamp = Utc::now().timestamp();
Self { user, contents, timestamp }
}
}

impl crate::packet::Sendable for Message {
fn to_packet(self) -> Result<Packet> {
let packet_contents: Vec<u8> = serde_json::to_string(&self)?.into_bytes();
let packet_type = PacketType::Message;
Ok(Packet { packet_type, packet_contents })
}

fn from_packet(packet: Packet) -> Result<Self> {
let packet_contents =
&String::from_utf8(packet.packet_contents).expect("could not decode as utf8");
let message: Message = serde_json::from_str(packet_contents)?;
Ok(message)
}
}

+ 21
- 40
src/server.rs Visa fil

@@ -1,8 +1,5 @@
// namespacing
use crate::{
packet::{Join, Message, Packet, PacketType, Sendable},
Result,
};
use crate::Result;
use async_std::{
net::{TcpListener, TcpStream},
task,
@@ -21,7 +18,11 @@ lazy_static! {
/// wraps the server
pub async fn server(port: u16) -> Result<()> {
let listener = TcpListener::bind(format!("127.0.0.1:{}", &port)).await?;
println!("online as server at: {}:{}", listener.local_addr()?.ip(), port);
println!(
"online as server at: {}:{}",
listener.local_addr()?.ip(),
port
);
let mut incoming = listener.incoming();

while let Some(stream) = incoming.next().await {
@@ -33,7 +34,10 @@ pub async fn server(port: u16) -> Result<()> {
let (read, write) = stream.split();
let stream_id = Uuid::new_v4();

WRITE_STREAMS.lock().expect("could not aqcuire lock").insert(stream_id.clone(), write);
WRITE_STREAMS
.lock()
.expect("could not aqcuire lock")
.insert(stream_id.clone(), write);
task::spawn(handle_stream(read, stream_id));
}

@@ -41,53 +45,30 @@ pub async fn server(port: u16) -> Result<()> {
}

async fn handle_stream(mut stream: ReadHalf<TcpStream>, stream_id: Uuid) -> Result<()> {
loop {
let packet = match Packet::read(&mut stream).await {
Ok(packet) => packet,
Err(err) => {
println!("error reading packet: {:?}", err);
return Ok(());
}
};

let packet = if let Some(packet) = packet {
println!("got packet");
packet
} else {
break;
};

match packet.packet_type {
PacketType::Message => {
let msg = Message::from_packet(packet)?;
println!("{:?}", msg);
task::spawn(relay_packet(msg));
}
PacketType::Join => {
let join = Join::from_packet(packet)?;
println!("{:?}", join);
task::spawn(relay_packet(join));
}
}

println!("packet processed");
}
loop {}
println!("disconnecting");

WRITE_STREAMS.lock().expect("failed to aqcuire lock").remove(&stream_id);
WRITE_STREAMS
.lock()
.expect("failed to aqcuire lock")
.remove(&stream_id);

Ok(())
}

async fn relay_packet<T: Clone + Sendable>(packet: T) -> Result<()> {
/*async fn relay_packet<T: Clone + Sendable>(packet: T) -> Result<()> {
let mut locked_write_streams = WRITE_STREAMS.lock().expect("failed to aqcuire lock");
let stream = futures::stream::iter(locked_write_streams.iter_mut());

let packet = &packet;
stream.for_each_concurrent(None, |(_, mut stream)| async move {
let packet = packet.clone().to_packet().expect("failed to convert to packet");
let packet = packet
.clone()
.to_packet()
.expect("failed to convert to packet");
// in case any of the writes fail just ignore them
let _ = packet.write(&mut stream);
});
Ok(())
}
*/

Laddar…
Avbryt
Spara