From 0d67fcee9215a149143118baab866f0f0451c5c9 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Thu, 16 Dec 2021 23:08:37 -0500 Subject: [PATCH] It compiles. --- allthethings/src/lib.rs | 13 ++ allthethings/src/protocol.rs | 22 ++- allthethings/src/replicator.rs | 321 +++++++++++++++++++++------------ allthethings/src/store.rs | 64 +++---- allthethings/src/varint.rs | 10 +- 5 files changed, 258 insertions(+), 172 deletions(-) diff --git a/allthethings/src/lib.rs b/allthethings/src/lib.rs index 69cae67a7..95a3651a0 100644 --- a/allthethings/src/lib.rs +++ b/allthethings/src/lib.rs @@ -1,3 +1,11 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + * + * (c)2021 ZeroTier, Inc. + * https://www.zerotier.com/ + */ + mod store; mod replicator; mod protocol; @@ -10,3 +18,8 @@ pub(crate) fn ms_since_epoch() -> u64 { pub(crate) fn ms_monotonic() -> u64 { std::time::Instant::now().elapsed().as_millis() as u64 } + +pub const IDENTITY_HASH_SIZE: usize = 48; + +pub use store::{Store, StoreObjectResult}; +pub use replicator::{Replicator, Config}; diff --git a/allthethings/src/protocol.rs b/allthethings/src/protocol.rs index a8be3b11f..077be4621 100644 --- a/allthethings/src/protocol.rs +++ b/allthethings/src/protocol.rs @@ -1,15 +1,17 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + * + * (c)2021 ZeroTier, Inc. + * https://www.zerotier.com/ + */ + pub(crate) const PROTOCOL_VERSION: u8 = 1; -/// No operation and no payload, sent as a heartbeat. -/// This is the only message type NOT followed by a message size varint. It's just one byte. pub(crate) const MESSAGE_TYPE_NOP: u8 = 0; - -/// An object either sent in response to a query or because it is new. -/// Payload is simply the object. The hash is not included as we compute it locally for security. -pub(crate) const MESSAGE_TYPE_OBJECT: u8 = 1; - -/// Request one or more objects by identity hash with optional common prefix. -pub(crate) const MESSAGE_TYPE_GET_OBJECTS: u8 = 2; +pub(crate) const MESSAGE_TYPE_HAVE_NEW_OBJECT: u8 = 1; +pub(crate) const MESSAGE_TYPE_OBJECT: u8 = 2; +pub(crate) const MESSAGE_TYPE_GET_OBJECTS: u8 = 3; /// HELLO message, which is all u8's and is packed and so can be parsed directly in place. /// This message is sent at the start of any connection by both sides. @@ -20,7 +22,7 @@ pub(crate) struct Hello { pub flags: [u8; 4], // u32, little endian pub clock: [u8; 8], // u64, little endian pub data_set_size: [u8; 8], // u64, little endian - pub domain_hash: [u8; 32], + pub domain_hash: [u8; 48], pub instance_id: [u8; 16], pub loopback_check_code_salt: [u8; 8], pub loopback_check_code: [u8; 16], diff --git a/allthethings/src/replicator.rs b/allthethings/src/replicator.rs index 1dc947ae6..e72b15eab 100644 --- a/allthethings/src/replicator.rs +++ b/allthethings/src/replicator.rs @@ -1,30 +1,55 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + * + * (c)2021 ZeroTier, Inc. + * https://www.zerotier.com/ + */ + use std::collections::HashMap; use std::convert::TryInto; -use std::future::Future; +use std::error::Error; use std::hash::{Hash, Hasher}; -use std::io::Read; +use std::marker::PhantomData; use std::mem::{size_of, transmute}; use std::sync::Arc; -use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; use getrandom::getrandom; -use sha2::{Digest, Sha256}; -use sha2::digest::{FixedOutput, Reset, Update}; +use sha2::{Digest, Sha384}; use smol::{Executor, Task, Timer}; -use smol::future; -use smol::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}; +use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use smol::lock::Mutex; use smol::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6, TcpListener, TcpStream, SocketAddr}; use smol::stream::StreamExt; -use crate::{ms_monotonic, ms_since_epoch, protocol}; -use crate::store::Store; +use crate::{IDENTITY_HASH_SIZE, ms_monotonic, ms_since_epoch, protocol}; +use crate::store::{StoreObjectResult, Store}; use crate::varint; -const CONNECTION_TIMEOUT_SECONDS: u64 = 30; +const CONNECTION_TIMEOUT_SECONDS: u64 = 60; +const CONNECTION_SYNC_RESTART_TIMEOUT_SECONDS: u64 = 5; + +static mut XORSHIFT64_STATE: u64 = 0; + +/// Get a non-cryptographic random number. +fn xorshift64_random() -> u64 { + let mut x = unsafe { XORSHIFT64_STATE }; + x ^= x.wrapping_shl(13); + x ^= x.wrapping_shr(7); + x ^= x.wrapping_shl(17); + unsafe { XORSHIFT64_STATE = x }; + x +} pub struct Config { + /// Number of P2P connections desired. + pub target_link_count: usize, + + /// Maximum allowed size of an object. + pub max_object_size: usize, + /// TCP port to which this should bind. pub tcp_port: u16, @@ -41,39 +66,49 @@ struct ConnectionKey { impl Hash for ConnectionKey { #[inline(always)] fn hash(&self, state: &mut H) { - state.write(&self.instance_id); + self.instance_id.hash(state); self.ip.hash(state); } } -struct ReplicatorImpl { - instance_id: [u8; 16], - loopback_check_code_secret: [u8; 16], - domain_hash: [u8; 32], - store: Arc, - config: Config, - connections: Mutex)>>, +struct Connection { + remote_address: SocketAddr, + last_receive: Arc, + task: Task<()> } -pub struct Replicator { - state: Arc>, +struct ReplicatorImpl<'ex> { + executor: Arc>, + instance_id: [u8; 16], + loopback_check_code_secret: [u8; 16], + domain_hash: [u8; 48], + store: Arc, + config: Config, + connections: Mutex>, + announced_objects_requested: Mutex>, +} + +pub struct Replicator<'ex> { v4_listener_task: Option>, v6_listener_task: Option>, service_task: Task<()>, + _marker: PhantomData>, } -impl Replicator { +impl<'ex> Replicator<'ex> { /// Create a new replicator to replicate the contents of the provided store. - /// All async tasks, sockets, and connections will be dropped if the replicator is dropped. Use - /// the shutdown() method for a graceful shutdown. - pub async fn start(executor: Arc, store: Arc, config: Config) -> Result> { + /// All async tasks, sockets, and connections will be dropped if the replicator is dropped. + pub async fn start(executor: &Arc>, store: Arc, config: Config) -> Result, Box> { + let _ = unsafe { getrandom(&mut *(&mut XORSHIFT64_STATE as *mut u64).cast::<[u8; 8]>()) }; + let listener_v4 = TcpListener::bind(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, config.tcp_port)).await; let listener_v6 = TcpListener::bind(SocketAddrV6::new(Ipv6Addr::UNSPECIFIED, config.tcp_port, 0, 0)).await; if listener_v4.is_err() && listener_v6.is_err() { return Err(Box::new(listener_v4.unwrap_err())); } - let r = Arc::new(ReplicatorImpl:: { + let r = Arc::new(ReplicatorImpl::<'ex> { + executor: executor.clone(), instance_id: { let mut tmp = [0_u8; 16]; getrandom(&mut tmp).expect("getrandom failed"); @@ -85,98 +120,67 @@ impl Replicator { tmp }, domain_hash: { - let mut h = Sha256::new(); + let mut h = Sha384::new(); h.update(config.domain.as_bytes()); - h.finalize_fixed().into() + h.finalize().as_ref().try_into().unwrap() }, config, store, connections: Mutex::new(HashMap::new()), + announced_objects_requested: Mutex::new(HashMap::new()) }); - let (e0, e1) = (executor.clone(), executor.clone()); Ok(Self { - state: r, - v4_listener_task: listener_v4.map_or(None, |listener_v4| Some(executor.spawn(r.listener_task_main(listener_v4, e0)))), - v6_listener_task: listener_v6.map_or(None, |listener_v6| Some(executor.spawn(r.listener_task_main(listener_v6, e1)))), - service_task: executor.spawn(r.service_main(executor.clone())), + v4_listener_task: listener_v4.map_or(None, |listener_v4| Some(executor.spawn(r.clone().listener_task_main(listener_v4)))), + v6_listener_task: listener_v6.map_or(None, |listener_v6| Some(executor.spawn(r.clone().listener_task_main(listener_v6)))), + service_task: executor.spawn(r.service_main()), + _marker: PhantomData::default(), }) } - - pub async fn shutdown(self) { - // Get a joined future including our service task and one or both listeners. There is always - // at least one listener. If there are no listeners this is a bug and will panic. - let main_tasks = self.v4_listener_task.map_or_else(|| { - future::zip(self.service_task.cancel(), self.v6_listener_task.unwrap().cancel()) - }, |v4| { - self.v6_listener_task.map_or_else(|| { - future::zip(self.service_task.cancel(), v4.cancel()) - }, |v6| { - future::zip(self.service_task.cancel(), future::zip(v4.cancel(), v6.cancel())) - }) - }); - - // Just dropping all connections is fine. - self.state.connections.lock().await.clear(); - - // Then gracefully wait for the main tasks to finish. - let _ = main_tasks.await; - } } -impl ReplicatorImpl { - async fn service_main(&self, executor: Arc) { - let mut timer = smol::Timer::interval(Duration::from_secs(1)); - let mut to_close: Vec = Vec::new(); +unsafe impl<'ex> Send for Replicator<'ex> {} + +unsafe impl<'ex> Sync for Replicator<'ex> {} + +impl<'ex> ReplicatorImpl<'ex> { + async fn service_main(self: Arc>) { + let mut timer = smol::Timer::interval(Duration::from_secs(5)); loop { timer.next().await; - - let mut connections = self.connections.lock().await; - let now_mt = ms_monotonic(); - for cc in connections.iter_mut() { - let c = &(*cc.1).0; - if c.closed.load(Ordering::Relaxed) || (now_mt - c.last_receive_time.load(Ordering::Relaxed)) > CONNECTION_TIMEOUT_SECONDS { - to_close.push(cc.0.clone()); - } - } - - for tc in to_close.iter() { - let _ = connections.remove(tc); - } - to_close.clear(); - - drop(connections); + self.announced_objects_requested.lock().await.retain(|_, ts| now_mt.saturating_sub(*ts) < (CONNECTION_TIMEOUT_SECONDS * 1000)); + self.connections.lock().await.retain(|_, c| (now_mt.saturating_sub(c.last_receive.load(Ordering::Relaxed))) < (CONNECTION_TIMEOUT_SECONDS * 1000)); } } - async fn listener_task_main(&self, listener: TcpListener, executor: Arc) { + async fn listener_task_main(self: Arc>, listener: TcpListener) { loop { let stream = listener.accept().await; if stream.is_ok() { - let (mut stream, remote_address) = stream.unwrap(); - self.handle_new_connection(stream, remote_address, false, executor.clone()).await; + let (stream, remote_address) = stream.unwrap(); + self.handle_new_connection(stream, remote_address, false).await; } } } - async fn handle_new_connection(&self, mut stream: TcpStream, remote_address: SocketAddr, outgoing: bool, executor: Arc) { + async fn handle_new_connection(self: &Arc>, mut stream: TcpStream, remote_address: SocketAddr, outgoing: bool) { stream.set_nodelay(true); let mut loopback_check_code_salt = [0_u8; 8]; - getrandom(&mut tmp).expect("getrandom failed"); + getrandom(&mut loopback_check_code_salt).expect("getrandom failed"); - let mut h = Sha256::new(); + let mut h = Sha384::new(); h.update(&loopback_check_code_salt); h.update(&self.loopback_check_code_secret); - let loopback_check_code: [u8; 32] = h.finalize_fixed().into(); + let loopback_check_code: [u8; 48] = h.finalize().as_ref().try_into().unwrap(); let hello = protocol::Hello { hello_size: size_of::() as u8, protocol_version: protocol::PROTOCOL_VERSION, flags: [0_u8; 4], clock: ms_since_epoch().to_le_bytes(), - data_set_size: self.store.total_size().await.to_le_bytes(), + data_set_size: self.store.total_size().to_le_bytes(), domain_hash: self.domain_hash.clone(), instance_id: self.instance_id.clone(), loopback_check_code_salt, @@ -193,10 +197,10 @@ impl ReplicatorImpl { if hello.hello_size == size_of::() as u8 && hello.protocol_version == protocol::PROTOCOL_VERSION { // If this hash's first 16 bytes are equal to the one in the HELLO, this connection is // from this node and should be dropped. - let mut h = Sha256::new(); + let mut h = Sha384::new(); h.update(&hello.loopback_check_code_salt); h.update(&self.loopback_check_code_secret); - let loopback_if_equal: [u8; 32] = h.finalize_fixed().into(); + let loopback_if_equal: [u8; 48] = h.finalize().as_ref().try_into().unwrap(); if !loopback_if_equal[0..16].eq(&hello.loopback_check_code) { let k = ConnectionKey { @@ -206,7 +210,12 @@ impl ReplicatorImpl { let mut connections = self.connections.lock().await; let _ = connections.entry(k).or_insert_with(move || { stream.set_nodelay(false); - (remote_address.clone(), executor.spawn(self.connection_io_task_main(stream, remote_address, false, executor.clone()))) + let last_receive = Arc::new(AtomicU64::new(ms_monotonic())); + Connection { + remote_address, + last_receive: last_receive.clone(), + task: self.executor.spawn(self.clone().connection_io_task_main(stream, hello.instance_id, last_receive)) + } }); } } @@ -214,60 +223,134 @@ impl ReplicatorImpl { } } - async fn connection_sync_init_task_main(&self, writer: Arc>) { - let mut periodic_timer = Timer::interval(Duration::from_secs(1)); - loop { - let _ = periodic_timer.next().await; - } - } - - async fn connection_io_task_main(&self, mut stream: TcpStream, remote_address: SocketAddr, outgoing: bool, executor: Arc) { - let mut reader = BufReader::with_capacity(S::MAX_OBJECT_SIZE * 2, stream.clone()); + async fn connection_io_task_main(self: Arc>, stream: TcpStream, remote_instance_id: [u8; 16], last_receive: Arc) { + let mut reader = BufReader::with_capacity(65536, stream.clone()); let writer = Arc::new(Mutex::new(stream)); - let _sync_search_init_task = executor.spawn(self.connection_sync_init_task_main(writer.clone())); + let writer2 = writer.clone(); + let _sync_search_init_task = self.executor.spawn(async { + let writer = writer2; + let mut periodic_timer = Timer::interval(Duration::from_secs(1)); + loop { + let _ = periodic_timer.next().await; + } + }); - let mut tmp = [0_u8; 4096]; + let mut tmp_mem = Vec::new(); + tmp_mem.resize(self.config.max_object_size, 0); + let tmp = tmp_mem.as_mut_slice(); 'main_io_loop: loop { if reader.read_exact(&mut tmp[0..1]).await.is_err() { break 'main_io_loop; } let message_type = tmp[0]; - if message_type == protocol::MESSAGE_TYPE_NOP { - continue 'main_io_loop; - } - - let message_size = varint::async_read(&mut reader).await; - if message_size.is_err() { - break 'main_io_loop; - } - let mut message_size = message_size.unwrap(); - - if message_size > S::MAX_OBJECT_SIZE as u64 { - break 'main_io_loop; - } + last_receive.store(ms_monotonic(), Ordering::Relaxed); match message_type { + protocol::MESSAGE_TYPE_NOP => {}, + + protocol::MESSAGE_TYPE_HAVE_NEW_OBJECT => { + if reader.read_exact(&mut tmp[0..IDENTITY_HASH_SIZE]).await.is_err() { + break 'main_io_loop; + } + let identity_hash: [u8; 48] = (&tmp[0..IDENTITY_HASH_SIZE]).try_into().unwrap(); + let mut announced_objects_requested = self.announced_objects_requested.lock().await; + if !announced_objects_requested.contains_key(&identity_hash) && !self.store.have(&identity_hash) { + announced_objects_requested.insert(identity_hash.clone(), ms_monotonic()); + drop(announced_objects_requested); // release mutex + + tmp[0] = protocol::MESSAGE_TYPE_GET_OBJECTS; + tmp[1] = 0; + tmp[2] = varint::ONE; + tmp[3..(3 + IDENTITY_HASH_SIZE)].copy_from_slice(&identity_hash); + if !writer.lock().await.write_all(&tmp).await.is_err() { + break 'main_io_loop; + } + } + } + protocol::MESSAGE_TYPE_OBJECT => { + let object_size = varint::async_read(&mut reader).await; + if object_size.is_err() { + break 'main_io_loop; + } + let object_size = object_size.unwrap(); + if object_size > self.config.max_object_size as u64 { + break 'main_io_loop; + } + + let object = &mut tmp[0..(object_size as usize)]; + if reader.read_exact(object).await.is_err() { + break 'main_io_loop; + } + + let identity_hash: [u8; 48] = Sha384::digest(object.as_ref()).as_ref().try_into().unwrap(); + match self.store.put(&identity_hash, object) { + StoreObjectResult::Invalid => { + break 'main_io_loop; + }, + StoreObjectResult::Ok | StoreObjectResult::Duplicate => { + if self.announced_objects_requested.lock().await.remove(&identity_hash).is_some() { + // TODO: propagate rumor if we requested this object in response to a HAVE message. + } + }, + _ => { + let _ = self.announced_objects_requested.lock().await.remove(&identity_hash); + } + } }, + protocol::MESSAGE_TYPE_GET_OBJECTS => { + // Read common prefix if the requester is requesting a set of hashes with the same beginning. + // A common prefix length of zero means they're requesting by full hash. + if reader.read_exact(&mut tmp[0..1]).await.is_err() { + break 'main_io_loop; + } + let common_prefix_length = tmp[0] as usize; + if common_prefix_length >= IDENTITY_HASH_SIZE { + break 'main_io_loop; + } + if reader.read_exact(&mut tmp[0..common_prefix_length]).await.is_err() { + break 'main_io_loop; + } + + // Get the number of hashes being requested. + let hash_count = varint::async_read(&mut reader).await; + if hash_count.is_err() { + break 'main_io_loop; + } + let hash_count = hash_count.unwrap(); + + // Step through each suffix of the common prefix and send the object if found. + for _ in 0..hash_count { + if reader.read_exact(&mut tmp[common_prefix_length..IDENTITY_HASH_SIZE]).await.is_err() { + break 'main_io_loop; + } + let identity_hash: [u8; IDENTITY_HASH_SIZE] = (&tmp[0..IDENTITY_HASH_SIZE]).try_into().unwrap(); + let object = self.store.get(&identity_hash); + if object.is_some() { + let object2 = object.unwrap(); + let object = object2.as_slice(); + let mut w = writer.lock().await; + if varint::async_write(&mut *w, object.len() as u64).await.is_err() { + break 'main_io_loop; + } + if w.write_all(object).await.is_err() { + break 'main_io_loop; + } + } + } }, + _ => { - // Skip the bodies of unrecognized message types. - while message_size >= tmp.len() as u64 { - if reader.read_exact(&tmp).await.is_err() { - break 'main_io_loop; - } - message_size -= tmp.len() as u64; - } - if message_size > 0 { - if reader.read_exact(&mut tmp[0..(message_size as usize)]).await.is_err() { - break 'main_io_loop; - } - } + break 'main_io_loop; } } } } } + +unsafe impl<'ex> Send for ReplicatorImpl<'ex> {} + +unsafe impl<'ex> Sync for ReplicatorImpl<'ex> {} diff --git a/allthethings/src/store.rs b/allthethings/src/store.rs index bf61897b4..bc487e60d 100644 --- a/allthethings/src/store.rs +++ b/allthethings/src/store.rs @@ -1,68 +1,50 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at https://mozilla.org/MPL/2.0/. + * + * (c)2021 ZeroTier, Inc. + * https://www.zerotier.com/ + */ + use smol::net::SocketAddr; +use crate::IDENTITY_HASH_SIZE; + /// Result code from the put() method in Database. -pub enum PutObjectResult { +pub enum StoreObjectResult { /// Datum stored successfully. Ok, /// Datum is one we already have. Duplicate, /// Value is invalid. (this may result in dropping connections to peers, etc.) Invalid, + /// Value is not invalid but it was not added to the data store for some neutral reason. + Ignored, } /// Trait that must be implemented for the data store that is to be replicated. -/// -/// Each datum is identified by an identity hash, which is a cryptographic hash of HASH_SIZE -/// bytes of its value. The implementation assumes that's what it is, but the hash function -/// is not specified in this library. pub trait Store: Sync + Send { - /// The size in bytes of the identity hash. - const HASH_SIZE: usize; - - /// The maximum size of the objects supported by this store (and thus replication domain). - const MAX_OBJECT_SIZE: usize; - - /// Object type returned by get(), must implement AsRef<[u8]>. - type GetOutput: AsRef<[u8]>; - - /// Compute a hash of a data object using the hash associated with this store. - /// This returns the identity hash which can then be used as a key with get(), put(), etc. - fn hash(&self, object: &[u8]) -> [u8; Self::HASH_SIZE]; - /// Get the total size of this data set in objects. - async fn total_size(&self) -> u64; + fn total_size(&self) -> u64; - /// Get an object from the database, returning None if it is not found or there is an error. - async fn get(&self, identity_hash: &[u8; Self::HASH_SIZE]) -> Option; + /// Get an object from the database. + fn get(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> Option>; /// Store an entry in the database. - async fn put(&self, identity_hash: &[u8; Self::HASH_SIZE], object: &[u8]) -> PutObjectResult; + fn put(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StoreObjectResult; + + /// Check if we have an object by its identity hash. + fn have(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool; /// Count the number of identity hash keys in this range (inclusive) of identity hashes. /// This may return None if an error occurs, but should return 0 if the set is empty. - async fn count(&self, start: &[u8; Self::HASH_SIZE], end: &[u8; Self::HASH_SIZE]) -> Option; + fn count(&self, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE]) -> Option; /// Called when a connection to a remote node was successful. /// This is always called on successful outbound connect. - async fn save_remote_endpoint(&self, to_address: &SocketAddr); + fn save_remote_endpoint(&self, to_address: &SocketAddr); /// Get a remote endpoint to try. /// This can return endpoints in any order and is used to try to establish outbound links. - async fn get_remote_endpoint(&self) -> Option; - - /* - /// Execute a function for every hash/value in a range. - /// Iteration stops if the supplied function returns false. - async fn for_each_entry(&self, start: &[u8; Self::HASH_SIZE], end: &[u8; Self::HASH_SIZE], function: F) - where - F: Fn(&[u8; Self::HASH_SIZE], &[u8]) -> FF, - FF: Future; - - /// Execute a function for every hash in a range. - /// Iteration stops if the supplied function returns false. - async fn for_each_hash_key(&self, start: &[u8; Self::HASH_SIZE], end: &[u8; Self::HASH_SIZE], function: F) - where - F: Fn(&[u8; Self::HASH_SIZE], &[u8]) -> FF, - FF: Future; - */ + fn get_remote_endpoint(&self) -> Option; } diff --git a/allthethings/src/varint.rs b/allthethings/src/varint.rs index 8992d7ddd..c96500754 100644 --- a/allthethings/src/varint.rs +++ b/allthethings/src/varint.rs @@ -8,7 +8,13 @@ use smol::io::{AsyncReadExt, AsyncWrite, AsyncRead, AsyncWriteExt}; -pub async fn async_write(w: &mut W, mut v: u64) -> std::io::Result<()> { +/// Byte that can be written for a zero varint. +pub const ZERO: u8 = 0x80; + +/// Byte that can be written for a varint of 1. +pub const ONE: u8 = 0x81; + +pub async fn async_write(w: &mut W, mut v: u64) -> std::io::Result<()> { let mut b = [0_u8; 10]; let mut i = 0; loop { @@ -25,7 +31,7 @@ pub async fn async_write(w: &mut W, mut v: u64) -> std::io::Resul w.write_all(&b[0..i]).await } -pub async fn async_read(r: &mut R) -> std::io::Result { +pub async fn async_read(r: &mut R) -> std::io::Result { let mut v = 0_u64; let mut buf = [0_u8; 1]; let mut pos = 0;