Merge some final address changes (#1947)

* Implement base62 and replace in Identity etc.

* Plug baseXX into Address.

* Use just base24 for addresses, other cleanup.

* cleanup
This commit is contained in:
Adam Ierymenko 2023-03-30 14:36:43 -04:00 committed by GitHub
parent 0e5ee456d6
commit 3ee27eec14
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 331 additions and 1800 deletions

View file

@ -1,37 +0,0 @@
[package]
name = "syncwhole"
version = "0.1.0"
edition = "2021"
license = "MPL-2.0"
authors = ["Adam Ierymenko <adam.ierymenko@zerotier.com>"]
[profile.release]
opt-level = 3
lto = true
codegen-units = 1
panic = 'abort'
[lib]
name = "syncwhole"
path = "src/lib.rs"
doc = true
[[bin]]
name = "syncwhole_local_testnet"
path = "src/main.rs"
doc = false
required-features = ["include_sha2_lib"]
[dependencies]
tokio = { version = "^1", features = ["net", "rt", "parking_lot", "time", "io-std", "io-util", "sync", "rt-multi-thread"], default-features = false }
serde = { version = "^1", features = ["derive"], default-features = false }
serde_bytes = "^0"
rmp = "^0"
rmp-serde = "^1"
sha2 = { version = "^0", optional = true }
async-trait = "^0"
futures-core = "^0"
iblt = { version = "^0", path = "../iblt" }
[features]
include_sha2_lib = ["sha2"]

View file

@ -1 +0,0 @@
../rustfmt.toml

View file

@ -1,105 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2022 ZeroTier, Inc.
* https://www.zerotier.com/
*/
use async_trait::async_trait;
/// Size of keys, which is the size of a 512-bit hash. This is a protocol constant.
pub const KEY_SIZE: usize = 64;
/// Result returned by DataStore::store().
pub enum StoreResult {
/// Entry was accepted.
Ok,
/// Entry was a duplicate of one we already have but was otherwise valid.
Duplicate,
/// Entry was valid but was ignored for an unspecified reason.
Ignored,
/// Entry was rejected as malformed or otherwise invalid (e.g. failed signature check).
Rejected,
}
/// API to be implemented by the data set we want to replicate.
///
/// Keys as used in this API are SHA512 hashes of values.
///
/// Range queries take an optional subset parameter. The format and interpretation of
/// this is entirely up to the implementer of DataStore. It could contain a time, a SQL
/// query, a set of certificates, anything. Its purpose is to select which items we want
/// from remote nodes so that we can replicate only a subset of a larger set of data.
/// Other nodes can also supply a subset to this one, so it's important that remote subset
/// values supplied to the local data store be handled correctly.
#[async_trait]
pub trait DataStore: Sync + Send {
/// Container for values returned by load().
///
/// Making this a trait defined type lets you use Arc<[u8]>, etc. as well as obvious
/// ones like Box<[u8]> and Vec<u8>.
type ValueRef: AsRef<[u8]> + Sync + Send + Clone;
/// Key hash size, always 64 for SHA512.
const KEY_SIZE: usize = KEY_SIZE;
/// Maximum size of a value in bytes.
const MAX_VALUE_SIZE: usize;
/// Get the subset that should be sent to remote nodes in queries.
async fn local_subset(&self) -> Option<Self::ValueRef>;
/// Get an item by identity hash key if it exists.
async fn load(&self, key: &[u8; KEY_SIZE]) -> Option<Self::ValueRef>;
/// Store an item in the data store and return its status.
///
/// Note that no time is supplied here. The data store must determine this in an implementation
/// dependent manner if this is a temporally subjective data store. It could be determined by
/// the wall clock, from the object itself, etc.
///
/// The key supplied here will always be the SHA512 hash of the value. There is no need to
/// re-compute and check the key, but the value must be validated.
///
/// Validation of the value and returning the appropriate StoreResult is important to the
/// operation of the synchronization algorithm:
///
/// StoreResult::Ok - Value was valid and was accepted and saved.
///
/// StoreResult::Duplicate - Value was valid but is a duplicate of one we already have.
///
/// StoreResult::Ignored - Value was valid but for some other reason was not saved.
///
/// StoreResult::Rejected - Value was not valid, causes link to peer to be dropped.
///
/// Rejected should only be returned if the value actually fails a validity check, signature
/// verification, proof of work check, or some other required criteria. Ignored must be
/// returned if the value is valid but is too old or was rejected for some other normal reason.
async fn store(&self, key: &[u8; KEY_SIZE], value: &[u8]) -> StoreResult;
/// Iterate through keys in a range.
///
/// Keys MUST be output in ascending binary sort order.
async fn keys<F: Send + FnMut(&[u8]) -> bool>(
&self,
subset: Option<&[u8]>,
range_start: &[u8; KEY_SIZE],
range_end: &[u8; KEY_SIZE],
f: F,
);
/// Iterate through values in a range.
///
/// Entries MUST be output in ascending binary sort order.
async fn values<F: Send + FnMut(&[u8], &[u8]) -> bool>(
&self,
subset: Option<&[u8]>,
range_start: &[u8; KEY_SIZE],
range_end: &[u8; KEY_SIZE],
f: F,
);
}

View file

@ -1,156 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2022 ZeroTier, Inc.
* https://www.zerotier.com/
*/
use std::net::SocketAddr;
use std::sync::Arc;
#[cfg(feature = "include_sha2_lib")]
use sha2::digest::{Digest, FixedOutput};
use serde::{Deserialize, Serialize};
use crate::node::RemoteNodeInfo;
/// Configuration setttings for a syncwhole node.
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq)]
pub struct Config {
/// A list of peer addresses to which we always want to stay connected.
/// The library will try to maintain connectivity to these regardless of connection limits.
pub anchors: Vec<SocketAddr>,
/// A list of other peer addresses that we can try in order to achieve desired_connection_count.
/// If this includes the anchors too there will be no effect since the anchors are tried first anyway.
pub peers: Vec<SocketAddr>,
/// The maximum number of TCP connections we should allow.
pub max_connection_count: usize,
/// The desired number of peering links.
pub desired_connection_count: usize,
/// Synchronization interval in milliseconds.
pub sync_interval: u64,
/// Connection inactivity timeout in milliseconds.
pub connection_timeout: u64,
/// An arbitrary name for this data set to avoid connecting to irrelevant nodes.
pub domain: String,
/// An optional name for this node to advertise to other nodes.
pub name: String,
/// An optional contact string for this node to advertise to other nodes.
/// Example: bighead@stanford.edu or https://www.piedpiper.com/
pub contact: String,
}
impl Default for Config {
fn default() -> Self {
Self {
anchors: Vec::new(),
peers: Vec::new(),
max_connection_count: 128,
desired_connection_count: 64,
sync_interval: 500,
connection_timeout: 500 * 10,
domain: String::new(),
name: String::new(),
contact: String::new(),
}
}
}
/// A trait that users of syncwhole implement to provide configuration information and listen for events.
pub trait Host: Sync + Send {
/// Get the current configuration for this node.
fn node_config(&self) -> Arc<Config>;
/// Test whether an inbound connection should be allowed from an address.
///
/// This is called on first incoming connection before any init is received. The authenticate()
/// method is called once init has been received and is another decision point. The default
/// implementation of this always returns true.
///
/// This is not called for outbound connections.
#[allow(unused_variables)]
fn allow(&self, remote_address: &SocketAddr) -> bool {
true
}
/// Compute HMAC-SHA512(secret, challenge).
///
/// A return of None indicates that the connection should be dropped. If authentication is
/// not enabled, the response should be computed using an all-zero secret key. This is
/// what the default implementation does, so if you don't want authentication there is no
/// need to override and implement this.
///
/// This actually gets called twice per link: once when Init is received to compute the
/// response, and once when InitResponse is received to verify the response to our challenge.
///
/// The default implementation authenticates with an all-zero key. Leave it this way if
/// you don't want authentication.
#[allow(unused_variables)]
fn authenticate(&self, info: &RemoteNodeInfo, challenge: &[u8]) -> Option<[u8; 64]> {
Some(Self::hmac_sha512(&[0_u8; 64], challenge))
}
/// Called when an attempt is made to connect to a remote address.
fn on_connect_attempt(&self, address: &SocketAddr);
/// Called when a connection has been successfully established.
///
/// Hosts are encouraged to learn endpoints when a successful outbound connection is made. Check the
/// inbound flag in the remote node info structure.
fn on_connect(&self, info: &RemoteNodeInfo);
/// Called when an open connection is closed for any reason.
fn on_connection_closed(&self, info: &RemoteNodeInfo, reason: String);
/// Fill a buffer with secure random bytes.
///
/// The implementer must call a secure random number generator or source to implement this.
fn get_secure_random(&self, buf: &mut [u8]);
/// Compute a SHA512 digest of the input.
///
/// Input can consist of one or more slices that will be processed in order.
///
/// If the feature "include_sha2_lib" is enabled a default implementation in terms of the
/// Rust sha2 crate is generated. Otherwise the implementer must supply their own
/// SHA512 function.
#[cfg(not(feature = "include_sha2_lib"))]
fn sha512(msg: &[&[u8]]) -> [u8; 64];
#[cfg(feature = "include_sha2_lib")]
fn sha512(msg: &[&[u8]]) -> [u8; 64] {
let mut h = sha2::Sha512::new();
for b in msg.iter() {
h.update(*b);
}
h.finalize_fixed().try_into().unwrap()
}
/// Compute HMAC-SHA512 using key and input.
///
/// Supplied key will always be 64 bytes in length.
///
/// The default implementation is HMAC implemented in terms of sha512() above. Specialize
/// to provide your own implementation.
fn hmac_sha512(key: &[u8], msg: &[u8]) -> [u8; 64] {
let mut opad = [0x5c_u8; 128];
let mut ipad = [0x36_u8; 128];
assert!(key.len() >= 64);
for i in 0..64 {
opad[i] ^= key[i];
}
for i in 0..64 {
ipad[i] ^= key[i];
}
Self::sha512(&[&opad, &Self::sha512(&[&ipad, msg])])
}
}

View file

@ -1,17 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2022 ZeroTier, Inc.
* https://www.zerotier.com/
*/
pub(crate) mod protocol;
pub(crate) mod utils;
pub(crate) mod varint;
pub mod datastore;
pub mod host;
pub mod node;
pub use async_trait;

View file

@ -1,196 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2022 ZeroTier, Inc.
* https://www.zerotier.com/
*/
use std::collections::BTreeMap;
use std::io::{stdout, Write};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::ops::Bound::Included;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use async_trait::async_trait;
use sha2::digest::Digest;
use sha2::Sha512;
use syncwhole::datastore::*;
use syncwhole::host::*;
use syncwhole::node::*;
use syncwhole::utils::*;
const TEST_NODE_COUNT: usize = 8;
const TEST_PORT_RANGE_START: u16 = 21384;
const TEST_STARTING_RECORDS_PER_NODE: usize = 16;
static mut RANDOM_CTR: u128 = 0;
fn get_random_bytes(mut buf: &mut [u8]) {
// This is only for testing and is not really secure.
let mut ctr = unsafe { RANDOM_CTR };
if ctr == 0 {
ctr = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() * (1 + Instant::now().elapsed().as_nanos());
}
while !buf.is_empty() {
let l = buf.len().min(64);
ctr = ctr.wrapping_add(1);
buf[..l].copy_from_slice(&Sha512::digest(&ctr.to_ne_bytes()).as_slice()[..l]);
buf = &mut buf[l..];
}
unsafe { RANDOM_CTR = ctr };
}
pub struct TestNodeHost {
pub name: String,
pub config: Config,
pub records: tokio::sync::RwLock<BTreeMap<[u8; 64], [u8; 64]>>,
}
impl TestNodeHost {
pub fn new_random(test_no: usize) -> Self {
let mut s = BTreeMap::new();
for _ in 0..TEST_STARTING_RECORDS_PER_NODE {
let mut v = [0_u8; 64];
get_random_bytes(&mut v);
let k = Self::sha512(&[&v]);
s.insert(k, v);
}
Self {
name: test_no.to_string(),
config: Config::default(),
records: tokio::sync::RwLock::new(s),
}
}
}
impl Host for TestNodeHost {
fn node_config(&self) -> Config {
self.config.clone()
}
fn on_connect_attempt(&self, _address: &SocketAddr) {
//println!("{:5}: connecting to {}", self.name, _address.to_string());
}
fn on_connect(&self, info: &RemoteNodeInfo) {
//println!("{:5}: connected to {} ({}, {})", self.name, info.remote_address.to_string(), info.node_name.as_ref().map_or("null", |s| s.as_str()), if info.inbound { "inbound" } else { "outbound" });
}
fn on_connection_closed(&self, info: &RemoteNodeInfo, reason: String) {
//println!("{:5}: closed connection to {}: {} ({}, {})", self.name, info.remote_address.to_string(), reason, if info.inbound { "inbound" } else { "outbound" }, if info.initialized { "initialized" } else { "not initialized" });
}
fn get_secure_random(&self, buf: &mut [u8]) {
// This is only for testing and is not really secure.
get_random_bytes(buf);
}
}
#[async_trait]
impl DataStore for TestNodeHost {
type ValueRef = [u8; 64];
const MAX_VALUE_SIZE: usize = 1024;
fn clock(&self) -> i64 {
ms_since_epoch()
}
fn domain(&self) -> &str {
"test"
}
async fn load(&self, key: &[u8; 64]) -> Option<Self::ValueRef> {
let records = self.records.read().await;
let value = records.get(key);
if value.is_some() {
Some(value.unwrap().clone())
} else {
None
}
}
async fn store(&self, key: &[u8; 64], value: &[u8]) -> StoreResult {
let value: [u8; 64] = value.try_into();
if value.is_ok() {
if self.records.write().await.insert(key.clone(), value).is_none() {
StoreResult::Ok
} else {
StoreResult::Duplicate
}
} else {
StoreResult::Rejected
}
}
async fn keys_under<F: Send + FnMut(&[u8]) -> bool>(&self, reference_time: i64, prefix: u64, prefix_bits: u32, f: F) {
let (start, end) = prefix_to_range(prefix, prefix_bits);
let records = self.records.read().await;
for (k, v) in records.range((Included(start), Included(end))) {
if !f(k) {
break;
}
}
}
}
fn main() {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
println!(
"Running syncwhole local self-test network with {} nodes starting at 127.0.0.1:{}",
TEST_NODE_COUNT, TEST_PORT_RANGE_START
);
println!();
println!("Starting nodes on 127.0.0.1...");
let mut nodes: Vec<Node<TestNodeHost, TestNodeHost>> = Vec::with_capacity(TEST_NODE_COUNT);
for port in TEST_PORT_RANGE_START..(TEST_PORT_RANGE_START + (TEST_NODE_COUNT as u16)) {
let mut peers: Vec<SocketAddr> = Vec::with_capacity(TEST_NODE_COUNT);
for port2 in TEST_PORT_RANGE_START..(TEST_PORT_RANGE_START + (TEST_NODE_COUNT as u16)) {
if port != port2 {
peers.push(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port2)));
}
}
let mut th = TestNodeHost::new_random(port as usize);
th.config.anchors = peers;
th.config.name = port.to_string();
let nh = Arc::new(th);
//println!("Starting node on 127.0.0.1:{}...", port, nh.db.lock().unwrap().len());
nodes.push(
Node::new(nh.clone(), nh.clone(), SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port)))
.await
.unwrap(),
);
}
print!("Waiting for all connections to be established...");
let _ = stdout().flush();
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let mut count = 0;
for n in nodes.iter() {
count += n.connection_count().await;
}
if count == (TEST_NODE_COUNT * (TEST_NODE_COUNT - 1)) {
println!(" {} connections up.", count);
break;
} else {
print!(".");
let _ = stdout().flush();
}
}
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
}

View file

@ -1,701 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2022 ZeroTier, Inc.
* https://www.zerotier.com/
*/
use std::collections::{HashMap, HashSet};
use std::io::IoSlice;
use std::mem::MaybeUninit;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::ops::Add;
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::Arc;
use iblt::IBLT;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf};
use tokio::net::{TcpListener, TcpSocket, TcpStream};
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::time::{Duration, Instant};
use crate::datastore::*;
use crate::host::Host;
use crate::protocol::*;
use crate::utils::*;
use crate::varint;
// Interval for announcing queued HaveRecords items, in milliseconds.
const ANNOUNCE_PERIOD: i64 = 100;
/// Information about a remote node to which we are connected.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RemoteNodeInfo {
/// Optional name advertised by remote node (arbitrary).
pub name: String,
/// Optional contact information advertised by remote node (arbitrary).
pub contact: String,
/// Actual remote endpoint address.
pub remote_address: SocketAddr,
/// Explicitly advertised remote addresses supplied by remote node (not necessarily verified).
pub explicit_addresses: Vec<SocketAddr>,
/// Time TCP connection was established (ms since epoch).
pub connect_time: i64,
/// Time TCP connection was estaablished (ms, monotonic).
pub connect_instant: i64,
/// True if this is an inbound TCP connection.
pub inbound: bool,
/// True if this connection has exchanged init messages successfully.
pub initialized: bool,
}
/// An instance of the syncwhole data set synchronization engine.
///
/// This holds a number of async tasks that are terminated or aborted if this object
/// is dropped. In other words this implements structured concurrency.
pub struct Node<D: DataStore + 'static, H: Host + 'static> {
internal: Arc<NodeInternal<D, H>>,
housekeeping_task: JoinHandle<()>,
announce_task: JoinHandle<()>,
listener_task: JoinHandle<()>,
}
impl<D: DataStore + 'static, H: Host + 'static> Node<D, H> {
pub async fn new(db: Arc<D>, host: Arc<H>, bind_address: SocketAddr) -> std::io::Result<Self> {
let listener = if bind_address.is_ipv4() {
TcpSocket::new_v4()
} else {
TcpSocket::new_v6()
}?;
configure_tcp_socket(&listener)?;
listener.bind(bind_address.clone())?;
let listener = listener.listen(1024)?;
let internal = Arc::new(NodeInternal::<D, H> {
anti_loopback_secret: {
let mut tmp = [0_u8; 64];
host.get_secure_random(&mut tmp);
tmp
},
datastore: db.clone(),
host: host.clone(),
connections: Mutex::new(HashMap::new()),
connecting_to: Mutex::new(HashSet::new()),
announce_queue: Mutex::new(HashMap::new()),
bind_address,
starting_instant: Instant::now(),
});
Ok(Self {
internal: internal.clone(),
housekeeping_task: tokio::spawn(internal.clone().housekeeping_task_main()),
announce_task: tokio::spawn(internal.clone().announce_task_main()),
listener_task: tokio::spawn(internal.listener_task_main(listener)),
})
}
#[inline(always)]
pub fn datastore(&self) -> &Arc<D> {
&self.internal.datastore
}
#[inline(always)]
pub fn host(&self) -> &Arc<H> {
&self.internal.host
}
/// Attempt to connect to an explicitly specified TCP endpoint.
///
/// Ok(true) is returned if a new connection was made. Ok(false) means there is already a connection
/// to the endpoint. An error is returned if the connection fails.
pub async fn connect(&self, address: &SocketAddr) -> std::io::Result<bool> {
if self.internal.connecting_to.lock().await.insert(address.clone()) {
self.internal
.connect(
address,
Instant::now().add(Duration::from_millis(self.internal.host.node_config().connection_timeout)),
)
.await
} else {
Ok(false)
}
}
/// Get a list of all open peer to peer connections.
pub async fn list_connections(&self) -> Vec<RemoteNodeInfo> {
let connections = self.internal.connections.lock().await;
let mut cl: Vec<RemoteNodeInfo> = Vec::with_capacity(connections.len());
for (_, c) in connections.iter() {
cl.push(c.info.lock().unwrap().clone());
}
cl
}
/// Get the number of open peer to peer connections.
pub async fn connection_count(&self) -> usize {
self.internal.connections.lock().await.len()
}
}
impl<D: DataStore + 'static, H: Host + 'static> Drop for Node<D, H> {
fn drop(&mut self) {
self.housekeeping_task.abort();
self.announce_task.abort();
self.listener_task.abort();
}
}
/********************************************************************************************************************/
fn configure_tcp_socket(socket: &TcpSocket) -> std::io::Result<()> {
let _ = socket.set_linger(None);
if socket.set_reuseport(true).is_ok() {
Ok(())
} else {
socket.set_reuseaddr(true)
}
}
fn decode_msgpack<'a, T: Deserialize<'a>>(b: &'a [u8]) -> std::io::Result<T> {
rmp_serde::from_slice(b).map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("invalid msgpack object: {}", e.to_string()),
)
})
}
pub struct NodeInternal<D: DataStore + 'static, H: Host + 'static> {
// Secret used to perform HMAC to detect and drop loopback connections to self.
anti_loopback_secret: [u8; 64],
// Outside code implementations of DataStore and Host traits.
datastore: Arc<D>,
host: Arc<H>,
// Connections and their task join handles, by remote endpoint address.
connections: Mutex<HashMap<SocketAddr, Arc<Connection>>>,
// Outgoing connections in progress.
connecting_to: Mutex<HashSet<SocketAddr>>,
// Records received since last announce and the endpoints that we know already have them.
announce_queue: Mutex<HashMap<[u8; KEY_SIZE], Vec<SocketAddr>>>,
// Local address to which this node is bound
bind_address: SocketAddr,
// Instant this node started.
starting_instant: Instant,
}
impl<D: DataStore + 'static, H: Host + 'static> NodeInternal<D, H> {
fn ms_monotonic(&self) -> i64 {
Instant::now().duration_since(self.starting_instant).as_millis() as i64
}
async fn housekeeping_task_main(self: Arc<Self>) {
let tasks = AsyncTaskReaper::new();
let mut sleep_for = Duration::from_millis(500);
loop {
tokio::time::sleep(sleep_for).await;
let config = self.host.node_config();
let mut connections = self.connections.lock().await;
let mut connecting_to = self.connecting_to.lock().await;
let now = self.ms_monotonic();
// Drop dead or timed out connections, and for live connections handle sending sync requests.
connections.retain(|_, c| {
if !c.closed.load(Ordering::Relaxed) {
let cc = c.clone();
if (now - c.last_receive_time.load(Ordering::Relaxed)) < (config.connection_timeout as i64) {
// TODO: sync init if not waiting for a sync response
true // keep connection
} else {
let _ = c.read_task.lock().unwrap().take().map(|j| j.abort());
let host = self.host.clone();
tasks.spawn(async move {
host.on_connection_closed(&*cc.info.lock().unwrap(), "timeout".to_string());
});
false // discard connection
}
} else {
let host = self.host.clone();
let cc = c.clone();
let j = c.read_task.lock().unwrap().take();
tasks.spawn(async move {
if j.is_some() {
let e = j.unwrap().await;
if e.is_ok() {
let e = e.unwrap();
host.on_connection_closed(
&*cc.info.lock().unwrap(),
e.map_or_else(|e| e.to_string(), |_| "unknown error".to_string()),
);
} else {
host.on_connection_closed(&*cc.info.lock().unwrap(), "remote host closed connection".to_string());
}
} else {
host.on_connection_closed(&*cc.info.lock().unwrap(), "remote host closed connection".to_string());
}
});
false // discard connection
}
});
let connect_timeout_at = Instant::now().add(Duration::from_millis(config.connection_timeout));
// Always try to connect to anchor peers.
for sa in config.anchors.iter() {
if !connections.contains_key(sa) && connecting_to.insert(sa.clone()) {
let self2 = self.clone();
let sa = sa.clone();
tasks.spawn(async move {
let _ = self2.connect(&sa, connect_timeout_at).await;
});
}
}
// Try to connect to more peers until desired connection count is reached.
let desired_connection_count = config.desired_connection_count.min(config.max_connection_count);
for sa in config.peers.iter() {
if (connections.len() + connecting_to.len()) >= desired_connection_count {
break;
}
if !connections.contains_key(sa) && connecting_to.insert(sa.clone()) {
let self2 = self.clone();
let sa = sa.clone();
tasks.spawn(async move {
let _ = self2.connect(&sa, connect_timeout_at).await;
});
}
}
sleep_for = Duration::from_millis(config.sync_interval.min(config.connection_timeout));
}
}
async fn announce_task_main(self: Arc<Self>) {
let sleep_for = Duration::from_millis(ANNOUNCE_PERIOD as u64);
let mut to_announce: Vec<([u8; KEY_SIZE], Vec<SocketAddr>)> = Vec::new();
let background_tasks = AsyncTaskReaper::new();
let announce_timeout = Duration::from_millis(self.host.node_config().connection_timeout);
loop {
tokio::time::sleep(sleep_for).await;
for (key, already_has) in self.announce_queue.lock().await.drain() {
to_announce.push((key, already_has));
}
let now = self.ms_monotonic();
let have_records_est_size = (to_announce.len() * KEY_SIZE) + 2;
let mut have_records: Vec<u8> = Vec::with_capacity(have_records_est_size);
for c in self.connections.lock().await.iter() {
if c.1.announce_new_records.load(Ordering::Relaxed) {
for (key, already_has) in to_announce.iter() {
if !already_has.contains(c.0) {
let _ = std::io::Write::write_all(&mut have_records, key);
}
}
if !have_records.is_empty() {
let c2 = c.1.clone();
background_tasks.spawn(async move {
// If the connection dies this will either fail or time out in 1s. Usually these execute instantly due to
// write buffering but a short timeout prevents them from building up too much.
let _ = tokio::time::timeout(
announce_timeout,
c2.send_msg(MessageType::HaveRecords, have_records.as_slice(), now),
);
});
have_records = Vec::with_capacity(have_records_est_size);
}
}
}
to_announce.clear();
}
}
async fn listener_task_main(self: Arc<Self>, listener: TcpListener) {
loop {
let socket = listener.accept().await;
if socket.is_ok() {
let (stream, address) = socket.unwrap();
if self.host.allow(&address) {
let config = self.host.node_config();
if self.connections.lock().await.len() < config.max_connection_count || config.anchors.contains(&address) {
Self::connection_start(&self, address, stream, true).await;
}
}
}
}
}
/// Internal connection method.
///
/// Note that this does not add the address to connecting_to. Instead it's done by the caller
/// to avoid races and similar things. It is removed from connecting_to once the connection
/// either succeeds or fails.
async fn connect(self: &Arc<Self>, address: &SocketAddr, deadline: Instant) -> std::io::Result<bool> {
let f = async {
let stream = if address.is_ipv4() {
TcpSocket::new_v4()
} else {
TcpSocket::new_v6()
}?;
configure_tcp_socket(&stream)?;
stream.bind(self.bind_address.clone())?;
let stream = tokio::time::timeout_at(deadline, stream.connect(address.clone()));
self.host.on_connect_attempt(address);
let stream = stream.await;
if stream.is_ok() {
Ok(self.connection_start(address.clone(), stream.unwrap()?, false).await)
} else {
Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "connect timed out"))
}
};
let r = f.await;
let _ = self.connecting_to.lock().await.remove(address);
r
}
/// Initialize and start a connection whether incoming or outgoing.
async fn connection_start(self: &Arc<Self>, address: SocketAddr, stream: TcpStream, inbound: bool) -> bool {
let mut ok = false;
let _ = self.connections.lock().await.entry(address.clone()).or_insert_with(|| {
ok = true;
let _ = stream.set_nodelay(false);
let (reader, writer) = stream.into_split();
let now = self.ms_monotonic();
let connection = Arc::new(Connection {
writer: Mutex::new(writer),
last_send_time: AtomicI64::new(now),
last_receive_time: AtomicI64::new(now),
info: std::sync::Mutex::new(RemoteNodeInfo {
name: String::new(),
contact: String::new(),
remote_address: address.clone(),
explicit_addresses: Vec::new(),
connect_time: ms_since_epoch(),
connect_instant: now,
inbound,
initialized: false,
}),
read_task: std::sync::Mutex::new(None),
announce_new_records: AtomicBool::new(false),
closed: AtomicBool::new(false),
});
let self2 = self.clone();
let c2 = connection.clone();
connection.read_task.lock().unwrap().replace(tokio::spawn(async move {
let result = self2.connection_io_task_main(&c2, address, reader).await;
c2.closed.store(true, Ordering::Relaxed);
result
}));
connection
});
ok
}
async fn connection_io_task_main(
self: Arc<Self>,
connection: &Arc<Connection>,
remote_address: SocketAddr,
mut reader: OwnedReadHalf,
) -> std::io::Result<()> {
const BUF_CHUNK_SIZE: usize = 4096;
const READ_BUF_INITIAL_SIZE: usize = 65536; // should be a multiple of BUF_CHUNK_SIZE
let mut write_buffer: Vec<u8> = Vec::with_capacity(BUF_CHUNK_SIZE);
let mut read_buffer: Vec<u8> = Vec::new();
read_buffer.resize(READ_BUF_INITIAL_SIZE, 0);
let config = self.host.node_config();
let mut anti_loopback_challenge_sent = [0_u8; 64];
let mut domain_challenge_sent = [0_u8; 64];
let mut auth_challenge_sent = [0_u8; 64];
self.host.get_secure_random(&mut anti_loopback_challenge_sent);
self.host.get_secure_random(&mut domain_challenge_sent);
self.host.get_secure_random(&mut auth_challenge_sent);
connection
.send_obj(
&mut write_buffer,
MessageType::Init,
&msg::Init {
anti_loopback_challenge: &anti_loopback_challenge_sent,
domain_challenge: &domain_challenge_sent,
auth_challenge: &auth_challenge_sent,
node_name: config.name.as_str(),
node_contact: config.contact.as_str(),
locally_bound_port: self.bind_address.port(),
explicit_ipv4: None,
explicit_ipv6: None,
},
self.ms_monotonic(),
)
.await?;
drop(config);
let max_message_size = ((D::MAX_VALUE_SIZE * 8) + (D::KEY_SIZE * 1024) + 65536) as u64; // sanity limit
let mut initialized = false;
let mut init_received = false;
let mut buffer_fill = 0_usize;
loop {
let message_type: MessageType;
let message_size: usize;
let header_size: usize;
let total_size: usize;
loop {
buffer_fill += reader.read(&mut read_buffer.as_mut_slice()[buffer_fill..]).await?;
if buffer_fill >= 2 {
// type and at least one byte of varint
let ms = varint::decode(&read_buffer.as_slice()[1..]);
if ms.1 > 0 {
// varint is all there and parsed correctly
if ms.0 > max_message_size {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "message too large"));
}
message_type = MessageType::from(*read_buffer.get(0).unwrap());
message_size = ms.0 as usize;
header_size = 1 + ms.1;
total_size = header_size + message_size;
if read_buffer.len() < total_size {
read_buffer.resize(((total_size / BUF_CHUNK_SIZE) + 1) * BUF_CHUNK_SIZE, 0);
}
while buffer_fill < total_size {
buffer_fill += reader.read(&mut read_buffer.as_mut_slice()[buffer_fill..]).await?;
}
break;
}
}
}
let message = &read_buffer.as_slice()[header_size..total_size];
let now = self.ms_monotonic();
connection.last_receive_time.store(now, Ordering::Relaxed);
match message_type {
MessageType::Nop => {}
MessageType::Init => {
if init_received {
return Err(std::io::Error::new(std::io::ErrorKind::Other, "duplicate init"));
}
init_received = true;
let msg: msg::Init = decode_msgpack(message)?;
let (anti_loopback_response, domain_challenge_response, auth_challenge_response) = {
let mut info = connection.info.lock().unwrap();
info.name = msg.node_name.to_string();
info.contact = msg.node_contact.to_string();
let _ = msg.explicit_ipv4.map(|pv4| {
info.explicit_addresses
.push(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::from(pv4.ip), pv4.port)));
});
let _ = msg.explicit_ipv6.map(|pv6| {
info.explicit_addresses
.push(SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::from(pv6.ip), pv6.port, 0, 0)));
});
let info = info.clone();
let auth_challenge_response = self.host.authenticate(&info, msg.auth_challenge);
if auth_challenge_response.is_none() {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"authenticate() returned None, connection dropped",
));
}
let auth_challenge_response = auth_challenge_response.unwrap();
(
H::hmac_sha512(&self.anti_loopback_secret, msg.anti_loopback_challenge),
H::hmac_sha512(&H::sha512(&[self.host.node_config().domain.as_bytes()]), msg.domain_challenge),
auth_challenge_response,
)
};
connection
.send_obj(
&mut write_buffer,
MessageType::InitResponse,
&msg::InitResponse {
anti_loopback_response: &anti_loopback_response,
domain_response: &domain_challenge_response,
auth_response: &auth_challenge_response,
},
now,
)
.await?;
}
MessageType::InitResponse => {
let msg: msg::InitResponse = decode_msgpack(message)?;
let mut info = connection.info.lock().unwrap();
if info.initialized {
return Err(std::io::Error::new(std::io::ErrorKind::Other, "duplicate init response"));
}
if msg
.anti_loopback_response
.eq(&H::hmac_sha512(&self.anti_loopback_secret, &anti_loopback_challenge_sent))
{
return Err(std::io::Error::new(std::io::ErrorKind::Other, "rejected connection to self"));
}
if !msg.domain_response.eq(&H::hmac_sha512(
&H::sha512(&[self.host.node_config().domain.as_bytes()]),
&domain_challenge_sent,
)) {
return Err(std::io::Error::new(std::io::ErrorKind::Other, "domain mismatch"));
}
if !self
.host
.authenticate(&info, &auth_challenge_sent)
.map_or(false, |cr| msg.auth_response.eq(&cr))
{
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"challenge/response authentication failed",
));
}
initialized = true;
info.initialized = true;
let info = info.clone(); // also releases lock since info is replaced/destroyed
self.host.on_connect(&info);
}
// Handle messages other than INIT and INIT_RESPONSE after checking 'initialized' flag.
_ => {
if !initialized {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"init exchange must be completed before other messages are sent",
));
}
match message_type {
MessageType::HaveRecords => {}
MessageType::GetRecords => {}
MessageType::Record => {
let key = H::sha512(&[message]);
match self.datastore.store(&key, message).await {
StoreResult::Ok => {
let mut q = self.announce_queue.lock().await;
let ql = q.entry(key).or_insert_with(|| Vec::with_capacity(2));
if !ql.contains(&remote_address) {
ql.push(remote_address.clone());
}
}
StoreResult::Rejected => {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
format!("record rejected by data store: {}", to_hex_string(&key)),
));
}
_ => {}
}
}
MessageType::SyncRequest => {
let msg: msg::SyncRequest = decode_msgpack(message)?;
}
MessageType::Sync => {
let msg: msg::Sync = decode_msgpack(message)?;
}
_ => {}
}
}
}
read_buffer.copy_within(total_size..buffer_fill, 0);
buffer_fill -= total_size;
}
}
}
impl<D: DataStore + 'static, H: Host + 'static> Drop for NodeInternal<D, H> {
fn drop(&mut self) {
let _ = tokio::runtime::Handle::try_current().map_or_else(
|_| {
for (_, c) in self.connections.blocking_lock().drain() {
c.read_task.lock().unwrap().as_mut().map(|c| c.abort());
}
},
|h| {
let _ = h.block_on(async {
for (_, c) in self.connections.lock().await.drain() {
c.read_task.lock().unwrap().as_mut().map(|c| c.abort());
}
});
},
);
}
}
struct Connection {
writer: Mutex<OwnedWriteHalf>,
last_send_time: AtomicI64,
last_receive_time: AtomicI64,
info: std::sync::Mutex<RemoteNodeInfo>,
read_task: std::sync::Mutex<Option<JoinHandle<std::io::Result<()>>>>,
announce_new_records: AtomicBool,
closed: AtomicBool,
}
impl Connection {
async fn send_msg(&self, message_type: MessageType, data: &[u8], now: i64) -> std::io::Result<()> {
let mut header: [u8; 16] = unsafe { MaybeUninit::uninit().assume_init() };
header[0] = message_type as u8;
let header_size = 1 + varint::encode(&mut header[1..], data.len() as u64);
if self
.writer
.lock()
.await
.write_vectored(&[IoSlice::new(&header[0..header_size]), IoSlice::new(data)])
.await?
== (data.len() + header_size)
{
self.last_send_time.store(now, Ordering::Relaxed);
Ok(())
} else {
Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "write error"))
}
}
async fn send_obj<O: Serialize>(&self, write_buf: &mut Vec<u8>, message_type: MessageType, obj: &O, now: i64) -> std::io::Result<()> {
write_buf.clear();
if rmp_serde::encode::write_named(write_buf, obj).is_ok() {
self.send_msg(message_type, write_buf.as_slice(), now).await
} else {
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"serialize failure (internal error)",
))
}
}
}

View file

@ -1,180 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2022 ZeroTier, Inc.
* https://www.zerotier.com/
*/
#[derive(Clone, Copy, Eq, PartialEq)]
#[repr(u8)]
pub enum MessageType {
/// No operation, payload ignored.
Nop = 0_u8,
/// msg::Init (msgpack)
Init = 1_u8,
/// msg::InitResponse (msgpack)
InitResponse = 2_u8,
/// <full record key>[<full record key>...]
HaveRecords = 3_u8,
/// <u8 length of each key prefix in bytes>[<key>...]
GetRecords = 4_u8,
/// <record>
Record = 5_u8,
/// msg::SyncRequest (msgpack)
SyncRequest = 6_u8,
/// msg::Sync (msgpack)
Sync = 7_u8,
}
const MESSAGE_TYPE_MAX: u8 = 7;
impl From<u8> for MessageType {
/// Get a type from a byte, returning the Nop type if the byte is out of range.
#[inline(always)]
fn from(b: u8) -> Self {
if b <= MESSAGE_TYPE_MAX {
unsafe { std::mem::transmute(b) }
} else {
Self::Nop
}
}
}
impl MessageType {
#[allow(unused)]
pub fn name(&self) -> &'static str {
match *self {
Self::Nop => "NOP",
Self::Init => "INIT",
Self::InitResponse => "INIT_RESPONSE",
Self::HaveRecords => "HAVE_RECORDS",
Self::GetRecords => "GET_RECORDS",
Self::Record => "RECORD",
Self::SyncRequest => "SYNC_REQUEST",
Self::Sync => "SYNC",
}
}
}
/// Msgpack serializable message types.
/// Some that are frequently transferred use shortened names to save bandwidth.
pub mod msg {
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct IPv4 {
pub ip: [u8; 4],
pub port: u16,
}
#[derive(Serialize, Deserialize)]
pub struct IPv6 {
pub ip: [u8; 16],
pub port: u16,
}
#[derive(Serialize, Deserialize)]
pub struct Init<'a> {
/// A random challenge to be hashed with a secret to detect and drop connections to self.
#[serde(with = "serde_bytes")]
pub anti_loopback_challenge: &'a [u8],
/// A random challenge for checking the data set domain.
#[serde(with = "serde_bytes")]
pub domain_challenge: &'a [u8],
/// A random challenge for login/authentication.
#[serde(with = "serde_bytes")]
pub auth_challenge: &'a [u8],
/// Optional name to advertise for this node.
pub node_name: &'a str,
/// Optional contact information for this node, such as a URL or an e-mail address.
pub node_contact: &'a str,
/// Port to which this node has locally bound.
/// This is used to try to auto-detect whether a NAT is in the way.
pub locally_bound_port: u16,
/// An IPv4 address where this node can be reached.
/// If both explicit_ipv4 and explicit_ipv6 are omitted the physical source IP:port may be used.
pub explicit_ipv4: Option<IPv4>,
/// An IPv6 address where this node can be reached.
/// If both explicit_ipv4 and explicit_ipv6 are omitted the physical source IP:port may be used.
pub explicit_ipv6: Option<IPv6>,
}
#[derive(Serialize, Deserialize)]
pub struct InitResponse<'a> {
/// HMAC-SHA512(local secret, anti_loopback_challenge) to detect and drop loops.
#[serde(with = "serde_bytes")]
pub anti_loopback_response: &'a [u8],
/// HMAC-SHA512(SHA512(domain), domain_challenge) to check that the data set domain matches.
#[serde(with = "serde_bytes")]
pub domain_response: &'a [u8],
/// HMAC-SHA512(secret, challenge) for authentication. (If auth is not enabled, an all-zero secret is used.)
#[serde(with = "serde_bytes")]
pub auth_response: &'a [u8],
}
#[derive(Serialize, Deserialize)]
pub struct SyncRequest<'a> {
/// Starting range to query, padded with zeroes if shorter than KEY_SIZE.
#[serde(with = "serde_bytes")]
#[serde(rename = "s")]
pub range_start: &'a [u8],
/// Ending range to query, padded with 0xff if shorter than KEY_SIZE.
#[serde(with = "serde_bytes")]
#[serde(rename = "e")]
pub range_end: &'a [u8],
/// Data-store-specific subset selector indicating what subset of items desired
#[serde(with = "serde_bytes")]
#[serde(rename = "q")]
pub subset: Option<&'a [u8]>,
}
#[derive(Serialize, Deserialize)]
pub struct Sync<'a> {
/// Starting range summarized, padded with zeroes if shorter than KEY_SIZE.
#[serde(with = "serde_bytes")]
#[serde(rename = "s")]
pub range_start: &'a [u8],
/// Ending range summarized, padded with 0xff if shorter than KEY_SIZE.
#[serde(with = "serde_bytes")]
#[serde(rename = "e")]
pub range_end: &'a [u8],
/// Data-store-specific subset selector indicating what subset of items were included
#[serde(with = "serde_bytes")]
#[serde(rename = "q")]
pub subset: Option<&'a [u8]>,
/// Number of buckets in IBLT
#[serde(rename = "b")]
pub iblt_buckets: usize,
/// Number of bytes in each IBLT item (key prefix)
#[serde(rename = "l")]
pub iblt_item_bytes: usize,
/// Set summary for keys under prefix within subset
#[serde(with = "serde_bytes")]
#[serde(rename = "i")]
pub iblt: &'a [u8],
}
}

View file

@ -1,129 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2022 ZeroTier, Inc.
* https://www.zerotier.com/
*/
use std::collections::HashMap;
use std::future::Future;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::SystemTime;
use tokio::task::JoinHandle;
/// Get the real time clock in milliseconds since Unix epoch.
pub fn ms_since_epoch() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64
}
/// Encode a byte slice to a hexadecimal string.
pub fn to_hex_string(b: &[u8]) -> String {
const HEX_CHARS: [u8; 16] = [
b'0', b'1', b'2', b'3', b'4', b'5', b'6', b'7', b'8', b'9', b'a', b'b', b'c', b'd', b'e', b'f',
];
let mut s = String::new();
s.reserve(b.len() * 2);
for c in b {
let x = *c as usize;
s.push(HEX_CHARS[x >> 4] as char);
s.push(HEX_CHARS[x & 0xf] as char);
}
s
}
#[inline(always)]
pub fn xorshift64(mut x: u64) -> u64 {
x ^= x.wrapping_shl(13);
x ^= x.wrapping_shr(7);
x ^= x.wrapping_shl(17);
x
}
#[inline(always)]
pub fn splitmix64(mut x: u64) -> u64 {
x ^= x.wrapping_shr(30);
x = x.wrapping_mul(0xbf58476d1ce4e5b9);
x ^= x.wrapping_shr(27);
x = x.wrapping_mul(0x94d049bb133111eb);
x ^= x.wrapping_shr(31);
x
}
/*
#[inline(always)]
pub fn splitmix64_inverse(mut x: u64) -> u64 {
x ^= x.wrapping_shr(31) ^ x.wrapping_shr(62);
x = x.wrapping_mul(0x319642b2d24d8ec3);
x ^= x.wrapping_shr(27) ^ x.wrapping_shr(54);
x = x.wrapping_mul(0x96de1b173f119089);
x ^= x.wrapping_shr(30) ^ x.wrapping_shr(60);
x
}
*/
static mut RANDOM_STATE_0: u64 = 0;
static mut RANDOM_STATE_1: u64 = 0;
/// Get a non-cryptographic pseudorandom number.
pub fn random() -> u64 {
let (mut s0, mut s1) = unsafe { (RANDOM_STATE_0, RANDOM_STATE_1) };
if s0 == 0 {
s0 = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_nanos() as u64;
}
if s1 == 0 {
s1 = splitmix64(std::process::id() as u64);
}
let s1_new = xorshift64(s1);
s0 = splitmix64(s0.wrapping_add(s1));
s1 = s1_new;
unsafe {
RANDOM_STATE_0 = s0;
RANDOM_STATE_1 = s1;
};
s0
}
/// Wrapper for tokio::spawn() that aborts tasks not yet completed when it is dropped.
pub struct AsyncTaskReaper {
ctr: AtomicUsize,
handles: Arc<std::sync::Mutex<HashMap<usize, JoinHandle<()>>>>,
}
impl AsyncTaskReaper {
pub fn new() -> Self {
Self {
ctr: AtomicUsize::new(0),
handles: Arc::new(std::sync::Mutex::new(HashMap::new())),
}
}
/// Spawn a new task.
///
/// Note that currently any task output is ignored. This is for fire and forget
/// background tasks that you want to be collected on loss of scope.
pub fn spawn<F: Future + Send + 'static>(&self, future: F) {
let id = self.ctr.fetch_add(1, Ordering::Relaxed);
let handles = self.handles.clone();
self.handles.lock().unwrap().insert(
id,
tokio::spawn(async move {
let _ = future.await;
let _ = handles.lock().unwrap().remove(&id);
}),
);
}
}
impl Drop for AsyncTaskReaper {
fn drop(&mut self) {
for (_, h) in self.handles.lock().unwrap().iter() {
h.abort();
}
}
}

View file

@ -1,45 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2022 ZeroTier, Inc.
* https://www.zerotier.com/
*/
#[allow(unused)]
pub const VARINT_MAX_SIZE_BYTES: usize = 10;
pub fn encode(b: &mut [u8], mut v: u64) -> usize {
let mut i = 0;
loop {
if v > 0x7f {
b[i] = (v as u8) & 0x7f;
i += 1;
v = v.wrapping_shr(7);
} else {
b[i] = (v as u8) | 0x80;
i += 1;
break;
}
}
i
}
pub fn decode(b: &[u8]) -> (u64, usize) {
let mut v = 0_u64;
let mut pos = 0;
let mut l = 0;
let bl = b.len();
while l < bl {
let x = b[l];
l += 1;
if x <= 0x7f {
v |= (x as u64).wrapping_shl(pos);
pos += 7;
} else {
v |= ((x & 0x7f) as u64).wrapping_shl(pos);
return (v, l);
}
}
return (0, 0);
}

View file

@ -9,10 +9,8 @@ use std::str::FromStr;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use zerotier_utils::base24;
use zerotier_utils::error::InvalidParameterError;
use zerotier_utils::hex;
use zerotier_utils::memory;
use zerotier_utils::{base24, hex, memory};
/// A full (V2) ZeroTier address.
///
@ -102,7 +100,20 @@ impl Borrow<[u8; Self::SIZE_BYTES]> for Address {
impl ToString for Address {
#[inline(always)]
fn to_string(&self) -> String {
base24::encode(&self.0)
let mut s = String::with_capacity(96);
let mut x = 0;
for c in self.0.chunks(4) {
if !s.is_empty() {
if (x & 3) == 0 {
s.push('.');
} else {
s.push('-');
}
}
x += 1;
base24::encode_into(c, &mut s);
}
s
}
}
@ -111,7 +122,15 @@ impl FromStr for Address {
#[inline]
fn from_str(s: &str) -> Result<Self, Self::Err> {
base24::decode(s.as_bytes()).and_then(|b| Self::from_bytes(b.as_slice()))
let mut a = Self([0u8; Self::SIZE_BYTES]);
let mut f = 0;
for ss in s.split(&['-', '.']) {
if ss.len() > 0 {
base24::decode_into_slice(ss.as_bytes(), &mut a.0[f * 4..(f + 1) * 4])?;
f += 1;
}
}
return Ok(a);
}
}
@ -196,11 +215,19 @@ impl PartialAddress {
pub const MIN_SIZE_BYTES: usize = Self::LEGACY_SIZE_BYTES;
pub const MAX_SIZE_BYTES: usize = Address::SIZE_BYTES;
const fn is_valid_specificity(s: u16) -> bool {
match s {
5 | 16 | 32 | 48 => true,
_ => false,
}
}
/// Construct an address from a byte slice with its length determining specificity.
#[inline]
pub fn from_bytes(b: &[u8]) -> Result<Self, InvalidParameterError> {
if b.len() >= Self::MIN_SIZE_BYTES
&& b.len() <= Self::MAX_SIZE_BYTES
&& Self::is_valid_specificity(b.len() as u16)
&& b[0] != Address::RESERVED_PREFIX
&& b[..Self::LEGACY_SIZE_BYTES].iter().any(|i| *i != 0)
{
@ -250,13 +277,13 @@ impl PartialAddress {
#[inline(always)]
pub fn as_bytes(&self) -> &[u8] {
debug_assert!(self.specificity >= Self::MIN_SIZE_BYTES as u16);
debug_assert!(Self::is_valid_specificity(self.specificity));
&self.address.0[..self.specificity as usize]
}
#[inline(always)]
pub(crate) fn legacy_bytes(&self) -> &[u8; 5] {
debug_assert!(self.specificity >= Self::MIN_SIZE_BYTES as u16);
debug_assert!(Self::is_valid_specificity(self.specificity));
memory::array_range::<u8, { Address::SIZE_BYTES }, 0, { PartialAddress::LEGACY_SIZE_BYTES }>(&self.address.0)
}
@ -268,7 +295,7 @@ impl PartialAddress {
/// Returns true if this partial address matches a full length address up to this partial's specificity.
#[inline(always)]
pub fn matches(&self, k: &Address) -> bool {
debug_assert!(self.specificity >= Self::MIN_SIZE_BYTES as u16);
debug_assert!(Self::is_valid_specificity(self.specificity));
let l = self.specificity as usize;
self.address.0[..l].eq(&k.0[..l])
}
@ -276,7 +303,7 @@ impl PartialAddress {
/// Returns true if this partial address matches another up to the lower of the two addresses' specificities.
#[inline(always)]
pub fn matches_partial(&self, k: &PartialAddress) -> bool {
debug_assert!(self.specificity >= Self::MIN_SIZE_BYTES as u16);
debug_assert!(Self::is_valid_specificity(self.specificity));
let l = self.specificity.min(k.specificity) as usize;
self.address.0[..l].eq(&k.address.0[..l])
}
@ -360,7 +387,7 @@ impl PartialAddress {
/// This returns None if there is no match or if this partial matches more than one entry, in which
/// case it's ambiguous and may be unsafe to use. This should be prohibited at other levels of the
/// system but is checked for here as well.
#[inline]
#[inline(always)]
pub fn find_unique_match_mut<'a, T>(&self, map: &'a mut BTreeMap<PartialAddress, T>) -> Option<&'a mut T> {
// This not only saves some repetition but is in fact the only way to easily do this. The same code as
// find_unique_match() but with range_mut() doesn't compile because the second range_mut() would
@ -388,10 +415,25 @@ impl PartialOrd for PartialAddress {
impl ToString for PartialAddress {
fn to_string(&self) -> String {
debug_assert!(Self::is_valid_specificity(self.specificity));
if self.is_legacy() {
hex::to_string(&self.address.0[..Self::LEGACY_SIZE_BYTES])
} else {
base24::encode(self.as_bytes())
let mut s = String::with_capacity(96);
let mut i = 0;
while i < self.specificity {
let ii = i + 4;
if !s.is_empty() {
if (i & 15) == 0 {
s.push('.');
} else {
s.push('-');
}
}
base24::encode_into(&self.address.0[i as usize..ii as usize], &mut s);
i = ii;
}
s
}
}
}
@ -401,9 +443,23 @@ impl FromStr for PartialAddress {
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.len() == 10 {
Self::from_bytes(hex::from_string(s).as_slice())
return Self::from_bytes(hex::from_string(s).as_slice());
} else {
base24::decode(s.as_bytes()).and_then(|b| Self::from_bytes(b.as_slice()))
let mut a = Address([0u8; Address::SIZE_BYTES]);
let mut f = 0;
let mut specificity = 0;
for ss in s.split(&['-', '.']) {
if ss.len() > 0 {
base24::decode_into_slice(ss.as_bytes(), &mut a.0[f * 4..(f + 1) * 4])?;
f += 1;
specificity += 4;
}
}
if Self::is_valid_specificity(specificity) {
return Ok(Self { address: a, specificity });
} else {
return Err(InvalidParameterError("illegal specificity"));
}
}
}
}
@ -477,3 +533,42 @@ impl<'de> Deserialize<'de> for PartialAddress {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use zerotier_crypto::random;
#[test]
fn to_from_string() {
for _ in 0..64 {
let mut tmp = Address::new_uninitialized();
random::fill_bytes_secure(&mut tmp.0);
let s = tmp.to_string();
//println!("{}", s);
let tmp2 = Address::from_str(s.as_str()).unwrap();
assert!(tmp == tmp2);
}
}
#[test]
fn to_from_string_partial() {
let mut tmp = [0u8; Address::SIZE_BYTES];
for _ in 0..64 {
for s in [5, 16, 32, 48] {
random::fill_bytes_secure(&mut tmp);
if tmp[0] == Address::RESERVED_PREFIX {
tmp[0] = 1;
}
if tmp[1] == 0 {
tmp[1] = 1;
}
let partial = PartialAddress::from_bytes(&tmp[..s]).unwrap();
let s = partial.to_string();
//println!("{}", s);
let partial2 = PartialAddress::from_str(s.as_str()).unwrap();
assert!(partial == partial2);
}
}
}
}

View file

@ -17,7 +17,7 @@ use zerotier_crypto::secret::Secret;
use zerotier_crypto::typestate::Valid;
use zerotier_crypto::x25519::*;
use zerotier_utils::arrayvec::ArrayVec;
use zerotier_utils::base64;
use zerotier_utils::base62;
use zerotier_utils::buffer::{Buffer, OutOfBoundsError};
use zerotier_utils::error::InvalidFormatError;
use zerotier_utils::marshalable::{Marshalable, UnmarshalError};
@ -284,17 +284,17 @@ impl ToString for Identity {
let mut s = String::with_capacity(1024);
s.push_str(self.address.to_string().as_str());
s.push_str(":1:");
base64::encode_into(&self.x25519.ecdh, &mut s);
base62::encode_into(&self.x25519.ecdh, &mut s, 0);
s.push(':');
base64::encode_into(&self.x25519.eddsa, &mut s);
base62::encode_into(&self.x25519.eddsa, &mut s, 0);
s.push(':');
base64::encode_into(p384.ecdh.as_bytes(), &mut s);
base62::encode_into(p384.ecdh.as_bytes(), &mut s, 0);
s.push(':');
base64::encode_into(p384.ecdsa.as_bytes(), &mut s);
base62::encode_into(p384.ecdsa.as_bytes(), &mut s, 0);
s.push(':');
base64::encode_into(&p384.ed25519_self_signature, &mut s);
base62::encode_into(&p384.ed25519_self_signature, &mut s, 0);
s.push(':');
base64::encode_into(&p384.p384_self_signature, &mut s);
base62::encode_into(&p384.p384_self_signature, &mut s, 0);
s
} else {
format!(
@ -317,28 +317,16 @@ impl FromStr for Identity {
return Ok(Self {
address: Address::from_str(ss[0]).map_err(|_| InvalidFormatError)?,
x25519: X25519 {
ecdh: base64::decode(ss[2].as_bytes())
.map_err(|_| InvalidFormatError)?
.try_into()
.map_err(|_| InvalidFormatError)?,
eddsa: base64::decode(ss[3].as_bytes())
.map_err(|_| InvalidFormatError)?
.try_into()
.map_err(|_| InvalidFormatError)?,
ecdh: base62::decode(ss[2].as_bytes()).ok_or(InvalidFormatError)?,
eddsa: base62::decode(ss[3].as_bytes()).ok_or(InvalidFormatError)?,
},
p384: Some(P384 {
ecdh: P384PublicKey::from_bytes(base64::decode(ss[4].as_bytes()).map_err(|_| InvalidFormatError)?.as_slice())
ecdh: P384PublicKey::from_bytes(&base62::decode::<P384_PUBLIC_KEY_SIZE>(ss[4].as_bytes()).ok_or(InvalidFormatError)?)
.ok_or(InvalidFormatError)?,
ecdsa: P384PublicKey::from_bytes(base64::decode(ss[5].as_bytes()).map_err(|_| InvalidFormatError)?.as_slice())
ecdsa: P384PublicKey::from_bytes(&base62::decode::<P384_PUBLIC_KEY_SIZE>(ss[5].as_bytes()).ok_or(InvalidFormatError)?)
.ok_or(InvalidFormatError)?,
ed25519_self_signature: base64::decode(ss[6].as_bytes())
.map_err(|_| InvalidFormatError)?
.try_into()
.map_err(|_| InvalidFormatError)?,
p384_self_signature: base64::decode(ss[7].as_bytes())
.map_err(|_| InvalidFormatError)?
.try_into()
.map_err(|_| InvalidFormatError)?,
ed25519_self_signature: base62::decode(ss[6].as_bytes()).ok_or(InvalidFormatError)?,
p384_self_signature: base62::decode(ss[7].as_bytes()).ok_or(InvalidFormatError)?,
}),
});
} else if ss[1] == "0" && ss.len() >= 3 {
@ -495,13 +483,13 @@ impl ToString for IdentitySecret {
let mut s = self.public.to_string();
if let Some(p384) = self.p384.as_ref() {
s.push(':');
base64::encode_into(self.x25519.ecdh.secret_bytes().as_bytes(), &mut s);
base62::encode_into(self.x25519.ecdh.secret_bytes().as_bytes(), &mut s, 0);
s.push(':');
base64::encode_into(self.x25519.eddsa.secret_bytes().as_bytes(), &mut s);
base62::encode_into(self.x25519.eddsa.secret_bytes().as_bytes(), &mut s, 0);
s.push(':');
base64::encode_into(p384.ecdh.secret_key_bytes().as_bytes(), &mut s);
base62::encode_into(p384.ecdh.secret_key_bytes().as_bytes(), &mut s, 0);
s.push(':');
base64::encode_into(p384.ecdsa.secret_key_bytes().as_bytes(), &mut s);
base62::encode_into(p384.ecdsa.secret_key_bytes().as_bytes(), &mut s, 0);
} else {
s.push(':');
s.push_str(hex::to_string(self.x25519.ecdh.secret_bytes().as_bytes()).as_str());
@ -521,22 +509,22 @@ impl FromStr for IdentitySecret {
if ss[1] == "1" && ss.len() >= 12 && public.p384.is_some() {
let x25519_ecdh = X25519KeyPair::from_bytes(
&public.x25519.ecdh,
base64::decode(ss[8].as_bytes()).map_err(|_| InvalidFormatError)?.as_slice(),
&base62::decode::<C25519_SECRET_KEY_SIZE>(ss[8].as_bytes()).ok_or(InvalidFormatError)?,
)
.ok_or(InvalidFormatError)?;
let x25519_eddsa = Ed25519KeyPair::from_bytes(
&public.x25519.ecdh,
base64::decode(ss[9].as_bytes()).map_err(|_| InvalidFormatError)?.as_slice(),
&base62::decode::<C25519_SECRET_KEY_SIZE>(ss[9].as_bytes()).ok_or(InvalidFormatError)?,
)
.ok_or(InvalidFormatError)?;
let p384_ecdh = P384KeyPair::from_bytes(
public.p384.as_ref().unwrap().ecdh.as_bytes(),
base64::decode(ss[10].as_bytes()).map_err(|_| InvalidFormatError)?.as_slice(),
&base62::decode::<P384_SECRET_KEY_SIZE>(ss[10].as_bytes()).ok_or(InvalidFormatError)?,
)
.ok_or(InvalidFormatError)?;
let p384_ecdsa = P384KeyPair::from_bytes(
public.p384.as_ref().unwrap().ecdh.as_bytes(),
base64::decode(ss[11].as_bytes()).map_err(|_| InvalidFormatError)?.as_slice(),
&base62::decode::<P384_SECRET_KEY_SIZE>(ss[11].as_bytes()).ok_or(InvalidFormatError)?,
)
.ok_or(InvalidFormatError)?;
return Ok(Self {

View file

@ -2,6 +2,7 @@
use std::collections::HashMap;
use std::hash::Hash;
use std::io::Write;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex, RwLock};
use std::time::Duration;
@ -22,7 +23,6 @@ use zerotier_crypto::typestate::{Valid, Verified};
use zerotier_utils::gate::IntervalGate;
use zerotier_utils::hex;
use zerotier_utils::marshalable::Marshalable;
use zerotier_utils::tokio::io::AsyncWriteExt;
/// A VL1 node on the ZeroTier global peer to peer network.
///

View file

@ -7,13 +7,12 @@ version = "0.1.0"
[features]
default = []
tokio = ["dep:tokio", "dep:futures-util"]
tokio = ["dep:tokio"]
[dependencies]
serde = { version = "^1", features = ["derive"], default-features = false }
serde_json = { version = "^1", features = ["std"], default-features = false }
tokio = { version = "^1", default-features = false, features = ["fs", "io-util", "io-std", "net", "process", "rt", "rt-multi-thread", "signal", "sync", "time"], optional = true }
futures-util = { version = "^0", optional = true }
[target."cfg(windows)".dependencies]
winapi = { version = "^0", features = ["handleapi", "ws2ipdef", "ws2tcpip"] }

View file

@ -11,7 +11,7 @@ use std::io::Write;
use crate::error::InvalidParameterError;
/// All unambiguous letters, thus easy to type on the alphabetic keyboards on phones without extra shift taps.
/// The letters 'l' and 'v' are skipped.
/// The letters 'l' and 'u' are skipped.
const BASE24_ALPHABET: [u8; 24] = [
b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'm', b'n', b'o', b'p', b'q', b'r', b's', b't', b'v', b'w', b'x', b'y', b'z',
];
@ -66,7 +66,7 @@ fn decode_up_to_u32(s: &[u8]) -> Result<u32, InvalidParameterError> {
}
/// Decode a base24 ASCII slice into bytes (no padding, length determines output length)
pub fn decode_into(s: &[u8], b: &mut Vec<u8>) -> Result<(), InvalidParameterError> {
pub fn decode_into<W: Write>(s: &[u8], b: &mut W) -> Result<(), InvalidParameterError> {
let mut s = s.as_ref();
while s.len() >= 7 {
@ -88,6 +88,11 @@ pub fn decode_into(s: &[u8], b: &mut Vec<u8>) -> Result<(), InvalidParameterErro
return Ok(());
}
#[inline]
pub fn decode_into_slice(s: &[u8], mut b: &mut [u8]) -> Result<(), InvalidParameterError> {
decode_into(s, &mut b)
}
pub fn encode(b: &[u8]) -> String {
let mut tmp = String::with_capacity(((b.len() / 4) * 7) + 2);
encode_into(b, &mut tmp);

187
utils/src/base62.rs Normal file
View file

@ -0,0 +1,187 @@
use std::io::Write;
use super::arrayvec::ArrayVec;
use super::memory;
const MAX_LENGTH_WORDS: usize = 128;
/// Encode a byte array into a base62 string.
///
/// The pad_output_to_length parameter outputs base62 zeroes at the end to ensure that the output
/// string is at least a given length. Set this to zero if you don't want to pad the output. This
/// has no effect on decoded output length.
pub fn encode_into(b: &[u8], s: &mut String, pad_output_to_length: usize) {
assert!(b.len() <= MAX_LENGTH_WORDS * 4);
let mut n: ArrayVec<u32, MAX_LENGTH_WORDS> = ArrayVec::new();
let mut i = 0;
let len_words = b.len() & usize::MAX.wrapping_shl(2);
while i < len_words {
n.push(u32::from_le(memory::load_raw(&b[i..])));
i += 4;
}
if i < b.len() {
let mut w = 0u32;
let mut shift = 0u32;
while i < b.len() {
w |= (b[i] as u32).wrapping_shl(shift);
i += 1;
shift += 8;
}
n.push(w);
}
let mut string_len = 0;
while !n.is_empty() {
s.push(b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"[big_div_rem::<MAX_LENGTH_WORDS, 62>(&mut n) as usize] as char);
string_len += 1;
}
while string_len < pad_output_to_length {
s.push('0');
string_len += 1;
}
}
/// Decode Base62 into a vector or other output.
///
/// Note that base62 doesn't have a way to know the output length. Decoding may be short if there were
/// trailing zeroes in the input. The output length parameter specifies the expected length of the
/// output, which will be zero padded if decoded data does not reach it. If decoded data exceeds this
/// length an error is returned.
pub fn decode_into<W: Write>(s: &[u8], b: &mut W, output_length: usize) -> std::io::Result<()> {
let mut n: ArrayVec<u32, MAX_LENGTH_WORDS> = ArrayVec::new();
for c in s.iter().rev() {
let mut c = *c as u32;
// 0..9, A..Z, or a..z
if c >= 48 && c <= 57 {
c -= 48;
} else if c >= 65 && c <= 90 {
c -= 65 - 10;
} else if c >= 97 && c <= 122 {
c -= 97 - (10 + 26);
} else {
return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid base62"));
}
big_mul::<MAX_LENGTH_WORDS, 62>(&mut n);
big_add(&mut n, c);
}
let mut bc = output_length;
for w in n.iter() {
if bc > 0 {
let l = bc.min(4);
b.write_all(&w.to_le_bytes()[..l])?;
bc -= l;
} else {
return Err(std::io::Error::new(std::io::ErrorKind::Other, "data too large"));
}
}
while bc > 0 {
b.write_all(&[0])?;
bc -= 1;
}
return Ok(());
}
#[inline]
pub fn decode_into_slice(s: &[u8], mut b: &mut [u8]) -> std::io::Result<()> {
let l = b.len();
decode_into(s, &mut b, l)
}
/// Decode into and return an array whose length is the desired output_length.
/// None is returned if there is an error.
#[inline]
pub fn decode<const L: usize>(s: &[u8]) -> Option<[u8; L]> {
let mut buf = [0u8; L];
let mut w = &mut buf[..];
if decode_into(s, &mut w, L).is_ok() {
Some(buf)
} else {
None
}
}
#[inline(always)]
fn big_div_rem<const C: usize, const D: u64>(n: &mut ArrayVec<u32, C>) -> u32 {
while let Some(&0) = n.last() {
n.pop();
}
let mut rem = 0;
for word in n.iter_mut().rev() {
let temp = (rem as u64).wrapping_shl(32) | (*word as u64);
let (a, b) = (temp / D, temp % D);
*word = a as u32;
rem = b as u32;
}
while let Some(&0) = n.last() {
n.pop();
}
rem
}
#[inline(always)]
fn big_add<const C: usize>(n: &mut ArrayVec<u32, C>, i: u32) {
let mut carry = i as u64;
for word in n.iter_mut() {
let res = (*word as u64).wrapping_add(carry);
*word = res as u32;
carry = res.wrapping_shr(32);
}
if carry > 0 {
n.push(carry as u32);
}
}
#[inline(always)]
fn big_mul<const C: usize, const M: u64>(n: &mut ArrayVec<u32, C>) {
while let Some(&0) = n.last() {
n.pop();
}
let mut carry = 0;
for word in n.iter_mut() {
let temp = (*word as u64).wrapping_mul(M).wrapping_add(carry);
*word = temp as u32;
carry = temp.wrapping_shr(32);
}
if carry != 0 {
n.push(carry as u32);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn div_rem() {
let mut n = ArrayVec::<u32, 4>::new();
n.push_slice(&[0xdeadbeef, 0xfeedfeed, 0xcafebabe, 0xf00dd00d]);
let rem = big_div_rem::<4, 63>(&mut n);
let nn = n.as_ref();
assert!(nn[0] == 0xaa23440b && nn[1] == 0xa696103c && nn[2] == 0x89513fea && nn[3] == 0x03cf7514 && rem == 58);
}
#[test]
fn encode_decode() {
let mut test = [0xff; 64];
for tl in 1..64 {
let test = &mut test[..tl];
test.fill(0xff);
let mut b = Vec::with_capacity(1024);
for _ in 0..10 {
let mut s = String::with_capacity(1024);
encode_into(&test, &mut s, 86);
b.clear();
//println!("{}", s);
assert!(decode_into(s.as_bytes(), &mut b, test.len()).is_ok());
assert_eq!(b.as_slice(), test);
for c in test.iter_mut() {
*c = crate::rand() as u8;
}
}
}
}
}

View file

@ -1,115 +0,0 @@
use crate::error::InvalidParameterError;
/// URL-safe base64 alphabet
const ALPHABET: [u8; 64] = *b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_";
const ALPHABET_INV: [u8; 256] = [
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 62, 255, 255, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 255, 255,
255, 255, 255, 255, 255, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 255, 255, 255, 255, 63,
255, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
255, 255, 255, 255, 255, 255, 255, 255, 255, 255,
];
pub fn encode_into(mut b: &[u8], s: &mut String) {
while b.len() >= 3 {
let bits = (b[0] as usize) | (b[1] as usize).wrapping_shl(8) | (b[2] as usize).wrapping_shl(16);
b = &b[3..];
let (i0, i1, i2, i3) = (bits & 63, bits.wrapping_shr(6) & 63, bits.wrapping_shr(12) & 63, bits.wrapping_shr(18));
s.push(ALPHABET[i0] as char);
s.push(ALPHABET[i1] as char);
s.push(ALPHABET[i2] as char);
s.push(ALPHABET[i3] as char);
}
if b.len() == 2 {
let bits = (b[0] as usize) | (b[1] as usize).wrapping_shl(8);
s.push(ALPHABET[bits & 63] as char);
s.push(ALPHABET[bits.wrapping_shr(6) & 63] as char);
s.push(ALPHABET[bits.wrapping_shr(12)] as char);
} else if b.len() == 1 {
let bits = b[0] as usize;
s.push(ALPHABET[bits & 63] as char);
s.push(ALPHABET[bits.wrapping_shr(6)] as char);
}
}
pub fn decode_into(mut s: &[u8], b: &mut Vec<u8>) -> Result<(), InvalidParameterError> {
while s.len() >= 4 {
let (i0, i1, i2, i3) = (
ALPHABET_INV[s[0] as usize],
ALPHABET_INV[s[1] as usize],
ALPHABET_INV[s[2] as usize],
ALPHABET_INV[s[3] as usize],
);
s = &s[4..];
if (i0 | i1 | i2 | i3) > 64 {
return Err(InvalidParameterError("invalid base64 string"));
}
let bits = (i0 as usize) | (i1 as usize).wrapping_shl(6) | (i2 as usize).wrapping_shl(12) | (i3 as usize).wrapping_shl(18);
b.push((bits & 0xff) as u8);
b.push((bits.wrapping_shr(8) & 0xff) as u8);
b.push((bits.wrapping_shr(16) & 0xff) as u8);
}
match s.len() {
1 => return Err(InvalidParameterError("invalid base64 string")),
2 => {
let (i0, i1) = (ALPHABET_INV[s[0] as usize], ALPHABET_INV[s[1] as usize]);
if (i0 | i1) > 64 {
return Err(InvalidParameterError("invalid base64 string"));
}
let bits = (i0 as usize) | (i1 as usize).wrapping_shl(6);
b.push((bits & 0xff) as u8);
}
3 => {
let (i0, i1, i2) = (ALPHABET_INV[s[0] as usize], ALPHABET_INV[s[1] as usize], ALPHABET_INV[s[2] as usize]);
if (i0 | i1 | i2) > 64 {
return Err(InvalidParameterError("invalid base64 string"));
}
let bits = (i0 as usize) | (i1 as usize).wrapping_shl(6) | (i2 as usize).wrapping_shl(12);
b.push((bits & 0xff) as u8);
b.push((bits.wrapping_shr(8) & 0xff) as u8);
}
_ => {}
}
Ok(())
}
pub fn encode(b: &[u8]) -> String {
let mut tmp = String::with_capacity(((b.len() / 3) * 4) + 3);
encode_into(b, &mut tmp);
tmp
}
pub fn decode(s: &[u8]) -> Result<Vec<u8>, InvalidParameterError> {
let mut tmp = Vec::with_capacity(((s.len() / 4) * 3) + 3);
decode_into(s, &mut tmp)?;
Ok(tmp)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn encode_decode() {
let mut tmp = [0xffu8; 256];
for _ in 0..7 {
let mut s = String::with_capacity(1024);
let mut v: Vec<u8> = Vec::with_capacity(256);
for i in 1..256 {
s.clear();
encode_into(&tmp[..i], &mut s);
//println!("{}", s);
v.clear();
decode_into(s.as_str().as_bytes(), &mut v).expect("decode error");
assert!(v.as_slice().eq(&tmp[..i]));
}
for b in tmp.iter_mut() {
*b -= 13;
}
}
}
}

View file

@ -1,62 +0,0 @@
use super::arrayvec::ArrayVec;
fn big_div_rem<const C: usize>(n: &mut ArrayVec<u32, C>, d: u32) -> u32 {
while let Some(&0) = n.last() {
n.pop();
}
let d = d as u64;
let mut rem = 0;
for word in n.iter_mut().rev() {
let temp = (rem as u64).wrapping_shl(32) | (*word as u64);
let (a, b) = (temp / d, temp % d);
*word = a as u32;
rem = b as u32;
}
while let Some(&0) = n.last() {
n.pop();
}
rem
}
fn big_add<const C: usize>(n: &mut ArrayVec<u32, C>, i: u32) {
debug_assert!(i <= (u32::MAX - 1));
debug_assert!(!n.is_empty());
debug_assert!(n.iter().any(|x| *x != 0));
let mut carry = false;
for word in n.iter_mut() {
(*word, carry) = word.overflowing_add(i.wrapping_add(carry as u32));
}
if carry {
n.push(1);
}
}
fn big_mul<const C: usize>(n: &mut ArrayVec<u32, C>, m: u32) {
while let Some(&0) = n.last() {
n.pop();
}
let m = m as u64;
let mut carry = 0;
for word in n.iter_mut() {
let temp = (*word as u64).wrapping_mul(m).wrapping_add(carry);
*word = (temp & 0xffffffff) as u32;
carry = temp.wrapping_shr(32);
}
if carry > 0 {
n.push(carry as u32);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn div_rem() {
let mut n = ArrayVec::<u32, 4>::new();
n.push_slice(&[0xdeadbeef, 0xfeedfeed, 0xcafebabe, 0xf00dd00d]);
let rem = big_div_rem(&mut n, 63);
let nn = n.as_ref();
assert!(nn[0] == 0xaa23440b && nn[1] == 0xa696103c && nn[2] == 0x89513fea && nn[3] == 0x03cf7514 && rem == 58);
}
}

View file

@ -8,7 +8,7 @@
pub mod arrayvec;
pub mod base24;
pub mod base64;
pub mod base62;
pub mod blob;
pub mod buffer;
pub mod cast;
@ -25,7 +25,6 @@ pub mod json;
pub mod marshalable;
pub mod memory;
pub mod pool;
pub mod proquint;
#[cfg(feature = "tokio")]
pub mod reaper;
pub mod ringbuffer;
@ -36,9 +35,6 @@ pub mod varint;
#[cfg(feature = "tokio")]
pub use tokio;
#[cfg(feature = "tokio")]
pub use futures_util;
/// Initial value that should be used for monotonic tick time variables.
pub const NEVER_HAPPENED_TICKS: i64 = i64::MIN;
@ -89,6 +85,11 @@ pub fn wait_for_process_abort() {
#[inline(never)]
pub extern "C" fn unlikely_branch() {}
#[cfg(unix)]
pub fn rand() -> u32 {
unsafe { (libc::rand() as u32) ^ (libc::rand() as u32).wrapping_shr(8) }
}
#[cfg(test)]
mod tests {
use super::ms_monotonic;