Refactor send method a bit to hide some backward compatibility detail from outside code.

This commit is contained in:
Adam Ierymenko 2022-10-18 16:01:16 -04:00
parent 23e73bbdd1
commit f2028ce3a2
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
2 changed files with 135 additions and 135 deletions

View file

@ -1,6 +1,7 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::Infallible;
use std::hash::Hash; use std::hash::Hash;
use std::io::Write; use std::io::Write;
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
@ -892,24 +893,22 @@ impl<HostSystemImpl: HostSystem + ?Sized> Node<HostSystemImpl> {
} }
/// Send a WHOIS query to the current best root. /// Send a WHOIS query to the current best root.
fn send_whois(&self, host_system: &HostSystemImpl, addresses: &[Address], time_ticks: i64) { fn send_whois(&self, host_system: &HostSystemImpl, mut addresses: &[Address], time_ticks: i64) {
debug_assert!(!addresses.is_empty()); debug_assert!(!addresses.is_empty());
if !addresses.is_empty() {
if let Some(root) = self.best_root() { if let Some(root) = self.best_root() {
let mut packet = host_system.get_buffer(); while !addresses.is_empty() {
packet.set_size(v1::HEADER_SIZE); if !root
let _ = packet.append_u8(verbs::VL1_WHOIS); .send(host_system, self, None, time_ticks, |packet| -> Result<(), Infallible> {
for a in addresses.iter() { assert!(packet.append_u8(verbs::VL1_WHOIS).is_ok());
if (packet.len() + ADDRESS_SIZE) > UDP_DEFAULT_MTU { while !addresses.is_empty() && (packet.len() + ADDRESS_SIZE) <= UDP_DEFAULT_MTU {
root.send(host_system, None, self, time_ticks, packet); assert!(packet.append_bytes_fixed(&addresses[0].to_bytes()).is_ok());
packet = host_system.get_buffer(); addresses = &addresses[1..];
packet.set_size(v1::HEADER_SIZE);
let _ = packet.append_u8(verbs::VL1_WHOIS);
} }
let _ = packet.append_bytes_fixed(&a.to_bytes()); Ok(())
} })
if packet.len() > (v1::HEADER_SIZE + 1) { .is_some()
root.send(host_system, None, self, time_ticks, packet); {
break;
} }
} }
} }

View file

@ -1,6 +1,7 @@
// (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md. // (c) 2020-2022 ZeroTier, Inc. -- currently propritery pending actual release and licensing. See LICENSE.md.
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::Infallible;
use std::hash::Hash; use std::hash::Hash;
use std::sync::atomic::{AtomicI64, AtomicU64, Ordering}; use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, Mutex, RwLock, Weak}; use std::sync::{Arc, Mutex, RwLock, Weak};
@ -259,18 +260,23 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
} }
} }
/// Send a packet to this peer, returning true on (potential) success. /// Send a packet to this peer.
/// ///
/// This will go directly if there is an active path, or otherwise indirectly /// This sets up a buffer and then invokes the supplied function to actually populate its contents.
/// via a root or some other route. /// It's structured this way to handle both V1 and V2 format packets and the need to set them up
pub(crate) fn send( /// differently while hiding that from higher level code.
///
/// The builder function must append the verb (with any verb flags) and packet payload. If it returns
/// an error, the error is returned immediately and the send is aborted. None is returned if the send
/// function itself fails for some reason such as no paths being available.
pub fn send<R, E, BuilderFunction: FnOnce(&mut PacketBuffer) -> Result<R, E>>(
&self, &self,
host_system: &HostSystemImpl, host_system: &HostSystemImpl,
path: Option<&Arc<Path<HostSystemImpl>>>,
node: &Node<HostSystemImpl>, node: &Node<HostSystemImpl>,
path: Option<&Arc<Path<HostSystemImpl>>>,
time_ticks: i64, time_ticks: i64,
mut packet: PooledPacketBuffer, builder_function: BuilderFunction,
) -> bool { ) -> Option<Result<R, E>> {
let mut _path_arc = None; let mut _path_arc = None;
let path = if let Some(path) = path { let path = if let Some(path) = path {
path path
@ -279,12 +285,24 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
if let Some(path) = _path_arc.as_ref() { if let Some(path) = _path_arc.as_ref() {
path path
} else { } else {
return false; return None;
} }
}; };
let max_fragment_size = path.endpoint.max_fragment_size(); let max_fragment_size = path.endpoint.max_fragment_size();
let mut packet = host_system.get_buffer();
if !self.identity.p384.is_some() {
// For the V1 protocol, leave room for for the header in the buffer.
packet.set_size(v1::HEADER_SIZE);
}
let r = builder_function(packet.as_mut());
if r.is_ok() {
if self.identity.p384.is_some() {
todo!() // TODO: ZSSP / V2 protocol
} else {
if self.remote_node_info.read().unwrap().remote_protocol_version >= 11 { if self.remote_node_info.read().unwrap().remote_protocol_version >= 11 {
let flags_cipher_hops = if packet.len() > max_fragment_size { let flags_cipher_hops = if packet.len() > max_fragment_size {
v1::HEADER_FLAG_FRAGMENTED | v1::CIPHER_AES_GMAC_SIV v1::HEADER_FLAG_FRAGMENTED | v1::CIPHER_AES_GMAC_SIV
@ -299,14 +317,11 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
node.identity.address, node.identity.address,
flags_cipher_hops, flags_cipher_hops,
)); ));
let tag = if let Ok(payload) = packet.as_bytes_starting_at_mut(v1::HEADER_SIZE) { let payload = packet.as_bytes_starting_at_mut(v1::HEADER_SIZE).unwrap();
aes_gmac_siv.encrypt_first_pass(payload); aes_gmac_siv.encrypt_first_pass(payload);
aes_gmac_siv.encrypt_first_pass_finish(); aes_gmac_siv.encrypt_first_pass_finish();
aes_gmac_siv.encrypt_second_pass_in_place(payload); aes_gmac_siv.encrypt_second_pass_in_place(payload);
aes_gmac_siv.encrypt_second_pass_finish() let tag = aes_gmac_siv.encrypt_second_pass_finish();
} else {
return false;
};
let header = packet.struct_mut_at::<v1::PacketHeader>(0).unwrap(); let header = packet.struct_mut_at::<v1::PacketHeader>(0).unwrap();
header.id.copy_from_slice(&tag[0..8]); header.id.copy_from_slice(&tag[0..8]);
@ -335,14 +350,13 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
packet_len, packet_len,
); );
let tag = if let Ok(payload) = packet.as_bytes_starting_at_mut(v1::HEADER_SIZE) { let payload = packet.as_bytes_starting_at_mut(v1::HEADER_SIZE).unwrap();
salsa.crypt_in_place(payload); salsa.crypt_in_place(payload);
poly1305::compute(&poly1305_otk, payload) let tag = poly1305::compute(&poly1305_otk, payload);
} else {
return false;
};
packet.as_bytes_mut()[v1::MAC_FIELD_INDEX..(v1::MAC_FIELD_INDEX + 8)].copy_from_slice(&tag[0..8]); packet.as_bytes_mut()[v1::MAC_FIELD_INDEX..(v1::MAC_FIELD_INDEX + 8)].copy_from_slice(&tag[0..8]);
} }
}
self.v1_proto_internal_send( self.v1_proto_internal_send(
host_system, host_system,
@ -354,8 +368,9 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
); );
self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed); self.last_send_time_ticks.store(time_ticks, Ordering::Relaxed);
}
return true; return Some(r);
} }
/// Send a HELLO to this peer. /// Send a HELLO to this peer.
@ -594,9 +609,12 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
); );
} }
let mut packet = host_system.get_buffer(); self.send(
packet.set_size(v1::HEADER_SIZE); host_system,
{ node,
Some(source_path),
time_ticks,
|packet| -> Result<(), Infallible> {
let f: &mut ( let f: &mut (
v1::message_component_structs::OkHeader, v1::message_component_structs::OkHeader,
v1::message_component_structs::OkHelloFixedHeaderFields, v1::message_component_structs::OkHelloFixedHeaderFields,
@ -609,9 +627,10 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
f.1.version_major = VERSION_MAJOR; f.1.version_major = VERSION_MAJOR;
f.1.version_minor = VERSION_MINOR; f.1.version_minor = VERSION_MINOR;
f.1.version_revision = VERSION_REVISION.to_be_bytes(); f.1.version_revision = VERSION_REVISION.to_be_bytes();
} Ok(())
},
);
self.send(host_system, Some(source_path), node, time_ticks, packet);
return PacketHandlerResult::Ok; return PacketHandlerResult::Ok;
} }
} }
@ -762,39 +781,25 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
payload: &PacketBuffer, payload: &PacketBuffer,
) -> PacketHandlerResult { ) -> PacketHandlerResult {
if node.this_node_is_root() || inner.should_communicate_with(&self.identity) { if node.this_node_is_root() || inner.should_communicate_with(&self.identity) {
let init_packet = |packet: &mut PacketBuffer| {
packet.set_size(v1::HEADER_SIZE);
let mut f: &mut v1::message_component_structs::OkHeader = packet.append_struct_get_mut().unwrap();
f.verb = verbs::VL1_OK;
f.in_re_verb = verbs::VL1_WHOIS;
f.in_re_message_id = message_id.to_ne_bytes();
};
let mut packet = host_system.get_buffer();
init_packet(&mut packet);
let mut addresses = payload.as_bytes(); let mut addresses = payload.as_bytes();
loop { while addresses.len() >= ADDRESS_SIZE {
if addresses.len() >= ADDRESS_SIZE { if !self
.send(host_system, node, None, time_ticks, |packet| {
while addresses.len() >= ADDRESS_SIZE && (packet.len() + Identity::MAX_MARSHAL_SIZE) <= UDP_DEFAULT_MTU {
if let Some(zt_address) = Address::from_bytes(&addresses[..ADDRESS_SIZE]) { if let Some(zt_address) = Address::from_bytes(&addresses[..ADDRESS_SIZE]) {
if let Some(peer) = node.peer(zt_address) { if let Some(peer) = node.peer(zt_address) {
if (packet.capacity() - packet.len()) < Identity::MAX_MARSHAL_SIZE { peer.identity.write_public(packet, self.identity.p384.is_none())?;
self.send(host_system, None, node, time_ticks, packet);
packet = host_system.get_buffer();
init_packet(&mut packet);
}
if !peer.identity.write_public(packet.as_mut(), self.identity.p384.is_none()).is_ok() {
break;
}
} }
} }
addresses = &addresses[ADDRESS_SIZE..]; addresses = &addresses[ADDRESS_SIZE..];
} else { }
Ok(())
})
.map_or(false, |r: std::io::Result<()>| r.is_ok())
{
break; break;
} }
} }
self.send(host_system, None, node, time_ticks, packet);
} }
return PacketHandlerResult::Ok; return PacketHandlerResult::Ok;
} }
@ -822,17 +827,13 @@ impl<HostSystemImpl: HostSystem + ?Sized> Peer<HostSystemImpl> {
payload: &PacketBuffer, payload: &PacketBuffer,
) -> PacketHandlerResult { ) -> PacketHandlerResult {
if inner.should_communicate_with(&self.identity) || node.is_peer_root(self) { if inner.should_communicate_with(&self.identity) || node.is_peer_root(self) {
let mut packet = host_system.get_buffer(); self.send(host_system, node, None, time_ticks, |packet| {
packet.set_size(v1::HEADER_SIZE);
{
let mut f: &mut v1::message_component_structs::OkHeader = packet.append_struct_get_mut().unwrap(); let mut f: &mut v1::message_component_structs::OkHeader = packet.append_struct_get_mut().unwrap();
f.verb = verbs::VL1_OK; f.verb = verbs::VL1_OK;
f.in_re_verb = verbs::VL1_ECHO; f.in_re_verb = verbs::VL1_ECHO;
f.in_re_message_id = message_id.to_ne_bytes(); f.in_re_message_id = message_id.to_ne_bytes();
} packet.append_bytes(payload.as_bytes())
if packet.append_bytes(payload.as_bytes()).is_ok() { });
self.send(host_system, None, node, time_ticks, packet);
}
} else { } else {
debug_event!( debug_event!(
host_system, host_system,