This commit is contained in:
Adam Ierymenko 2020-01-20 21:05:29 -08:00
parent 73b23f1b16
commit 0c58901469
No known key found for this signature in database
GPG key ID: C8877CF2D7A5D7F3
7 changed files with 160 additions and 149 deletions

View file

@ -57,8 +57,8 @@ Commands:
set [option] [value] Get or set a service config option set [option] [value] Get or set a service config option
phy <IP/bits> blacklist <boolean> Set or clear blacklist for CIDR phy <IP/bits> blacklist <boolean> Set or clear blacklist for CIDR
phy <IP/bits> trust <path ID/0> Set or clear trusted path ID for CIDR phy <IP/bits> trust <path ID/0> Set or clear trusted path ID for CIDR
* port <port> Set primary port for P2P links port <port> Set primary port for P2P links
* secondaryport <port/0> Set secondary P2P port (0 disables) secondaryport <port/0> Set secondary P2P port (0 disables)
portsearch <boolean> Enable/disable port search on startup portsearch <boolean> Enable/disable port search on startup
portmapping <boolean> Enable/disable use of uPnP/NAT-PMP portmapping <boolean> Enable/disable use of uPnP/NAT-PMP
identity <command> [args] Identity management commands identity <command> [args] Identity management commands
@ -73,8 +73,7 @@ This is typically run from launchd (Mac), systemd or init (Linux), etc.
If 'set' is followed by a 16-digit hex number it will get/set network config If 'set' is followed by a 16-digit hex number it will get/set network config
options. Otherwise it will get/set service options. Run with no arguments to options. Otherwise it will get/set service options. Run with no arguments to
see all options. Settings with a '*' alongside require a service restart. see all options.
A few rarely used options require manual editing of local.conf and restart.
An identity can be specified as a file or directly. This is auto-detected. An identity can be specified as a file or directly. This is auto-detected.

View file

@ -37,7 +37,6 @@ func Status(basePath, authToken string, args []string, jsonOutput bool) {
fmt.Printf("\tports:\tprimary: %d secondary: %d\n", status.Config.Settings.PrimaryPort, status.Config.Settings.SecondaryPort) fmt.Printf("\tports:\tprimary: %d secondary: %d\n", status.Config.Settings.PrimaryPort, status.Config.Settings.SecondaryPort)
fmt.Printf("\tport search:\t%s\n", enabledDisabled(status.Config.Settings.PortSearch)) fmt.Printf("\tport search:\t%s\n", enabledDisabled(status.Config.Settings.PortSearch))
fmt.Printf("\tport mapping (uPnP/NAT-PMP):\t%s\n", enabledDisabled(status.Config.Settings.PortMapping)) fmt.Printf("\tport mapping (uPnP/NAT-PMP):\t%s\n", enabledDisabled(status.Config.Settings.PortMapping))
fmt.Printf("\tmultipath mode:\t%d\n", status.Config.Settings.MuiltipathMode)
fmt.Printf("\tblacklisted interface prefixes:\t") fmt.Printf("\tblacklisted interface prefixes:\t")
for i, bl := range status.Config.Settings.InterfacePrefixBlacklist { for i, bl := range status.Config.Settings.InterfacePrefixBlacklist {
if i > 0 { if i > 0 {

View file

@ -71,6 +71,7 @@ struct ZT_GoNodeThread
std::string ip; std::string ip;
int port; int port;
int af; int af;
bool primary;
std::atomic<bool> run; std::atomic<bool> run;
std::thread thr; std::thread thr;
}; };
@ -219,17 +220,15 @@ static int ZT_GoNode_WirePacketSendFunction(
unsigned int len, unsigned int len,
unsigned int ipTTL) unsigned int ipTTL)
{ {
if ((localSocket != -1)&&(localSocket != ZT_INVALID_SOCKET)) { if (localSocket > 0) {
doUdpSend((ZT_SOCKET)localSocket,addr,data,len,ipTTL); doUdpSend((ZT_SOCKET)localSocket,addr,data,len,ipTTL);
} else { } else {
ZT_GoNode *const gn = reinterpret_cast<ZT_GoNode *>(uptr); ZT_GoNode *const gn = reinterpret_cast<ZT_GoNode *>(uptr);
std::set<std::string> ipsSentFrom;
std::lock_guard<std::mutex> l(gn->threads_l); std::lock_guard<std::mutex> l(gn->threads_l);
for(auto t=gn->threads.begin();t!=gn->threads.end();++t) { for(auto t=gn->threads.begin();t!=gn->threads.end();++t) {
if (t->second.af == addr->ss_family) { if ((t->second.af == addr->ss_family)&&(t->second.primary)) {
if (ipsSentFrom.insert(t->second.ip).second) {
doUdpSend(t->first,addr,data,len,ipTTL); doUdpSend(t->first,addr,data,len,ipTTL);
} break;
} }
} }
} }
@ -444,7 +443,7 @@ static void setCommonUdpSocketSettings(ZT_SOCKET udpSock,const char *dev)
#endif #endif
} }
extern "C" int ZT_GoNode_phyStartListen(ZT_GoNode *gn,const char *dev,const char *ip,const int port) extern "C" int ZT_GoNode_phyStartListen(ZT_GoNode *gn,const char *dev,const char *ip,const int port,const int primary)
{ {
if (strchr(ip,':')) { if (strchr(ip,':')) {
struct sockaddr_in6 in6; struct sockaddr_in6 in6;
@ -474,6 +473,7 @@ extern "C" int ZT_GoNode_phyStartListen(ZT_GoNode *gn,const char *dev,const char
gnt.ip = ip; gnt.ip = ip;
gnt.port = port; gnt.port = port;
gnt.af = AF_INET6; gnt.af = AF_INET6;
gnt.primary = (primary != 0);
gnt.run = true; gnt.run = true;
gnt.thr = std::thread([udpSock,gn,&gnt] { gnt.thr = std::thread([udpSock,gn,&gnt] {
struct sockaddr_in6 in6; struct sockaddr_in6 in6;
@ -519,6 +519,7 @@ extern "C" int ZT_GoNode_phyStartListen(ZT_GoNode *gn,const char *dev,const char
gnt.ip = ip; gnt.ip = ip;
gnt.port = port; gnt.port = port;
gnt.af = AF_INET6; gnt.af = AF_INET6;
gnt.primary = (primary != 0);
gnt.run = true; gnt.run = true;
gnt.thr = std::thread([udpSock,gn,&gnt] { gnt.thr = std::thread([udpSock,gn,&gnt] {
struct sockaddr_in in4; struct sockaddr_in in4;

View file

@ -49,7 +49,7 @@ void ZT_GoNode_delete(ZT_GoNode *gn);
ZT_Node *ZT_GoNode_getNode(ZT_GoNode *gn); ZT_Node *ZT_GoNode_getNode(ZT_GoNode *gn);
/* This can be called more than once to start multiple listener threads */ /* This can be called more than once to start multiple listener threads */
int ZT_GoNode_phyStartListen(ZT_GoNode *gn,const char *dev,const char *ip,int port); int ZT_GoNode_phyStartListen(ZT_GoNode *gn,const char *dev,const char *ip,int port,int primary);
/* Close all listener threads for a given local IP and port */ /* Close all listener threads for a given local IP and port */
int ZT_GoNode_phyStopListen(ZT_GoNode *gn,const char *dev,const char *ip,int port); int ZT_GoNode_phyStopListen(ZT_GoNode *gn,const char *dev,const char *ip,int port);

View file

@ -182,7 +182,7 @@ func apiSetStandardHeaders(out http.ResponseWriter) {
h.Set("Pragma", "no-cache") h.Set("Pragma", "no-cache")
t := time.Now().UTC() t := time.Now().UTC()
h.Set("Date", t.Format(time.RFC1123)) h.Set("Date", t.Format(time.RFC1123))
h.Set("X-ZT-Clock", strconv.FormatInt(t.UnixNano() / int64(1000000), 10)) h.Set("X-ZT-Clock", strconv.FormatInt(t.UnixNano()/int64(1000000), 10))
} }
func apiSendObj(out http.ResponseWriter, req *http.Request, httpStatusCode int, obj interface{}) error { func apiSendObj(out http.ResponseWriter, req *http.Request, httpStatusCode int, obj interface{}) error {

View file

@ -60,9 +60,6 @@ type LocalConfigSettings struct {
// LogSizeMax is the maximum size of the log in kilobytes or 0 for no limit and -1 to disable logging // LogSizeMax is the maximum size of the log in kilobytes or 0 for no limit and -1 to disable logging
LogSizeMax int `json:"logSizeMax"` LogSizeMax int `json:"logSizeMax"`
// MultipathMode sets the multipath link aggregation mode
MuiltipathMode int `json:"multipathMode"`
// IP/port to bind for TCP access to control API (disabled if null) // IP/port to bind for TCP access to control API (disabled if null)
APITCPBindAddress *InetAddress `json:"apiTCPBindAddress,omitempty"` APITCPBindAddress *InetAddress `json:"apiTCPBindAddress,omitempty"`
@ -85,17 +82,21 @@ type LocalConfig struct {
Network map[NetworkID]NetworkLocalSettings `json:"network,omitempty"` Network map[NetworkID]NetworkLocalSettings `json:"network,omitempty"`
// LocalConfigSettings contains other local settings for this node // LocalConfigSettings contains other local settings for this node
Settings LocalConfigSettings `json:"settings,omitempty"` Settings LocalConfigSettings `json:"settings"`
initialized bool
} }
// Read this local config from a file, initializing to defaults if the file does not exist. // Read this local config from a file, initializing to defaults if the file does not exist.
func (lc *LocalConfig) Read(p string, saveDefaultsIfNotExist bool, isTotallyNewNode bool) error { func (lc *LocalConfig) Read(p string, saveDefaultsIfNotExist bool, isTotallyNewNode bool) error {
if lc.Physical == nil { // Initialize defaults, which may be replaced if we read a file from disk.
if !lc.initialized {
lc.initialized = true
lc.Physical = make(map[string]LocalConfigPhysicalPathConfiguration) lc.Physical = make(map[string]LocalConfigPhysicalPathConfiguration)
lc.Virtual = make(map[Address]LocalConfigVirtualAddressConfiguration) lc.Virtual = make(map[Address]LocalConfigVirtualAddressConfiguration)
lc.Network = make(map[NetworkID]NetworkLocalSettings) lc.Network = make(map[NetworkID]NetworkLocalSettings)
// LocalConfig default settings
if isTotallyNewNode { if isTotallyNewNode {
lc.Settings.PrimaryPort = 893 lc.Settings.PrimaryPort = 893
} else { } else {
@ -105,7 +106,9 @@ func (lc *LocalConfig) Read(p string, saveDefaultsIfNotExist bool, isTotallyNewN
lc.Settings.PortSearch = true lc.Settings.PortSearch = true
lc.Settings.PortMapping = true lc.Settings.PortMapping = true
lc.Settings.LogSizeMax = 128 lc.Settings.LogSizeMax = 128
lc.Settings.MuiltipathMode = 0 if !isTotallyNewNode {
lc.Settings.APITCPBindAddress = NewInetAddressFromString("127.0.0.1/9993")
}
switch runtime.GOOS { switch runtime.GOOS {
case "windows": case "windows":
lc.Settings.InterfacePrefixBlacklist = []string{"loopback"} lc.Settings.InterfacePrefixBlacklist = []string{"loopback"}

View file

@ -86,25 +86,25 @@ var (
type Node struct { type Node struct {
networks map[NetworkID]*Network networks map[NetworkID]*Network
networksByMAC map[MAC]*Network // locked by networksLock networksByMAC map[MAC]*Network // locked by networksLock
networksLock sync.RWMutex
interfaceAddresses map[string]net.IP // physical external IPs on the machine interfaceAddresses map[string]net.IP // physical external IPs on the machine
interfaceAddressesLock sync.Mutex
basePath string basePath string
peersPath string peersPath string
networksPath string networksPath string
localConfigPath string localConfigPath string
localConfig LocalConfig localConfig LocalConfig
localConfigLock sync.RWMutex localConfigLock sync.RWMutex
networksLock sync.RWMutex
interfaceAddressesLock sync.Mutex
logW *sizeLimitWriter logW *sizeLimitWriter
log *log.Logger log *log.Logger
gn *C.ZT_GoNode gn *C.ZT_GoNode
zn *C.ZT_Node zn *C.ZT_Node
id *Identity id *Identity
apiServer *http.Server namedSocketApiServer *http.Server
tcpApiServer *http.Server tcpApiServer *http.Server
online uint32 online uint32
running uint32 running uint32
runLock sync.Mutex runWaitGroup sync.WaitGroup
} }
// NewNode creates and initializes a new instance of the ZeroTier node service // NewNode creates and initializes a new instance of the ZeroTier node service
@ -113,6 +113,8 @@ func NewNode(basePath string) (n *Node, err error) {
n.networks = make(map[NetworkID]*Network) n.networks = make(map[NetworkID]*Network)
n.networksByMAC = make(map[MAC]*Network) n.networksByMAC = make(map[MAC]*Network)
n.interfaceAddresses = make(map[string]net.IP) n.interfaceAddresses = make(map[string]net.IP)
n.online = 0
n.running = 1
_ = os.MkdirAll(basePath, 0755) _ = os.MkdirAll(basePath, 0755)
if _, err = os.Stat(basePath); err != nil { if _, err = os.Stat(basePath); err != nil {
@ -199,6 +201,12 @@ func NewNode(basePath string) (n *Node, err error) {
return nil, errors.New("unable to bind to primary port") return nil, errors.New("unable to bind to primary port")
} }
n.namedSocketApiServer, n.tcpApiServer, err = createAPIServer(basePath, n)
if err != nil {
n.log.Printf("FATAL: unable to start API server: %s", err.Error())
return nil, err
}
nodesByUserPtrLock.Lock() nodesByUserPtrLock.Lock()
nodesByUserPtr[uintptr(unsafe.Pointer(n))] = n nodesByUserPtr[uintptr(unsafe.Pointer(n))] = n
nodesByUserPtrLock.Unlock() nodesByUserPtrLock.Unlock()
@ -215,12 +223,9 @@ func NewNode(basePath string) (n *Node, err error) {
} }
n.zn = (*C.ZT_Node)(C.ZT_GoNode_getNode(n.gn)) n.zn = (*C.ZT_Node)(C.ZT_GoNode_getNode(n.gn))
var ns C.ZT_NodeStatus n.id, err = newIdentityFromCIdentity(C.ZT_Node_identity(unsafe.Pointer(n.zn)))
C.ZT_Node_status(unsafe.Pointer(n.zn), &ns)
idString := C.GoString(ns.secretIdentity)
n.id, err = NewIdentityFromString(idString)
if err != nil { if err != nil {
n.log.Printf("FATAL: node's identity does not seem valid (%s)", string(idString)) n.log.Printf("FATAL: error obtaining node's identity")
nodesByUserPtrLock.Lock() nodesByUserPtrLock.Lock()
delete(nodesByUserPtr, uintptr(unsafe.Pointer(n))) delete(nodesByUserPtr, uintptr(unsafe.Pointer(n)))
nodesByUserPtrLock.Unlock() nodesByUserPtrLock.Unlock()
@ -228,30 +233,20 @@ func NewNode(basePath string) (n *Node, err error) {
return nil, err return nil, err
} }
n.apiServer, n.tcpApiServer, err = createAPIServer(basePath, n) n.runWaitGroup.Add(1)
if err != nil {
n.log.Printf("FATAL: unable to start API server: %s", err.Error())
nodesByUserPtrLock.Lock()
delete(nodesByUserPtr, uintptr(unsafe.Pointer(n)))
nodesByUserPtrLock.Unlock()
C.ZT_GoNode_delete(n.gn)
return nil, err
}
n.online = 0
n.running = 1
n.runLock.Lock() // used to block Close() until below gorountine exits
go func() { go func() {
defer n.runWaitGroup.Done()
var previousExplicitExternalAddresses []ExternalAddress // empty at first so these will be configured
lastMaintenanceRun := int64(0) lastMaintenanceRun := int64(0)
var previousExplicitExternalAddresses []ExternalAddress
var portsA [3]int
for atomic.LoadUint32(&n.running) != 0 {
time.Sleep(1 * time.Second)
now := TimeMs() for atomic.LoadUint32(&n.running) != 0 {
if (now - lastMaintenanceRun) >= 30000 { time.Sleep(500 * time.Millisecond)
lastMaintenanceRun = now
nowS := time.Now().Unix()
if (nowS - lastMaintenanceRun) >= 30 {
lastMaintenanceRun = nowS
//////////////////////////////////////////////////////////////////////
n.localConfigLock.RLock() n.localConfigLock.RLock()
// Get local physical interface addresses, excluding blacklisted and ZeroTier-created interfaces // Get local physical interface addresses, excluding blacklisted and ZeroTier-created interfaces
@ -280,26 +275,29 @@ func NewNode(basePath string) (n *Node, err error) {
n.networksLock.RUnlock() n.networksLock.RUnlock()
} }
// Open or close locally bound UDP ports for each local interface address.
// This opens ports if they are not already open and then closes ports if
// they are open but no longer seem to exist.
interfaceAddressesChanged := false interfaceAddressesChanged := false
ports := portsA[:0] ports := make([]int, 0, 2)
if n.localConfig.Settings.PrimaryPort > 0 && n.localConfig.Settings.PrimaryPort < 65536 { if n.localConfig.Settings.PrimaryPort > 0 && n.localConfig.Settings.PrimaryPort < 65536 {
ports = append(ports, n.localConfig.Settings.PrimaryPort) ports = append(ports, n.localConfig.Settings.PrimaryPort)
} }
if n.localConfig.Settings.SecondaryPort > 0 && n.localConfig.Settings.SecondaryPort < 65536 { if n.localConfig.Settings.SecondaryPort > 0 && n.localConfig.Settings.SecondaryPort < 65536 {
ports = append(ports, n.localConfig.Settings.SecondaryPort) ports = append(ports, n.localConfig.Settings.SecondaryPort)
} }
// Open or close locally bound UDP ports for each local interface address.
// This opens ports if they are not already open and then closes ports if
// they are open but no longer seem to exist.
n.interfaceAddressesLock.Lock() n.interfaceAddressesLock.Lock()
for astr, ipn := range interfaceAddresses { for astr, ipn := range interfaceAddresses {
if _, alreadyKnown := n.interfaceAddresses[astr]; !alreadyKnown { if _, alreadyKnown := n.interfaceAddresses[astr]; !alreadyKnown {
interfaceAddressesChanged = true interfaceAddressesChanged = true
ipCstr := C.CString(ipn.String()) ipCstr := C.CString(ipn.String())
for _, p := range ports { for pn, p := range ports {
n.log.Printf("UDP binding to port %d on interface %s", p, astr) n.log.Printf("UDP binding to port %d on interface %s", p, astr)
C.ZT_GoNode_phyStartListen(n.gn, nil, ipCstr, C.int(p)) primary := C.int(0)
if pn == 0 {
primary = 1
}
C.ZT_GoNode_phyStartListen(n.gn, nil, ipCstr, C.int(p), primary)
} }
C.free(unsafe.Pointer(ipCstr)) C.free(unsafe.Pointer(ipCstr))
} }
@ -318,8 +316,11 @@ func NewNode(basePath string) (n *Node, err error) {
n.interfaceAddresses = interfaceAddresses n.interfaceAddresses = interfaceAddresses
n.interfaceAddressesLock.Unlock() n.interfaceAddressesLock.Unlock()
// Update node's understanding of our interface addressaes if they've changed // Update node's understanding of our interface addresses if they've changed
if interfaceAddressesChanged || reflect.DeepEqual(n.localConfig.Settings.ExplicitAddresses, previousExplicitExternalAddresses) { if interfaceAddressesChanged || reflect.DeepEqual(n.localConfig.Settings.ExplicitAddresses, previousExplicitExternalAddresses) {
previousExplicitExternalAddresses = n.localConfig.Settings.ExplicitAddresses
// Consolidate explicit and detected interface addresses, removing duplicates.
externalAddresses := make(map[[3]uint64]*ExternalAddress) externalAddresses := make(map[[3]uint64]*ExternalAddress)
for _, ip := range interfaceAddresses { for _, ip := range interfaceAddresses {
for _, p := range ports { for _, p := range ports {
@ -336,14 +337,16 @@ func NewNode(basePath string) (n *Node, err error) {
for _, a := range n.localConfig.Settings.ExplicitAddresses { for _, a := range n.localConfig.Settings.ExplicitAddresses {
externalAddresses[a.key()] = &a externalAddresses[a.key()] = &a
} }
if len(externalAddresses) > 0 { if len(externalAddresses) > 0 {
cAddrs := make([]C.ZT_InterfaceAddress, len(externalAddresses)) cAddrs := make([]C.ZT_InterfaceAddress, len(externalAddresses))
cAddrCount := 0 cAddrCount := 0
for _, a := range externalAddresses { for _, a := range externalAddresses {
makeSockaddrStorage(a.IP, a.Port, &(cAddrs[cAddrCount].address)) makeSockaddrStorage(a.IP, a.Port, &(cAddrs[cAddrCount].address))
cAddrs[cAddrCount].permanent = 0
if a.Permanent { if a.Permanent {
cAddrs[cAddrCount].permanent = 1 cAddrs[cAddrCount].permanent = 1
} else {
cAddrs[cAddrCount].permanent = 0
} }
cAddrCount++ cAddrCount++
} }
@ -359,9 +362,9 @@ func NewNode(basePath string) (n *Node, err error) {
} }
n.localConfigLock.RUnlock() n.localConfigLock.RUnlock()
//////////////////////////////////////////////////////////////////////
} }
} }
n.runLock.Unlock() // signal Close() that maintenance goroutine is done
}() }()
return n, nil return n, nil
@ -370,8 +373,8 @@ func NewNode(basePath string) (n *Node, err error) {
// Close closes this Node and frees its underlying C++ Node structures // Close closes this Node and frees its underlying C++ Node structures
func (n *Node) Close() { func (n *Node) Close() {
if atomic.SwapUint32(&n.running, 0) != 0 { if atomic.SwapUint32(&n.running, 0) != 0 {
if n.apiServer != nil { if n.namedSocketApiServer != nil {
_ = n.apiServer.Close() _ = n.namedSocketApiServer.Close()
} }
if n.tcpApiServer != nil { if n.tcpApiServer != nil {
_ = n.tcpApiServer.Close() _ = n.tcpApiServer.Close()
@ -379,8 +382,7 @@ func (n *Node) Close() {
C.ZT_GoNode_delete(n.gn) C.ZT_GoNode_delete(n.gn)
n.runLock.Lock() // wait for maintenance gorountine to die n.runWaitGroup.Wait()
n.runLock.Unlock()
nodesByUserPtrLock.Lock() nodesByUserPtrLock.Lock()
delete(nodesByUserPtr, uintptr(unsafe.Pointer(n))) delete(nodesByUserPtr, uintptr(unsafe.Pointer(n)))
@ -610,6 +612,8 @@ func (n *Node) makeStateObjectPath(objType int, id [2]uint64) (string, bool) {
case C.ZT_STATE_OBJECT_IDENTITY_SECRET: case C.ZT_STATE_OBJECT_IDENTITY_SECRET:
fp = path.Join(n.basePath, "identity.secret") fp = path.Join(n.basePath, "identity.secret")
secret = true secret = true
case C.ZT_STATE_OBJECT_LOCATOR:
fp = path.Join(n.basePath, "locator")
case C.ZT_STATE_OBJECT_PEER: case C.ZT_STATE_OBJECT_PEER:
fp = path.Join(n.basePath, "peers.d") fp = path.Join(n.basePath, "peers.d")
_ = os.Mkdir(fp, 0700) _ = os.Mkdir(fp, 0700)
@ -664,8 +668,8 @@ func (n *Node) handleTrace(traceMessage string) {
} }
} }
func (n *Node) handleUserMessage(origin *Identity, messageTypeID uint64, data []byte) { // func (n *Node) handleUserMessage(origin *Identity, messageTypeID uint64, data []byte) {
} // }
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
@ -728,18 +732,27 @@ func goPathLookupFunc(gn unsafe.Pointer, _ C.uint64_t, _ int, identity, familyP,
//export goStateObjectPutFunc //export goStateObjectPutFunc
func goStateObjectPutFunc(gn unsafe.Pointer, objType C.int, id, data unsafe.Pointer, len C.int) { func goStateObjectPutFunc(gn unsafe.Pointer, objType C.int, id, data unsafe.Pointer, len C.int) {
go func() { id2 := *((*[2]uint64)(id))
var data2 []byte
if len > 0 {
data2 = C.GoBytes(data, len)
}
nodesByUserPtrLock.RLock() nodesByUserPtrLock.RLock()
node := nodesByUserPtr[uintptr(gn)] node := nodesByUserPtr[uintptr(gn)]
nodesByUserPtrLock.RUnlock() nodesByUserPtrLock.RUnlock()
if node == nil { if node == nil {
return return
} }
node.runWaitGroup.Add(1)
go func() {
if len < 0 { if len < 0 {
node.stateObjectDelete(int(objType), *((*[2]uint64)(id))) node.stateObjectDelete(int(objType), id2)
} else { } else {
node.stateObjectPut(int(objType), *((*[2]uint64)(id)), C.GoBytes(data, len)) node.stateObjectPut(int(objType), id2, data2)
} }
node.runWaitGroup.Done()
}() }()
} }
@ -754,7 +767,7 @@ func goStateObjectGetFunc(gn unsafe.Pointer, objType C.int, id, dataP unsafe.Poi
*((*uintptr)(dataP)) = 0 *((*uintptr)(dataP)) = 0
tmp, found := node.stateObjectGet(int(objType), *((*[2]uint64)(id))) tmp, found := node.stateObjectGet(int(objType), *((*[2]uint64)(id)))
if found && len(tmp) > 0 { if found && len(tmp) > 0 {
cData := C.malloc(C.ulong(len(tmp))) cData := C.malloc(C.ulong(len(tmp))) // GoGlue sends free() to the core as the free function
if uintptr(cData) == 0 { if uintptr(cData) == 0 {
return -1 return -1
} }
@ -766,7 +779,6 @@ func goStateObjectGetFunc(gn unsafe.Pointer, objType C.int, id, dataP unsafe.Poi
//export goVirtualNetworkConfigFunc //export goVirtualNetworkConfigFunc
func goVirtualNetworkConfigFunc(gn, _ unsafe.Pointer, nwid C.uint64_t, op C.int, conf unsafe.Pointer) { func goVirtualNetworkConfigFunc(gn, _ unsafe.Pointer, nwid C.uint64_t, op C.int, conf unsafe.Pointer) {
go func() {
nodesByUserPtrLock.RLock() nodesByUserPtrLock.RLock()
node := nodesByUserPtr[uintptr(gn)] node := nodesByUserPtr[uintptr(gn)]
nodesByUserPtrLock.RUnlock() nodesByUserPtrLock.RUnlock()
@ -817,21 +829,25 @@ func goVirtualNetworkConfigFunc(gn, _ unsafe.Pointer, nwid C.uint64_t, op C.int,
}) })
} }
} }
node.runWaitGroup.Add(1)
go func() {
network.updateConfig(&nc, nil) network.updateConfig(&nc, nil)
} node.runWaitGroup.Done()
}
}() }()
}
}
} }
//export goZtEvent //export goZtEvent
func goZtEvent(gn unsafe.Pointer, eventType C.int, data unsafe.Pointer) { func goZtEvent(gn unsafe.Pointer, eventType C.int, data unsafe.Pointer) {
go func() {
nodesByUserPtrLock.RLock() nodesByUserPtrLock.RLock()
node := nodesByUserPtr[uintptr(gn)] node := nodesByUserPtr[uintptr(gn)]
nodesByUserPtrLock.RUnlock() nodesByUserPtrLock.RUnlock()
if node == nil { if node == nil {
return return
} }
switch eventType { switch eventType {
case C.ZT_EVENT_OFFLINE: case C.ZT_EVENT_OFFLINE:
atomic.StoreUint32(&node.online, 0) atomic.StoreUint32(&node.online, 0)
@ -839,12 +855,5 @@ func goZtEvent(gn unsafe.Pointer, eventType C.int, data unsafe.Pointer) {
atomic.StoreUint32(&node.online, 1) atomic.StoreUint32(&node.online, 1)
case C.ZT_EVENT_TRACE: case C.ZT_EVENT_TRACE:
node.handleTrace(C.GoString((*C.char)(data))) node.handleTrace(C.GoString((*C.char)(data)))
case C.ZT_EVENT_USER_MESSAGE:
um := (*C.ZT_UserMessage)(data)
id, err := newIdentityFromCIdentity(unsafe.Pointer(um.id))
if err != nil {
node.handleUserMessage(id, uint64(um.typeId), C.GoBytes(um.data, C.int(um.length)))
} }
}
}()
} }