Verified Commit 236f33df authored by Katharina Fey's avatar Katharina Fey 🏴
Browse files

Refactoring ratman-types abstraction

The previous work done via !69 was incomplete in this matter.  This
commit fixes the abstraction for the ratman client-lib and attempts to
normalise the usage of `Recipient` and `Message` types
parent c34b531c
Pipeline #3178 passed with stages
in 7 minutes and 40 seconds
......@@ -2,7 +2,6 @@
//
// SPDX-License-Identifier: AGPL-3.0-or-later WITH LicenseRef-AppStore
use ratman_client::{Identity, RatmanIpc, Receive_Type};
use ratman_client::{RatmanIpc, Receive_Type};
#[async_std::main]
......
......@@ -5,7 +5,7 @@
//! Address resolution table module
use async_std::{
net::{Ipv6Addr, SocketAddrV6},
net::SocketAddrV6,
sync::{Arc, RwLock},
};
use std::collections::BTreeMap;
......@@ -68,6 +68,7 @@ impl AddrTable {
self.ips.read().await.get(&id).cloned()
}
#[allow(unused)]
pub(crate) async fn all(&self) -> Vec<SocketAddrV6> {
self.ips.read().await.values().cloned().collect()
}
......
......@@ -2,9 +2,6 @@
//
// SPDX-License-Identifier: AGPL-3.0-or-later WITH LicenseRef-AppStore
//! netmod-udp is a UDP overlay for Ratman
#![allow(warnings)]
#[macro_use]
extern crate tracing;
......@@ -19,9 +16,8 @@ pub(crate) use framing::{Envelope, FrameExt};
use async_std::{sync::Arc, task};
use async_trait::async_trait;
use netmod::{Endpoint as EndpointExt, Frame, Recipient, Result, Target};
use netmod::{Endpoint as EndpointExt, Frame, Result, Target};
use pnet::datalink::interfaces;
use std::net::ToSocketAddrs;
#[derive(Clone)]
pub struct Endpoint {
......@@ -57,7 +53,6 @@ impl EndpointExt for Endpoint {
let inner = bincode::serialize(&frame).unwrap();
let env = Envelope::Data(inner);
match target {
/// Sending to a user,
Target::Single(ref id) => {
self.socket
.send(&env, self.addrs.ip(*id).await.unwrap())
......
......@@ -3,12 +3,12 @@
use crate::{AddrTable, Envelope, FrameExt};
use async_std::{
future::{self, Future},
net::{IpAddr, Ipv6Addr, SocketAddr, SocketAddrV6, UdpSocket},
net::{Ipv6Addr, SocketAddr, SocketAddrV6, UdpSocket},
pin::Pin,
sync::{Arc, RwLock},
task::{self, Poll},
};
use netmod::{Frame, Target};
use netmod::Target;
use std::collections::VecDeque;
use std::ffi::CString;
use task_notify::Notify;
......@@ -74,12 +74,16 @@ impl Socket {
/// Send a multicast with an Envelope
pub(crate) async fn multicast(&self, env: &Envelope) {
self.sock
if let Err(e) = self
.sock
.send_to(
&env.as_bytes(),
SocketAddrV6::new(MULTI.clone(), self.port, 0, self.scope),
)
.await;
.await
{
error!("failed to multicast frame: {}", e);
}
}
pub(crate) async fn next(&self) -> FrameExt {
......@@ -123,7 +127,7 @@ impl Socket {
match arc.sock.local_addr() {
Ok(SocketAddr::V6(local)) if local == peer => continue,
Ok(_) => {}
data => {
_data => {
warn!("failed to verify local-loop integrety. this might caus issues!");
}
};
......@@ -169,6 +173,6 @@ fn test_init() {
let table = Arc::new(AddrTable::new());
let sock = Socket::new("br42", 12322, table).await;
println!("Multicasting");
sock.multicast(&Envelope::Announce);
sock.multicast(&Envelope::Announce).await;
});
}
......@@ -14,8 +14,8 @@
//! ## Version numbers
//!
//! The client library MAJOR and MINOR version follow a particular
//! Ratman release. So for example, version `0.3.0` of this crate is
//! built against version `0.3.0` of `ratmand`. Because Ratman itself
//! Ratman release. So for example, version `0.4.0` of this crate is
//! built against version `0.4.0` of `ratmand`. Because Ratman itself
//! follows semantic versioning, this crate is in turn also
//! semantically versioned.
//!
......@@ -35,14 +35,14 @@ use async_std::{
net::TcpStream,
task,
};
pub use types::{api::Receive_Type, Error, Identity, Result};
pub use types::{api::Receive_Type, Error, Identity, Message, Result, TimePair};
use types::{
api::{
self, ApiMessageEnum,
Peers_Type::{DISCOVER, RESP},
Setup_Type::ACK,
},
encode_message, parse_message, read_with_length, write_with_length, Message,
encode_message, parse_message, read_with_length, write_with_length,
};
/// An IPC handle for a particular address
......
......@@ -38,7 +38,7 @@ use crate::{Message, Recipient, Router};
use async_std::{net::TcpListener, task::spawn};
use state::{DaemonState, OnlineMap};
use tracing_subscriber::{filter::LevelFilter, fmt, EnvFilter};
use types::Result;
use types::{Recipient as TypesRecipient, Result};
pub use peers::attach_peers;
......@@ -101,8 +101,8 @@ async fn run_relay(r: Router, online: OnlineMap) {
id,
sender,
match recipient {
Recipient::User(id) => Some(id),
Recipient::Flood(_) => None,
Recipient::User(addr) => TypesRecipient::Standard(vec![addr]),
Recipient::Flood(ns) => TypesRecipient::Flood(ns),
},
payload,
format!("{:?}", timesig),
......
......@@ -17,7 +17,8 @@ use types::{
all_peers, api_peers, api_setup, online_ack, ApiMessageEnum, Peers, Peers_Type, Receive,
Send, Setup, Setup_Type, Setup_oneof__id,
},
encode_message, parse_message, write_with_length, Error as ParseError, Result as ParseResult,
encode_message, parse_message, write_with_length, Error as ParseError,
Recipient as TypesRecipient, Result as ParseResult,
};
async fn handle_send(r: &Router, online: &OnlineMap, _self: Identity, send: Send) -> Result<()> {
......@@ -38,8 +39,8 @@ async fn handle_send(r: &Router, online: &OnlineMap, _self: Identity, send: Send
*id,
*sender,
match recipient {
Recipient::User(id) => Some(*id),
Recipient::Flood(_) => None,
Recipient::User(id) => TypesRecipient::Standard(vec![*id]),
Recipient::Flood(ns) => TypesRecipient::Flood(*ns),
},
payload.clone(),
format!("{:?}", timesig),
......
......@@ -10,12 +10,14 @@ use types::api::{Send, Send_Type};
pub(crate) fn send_to_message(s: Send) -> Vec<Message> {
// Take the set of recipients from the message and turn it into a
// set of Ratman recipients
let recipients: Vec<_> = match s.field_type {
let recipient: Vec<_> = match s.field_type {
Send_Type::DEFAULT => s
.get_msg()
.recipients
.iter()
.map(|r| Recipient::User(Identity::from_bytes(&r)))
.get_recipient()
.get_std()
.get_standard()
.into_iter()
.map(|addr| Recipient::User(Identity::from_bytes(&addr)))
.collect(),
Send_Type::FLOOD => vec![Recipient::Flood(Identity::from_bytes(s.scope.as_slice()))],
};
......@@ -23,7 +25,7 @@ pub(crate) fn send_to_message(s: Send) -> Vec<Message> {
// Then create a new message for each recipient (if the type is
// "flood" then only a single message gets created)
recipients
recipient
.into_iter()
.map(|recipient| Message {
id: MsgId::random(),
......
......@@ -4,20 +4,32 @@
syntax = "proto3";
message StandardRecipient {
repeated bytes standard = 1;
}
// Recipient state information
message Recipient {
oneof inner {
StandardRecipient std = 1;
// A message gets generated for each recipient
bytes flood_scope = 2;
}
}
// A complete message type that maps onto a Ratman message
message Message {
// Message ID filled in by Ratman
optional bytes id = 1;
// Sender address information
bytes sender = 2;
// A group of recipients (this will be broken up into multiple
// messages)
repeated bytes recipients = 3;
// Recipient information
Recipient recipient = 3;
// Timestamp filled in by Ratman
optional string time = 4;
/////////////////////////
// Main message payload
bytes payload = 10;
// Payload signature
......
......@@ -17,7 +17,7 @@ pub mod api;
mod message;
mod timepair;
pub use message::Message;
pub use message::{Message, Recipient};
pub use timepair::TimePair;
pub use error::{Error, Result};
......
......@@ -2,11 +2,6 @@
//
// SPDX-License-Identifier: AGPL-3.0-or-later WITH LicenseRef-AppStore
//! A message is only ever addressed to a single node, or everyone on
//! the network. The signature is required to be present, if a
//! payload is. The payload can be empty, which can be used to create
//! a ping, or using the 16 byte id as payload. In these cases,
//! the sigature can also be empty.
use serde::{Deserialize, Serialize};
#[cfg(feature = "proto")]
......@@ -15,11 +10,39 @@ pub use crate::proto::message::Message as ProtoMessage;
use crate::timepair::TimePair;
use ratman_identity::Identity;
/// Specify the message recipient
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum Recipient {
Standard(Vec<Identity>),
Flood(Identity),
}
impl Recipient {
/// Create a standard message recipient
pub fn standard<T: Into<Vec<Identity>>>(addrs: T) -> Self {
Self::Standard(addrs.into())
}
/// Create a flood message recipient
pub fn flood(namespace: Identity) -> Self {
Self::Flood(namespace)
}
}
impl From<Vec<Identity>> for Recipient {
fn from(vec: Vec<Identity>) -> Self {
Self::Standard(vec)
}
}
/// Main Ratman message type
///
/// A message can either be addressed to a single recipient, or a
/// namespace on the network.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Message {
id: Identity,
sender: Identity,
recipients: Vec<Identity>,
recipient: Recipient,
time: TimePair,
payload: Vec<u8>,
signature: Vec<u8>,
......@@ -28,17 +51,17 @@ pub struct Message {
impl Message {
pub fn new(
sender: Identity,
recipients: Vec<Identity>,
recipient: impl Into<Recipient>,
payload: Vec<u8>,
signature: Vec<u8>,
) -> Self {
Message {
id: Identity::random(),
sender: sender,
recipients: recipients,
recipient: recipient.into(),
time: TimePair::sending(),
payload: payload,
signature: signature,
sender,
payload,
signature,
}
}
......@@ -55,7 +78,7 @@ impl Message {
pub fn received(
id: Identity,
sender: Identity,
recipient: Option<Identity>,
recipient: Recipient,
payload: Vec<u8>,
timesig: String,
sign: Vec<u8>,
......@@ -63,9 +86,28 @@ impl Message {
let mut inner = ProtoMessage::new();
inner.set_id(id.as_bytes().to_vec());
inner.set_sender(sender.as_bytes().to_vec());
if let Some(r) = recipient {
inner.set_recipients(vec![r.as_bytes().to_vec()].into());
use crate::proto::message::{Recipient as ProtoRecipient, StandardRecipient};
let mut r = ProtoRecipient::new();
match recipient {
Recipient::Standard(addrs) => {
let mut std_r = StandardRecipient::new();
std_r.set_standard(
addrs
.into_iter()
.map(|id| id.as_bytes().to_vec())
.collect::<Vec<_>>()
.into(),
);
r.set_std(std_r);
}
Recipient::Flood(ns) => {
r.set_flood_scope(ns.as_bytes().to_vec().into());
}
}
inner.set_recipient(r);
inner.set_time(timesig);
inner.set_payload(payload);
inner.set_signature(sign);
......@@ -77,16 +119,27 @@ impl Message {
/// Implement RAW `From` protobuf type message
#[cfg(feature = "proto")]
impl From<ProtoMessage> for Message {
fn from(msg: ProtoMessage) -> Self {
fn from(mut msg: ProtoMessage) -> Self {
use crate::proto::message::Recipient_oneof_inner;
let mut r = msg.take_recipient();
let recipient = match r.inner {
Some(Recipient_oneof_inner::std(ref mut std)) => Recipient::Standard(
std.take_standard()
.into_iter()
.map(|id| Identity::from_bytes(&id))
.collect(),
),
Some(Recipient_oneof_inner::flood_scope(ref id)) => {
Recipient::Flood(Identity::from_bytes(id))
}
_ => unreachable!(),
};
Message {
id: Identity::from_bytes(msg.get_id()),
sender: Identity::from_bytes(msg.get_id()),
recipients: msg
.get_recipients()
.iter()
.map(|r| Identity::from_bytes(r))
.collect::<Vec<Identity>>()
.into(),
recipient,
time: TimePair::from_string(msg.get_time()),
payload: msg.get_payload().to_vec(),
signature: msg.get_signature().to_vec(),
......@@ -100,13 +153,28 @@ impl From<Message> for ProtoMessage {
fn from(msg: Message) -> ProtoMessage {
let mut inner = ProtoMessage::new();
inner.set_sender(msg.sender.as_bytes().to_vec());
inner.set_recipients(
msg.recipients
.iter()
.map(|r| r.as_bytes().to_vec())
.collect::<Vec<_>>()
.into(),
);
use crate::proto::message::{Recipient as ProtoRecipient, StandardRecipient};
let mut r = ProtoRecipient::new();
match msg.recipient {
Recipient::Standard(addrs) => {
let mut std_r = StandardRecipient::new();
std_r.set_standard(
addrs
.into_iter()
.map(|id| id.as_bytes().to_vec())
.collect::<Vec<_>>()
.into(),
);
r.set_std(std_r);
}
Recipient::Flood(ns) => {
r.set_flood_scope(ns.as_bytes().to_vec().into());
}
}
inner.set_recipient(r);
inner.set_payload(msg.payload);
inner.set_signature(msg.signature);
inner
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment