Move sync library out of this repo.

This commit is contained in:
Adam Ierymenko 2022-01-14 17:15:01 -05:00
parent 99611f8781
commit a956dbd4be
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
11 changed files with 0 additions and 1214 deletions

View file

@ -1,2 +0,0 @@
/target
Cargo.lock

View file

@ -1,13 +0,0 @@
[package]
name = "allthethings"
version = "0.1.0"
edition = "2021"
[dependencies]
smol = { version = "^1", features = [] }
zerotier-core-crypto = { path = "../zerotier-core-crypto" }
socket2 = "^0"
serde = { version = "^1", features = ["derive"], default-features = false }
rmp-serde = "^0"
serde_json = { version = "^1", features = ["std"], default-features = false }
parking_lot = "^0"

View file

@ -1,36 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2021 ZeroTier, Inc.
* https://www.zerotier.com/
*/
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct Config {
/// Maximum allowed size of a protocol message.
pub max_message_size: usize,
/// TCP port to which this should bind.
pub tcp_port: u16,
/// Connection timeout in seconds.
pub io_timeout: u64,
/// A name for this replicated data set. This is just used to prevent linking to peers replicating different data.
pub domain: String,
}
impl Default for Config {
fn default() -> Self {
Self {
max_message_size: 1024 * 256, // 256KiB
tcp_port: 19993,
io_timeout: 300, // 5 minutes
domain: String::new(),
}
}
}

View file

@ -1,258 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2021 ZeroTier, Inc.
* https://www.zerotier.com/
*/
use std::alloc::{alloc_zeroed, dealloc, Layout};
use std::mem::size_of;
use std::ptr::slice_from_raw_parts;
use crate::IDENTITY_HASH_SIZE;
// The number of indexing sub-hashes to use, must be <= IDENTITY_HASH_SIZE / 8
const KEY_MAPPING_ITERATIONS: usize = 5;
#[cfg(not(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64")))]
#[inline(always)]
fn read_unaligned_u64(i: *const u64) -> u64 {
let mut tmp = 0_u64;
unsafe { copy_nonoverlapping(i.cast::<u8>(), (&mut tmp as *mut u64).cast(), 8) };
tmp
}
#[cfg(any(target_arch = "x86", target_arch = "x86_64", target_arch = "aarch64", target_arch = "powerpc64"))]
#[inline(always)]
fn read_unaligned_u64(i: *const u64) -> u64 { unsafe { *i } }
#[inline(always)]
fn xorshift64(mut x: u64) -> u64 {
x ^= x.wrapping_shl(13);
x ^= x.wrapping_shr(7);
x ^= x.wrapping_shl(17);
x
}
#[repr(C, packed)]
struct IBLTEntry {
key_sum: [u64; IDENTITY_HASH_SIZE / 8],
check_hash_sum: u64,
count: i64,
}
/// An IBLT (invertible bloom lookup table) specialized for reconciling sets of identity hashes.
///
/// This makes some careful use of unsafe as it's heavily optimized. It's a CPU bottleneck when
/// replicating large dynamic data sets.
pub struct IBLT {
map: *mut IBLTEntry,
buckets: usize
}
impl Drop for IBLT {
fn drop(&mut self) {
unsafe { dealloc(self.map.cast(), Layout::from_size_align(size_of::<IBLTEntry>() * self.buckets, 8).unwrap()) };
}
}
impl IBLTEntry {
#[inline(always)]
fn is_singular(&self) -> bool {
if self.count == 1 || self.count == -1 {
u64::from_le(self.key_sum[0]).wrapping_add(xorshift64(u64::from_le(self.key_sum[1]))) == u64::from_le(self.check_hash_sum)
} else {
false
}
}
}
impl IBLT {
/// Construct a new IBLT with a given capacity.
pub fn new(buckets: usize) -> Self {
assert!(buckets > 0 && buckets <= u32::MAX as usize);
Self {
map: unsafe { alloc_zeroed(Layout::from_size_align(size_of::<IBLTEntry>() * buckets, 8).unwrap()).cast() },
buckets,
}
}
#[inline(always)]
pub fn buckets(&self) -> usize { self.buckets }
#[inline(always)]
pub fn as_bytes(&self) -> &[u8] { unsafe { &*slice_from_raw_parts(self.map.cast::<u8>(), size_of::<IBLTEntry>() * self.buckets) } }
fn ins_rem(&mut self, key: &[u8; IDENTITY_HASH_SIZE], delta: i64) {
let key = key.as_ptr().cast::<u64>();
let (k0, k1, k2, k3, k4, k5) = (read_unaligned_u64(key.wrapping_add(0)), read_unaligned_u64(key.wrapping_add(1)), read_unaligned_u64(key.wrapping_add(2)), read_unaligned_u64(key.wrapping_add(3)), read_unaligned_u64(key.wrapping_add(4)), read_unaligned_u64(key.wrapping_add(5)));
let check_hash = u64::from_le(k0).wrapping_add(xorshift64(u64::from_le(k1))).to_le();
for mapping_sub_hash in 0..KEY_MAPPING_ITERATIONS {
let b = unsafe { &mut *self.map.wrapping_add((u64::from_le(read_unaligned_u64(key.wrapping_add(mapping_sub_hash))) as usize) % self.buckets) };
b.key_sum[0] ^= k0;
b.key_sum[1] ^= k1;
b.key_sum[2] ^= k2;
b.key_sum[3] ^= k3;
b.key_sum[4] ^= k4;
b.key_sum[5] ^= k5;
b.check_hash_sum ^= check_hash;
b.count = i64::from_le(b.count).wrapping_add(delta).to_le();
}
}
#[inline(always)]
pub fn insert(&mut self, key: &[u8; IDENTITY_HASH_SIZE]) { self.ins_rem(key, 1); }
#[inline(always)]
pub fn remove(&mut self, key: &[u8; IDENTITY_HASH_SIZE]) { self.ins_rem(key, -1); }
/// Subtract another IBLT from this one to compute set difference.
/// The other may be in the form of a raw byte array or an IBLT, which implements
/// AsRef<[u8]>. It must have the same number of buckets.
pub fn subtract<O: AsRef<[u8]>>(&mut self, other: &O) {
let other_slice = other.as_ref();
if (other_slice.len() / size_of::<IBLTEntry>()) == self.buckets {
let other_map: *const IBLTEntry = other_slice.as_ptr().cast();
for i in 0..self.buckets {
let self_b = unsafe { &mut *self.map.wrapping_add(i) };
let other_b = unsafe { &*other_map.wrapping_add(i) };
self_b.key_sum[0] ^= other_b.key_sum[0];
self_b.key_sum[1] ^= other_b.key_sum[1];
self_b.key_sum[2] ^= other_b.key_sum[2];
self_b.key_sum[3] ^= other_b.key_sum[3];
self_b.key_sum[4] ^= other_b.key_sum[4];
self_b.key_sum[5] ^= other_b.key_sum[5];
self_b.check_hash_sum ^= other_b.check_hash_sum;
self_b.count = i64::from_le(self_b.count).wrapping_sub(i64::from_le(other_b.count)).to_le();
}
}
}
/// Extract every enumerable value from this IBLT.
///
/// This consumes the IBLT instance since listing requires destructive modification
/// of the digest data.
pub fn list<F: FnMut(&[u8; IDENTITY_HASH_SIZE]) -> bool>(self, mut f: F) {
let buckets = self.buckets;
let mut singular_buckets: Vec<u32> = Vec::with_capacity(buckets + 2);
for i in 0..buckets {
unsafe {
if (&*self.map.wrapping_add(i)).is_singular() {
singular_buckets.push(i as u32);
}
}
}
let mut key = [0_u64; IDENTITY_HASH_SIZE / 8];
let mut bucket_ptr = 0;
while bucket_ptr < singular_buckets.len() {
let b = unsafe { &*self.map.wrapping_add(*singular_buckets.get_unchecked(bucket_ptr) as usize) };
bucket_ptr += 1;
if b.is_singular() {
key[0] = b.key_sum[0];
key[1] = b.key_sum[1];
key[2] = b.key_sum[2];
key[3] = b.key_sum[3];
key[4] = b.key_sum[4];
key[5] = b.key_sum[5];
if f(unsafe { &*key.as_ptr().cast::<[u8; IDENTITY_HASH_SIZE]>() }) {
let check_hash = u64::from_le(key[0]).wrapping_add(xorshift64(u64::from_le(key[1]))).to_le();
for mapping_sub_hash in 0..KEY_MAPPING_ITERATIONS {
let bi = (u64::from_le(unsafe { *key.get_unchecked(mapping_sub_hash) }) as usize) % buckets;
let b = unsafe { &mut *self.map.wrapping_add(bi) };
b.key_sum[0] ^= key[0];
b.key_sum[1] ^= key[1];
b.key_sum[2] ^= key[2];
b.key_sum[3] ^= key[3];
b.key_sum[4] ^= key[4];
b.key_sum[5] ^= key[5];
b.check_hash_sum ^= check_hash;
b.count = i64::from_le(b.count).wrapping_sub(1).to_le();
if b.is_singular() {
singular_buckets.push(bi as u32);
}
}
} else {
break;
}
}
}
}
}
impl AsRef<[u8]> for IBLT {
/// Get this IBLT in raw byte array form.
#[inline(always)]
fn as_ref(&self) -> &[u8] { self.as_bytes() }
}
#[cfg(test)]
mod tests {
use zerotier_core_crypto::hash::SHA384;
use crate::iblt::*;
#[test]
fn compiler_behavior() {
// A number of things above like unrolled key XORing must be changed if this size is changed.
assert_eq!(IDENTITY_HASH_SIZE, 48);
assert!(KEY_MAPPING_ITERATIONS <= (IDENTITY_HASH_SIZE / 8) && (IDENTITY_HASH_SIZE % 8) == 0);
// Make sure this packed struct is actually packed.
assert_eq!(size_of::<IBLTEntry>(), IDENTITY_HASH_SIZE + 8 + 8);
}
#[allow(unused_variables)]
#[test]
fn insert_and_list() {
for _ in 0..10 {
for expected_cnt in 0..768 {
let random_u64 = zerotier_core_crypto::random::xorshift64_random();
let mut t = IBLT::new(2048);
for i in 0..expected_cnt {
let k = SHA384::hash(&((i + random_u64) as u64).to_le_bytes());
t.insert(&k);
}
let mut cnt = 0;
t.list(|k| {
cnt += 1;
true
});
assert_eq!(cnt, expected_cnt);
}
}
}
#[allow(unused_variables)]
#[test]
fn set_reconciliation() {
for _ in 0..10 {
let random_u64 = zerotier_core_crypto::random::xorshift64_random();
let mut alice = IBLT::new(2048);
let mut bob = IBLT::new(2048);
let mut alice_total = 0_i32;
let mut bob_total = 0_i32;
for i in 0..1500 {
let k = SHA384::hash(&((i ^ random_u64) as u64).to_le_bytes());
if (k[0] & 1) == 1 {
alice.insert(&k);
alice_total += 1;
}
if (k[0] & 3) == 2 {
bob.insert(&k);
bob_total += 1;
}
}
alice.subtract(&bob);
let mut diff_total = 0_i32;
alice.list(|k| {
diff_total += 1;
true
});
// This is a probabilistic process so we tolerate a little bit of failure. The idea is that each
// pass reconciles more and more differences.
assert!(((alice_total + bob_total) - diff_total).abs() <= 128);
}
}
}

View file

@ -1,42 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2021 ZeroTier, Inc.
* https://www.zerotier.com/
*/
mod store;
mod protocol;
mod varint;
mod memorystore;
mod iblt;
mod config;
mod link;
mod node;
pub fn ms_since_epoch() -> u64 {
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as u64
}
pub fn ms_monotonic() -> u64 {
std::time::Instant::now().elapsed().as_millis() as u64
}
#[inline(always)]
pub(crate) async fn io_timeout<T, F: smol::future::Future<Output = smol::io::Result<T>>>(d: std::time::Duration, f: F) -> smol::io::Result<T> {
smol::future::or(f, async {
let _ = smol::Timer::after(d).await;
Err(smol::io::Error::new(smol::io::ErrorKind::TimedOut, "I/O timeout"))
}).await
}
/// SHA384 is the hash currently used. Others could be supported in the future.
/// If this size changes check iblt.rs for a few things that must be changed. This
/// is checked in "cargo test."
pub const IDENTITY_HASH_SIZE: usize = 48;
pub use config::Config;
pub use store::{Store, StorePutResult};
pub use memorystore::MemoryStore;
pub use node::Node;

View file

@ -1,346 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2021 ZeroTier, Inc.
* https://www.zerotier.com/
*/
use std::mem::MaybeUninit;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::Duration;
use serde::{Deserialize, Serialize};
use smol::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
use smol::lock::Mutex;
use smol::net::TcpStream;
use zerotier_core_crypto::gmac::GMACStream;
use zerotier_core_crypto::hash::SHA384;
use zerotier_core_crypto::kbkdf::zt_kbkdf_hmac_sha384;
use zerotier_core_crypto::p521::{P521KeyPair, P521PublicKey};
use zerotier_core_crypto::secret::Secret;
use crate::{Config, IDENTITY_HASH_SIZE, io_timeout, Store, varint};
use crate::protocol::*;
struct Output {
stream: BufWriter<TcpStream>,
gmac: Option<GMACStream>,
}
pub(crate) struct Link<'e, S: Store + 'static> {
pub connect_time: u64,
io_timeout: Duration,
node_secret: &'e P521KeyPair,
config: &'e Config,
store: &'e S,
remote_node_id: parking_lot::Mutex<Option<[u8; 48]>>,
reader: Mutex<BufReader<TcpStream>>,
writer: Mutex<Output>,
keepalive_period: u64,
last_send_time: AtomicU64,
max_message_size: usize,
authenticated: AtomicBool,
}
#[inline(always)]
fn decode_msgpack<'de, T: Deserialize<'de>>(data: &'de [u8]) -> smol::io::Result<T> {
rmp_serde::from_read_ref(data).map_err(|_| smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid msgpack data"))
}
#[inline(always)]
fn next_id_hash_in_slice(bytes: &[u8]) -> smol::io::Result<&[u8; IDENTITY_HASH_SIZE]> {
if bytes.len() >= IDENTITY_HASH_SIZE {
Ok(unsafe { &*bytes.as_ptr().cast::<[u8; IDENTITY_HASH_SIZE]>() })
} else {
Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid identity hash"))
}
}
impl<'e, S: Store + 'static> Link<'e, S> {
pub fn new(stream: TcpStream, node_secret: &'e P521KeyPair, config: &'e Config, store: &'e S) -> Self {
let _ = stream.set_nodelay(false);
let max_message_size = HELLO_SIZE_MAX.max(config.max_message_size);
let now_monotonic = store.monotonic_clock();
Self {
connect_time: now_monotonic,
io_timeout: Duration::from_secs(config.io_timeout),
node_secret,
config,
store,
remote_node_id: parking_lot::Mutex::new(None),
reader: Mutex::new(BufReader::with_capacity(65536, stream.clone())),
writer: Mutex::new(Output {
stream: BufWriter::with_capacity(max_message_size + 16, stream),
gmac: None
}),
keepalive_period: (config.io_timeout * 1000) / 2,
last_send_time: AtomicU64::new(now_monotonic),
max_message_size,
authenticated: AtomicBool::new(false),
}
}
/// Get the remote node ID, which is SHA384(its long-term public keys).
/// Returns None if the remote node has not yet responded with HelloAck and been verified.
pub fn remote_node_id(&self) -> Option<[u8; 48]> { self.remote_node_id.lock().clone() }
pub(crate) async fn do_periodic_tasks(&self, now_monotonic: u64) -> smol::io::Result<()> {
if now_monotonic.saturating_sub(self.last_send_time.load(Ordering::Relaxed)) >= self.keepalive_period && self.authenticated.load(Ordering::Relaxed) {
let timeout = Duration::from_secs(1);
let mut writer = self.writer.lock().await;
io_timeout(timeout, writer.stream.write_all(&[MESSAGE_TYPE_KEEPALIVE])).await?;
io_timeout(timeout, writer.stream.flush()).await?;
self.last_send_time.store(now_monotonic, Ordering::Relaxed);
}
Ok(())
}
async fn write_message(&self, message_type: u8, message: &[&[u8]]) -> smol::io::Result<()> {
let mut writer = self.writer.lock().await;
if writer.gmac.is_some() {
let mut mac: [u8; 16] = unsafe { MaybeUninit::uninit().assume_init() };
let mt = [message_type];
let gmac = writer.gmac.as_mut().unwrap();
gmac.init_for_next_message();
gmac.update(&mt);
let mut total_length = 0_usize;
for m in message.iter() {
total_length += (*m).len();
gmac.update(*m);
}
gmac.finish(&mut mac);
io_timeout(self.io_timeout, writer.stream.write_all(&mt)).await?;
io_timeout(self.io_timeout, varint::async_write(&mut writer.stream, total_length as u64)).await?;
for m in message.iter() {
io_timeout(self.io_timeout, writer.stream.write_all(*m)).await?;
}
io_timeout(self.io_timeout, writer.stream.write_all(&mac)).await?;
io_timeout(self.io_timeout, writer.stream.flush()).await
} else {
Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "link negotiation is not complete"))
}
}
async fn write_message_msgpack<T: Serialize>(&self, serialize_buf: &mut Vec<u8>, message_type: u8, message: &T) -> smol::io::Result<()> {
serialize_buf.clear();
rmp_serde::encode::write(serialize_buf, message).map_err(|_| smol::io::Error::new(smol::io::ErrorKind::InvalidData, "msgpack encode failure"))?;
self.write_message(message_type, &[serialize_buf.as_slice()]).await
}
pub(crate) async fn io_main(&self) -> smol::io::Result<()> {
// Reader is held here for the duration of the link's I/O loop.
let mut reader_mg = self.reader.lock().await;
let reader = &mut *reader_mg;
let mut read_buf: Vec<u8> = Vec::new();
read_buf.resize(self.max_message_size, 0);
let mut tmp_buf: Vec<u8> = Vec::with_capacity(4096);
// (1) Send Hello and save the nonce and the hash of the raw Hello message for later HelloAck HMAC check.
let mut outgoing_nonce = [0_u8; HELLO_NONCE_SIZE];
zerotier_core_crypto::random::fill_bytes_secure(&mut outgoing_nonce);
let ephemeral_secret = P521KeyPair::generate(true).unwrap();
let sent_hello_hash = {
tmp_buf.clear();
let _ = rmp_serde::encode::write(&mut tmp_buf, &Hello {
protocol_version: PROTOCOL_VERSION,
flags: 0,
clock: self.store.clock(),
domain: self.config.domain.as_str(),
nonce: &outgoing_nonce,
p521_ecdh_ephemeral_key: ephemeral_secret.public_key_bytes(),
p521_ecdh_node_key: self.node_secret.public_key_bytes(),
}).unwrap();
let mut writer = self.writer.lock().await;
io_timeout(self.io_timeout, varint::async_write(&mut writer.stream, tmp_buf.len() as u64)).await?;
io_timeout(self.io_timeout, writer.stream.write_all(tmp_buf.as_slice())).await?;
io_timeout(self.io_timeout, writer.stream.flush()).await?;
drop(writer);
SHA384::hash(tmp_buf.as_slice())
};
self.last_send_time.store(self.store.monotonic_clock(), Ordering::Relaxed);
// (2) Read other side's HELLO and send ACK. Also do key agreement, initialize GMAC, etc.
let message_size = io_timeout(self.io_timeout, varint::async_read(reader)).await? as usize;
if message_size > HELLO_SIZE_MAX {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message too large"));
}
let (mut gmac_receive, ack_key, remote_node_id) = {
let hello_buf = &mut read_buf.as_mut_slice()[0..message_size];
io_timeout(self.io_timeout, reader.read_exact(hello_buf)).await?;
let received_hello_hash = SHA384::hash(hello_buf); // for ACK generation
let hello: Hello = decode_msgpack(hello_buf)?;
if hello.nonce.len() < HELLO_NONCE_SIZE || hello.protocol_version != PROTOCOL_VERSION {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid HELLO parameters"));
}
let remote_node_public_key = P521PublicKey::from_bytes(hello.p521_ecdh_node_key);
let remote_ephemeral_public_key = P521PublicKey::from_bytes(hello.p521_ecdh_ephemeral_key);
if remote_node_public_key.is_none() || remote_ephemeral_public_key.is_none() {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid public key in HELLO"));
}
let node_shared_key = self.node_secret.agree(&remote_node_public_key.unwrap());
let ephemeral_shared_key = ephemeral_secret.agree(&remote_ephemeral_public_key.unwrap());
if node_shared_key.is_none() || ephemeral_shared_key.is_none() {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "key agreement failed"));
}
let shared_base_key = Secret(SHA384::hmac(&SHA384::hash(ephemeral_shared_key.unwrap().as_bytes()), node_shared_key.unwrap().as_bytes()));
let shared_gmac_base_key = zt_kbkdf_hmac_sha384(shared_base_key.as_bytes(), KBKDF_LABEL_GMAC, 0, 0);
let gmac_receive_key = Secret(SHA384::hmac(&hello.nonce[0..48], &shared_gmac_base_key.0));
let gmac_send_key = Secret(SHA384::hmac(&outgoing_nonce[0..48], &shared_gmac_base_key.0));
let gmac_receive = GMACStream::new(&gmac_receive_key.0[0..32], &hello.nonce[48..64]);
self.writer.lock().await.gmac.replace(GMACStream::new(&gmac_send_key.0[0..32], &outgoing_nonce[48..64]));
let shared_ack_key = zt_kbkdf_hmac_sha384(shared_base_key.as_bytes(), KBKDF_LABEL_HELLO_ACK_HMAC, 0, 0);
let ack_hmac = SHA384::hmac(shared_ack_key.as_bytes(), &received_hello_hash);
self.write_message_msgpack(&mut tmp_buf, MESSAGE_TYPE_HELLO_ACK, &HelloAck {
ack: &ack_hmac,
clock_echo: hello.clock
}).await?;
(gmac_receive, shared_ack_key, SHA384::hash(hello.p521_ecdh_node_key))
};
self.last_send_time.store(self.store.monotonic_clock(), Ordering::Relaxed);
// Done with ephemeral secret key, so forget it.
drop(ephemeral_secret);
// (3) Start primary I/O loop and initially listen for HelloAck to confirm the other side's node identity.
let mut received_mac_buf = [0_u8; 16];
let mut expected_mac_buf = [0_u8; 16];
let mut message_type_buf = [0_u8; 1];
let mut authenticated = false;
loop {
io_timeout(self.io_timeout, reader.read_exact(&mut message_type_buf)).await?;
if message_type_buf[0] != MESSAGE_TYPE_KEEPALIVE {
let message_size = io_timeout(self.io_timeout, varint::async_read(reader)).await? as usize;
if message_size > self.max_message_size {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message too large"));
}
let message_buf = &mut read_buf.as_mut_slice()[0..message_size];
io_timeout(self.io_timeout, reader.read_exact(message_buf)).await?;
io_timeout(self.io_timeout, reader.read_exact(&mut received_mac_buf)).await?;
gmac_receive.init_for_next_message();
gmac_receive.update(&message_type_buf);
gmac_receive.update(message_buf);
gmac_receive.finish(&mut expected_mac_buf);
if !received_mac_buf.eq(&expected_mac_buf) {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message authentication failed"));
}
if authenticated {
match message_type_buf[0] {
MESSAGE_TYPE_OBJECTS => self.do_objects(message_buf).await?,
MESSAGE_TYPE_HAVE_OBJECTS => self.do_have_objects(&mut tmp_buf, message_buf).await?,
MESSAGE_TYPE_WANT_OBJECTS => self.do_want_objects(message_buf).await?,
MESSAGE_TYPE_STATE => self.do_sync_request(decode_msgpack(message_buf)?).await?,
MESSAGE_TYPE_IBLT_SYNC_DIGEST => self.do_iblt_sync_digest(decode_msgpack(message_buf)?).await?,
_ => {},
}
} else {
if message_type_buf[0] == MESSAGE_TYPE_HELLO_ACK {
let hello_ack: HelloAck = decode_msgpack(message_buf)?;
let expected_ack_hmac = SHA384::hmac(ack_key.as_bytes(), &sent_hello_hash);
if !hello_ack.ack.eq(&expected_ack_hmac) {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "session authentication failed"));
}
authenticated = true;
let _ = self.remote_node_id.lock().replace(remote_node_id.clone());
self.authenticated.store(true, Ordering::Relaxed);
} else {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "message received before session authenticated"));
}
}
}
}
}
async fn do_objects(&self, mut msg: &[u8]) -> smol::io::Result<()> {
while !msg.is_empty() {
let obj_size = varint::async_read(&mut msg).await? as usize;
if obj_size >= msg.len() {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "object incomplete"));
}
let obj = &msg[0..obj_size];
msg = &msg[obj_size..];
match self.store.put(&SHA384::hash(obj), obj) {
crate::StorePutResult::Invalid => {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "invalid object in stream"));
},
_ => {}
}
}
Ok(())
}
async fn do_have_objects(&self, tmp_buf: &mut Vec<u8>, mut msg: &[u8]) -> smol::io::Result<()> {
tmp_buf.clear();
varint::write(tmp_buf, self.store.clock());
let empty_tmp_buf_size = tmp_buf.len();
while !msg.is_empty() {
let id_hash = next_id_hash_in_slice(msg)?;
msg = &msg[IDENTITY_HASH_SIZE..];
if !self.store.have(id_hash) {
let _ = tmp_buf.write_all(id_hash);
}
}
if tmp_buf.len() != empty_tmp_buf_size {
self.write_message(MESSAGE_TYPE_WANT_OBJECTS, &[tmp_buf.as_slice()]).await
} else {
Ok(())
}
}
async fn do_want_objects(&self, mut msg: &[u8]) -> smol::io::Result<()> {
let ref_time = varint::read(&mut msg);
if !ref_time.is_err() {
return Err(smol::io::Error::new(smol::io::ErrorKind::InvalidData, "object incomplete"));
}
let ref_time = ref_time.unwrap().0;
let mut objects: Vec<S::Object> = Vec::with_capacity((msg.len() / IDENTITY_HASH_SIZE) + 1);
while !msg.is_empty() {
let id_hash = next_id_hash_in_slice(msg)?;
msg = &msg[IDENTITY_HASH_SIZE..];
let _ = self.store.get(ref_time, id_hash).map(|obj| objects.push(obj));
}
if !objects.is_empty() {
let mut sizes: Vec<varint::Encoded> = Vec::with_capacity(objects.len());
let mut slices: Vec<&[u8]> = Vec::with_capacity(objects.len() * 2);
for o in objects.iter() {
sizes.push(varint::Encoded::from(o.as_ref().len() as u64));
}
for i in 0..objects.len() {
slices.push(sizes.get(i).unwrap().as_ref());
slices.push(objects.get(i).unwrap().as_ref());
}
self.write_message(MESSAGE_TYPE_OBJECTS, slices.as_slice()).await
} else {
Ok(())
}
}
async fn do_sync_request(&self, sr: State<'_>) -> smol::io::Result<()> {
Ok(())
}
async fn do_iblt_sync_digest(&self, sd: IBLTSyncDigest<'_>) -> smol::io::Result<()> {
Ok(())
}
}

View file

@ -1,99 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2021 ZeroTier, Inc.
* https://www.zerotier.com/
*/
use std::collections::Bound::Included;
use std::collections::BTreeMap;
use std::sync::Arc;
use parking_lot::Mutex;
use crate::{IDENTITY_HASH_SIZE, ms_monotonic, ms_since_epoch, Store, StorePutResult};
/// A Store that stores all objects in memory, mostly for testing.
pub struct MemoryStore(Mutex<BTreeMap<[u8; IDENTITY_HASH_SIZE], (Arc<[u8]>, u64)>>);
impl MemoryStore {
pub fn new() -> Self { Self(Mutex::new(BTreeMap::new())) }
}
impl Store for MemoryStore {
type Object = Arc<[u8]>;
#[inline(always)]
fn clock(&self) -> u64 { ms_since_epoch() }
#[inline(always)]
fn monotonic_clock(&self) -> u64 { ms_monotonic() }
fn get(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> Option<Self::Object> {
self.0.lock().get(identity_hash).and_then(|o| {
if (*o).1 <= reference_time {
Some((*o).0.clone())
} else {
None
}
})
}
fn put(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult {
let mut result = StorePutResult::Duplicate;
let _ = self.0.lock().entry(identity_hash.clone()).or_insert_with(|| {
result = StorePutResult::Ok;
(object.to_vec().into(), ms_since_epoch())
});
result
}
fn have(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool {
self.0.lock().contains_key(identity_hash)
}
fn total_count(&self, reference_time: u64) -> Option<u64> {
let mut tc = 0_u64;
for e in self.0.lock().iter() {
tc += ((*e.1).1 <= reference_time) as u64;
}
Some(tc)
}
fn for_each<F: FnMut(&[u8], &Arc<[u8]>) -> bool>(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE], mut f: F) {
let mut tmp: Vec<([u8; IDENTITY_HASH_SIZE], Arc<[u8]>)> = Vec::with_capacity(1024);
for e in self.0.lock().range((Included(*start), Included(*end))).into_iter() {
if (*e.1).1 <= reference_time {
tmp.push((e.0.clone(), (*e.1).0.clone()));
}
}
for e in tmp.iter() {
if !f(&(*e).0, &(*e).1) {
break;
}
}
}
fn for_each_identity_hash<F: FnMut(&[u8]) -> bool>(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE], mut f: F) {
let mut tmp: Vec<[u8; IDENTITY_HASH_SIZE]> = Vec::with_capacity(1024);
for e in self.0.lock().range((Included(*start), Included(*end))).into_iter() {
if (*e.1).1 <= reference_time {
tmp.push(e.0.clone());
}
}
for e in tmp.iter() {
if !f(e) {
break;
}
}
}
fn count(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE]) -> Option<u64> {
let mut tc = 0_u64;
for e in self.0.lock().range((Included(*start), Included(*end))).into_iter() {
tc += ((*e.1).1 <= reference_time) as u64;
}
Some(tc)
}
}

View file

@ -1,170 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2021 ZeroTier, Inc.
* https://www.zerotier.com/
*/
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::sync::{Arc, Weak};
use std::time::Duration;
use smol::{Executor, Task, Timer};
use smol::lock::Mutex;
use smol::net::SocketAddr;
use zerotier_core_crypto::p521::P521KeyPair;
use crate::{Config, Store};
use crate::link::Link;
struct NodeIntl<'e, S: Store + 'static> {
config: &'e Config,
secret: &'e P521KeyPair,
store: &'e S,
executor: &'e Executor<'e>,
connections: Mutex<HashMap<SocketAddr, (Weak<Link<'e, S>>, Task<()>)>>
}
pub struct Node<'e, S: Store + 'static> {
daemon_tasks: Vec<Task<()>>,
intl: Weak<NodeIntl<'e, S>>
}
impl<'e, S: Store + 'static> Node<'e, S> {
pub fn new(config: &'e Config, secret: &'e P521KeyPair, store: &'e S, executor: &'e Executor<'e>) -> Result<Self, Box<dyn Error>> {
let listener_v4 = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).and_then(|v4| {
let _ = v4.set_reuse_address(true);
let _ = v4.bind(&socket2::SockAddr::from(std::net::SocketAddrV4::new(std::net::Ipv4Addr::UNSPECIFIED, config.tcp_port)))?;
let _ = v4.listen(64);
Ok(v4)
});
let listener_v6 = socket2::Socket::new(socket2::Domain::IPV6, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).and_then(|v6| {
let _ = v6.set_only_v6(true);
let _ = v6.set_reuse_address(true);
let _ = v6.bind(&socket2::SockAddr::from(std::net::SocketAddrV6::new(std::net::Ipv6Addr::UNSPECIFIED, config.tcp_port, 0, 0)))?;
let _ = v6.listen(64);
Ok(v6)
});
if listener_v4.is_err() && listener_v6.is_err() {
return Err(Box::new(listener_v4.unwrap_err()));
}
let ni = Arc::new(NodeIntl {
config,
secret,
store,
executor,
connections: Mutex::new(HashMap::with_capacity(64)),
});
let mut n = Self {
daemon_tasks: Vec::with_capacity(3),
intl: Arc::downgrade(&ni)
};
if listener_v4.is_ok() {
let listener_v4 = listener_v4.unwrap();
let ni2 = ni.clone();
n.daemon_tasks.push(executor.spawn(async move { ni2.tcp_listener_main(smol::net::TcpListener::try_from(std::net::TcpListener::from(listener_v4)).unwrap()).await }));
}
if listener_v6.is_ok() {
let listener_v6 = listener_v6.unwrap();
let ni2 = ni.clone();
n.daemon_tasks.push(executor.spawn(async move { ni2.tcp_listener_main(smol::net::TcpListener::try_from(std::net::TcpListener::from(listener_v6)).unwrap()).await }));
}
let ni2 = ni.clone();
n.daemon_tasks.push(executor.spawn(async move { ni2.background_task_main().await }));
Ok(n)
}
}
impl<'e, S: Store + 'static> NodeIntl<'e, S> {
async fn background_task_main(&self) {
let io_timeout_ms = self.config.io_timeout * 1000;
let delay = Duration::from_secs(10);
loop {
Timer::after(delay).await;
let mut connections = self.connections.lock().await;
let (done_sender, done_receiver) = smol::channel::bounded::<()>(16);
let done_sender = Arc::new(done_sender);
let to_erase: Arc<Mutex<Vec<SocketAddr>>> = Arc::new(Mutex::new(Vec::new()));
let mut tasks: Vec<Task<()>> = Vec::with_capacity(connections.len());
// Search for connections that are dead, have timed out during negotiation, or
// that are duplicates of another connection to the same remote node.
let have_node_ids: Arc<Mutex<HashSet<[u8; 48]>>> = Arc::new(Mutex::new(HashSet::with_capacity(connections.len())));
let now_monotonic = self.store.monotonic_clock();
for c in connections.iter() {
let l = c.1.0.upgrade();
if l.is_some() {
let l = l.unwrap();
let remote_node_id = l.remote_node_id();
if remote_node_id.is_some() {
let remote_node_id = remote_node_id.unwrap();
if !have_node_ids.lock().await.contains(&remote_node_id) {
let a = c.0.clone();
let hn = have_node_ids.clone();
let te = to_erase.clone();
let ds = done_sender.clone();
tasks.push(self.executor.spawn(async move {
if l.do_periodic_tasks(now_monotonic).await.is_ok() {
if !hn.lock().await.insert(remote_node_id) {
// This is a redudant link to the same remote node.
te.lock().await.push(a);
}
} else {
// A fatal error occurred while servicing the connection.
te.lock().await.push(a);
}
let _ = ds.send(()).await;
}));
} else {
// This is a redudant link to the same remote node.
to_erase.lock().await.push(c.0.clone());
}
} else if (now_monotonic - l.connect_time) > io_timeout_ms {
// Link negotiation timed out if we aren't connected yet.
to_erase.lock().await.push(c.0.clone());
}
} else {
// Connection is closed and has released its internally held Arc<>.
to_erase.lock().await.push(c.0.clone());
}
}
// Wait for a message on the channel from each task indicating that it is complete.
for _ in 0..tasks.len() {
let _ = done_receiver.recv().await;
}
// Close and erase all connections slated for cleanup.
for e in to_erase.lock().await.iter() {
let _ = connections.remove(e);
}
}
}
async fn tcp_listener_main(&self, listener: smol::net::TcpListener) {
loop {
let c = listener.accept().await;
if c.is_ok() {
let (connection, remote_address) = c.unwrap();
let l = Arc::new(Link::<'e, S>::new(connection, self.secret, self.config, self.store));
self.connections.lock().await.insert(remote_address.clone(), (Arc::downgrade(&l), self.executor.spawn(async move {
let _ = l.io_main().await;
// Arc<Link> is now released, causing Weak<Link> to go null and then causing this
// entry to be removed from the connection map on the next background task sweep.
})));
} else {
break;
}
}
}
}

View file

@ -1,146 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2021 ZeroTier, Inc.
* https://www.zerotier.com/
*/
/*
* Wire protocol notes:
*
* Messages are prefixed by a type byte followed by a message size in the form
* of a variable length integer (varint). Each message is followed by a
* 16-byte GMAC message authentication code.
*
* HELLO is an exception. It's sent on connect, is prefixed only by a varint
* size, and is not followed by a MAC. Instead HelloAck is sent after it
* containing a full HMAC ACK for key negotiation.
*
* GMAC is keyed using a KBKDF-derived key from a shared key currently made
* with HKDF as HMAC(SHA384(ephemeral key), node key). The first 32 bytes of
* the key are the GMAC key while the nonce is the first 96 bytes of
* the nonce where this 96-bit integer is incremented (as little-endian)
* for each message sent. Increment should wrap at 2^96. The connection should
* close after no more than 2^96 messages, but that's a crazy long session
* anyway.
*
* The wire protocol is only authenticated to prevent network level attacks.
* Data is not encrypted since this is typically used to replicate a public
* "well known" data set and encryption would add needless overhead.
*/
use serde::{Deserialize, Serialize};
/// KBKDF label for the HMAC in HelloAck.
pub const KBKDF_LABEL_HELLO_ACK_HMAC: u8 = b'A';
/// KBKDF label for GMAC key derived from main key.
pub const KBKDF_LABEL_GMAC: u8 = b'G';
/// Sanity limit on the size of HELLO.
pub const HELLO_SIZE_MAX: usize = 4096;
/// Size of nonce sent with HELLO.
pub const HELLO_NONCE_SIZE: usize = 64;
/// Overall protocol version.
pub const PROTOCOL_VERSION: u16 = 1;
/// No operation, no payload, sent without size or MAC.
/// This is a special single byte message used for connection keepalive.
pub const MESSAGE_TYPE_KEEPALIVE: u8 = 0;
/// Acknowledgement of HELLO.
pub const MESSAGE_TYPE_HELLO_ACK: u8 = 1;
/// A series of objects with each prefixed by a varint size.
pub const MESSAGE_TYPE_OBJECTS: u8 = 2;
/// A series of identity hashes concatenated together advertising objects we have.
pub const MESSAGE_TYPE_HAVE_OBJECTS: u8 = 3;
/// A series of identity hashes concatenated together of objects being requested.
pub const MESSAGE_TYPE_WANT_OBJECTS: u8 = 4;
/// Report state, requesting possible sync response.
pub const MESSAGE_TYPE_STATE: u8 = 5;
/// IBLT sync digest, payload is IBLTSyncDigest.
pub const MESSAGE_TYPE_IBLT_SYNC_DIGEST: u8 = 6;
/// Initial message sent by both sides on TCP connection establishment.
/// This is sent with no type or message authentication code and is only
/// sent on connect. It is prefixed by a varint size.
#[derive(Deserialize, Serialize)]
pub struct Hello<'a> {
/// Local value of PROTOCOL_VERSION.
pub protocol_version: u16,
/// Flags, currently unused and always zero.
pub flags: u64,
/// Local clock in milliseconds since Unix epoch.
pub clock: u64,
/// The data set name ("domain") to which this node belongs.
pub domain: &'a str,
/// Random nonce, must be at least 64 bytes in length.
pub nonce: &'a [u8],
/// Random ephemeral ECDH session key.
pub p521_ecdh_ephemeral_key: &'a [u8],
/// Long-lived node-identifying ECDH public key.
pub p521_ecdh_node_key: &'a [u8],
}
/// Sent in response to Hello and contains an acknowledgement HMAC for the shared key.
#[derive(Deserialize, Serialize)]
pub struct HelloAck<'a> {
/// HMAC-SHA384(KBKDF(ack key, KBKDF_LABEL_HELLO_ACK_HMAC), SHA384(original raw Hello))
pub ack: &'a [u8],
/// Value of clock in original hello, for measuring latency.
pub clock_echo: u64,
}
/// Report the state of the sender's data set.
///
/// The peer may respond in one of three ways:
///
/// (1) It may send an IBLTSyncDigest over a range of identity hashes of its
/// choice so that the requesting node may compute a difference and request
/// objects it does not have.
///
/// (2) It may send HAVE_OBJECTS with a simple list of objects.
///
/// (3) It may simply send a batch of objects.
///
/// (4) It may not respond at all.
///
/// Which option is chosen is up to the responding node and should be chosen
/// via a heuristic to maximize sync efficiency and minimize sync time.
///
/// A central assumption is that identity hashes are uniformly distributed
/// since they are cryptographic hashes (currently SHA-384). This allows a
/// simple calculation to be made with the sending node's total count to
/// estimate set difference density across the entire hash range.
#[derive(Deserialize, Serialize)]
pub struct State<'a> {
/// Total number of hashes in the entire data set.
pub total_count: u64,
/// Our clock to use as a reference time for filtering the data set (if applicable).
pub reference_time: u64,
}
/// An IBLT digest of identity hashes over a range.
#[derive(Deserialize, Serialize)]
pub struct IBLTSyncDigest<'a> {
/// Start of range. Right-pad with zeroes if too short.
pub range_start: &'a [u8],
/// End of range. Right-pad with zeroes if too short.
pub range_end: &'a [u8],
/// IBLT digest of hashes in this range.
pub iblt: &'a [u8],
/// Number of hashes in this range.
pub count: u64,
/// Total number of hashes in entire data set.
pub total_count: u64,
/// Reference time from SyncRequest or 0 if this is being sent synthetically.
pub reference_time: u64,
}

View file

@ -1,57 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2021 ZeroTier, Inc.
* https://www.zerotier.com/
*/
use crate::IDENTITY_HASH_SIZE;
/// Result returned by Store::put().
pub enum StorePutResult {
/// Datum stored successfully.
Ok,
/// Datum is one we already have.
Duplicate,
/// Value is invalid. (this may result in dropping connections to peers, etc.)
Invalid,
/// Value is not invalid but it was not added to the data store for some neutral reason.
Ignored,
}
/// Trait that must be implemented by the data store that is to be replicated.
pub trait Store: Sync + Send {
/// Type returned by get(), which can be anything that contains a byte slice.
type Object: AsRef<[u8]> + Send;
/// Get the current wall time in milliseconds since Unix epoch.
fn clock(&self) -> u64;
/// Get the number of milliseconds that have elapsed since some arbitrary event in the past (like system boot).
fn monotonic_clock(&self) -> u64;
/// Get an object from the database.
fn get(&self, reference_time: u64, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> Option<Self::Object>;
/// Store an entry in the database.
fn put(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE], object: &[u8]) -> StorePutResult;
/// Check if we have an object by its identity hash.
fn have(&self, identity_hash: &[u8; IDENTITY_HASH_SIZE]) -> bool;
/// Get the total count of objects.
fn total_count(&self, reference_time: u64) -> Option<u64>;
/// Iterate over a range of identity hashes and values.
/// This calls the supplied function for each object. If the function returns false iteration stops.
fn for_each<F: FnMut(&[u8], &Self::Object) -> bool>(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE], f: F);
/// Iterate over a range of identity hashes.
/// This calls the supplied function for each hash. If the function returns false iteration stops.
fn for_each_identity_hash<F: FnMut(&[u8]) -> bool>(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE], f: F);
/// Count the number of identity hash keys in this range (inclusive) of identity hashes.
/// This may return None if an error occurs, but should return 0 if the set is empty.
fn count(&self, reference_time: u64, start: &[u8; IDENTITY_HASH_SIZE], end: &[u8; IDENTITY_HASH_SIZE]) -> Option<u64>;
}

View file

@ -1,45 +0,0 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*
* (c)2021 ZeroTier, Inc.
* https://www.zerotier.com/
*/
use smol::io::{AsyncReadExt, AsyncWrite, AsyncRead, AsyncWriteExt};
pub use zerotier_core_crypto::varint::*;
pub async fn async_write<W: AsyncWrite + Unpin>(w: &mut W, mut v: u64) -> smol::io::Result<()> {
let mut b = [0_u8; 10];
let mut i = 0;
loop {
if v > 0x7f {
b[i] = (v as u8) & 0x7f;
i += 1;
v = v.wrapping_shr(7);
} else {
b[i] = (v as u8) | 0x80;
i += 1;
break;
}
}
w.write_all(&b[0..i]).await
}
pub async fn async_read<R: AsyncRead + Unpin>(r: &mut R) -> smol::io::Result<u64> {
let mut v = 0_u64;
let mut buf = [0_u8; 1];
let mut pos = 0;
loop {
let _ = r.read_exact(&mut buf).await?;
let b = buf[0];
if b <= 0x7f {
v |= (b as u64).wrapping_shl(pos);
pos += 7;
} else {
v |= ((b & 0x7f) as u64).wrapping_shl(pos);
return Ok(v);
}
}
}