It now binds and sends packets. They are not correct but they are sent.

This commit is contained in:
Adam Ierymenko 2022-06-20 15:11:01 -04:00
parent 51817ed557
commit 36a105ecbf
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
8 changed files with 140 additions and 66 deletions

View file

@ -18,7 +18,7 @@ impl Poly1305 {
#[inline(always)] #[inline(always)]
pub fn update(&mut self, data: &[u8]) { pub fn update(&mut self, data: &[u8]) {
self.0.update(poly1305::Block::from_slice(data)); self.0.update_padded(data);
} }
#[inline(always)] #[inline(always)]

View file

@ -30,7 +30,7 @@ impl ToString for Event {
fn to_string(&self) -> String { fn to_string(&self) -> String {
match self { match self {
Event::Online(online) => format!("[vl1] online == {}", online), Event::Online(online) => format!("[vl1] online == {}", online),
Event::Debug(l, f, m) => format!("[debug] {}:{} {}", l, f, m), Event::Debug(l, f, m) => format!("[debug] {}:{} {}", l.split("/").last().unwrap(), f, m),
Event::SecurityWarning(w) => format!("[global] security warning: {}", w), Event::SecurityWarning(w) => format!("[global] security warning: {}", w),
Event::FatalError(e) => format!("[global] FATAL: {}", e), Event::FatalError(e) => format!("[global] FATAL: {}", e),
Event::IdentityAutoGenerated(id) => format!("[vl1] identity auto-generated: {}", id.to_string()), Event::IdentityAutoGenerated(id) => format!("[vl1] identity auto-generated: {}", id.to_string()),

View file

@ -85,11 +85,7 @@ impl<I: Interface> NetworkHypervisor<I> {
} }
/// Call add_update_default_root_set if there are no roots defined, otherwise do nothing and return false. /// Call add_update_default_root_set if there are no roots defined, otherwise do nothing and return false.
pub fn add_update_default_root_set_if_none(&self) -> bool { pub fn add_update_default_root_set_if_none(&self) {
if self.vl1.has_roots_defined() { assert!(self.add_update_default_root_set());
false
} else {
self.add_update_default_root_set()
}
} }
} }

View file

@ -10,15 +10,15 @@ pub use zerotier_core_crypto::varint;
pub(crate) const ZEROES: [u8; 64] = [0_u8; 64]; pub(crate) const ZEROES: [u8; 64] = [0_u8; 64];
#[cfg(target_feature = "debug_events")] #[cfg(feature = "debug_events")]
#[allow(unused_macros)] #[allow(unused_macros)]
macro_rules! debug_event { macro_rules! debug_event {
($si:expr, $fmt:expr $(, $($arg:tt)*)?) => { ($si:expr, $fmt:expr $(, $($arg:tt)*)?) => {
$si.event(crate::Event::Debug(file!(), line!(), format!($fmt, $($($arg)*)?))).await; $si.event(crate::Event::Debug(file!(), line!(), format!($fmt, $($($arg)*)?)));
} }
} }
#[cfg(not(target_feature = "debug_events"))] #[cfg(not(feature = "debug_events"))]
#[allow(unused_macros)] #[allow(unused_macros)]
macro_rules! debug_event { macro_rules! debug_event {
($si:expr, $fmt:expr $(, $($arg:tt)*)?) => {}; ($si:expr, $fmt:expr $(, $($arg:tt)*)?) => {};

View file

@ -32,7 +32,10 @@ pub trait SystemInterface: Sync + Send + 'static {
type LocalInterface: Sync + Send + Sized + Hash + PartialEq + Eq + Clone + ToString; type LocalInterface: Sync + Send + Sized + Hash + PartialEq + Eq + Clone + ToString;
/// An event occurred. /// An event occurred.
async fn event(&self, event: Event); ///
/// This isn't async to avoid all kinds of issues in code that deals with locks. If you need
/// it to be async use a channel or something.
fn event(&self, event: Event);
/// A USER_MESSAGE packet was received. /// A USER_MESSAGE packet was received.
async fn user_message(&self, source: &Identity, message_type: u64, message: &[u8]); async fn user_message(&self, source: &Identity, message_type: u64, message: &[u8]);
@ -123,15 +126,17 @@ const ROOT_SYNC_INTERVAL_MS: i64 = 1000;
struct BackgroundTaskIntervals { struct BackgroundTaskIntervals {
root_sync: IntervalGate<{ ROOT_SYNC_INTERVAL_MS }>, root_sync: IntervalGate<{ ROOT_SYNC_INTERVAL_MS }>,
root_hello: IntervalGate<{ ROOT_HELLO_INTERVAL }>, root_hello: IntervalGate<{ ROOT_HELLO_INTERVAL }>,
peers: IntervalGate<{ crate::vl1::peer::SERVICE_INTERVAL_MS }>, root_spam_hello: IntervalGate<{ ROOT_HELLO_SPAM_INTERVAL }>,
paths: IntervalGate<{ crate::vl1::path::SERVICE_INTERVAL_MS }>, peer_service: IntervalGate<{ crate::vl1::peer::SERVICE_INTERVAL_MS }>,
whois: IntervalGate<{ crate::vl1::whoisqueue::SERVICE_INTERVAL_MS }>, path_service: IntervalGate<{ crate::vl1::path::SERVICE_INTERVAL_MS }>,
whois_service: IntervalGate<{ crate::vl1::whoisqueue::SERVICE_INTERVAL_MS }>,
} }
struct RootInfo<SI: SystemInterface> { struct RootInfo<SI: SystemInterface> {
roots: HashMap<Arc<Peer<SI>>, Vec<Endpoint>>, roots: HashMap<Arc<Peer<SI>>, Vec<Endpoint>>,
sets: HashMap<String, RootSet>, sets: HashMap<String, RootSet>,
sets_modified: bool, sets_modified: bool,
online: bool,
} }
enum PathKey<'a, SI: SystemInterface> { enum PathKey<'a, SI: SystemInterface> {
@ -224,7 +229,7 @@ impl<SI: SystemInterface> Node<SI> {
return Err(InvalidParameterError("no identity found and auto-generate not enabled")); return Err(InvalidParameterError("no identity found and auto-generate not enabled"));
} else { } else {
let id = Identity::generate(); let id = Identity::generate();
si.event(Event::IdentityAutoGenerated(id.clone())).await; si.event(Event::IdentityAutoGenerated(id.clone()));
si.save_node_identity(&id).await; si.save_node_identity(&id).await;
id id
} }
@ -237,10 +242,12 @@ impl<SI: SystemInterface> Node<SI> {
let old = id.clone(); let old = id.clone();
if id.upgrade()? { if id.upgrade()? {
si.save_node_identity(&id).await; si.save_node_identity(&id).await;
si.event(Event::IdentityAutoUpgraded(old, id.clone())).await; si.event(Event::IdentityAutoUpgraded(old, id.clone()));
} }
} }
debug_event!(si, "[vl1] loaded identity {}", id.to_string());
Ok(Self { Ok(Self {
instance_id: zerotier_core_crypto::random::get_bytes_secure(), instance_id: zerotier_core_crypto::random::get_bytes_secure(),
identity: id, identity: id,
@ -251,6 +258,7 @@ impl<SI: SystemInterface> Node<SI> {
roots: HashMap::new(), roots: HashMap::new(),
sets: HashMap::new(), sets: HashMap::new(),
sets_modified: false, sets_modified: false,
online: false,
}), }),
best_root: RwLock::new(None), best_root: RwLock::new(None),
whois: WhoisQueue::new(), whois: WhoisQueue::new(),
@ -267,13 +275,79 @@ impl<SI: SystemInterface> Node<SI> {
self.peers.read().get(&a).cloned() self.peers.read().get(&a).cloned()
} }
pub fn is_online(&self) -> bool {
self.roots.read().online
}
fn update_best_root(&self, si: &SI, time_ticks: i64) {
let roots = self.roots.read();
// The best root is the one that has replied to a HELLO most recently. Since we send HELLOs in unison
// this is a proxy for latency and also causes roots that fail to reply to drop out quickly.
let mut best: Option<&Arc<Peer<SI>>> = None;
let mut latest_hello_reply = 0;
for (r, _) in roots.roots.iter() {
let t = r.last_hello_reply_time_ticks.load(Ordering::Relaxed);
if t > latest_hello_reply {
latest_hello_reply = t;
let _ = best.insert(r);
}
}
if let Some(best) = best {
let mut best_root = self.best_root.write();
if let Some(best_root) = best_root.as_mut() {
if !Arc::ptr_eq(best_root, best) {
debug_event!(si, "[vl1] new best root: {} (replaced {})", best.identity.address.to_string(), best_root.identity.address.to_string());
*best_root = best.clone();
}
} else {
debug_event!(si, "[vl1] new best root: {} (was empty)", best.identity.address.to_string());
let _ = best_root.insert(best.clone());
}
} else {
if let Some(old_best) = self.best_root.write().take() {
debug_event!(si, "[vl1] new best root: NONE (replaced {})", old_best.identity.address.to_string());
}
}
// Determine if the node is online by whether there is a currently reachable root.
if (time_ticks - latest_hello_reply) < (ROOT_HELLO_INTERVAL * 2) && best.is_some() {
if !roots.online {
drop(roots);
self.roots.write().online = true;
si.event(Event::Online(true));
}
} else if roots.online {
drop(roots);
self.roots.write().online = false;
si.event(Event::Online(false));
}
}
pub async fn do_background_tasks(&self, si: &SI) -> Duration { pub async fn do_background_tasks(&self, si: &SI) -> Duration {
let tt = si.time_ticks(); let tt = si.time_ticks();
let (root_sync, root_hello, peer_check, path_check, whois_check) = { let (root_sync, root_hello, mut root_spam_hello, peer_service, path_service, whois_service) = {
let mut intervals = self.intervals.lock(); let mut intervals = self.intervals.lock();
(intervals.root_sync.gate(tt), intervals.root_hello.gate(tt), intervals.peers.gate(tt), intervals.paths.gate(tt), intervals.whois.gate(tt)) (intervals.root_sync.gate(tt), intervals.root_hello.gate(tt), intervals.root_spam_hello.gate(tt), intervals.peer_service.gate(tt), intervals.path_service.gate(tt), intervals.whois_service.gate(tt))
}; };
// We only "spam" if we are offline.
if root_spam_hello && self.is_online() {
root_spam_hello = false;
}
debug_event!(
si,
"[vl1] do_background_tasks:{}{}{}{}{}{}",
if root_sync { " root_sync" } else { "" },
if root_hello { " root_hello" } else { "" },
if root_spam_hello { " root_spam_hello" } else { "" },
if peer_service { " peer_service" } else { "" },
if path_service { " path_service" } else { "" },
if whois_service { " whois_service" } else { "" },
);
if root_sync { if root_sync {
if { if {
let mut roots = self.roots.write(); let mut roots = self.roots.write();
@ -284,6 +358,8 @@ impl<SI: SystemInterface> Node<SI> {
false false
} }
} { } {
debug_event!(si, "[vl1] root sets modified, synchronizing internal data structures");
let (mut old_root_identities, address_collisions, new_roots, bad_identities) = { let (mut old_root_identities, address_collisions, new_roots, bad_identities) = {
let roots = self.roots.read(); let roots = self.roots.read();
@ -309,11 +385,12 @@ impl<SI: SystemInterface> Node<SI> {
for (_, rc) in roots.sets.iter() { for (_, rc) in roots.sets.iter() {
for m in rc.members.iter() { for m in rc.members.iter() {
if m.endpoints.is_some() && !address_collisions.contains(&m.identity.address) { if m.endpoints.is_some() && !address_collisions.contains(&m.identity.address) {
debug_event!(si, "[vl1] examining root {} with {} endpoints", m.identity.address.to_string(), m.endpoints.as_ref().map_or(0, |e| e.len()));
let peers = self.peers.upgradable_read(); let peers = self.peers.upgradable_read();
if let Some(peer) = peers.get(&m.identity.address) { if let Some(peer) = peers.get(&m.identity.address) {
new_roots.insert(peer.clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect()); new_roots.insert(peer.clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect());
} else { } else {
if let Some(peer) = Peer::<SI>::new(&self.identity, m.identity.clone(), si.time_clock()) { if let Some(peer) = Peer::<SI>::new(&self.identity, m.identity.clone(), si.time_clock(), tt) {
new_roots.insert(parking_lot::RwLockUpgradableReadGuard::upgrade(peers).entry(m.identity.address).or_insert_with(|| Arc::new(peer)).clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect()); new_roots.insert(parking_lot::RwLockUpgradableReadGuard::upgrade(peers).entry(m.identity.address).or_insert_with(|| Arc::new(peer)).clone(), m.endpoints.as_ref().unwrap().iter().cloned().collect());
} else { } else {
bad_identities.push(m.identity.clone()); bad_identities.push(m.identity.clone());
@ -327,10 +404,10 @@ impl<SI: SystemInterface> Node<SI> {
}; };
for c in address_collisions.iter() { for c in address_collisions.iter() {
si.event(Event::SecurityWarning(format!("address/identity collision in root sets! address {} collides across root sets or with an existing peer and is being ignored as a root!", c.to_string()))).await; si.event(Event::SecurityWarning(format!("address/identity collision in root sets! address {} collides across root sets or with an existing peer and is being ignored as a root!", c.to_string())));
} }
for i in bad_identities.iter() { for i in bad_identities.iter() {
si.event(Event::SecurityWarning(format!("bad identity detected for address {} in at least one root set, ignoring (error creating peer object)", i.address.to_string()))).await; si.event(Event::SecurityWarning(format!("bad identity detected for address {} in at least one root set, ignoring (error creating peer object)", i.address.to_string())));
} }
let mut new_root_identities: Vec<Identity> = new_roots.iter().map(|(p, _)| p.identity.clone()).collect(); let mut new_root_identities: Vec<Identity> = new_roots.iter().map(|(p, _)| p.identity.clone()).collect();
@ -338,37 +415,19 @@ impl<SI: SystemInterface> Node<SI> {
old_root_identities.sort_unstable(); old_root_identities.sort_unstable();
new_root_identities.sort_unstable(); new_root_identities.sort_unstable();
if !old_root_identities.eq(&new_root_identities) { if !old_root_identities.eq(&new_root_identities) {
let mut best: Option<Arc<Peer<SI>>> = None; self.roots.write().roots = new_roots;
si.event(Event::UpdatedRoots(old_root_identities, new_root_identities));
{
let mut roots = self.roots.write();
roots.roots = new_roots;
// The best root is the one that has replied to a HELLO most recently. Since we send HELLOs in unison
// this is a proxy for latency and also causes roots that fail to reply to drop out quickly.
let mut latest_hello_reply = 0;
for (r, _) in roots.roots.iter() {
let t = r.last_hello_reply_time_ticks.load(Ordering::Relaxed);
if t >= latest_hello_reply {
latest_hello_reply = t;
let _ = best.insert(r.clone());
}
}
}
*(self.best_root.write()) = best;
//debug_event!(si, "new best root: {}", best.as_ref().map_or("none".into(), |p| p.identity.address.to_string()));
//si.event(Event::UpdatedRoots(old_root_identities, new_root_identities)).await;
} }
} }
self.update_best_root(si, tt);
} }
// Say HELLO to all roots periodically. For roots we send HELLO to every single endpoint // Say HELLO to all roots periodically. For roots we send HELLO to every single endpoint
// they have, which is a behavior that differs from normal peers. This allows roots to // they have, which is a behavior that differs from normal peers. This allows roots to
// e.g. see our IPv4 and our IPv6 address which can be important for us to learn our // e.g. see our IPv4 and our IPv6 address which can be important for us to learn our
// external addresses from them. // external addresses from them.
if root_hello { if root_hello || root_spam_hello {
let roots = { let roots = {
let roots = self.roots.read(); let roots = self.roots.read();
let mut roots_copy = Vec::with_capacity(roots.roots.len()); let mut roots_copy = Vec::with_capacity(roots.roots.len());
@ -385,7 +444,7 @@ impl<SI: SystemInterface> Node<SI> {
} }
} }
if peer_check { if peer_service {
// Service all peers, removing any whose service() method returns false AND that are not // Service all peers, removing any whose service() method returns false AND that are not
// roots. Roots on the other hand remain in the peer list as long as they are roots. // roots. Roots on the other hand remain in the peer list as long as they are roots.
let mut dead_peers = Vec::new(); let mut dead_peers = Vec::new();
@ -402,7 +461,7 @@ impl<SI: SystemInterface> Node<SI> {
} }
} }
if path_check { if path_service {
// Service all paths, removing expired or invalid ones. This is done in two passes to // Service all paths, removing expired or invalid ones. This is done in two passes to
// avoid introducing latency into a flow. // avoid introducing latency into a flow.
let mut dead_paths = Vec::new(); let mut dead_paths = Vec::new();
@ -428,18 +487,19 @@ impl<SI: SystemInterface> Node<SI> {
} }
} }
if whois_check { if whois_service {
self.whois.service(si, self, tt); self.whois.service(si, self, tt);
} }
Duration::from_millis((ROOT_SYNC_INTERVAL_MS.min(crate::vl1::whoisqueue::SERVICE_INTERVAL_MS).min(crate::vl1::path::SERVICE_INTERVAL_MS).min(crate::vl1::peer::SERVICE_INTERVAL_MS) as u64) / 2) Duration::from_millis(1000)
} }
pub async fn handle_incoming_physical_packet<PH: InnerProtocolInterface>(&self, si: &SI, ph: &PH, source_endpoint: &Endpoint, source_local_socket: &SI::LocalSocket, source_local_interface: &SI::LocalInterface, mut data: PooledPacketBuffer) { pub async fn handle_incoming_physical_packet<PH: InnerProtocolInterface>(&self, si: &SI, ph: &PH, source_endpoint: &Endpoint, source_local_socket: &SI::LocalSocket, source_local_interface: &SI::LocalInterface, mut data: PooledPacketBuffer) {
debug_event!( debug_event!(
si, si,
"<< #{} ->{} from {} length {} via socket {}@{}", "[vl1] #{} {}->{} via {} length {} via socket {}@{}",
data.bytes_fixed_at::<8>(0).map_or("????????????????".into(), |pid| zerotier_core_crypto::hex::to_string(pid)), data.bytes_fixed_at::<8>(0).map_or("????????????????".into(), |pid| zerotier_core_crypto::hex::to_string(pid)),
data.bytes_fixed_at::<5>(13).map_or("??????????".into(), |src| zerotier_core_crypto::hex::to_string(src)),
data.bytes_fixed_at::<5>(8).map_or("??????????".into(), |dest| zerotier_core_crypto::hex::to_string(dest)), data.bytes_fixed_at::<5>(8).map_or("??????????".into(), |dest| zerotier_core_crypto::hex::to_string(dest)),
source_endpoint.to_string(), source_endpoint.to_string(),
data.len(), data.len(),
@ -455,11 +515,13 @@ impl<SI: SystemInterface> Node<SI> {
path.log_receive_anything(time_ticks); path.log_receive_anything(time_ticks);
if fragment_header.is_fragment() { if fragment_header.is_fragment() {
#[cfg(debug_assertions)]
let fragment_header_id = u64::from_be_bytes(fragment_header.id);
debug_event!(si, "-- #{:0>16x} fragment {} of {} received", u64::from_be_bytes(fragment_header.id), fragment_header.fragment_no(), fragment_header.total_fragments()); debug_event!(si, "-- #{:0>16x} fragment {} of {} received", u64::from_be_bytes(fragment_header.id), fragment_header.fragment_no(), fragment_header.total_fragments());
if let Some(assembled_packet) = path.receive_fragment(fragment_header.packet_id(), fragment_header.fragment_no(), fragment_header.total_fragments(), data, time_ticks) { if let Some(assembled_packet) = path.receive_fragment(fragment_header.packet_id(), fragment_header.fragment_no(), fragment_header.total_fragments(), data, time_ticks) {
if let Some(frag0) = assembled_packet.frags[0].as_ref() { if let Some(frag0) = assembled_packet.frags[0].as_ref() {
debug_event!(si, "-- #{:0>16x} packet fully assembled!", u64::from_be_bytes(fragment_header.id)); debug_event!(si, "-- #{:0>16x} packet fully assembled!", fragment_header_id);
if let Ok(packet_header) = frag0.struct_at::<PacketHeader>(0) { if let Ok(packet_header) = frag0.struct_at::<PacketHeader>(0) {
if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(source) = Address::from_bytes(&packet_header.src) {
@ -473,8 +535,9 @@ impl<SI: SystemInterface> Node<SI> {
} }
} }
} else { } else {
#[cfg(debug_assertions)]
if let Ok(packet_header) = data.struct_at::<PacketHeader>(0) { if let Ok(packet_header) = data.struct_at::<PacketHeader>(0) {
debug_event!(si, "-- #{:0>16x} is unfragmented", u64::from_be_bytes(fragment_header.id)); debug_event!(si, "-- #{:0>16x} is unfragmented", u64::from_be_bytes(packet_header.id));
if let Some(source) = Address::from_bytes(&packet_header.src) { if let Some(source) = Address::from_bytes(&packet_header.src) {
if let Some(peer) = self.peer(source) { if let Some(peer) = self.peer(source) {
@ -486,17 +549,28 @@ impl<SI: SystemInterface> Node<SI> {
} }
} }
} else { } else {
#[cfg(debug_assertions)]
let debug_packet_id;
if fragment_header.is_fragment() { if fragment_header.is_fragment() {
debug_event!(si, "-- #{:0>16x} forwarding packet fragment to {}", u64::from_be_bytes(fragment_header.id), dest.to_string()); #[cfg(debug_assertions)]
{
debug_packet_id = u64::from_be_bytes(fragment_header.id);
}
debug_event!(si, "-- #{:0>16x} forwarding packet fragment to {}", debug_packet_id, dest.to_string());
if fragment_header.increment_hops() > FORWARD_MAX_HOPS { if fragment_header.increment_hops() > FORWARD_MAX_HOPS {
debug_event!(si, "-- #{:0>16x} discarded: max hops exceeded!", u64::from_be_bytes(fragment_header.id)); debug_event!(si, "-- #{:0>16x} discarded: max hops exceeded!", debug_packet_id);
return; return;
} }
} else { } else {
if let Ok(packet_header) = data.struct_mut_at::<PacketHeader>(0) { if let Ok(packet_header) = data.struct_mut_at::<PacketHeader>(0) {
debug_event!(si, "-- #{:0>16x} forwarding packet to {}", u64::from_be_bytes(fragment_header.id), dest.to_string()); #[cfg(debug_assertions)]
{
debug_packet_id = u64::from_be_bytes(packet_header.id);
}
debug_event!(si, "-- #{:0>16x} forwarding packet to {}", debug_packet_id, dest.to_string());
if packet_header.increment_hops() > FORWARD_MAX_HOPS { if packet_header.increment_hops() > FORWARD_MAX_HOPS {
debug_event!(si, "-- #{:0>16x} discarded: max hops exceeded!", u64::from_be_bytes(fragment_header.id)); debug_event!(si, "-- #{:0>16x} discarded: max hops exceeded!", u64::from_be_bytes(packet_header.id));
return; return;
} }
} else { } else {
@ -507,7 +581,7 @@ impl<SI: SystemInterface> Node<SI> {
if let Some(peer) = self.peer(dest) { if let Some(peer) = self.peer(dest) {
// TODO: SHOULD we forward? Need a way to check. // TODO: SHOULD we forward? Need a way to check.
peer.forward(si, time_ticks, data.as_ref()).await; peer.forward(si, time_ticks, data.as_ref()).await;
debug_event!(si, "-- #{:0>16x} forwarded successfully", u64::from_be_bytes(fragment_header.id)); debug_event!(si, "-- #{:0>16x} forwarded successfully", debug_packet_id);
} }
} }
} }

View file

@ -52,6 +52,7 @@ pub struct Peer<SI: SystemInterface> {
last_receive_time_ticks: AtomicI64, last_receive_time_ticks: AtomicI64,
pub(crate) last_hello_reply_time_ticks: AtomicI64, pub(crate) last_hello_reply_time_ticks: AtomicI64,
last_forward_time_ticks: AtomicI64, last_forward_time_ticks: AtomicI64,
create_time_ticks: i64,
// Counter for assigning sequential message IDs. // Counter for assigning sequential message IDs.
message_id_counter: AtomicU64, message_id_counter: AtomicU64,
@ -164,7 +165,7 @@ impl<SI: SystemInterface> Peer<SI> {
/// ///
/// This only returns None if this_node_identity does not have its secrets or if some /// This only returns None if this_node_identity does not have its secrets or if some
/// fatal error occurs performing key agreement between the two identities. /// fatal error occurs performing key agreement between the two identities.
pub(crate) fn new(this_node_identity: &Identity, id: Identity, time_clock: i64) -> Option<Peer<SI>> { pub(crate) fn new(this_node_identity: &Identity, id: Identity, time_clock: i64, time_ticks: i64) -> Option<Peer<SI>> {
this_node_identity.agree(&id).map(|static_secret| -> Self { this_node_identity.agree(&id).map(|static_secret| -> Self {
Self { Self {
identity: id, identity: id,
@ -175,6 +176,7 @@ impl<SI: SystemInterface> Peer<SI> {
last_receive_time_ticks: AtomicI64::new(0), last_receive_time_ticks: AtomicI64::new(0),
last_forward_time_ticks: AtomicI64::new(0), last_forward_time_ticks: AtomicI64::new(0),
last_hello_reply_time_ticks: AtomicI64::new(0), last_hello_reply_time_ticks: AtomicI64::new(0),
create_time_ticks: time_ticks,
message_id_counter: AtomicU64::new(((time_clock as u64) / 100).wrapping_shl(28) ^ next_u64_secure().wrapping_shr(36)), message_id_counter: AtomicU64::new(((time_clock as u64) / 100).wrapping_shl(28) ^ next_u64_secure().wrapping_shr(36)),
remote_version: AtomicU64::new(0), remote_version: AtomicU64::new(0),
remote_protocol_version: AtomicU8::new(0), remote_protocol_version: AtomicU8::new(0),
@ -244,17 +246,15 @@ impl<SI: SystemInterface> Peer<SI> {
paths.retain(|p| ((time_ticks - p.last_receive_time_ticks) < PEER_EXPIRATION_TIME) && (p.path.strong_count() > 0)); paths.retain(|p| ((time_ticks - p.last_receive_time_ticks) < PEER_EXPIRATION_TIME) && (p.path.strong_count() > 0));
Self::prioritize_paths(&mut paths); Self::prioritize_paths(&mut paths);
} }
if (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed)) < PEER_EXPIRATION_TIME { (time_ticks - self.last_receive_time_ticks.load(Ordering::Relaxed).max(self.create_time_ticks)) < PEER_EXPIRATION_TIME
true
} else {
false
}
} }
/// Receive, decrypt, authenticate, and process an incoming packet from this peer. /// Receive, decrypt, authenticate, and process an incoming packet from this peer.
/// ///
/// If the packet comes in multiple fragments, the fragments slice should contain all /// If the packet comes in multiple fragments, the fragments slice should contain all
/// those fragments after the main packet header and first chunk. /// those fragments after the main packet header and first chunk.
///
/// This returns true if the packet decrypted and passed authentication.
pub(crate) async fn receive<PH: InnerProtocolInterface>(&self, node: &Node<SI>, si: &SI, ph: &PH, time_ticks: i64, source_path: &Arc<Path<SI>>, header: &PacketHeader, frag0: &PacketBuffer, fragments: &[Option<PooledPacketBuffer>]) { pub(crate) async fn receive<PH: InnerProtocolInterface>(&self, node: &Node<SI>, si: &SI, ph: &PH, time_ticks: i64, source_path: &Arc<Path<SI>>, header: &PacketHeader, frag0: &PacketBuffer, fragments: &[Option<PooledPacketBuffer>]) {
if let Ok(packet_frag0_payload_bytes) = frag0.as_bytes_starting_at(packet_constants::VERB_INDEX) { if let Ok(packet_frag0_payload_bytes) = frag0.as_bytes_starting_at(packet_constants::VERB_INDEX) {
//let mut payload = unsafe { PacketBuffer::new_without_memzero() }; //let mut payload = unsafe { PacketBuffer::new_without_memzero() };
@ -286,8 +286,6 @@ impl<SI: SystemInterface> Peer<SI> {
} }
if let Ok(mut verb) = payload.u8_at(0) { if let Ok(mut verb) = payload.u8_at(0) {
self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed);
let extended_authentication = (verb & packet_constants::VERB_FLAG_EXTENDED_AUTHENTICATION) != 0; let extended_authentication = (verb & packet_constants::VERB_FLAG_EXTENDED_AUTHENTICATION) != 0;
if extended_authentication { if extended_authentication {
if payload.len() >= SHA512_HASH_SIZE { if payload.len() >= SHA512_HASH_SIZE {
@ -320,6 +318,8 @@ impl<SI: SystemInterface> Peer<SI> {
} }
} }
self.last_receive_time_ticks.store(time_ticks, Ordering::Relaxed);
let mut path_is_known = false; let mut path_is_known = false;
for p in self.paths.lock().iter_mut() { for p in self.paths.lock().iter_mut() {
if p.path_internal_instance_id == source_path.internal_instance_id { if p.path_internal_instance_id == source_path.internal_instance_id {
@ -609,6 +609,7 @@ impl<SI: SystemInterface> Peer<SI> {
match ok_header.in_re_verb { match ok_header.in_re_verb {
verbs::VL1_HELLO => { verbs::VL1_HELLO => {
// TODO // TODO
self.last_hello_reply_time_ticks.store(time_ticks, Ordering::Relaxed);
} }
verbs::VL1_WHOIS => {} verbs::VL1_WHOIS => {}
_ => { _ => {

View file

@ -247,6 +247,9 @@ pub const PATH_EXPIRATION_TIME: i64 = (PATH_KEEPALIVE_INTERVAL * 2) + 10000;
/// How often to send HELLOs to roots, which is more often than normal peers. /// How often to send HELLOs to roots, which is more often than normal peers.
pub const ROOT_HELLO_INTERVAL: i64 = PATH_KEEPALIVE_INTERVAL * 2; pub const ROOT_HELLO_INTERVAL: i64 = PATH_KEEPALIVE_INTERVAL * 2;
/// How often to send HELLOs to roots when we are offline.
pub const ROOT_HELLO_SPAM_INTERVAL: i64 = 5000;
/// How often to send HELLOs to regular peers. /// How often to send HELLOs to regular peers.
pub const PEER_HELLO_INTERVAL_MAX: i64 = 300000; pub const PEER_HELLO_INTERVAL_MAX: i64 = 300000;

View file

@ -157,7 +157,7 @@ impl SystemInterface for ServiceImpl {
type LocalSocket = crate::service::LocalSocket; type LocalSocket = crate::service::LocalSocket;
type LocalInterface = crate::localinterface::LocalInterface; type LocalInterface = crate::localinterface::LocalInterface;
async fn event(&self, event: Event) { fn event(&self, event: Event) {
println!("{}", event.to_string()); println!("{}", event.to_string());
match event { match event {
_ => {} _ => {}