From bcaf42e07557f611a4609a8d69fb1ef6d29f1c95 Mon Sep 17 00:00:00 2001 From: Joseph Henry Date: Wed, 20 Jan 2016 13:38:14 -0800 Subject: [PATCH] Transfer speed increate + stability fixes --- netcon/NetconEthernetTap.cpp | 42 ++++++++++++++++++++---------------- netcon/NetconEthernetTap.hpp | 4 ++-- osdep/Phy.hpp | 2 +- service/OneService.cpp | 2 +- 4 files changed, 27 insertions(+), 23 deletions(-) diff --git a/netcon/NetconEthernetTap.cpp b/netcon/NetconEthernetTap.cpp index c3132dd87..0c3d2e900 100644 --- a/netcon/NetconEthernetTap.cpp +++ b/netcon/NetconEthernetTap.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #include "NetconEthernetTap.hpp" @@ -304,13 +305,14 @@ void NetconEthernetTap::threadMain() // Connection prunning if (since_status >= STATUS_TMR_INTERVAL) { prev_status_time = now; - for(size_t i=0;i<_TcpConnections.size();++i) { if(!_TcpConnections[i]->sock) continue; int fd = _phy.getDescriptor(_TcpConnections[i]->sock); dwr(MSG_DEBUG," tap_thread(): tcp\\jobs = {%d, %d}\n", _TcpConnections.size(), jobmap.size()); - + // If there's anything on the RX buf, set to notify in case we stalled + if(_TcpConnections[i]->rxsz > 0) + _phy.setNotifyWritable(_TcpConnections[i]->sock, true); fcntl(fd, F_SETFL, O_NONBLOCK); unsigned char tmpbuf[BUF_SZ]; @@ -417,30 +419,31 @@ void NetconEthernetTap::phyOnUnixClose(PhySocket *sock,void **uptr) { closeConnection(sock); } -void NetconEthernetTap::phyOnUnixWritable(PhySocket *sock,void **uptr) +void NetconEthernetTap::phyOnUnixWritable(PhySocket *sock,void **uptr,bool lwip_invoked) { - Mutex::Lock _l(_tcpconns_m); - Mutex::Lock _l2(_rx_buf_m); + if(!lwip_invoked) { + _tcpconns_m.lock(); + _rx_buf_m.lock(); + } TcpConnection *conn = getConnection(sock); - int len = conn->rxsz; - int n = _phy.streamSend(conn->sock, conn->rxbuf, len); + if(!conn->rxsz) + return; + int n = _phy.streamSend(conn->sock, conn->rxbuf, conn->rxsz); if(n > 0) { - if(n < len) { - dwr(MSG_ERROR,"\n phyOnUnixWritable(): unable to write entire \"block\" to stream\n"); - } - if(len-n) - memcpy(conn->rxbuf, conn->rxbuf+n, len-n); + if(conn->rxsz-n > 0) + memcpy(conn->rxbuf, conn->rxbuf+n, conn->rxsz-n); conn->rxsz -= n; float max = (float)DEFAULT_BUF_SZ; dwr(MSG_TRANSFER," <--- RX :: {TX: %.3f%%, RX: %.3f%%, sock=%x} :: %d bytes\n", (float)conn->txsz / max, (float)conn->rxsz / max, sock, n); lwipstack->_tcp_recved(conn->pcb, n); - if(conn->rxsz == 0){ - _phy.setNotifyWritable(conn->sock, false); // Nothing more to be notified about - } } else { - perror("\n"); - dwr(MSG_ERROR," phyOnUnixWritable(): errno = %d\n", errno); + dwr(MSG_ERROR," phyOnUnixWritable(): errno = %d, rxsz = %d\n", errno, conn->rxsz); + _phy.setNotifyWritable(conn->sock, false); + } + if(!lwip_invoked) { + _tcpconns_m.unlock(); + _rx_buf_m.unlock(); } } @@ -673,7 +676,6 @@ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *PCB, struct pbuf * Larg *l = (Larg*)arg; int tot = 0; struct pbuf* q = p; - Mutex::Lock _l(l->tap->_tcpconns_m); if(!l->conn) { @@ -702,8 +704,10 @@ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *PCB, struct pbuf * p = p->next; tot += len; } - if(tot) + if(tot) { + l->tap->phyOnUnixWritable(l->conn->sock, NULL, true); l->tap->_phy.setNotifyWritable(l->conn->sock, true); + } l->tap->lwipstack->_pbuf_free(q); return ERR_OK; } diff --git a/netcon/NetconEthernetTap.hpp b/netcon/NetconEthernetTap.hpp index f15f86712..2c79840bc 100644 --- a/netcon/NetconEthernetTap.hpp +++ b/netcon/NetconEthernetTap.hpp @@ -58,7 +58,7 @@ struct accept_st; #define APPLICATION_POLL_FREQ 2 #define ZT_LWIP_TCP_TIMER_INTERVAL 5 -#define STATUS_TMR_INTERVAL 500 // How often we check connection statuses (in ms) +#define STATUS_TMR_INTERVAL 250 // How often we check connection statuses (in ms) #define DEFAULT_BUF_SZ 1024 * 1024 * 2 #define DEFAULT_BUF_SOFTMAX DEFAULT_BUF_SZ / 2 @@ -405,7 +405,7 @@ private: /* * Notifies us that we can write to an application's socket */ - void phyOnUnixWritable(PhySocket *sock,void **uptr); + void phyOnUnixWritable(PhySocket *sock,void **uptr,bool lwip_invoked); /* * Returns a pointer to a TcpConnection associated with a given PhySocket diff --git a/osdep/Phy.hpp b/osdep/Phy.hpp index bdf20d13a..0f993c9f2 100644 --- a/osdep/Phy.hpp +++ b/osdep/Phy.hpp @@ -976,7 +976,7 @@ public: ZT_PHY_SOCKFD_TYPE sock = s->sock; // if closed, s->sock becomes invalid as s is no longer dereferencable if ((FD_ISSET(sock,&wfds))&&(FD_ISSET(sock,&_writefds))) { try { - _handler->phyOnUnixWritable((PhySocket *)&(*s),&(s->uptr)); + _handler->phyOnUnixWritable((PhySocket *)&(*s),&(s->uptr),false); } catch ( ... ) {} } if (FD_ISSET(sock,&rfds)) { diff --git a/service/OneService.cpp b/service/OneService.cpp index 8039b6f79..593f58aaa 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -1120,7 +1120,7 @@ public: inline void phyOnUnixAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN) {} inline void phyOnUnixClose(PhySocket *sock,void **uptr) {} inline void phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len) {} - inline void phyOnUnixWritable(PhySocket *sock,void **uptr) {} + inline void phyOnUnixWritable(PhySocket *sock,void **uptr,bool lwip_invoked) {} inline int nodeVirtualNetworkConfigFunction(uint64_t nwid,void **nuptr,enum ZT_VirtualNetworkConfigOperation op,const ZT_VirtualNetworkConfig *nwc) {