From a9a370587780d7d0d0e68e850c8e64f4ded419b0 Mon Sep 17 00:00:00 2001 From: Adam Ierymenko Date: Tue, 28 Apr 2015 12:43:10 -0700 Subject: [PATCH] TCP tunneling implementation -- not tested yet and no initiation yet. --- service/OneService.cpp | 320 +++++++++++++++++++++++++++-------------- 1 file changed, 210 insertions(+), 110 deletions(-) diff --git a/service/OneService.cpp b/service/OneService.cpp index a04d46dff..554d59bf6 100644 --- a/service/OneService.cpp +++ b/service/OneService.cpp @@ -113,17 +113,20 @@ static const struct http_parser_settings HTTP_PARSER_SETTINGS = { ShttpOnMessageComplete }; -struct HttpConnection +struct TcpConnection { - bool server; - bool writing; + enum { + TCP_HTTP_INCOMING, + TCP_HTTP_OUTGOING, // not currently used + TCP_TUNNEL_OUTGOING // fale-SSL outgoing tunnel -- HTTP-related fields are not used + } type; + bool shouldKeepAlive; OneServiceImpl *parent; PhySocket *sock; InetAddress from; http_parser parser; unsigned long messageSize; - unsigned long writePtr; uint64_t lastActivity; std::string currentHeaderField; @@ -132,7 +135,9 @@ struct HttpConnection std::string url; std::string status; std::map< std::string,std::string > headers; - std::string body; // also doubles as send queue for writes out to the socket + std::string body; + + std::string writeBuf; }; class OneServiceImpl : public OneService @@ -281,8 +286,8 @@ public: } try { - while (!_httpConnections.empty()) - _phy.close(_httpConnections.begin()->first); + while (!_tcpConections.empty()) + _phy.close(_tcpConections.begin()->first); } catch ( ... ) {} { @@ -336,13 +341,13 @@ public: ZT1_ResultCode rc = _node->processWirePacket( OSUtils::now(), (const struct sockaddr_storage *)from, // Phy<> uses sockaddr_storage, so it'll always be that big - 0, + 0, // desperation == 0, direct UDP data, len, &_nextBackgroundTaskDeadline); if (ZT1_ResultCode_isFatal(rc)) { char tmp[256]; - Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket(%d)",(int)rc); + Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); Mutex::Lock _l(_termReason_m); _termReason = ONE_UNRECOVERABLE_ERROR; _fatalErrorMessage = tmp; @@ -352,65 +357,162 @@ public: inline void phyOnTcpConnect(PhySocket *sock,void **uptr,bool success) { - // TODO: outgoing HTTP connection success/failure + if (!success) + return; + + // Outgoing connections are right now only tunnel connections + TcpConnection *tc = &(_tcpConections[sock]); + tc->type = TcpConnection::TCP_TUNNEL_OUTGOING; + tc->shouldKeepAlive = true; // unused + tc->parent = this; + tc->sock = sock; + // from and parser are not used + tc->messageSize = 0; // unused + tc->lastActivity = OSUtils::now(); + // HTTP stuff is not used + tc->writeBuf = ""; + *uptr = (void *)tc; + + // Send "hello" message + tc->writeBuf.push_back((char)0x17); + tc->writeBuf.push_back((char)0x03); + tc->writeBuf.push_back((char)0x03); // fake TLS 1.2 header + tc->writeBuf.push_back((char)0x00); + tc->writeBuf.push_back((char)0x04); // mlen == 4 + tc->writeBuf.push_back((char)ZEROTIER_ONE_VERSION_MAJOR); + tc->writeBuf.push_back((char)ZEROTIER_ONE_VERSION_MINOR); + tc->writeBuf.push_back((char)((ZEROTIER_ONE_VERSION_REVISION >> 8) & 0xff)); + tc->writeBuf.push_back((char)(ZEROTIER_ONE_VERSION_REVISION & 0xff)); + _phy.tcpSetNotifyWritable(sock,true); } inline void phyOnTcpAccept(PhySocket *sockL,PhySocket *sockN,void **uptrL,void **uptrN,const struct sockaddr *from) { - HttpConnection *htc = &(_httpConnections[sockN]); - htc->server = true; - htc->writing = false; - htc->shouldKeepAlive = true; - htc->parent = this; - htc->sock = sockN; - htc->from = from; - http_parser_init(&(htc->parser),HTTP_REQUEST); - htc->parser.data = (void *)htc; - htc->messageSize = 0; - htc->writePtr = 0; - htc->lastActivity = OSUtils::now(); - htc->currentHeaderField = ""; - htc->currentHeaderValue = ""; - htc->url = ""; - htc->status = ""; - htc->headers.clear(); - htc->body = ""; - *uptrN = (void *)htc; + // Incoming connections are TCP HTTP requests + TcpConnection *tc = &(_tcpConections[sockN]); + tc->type = TcpConnection::TCP_HTTP_INCOMING; + tc->shouldKeepAlive = true; + tc->parent = this; + tc->sock = sockN; + tc->from = from; + http_parser_init(&(tc->parser),HTTP_REQUEST); + tc->parser.data = (void *)tc; + tc->messageSize = 0; + tc->lastActivity = OSUtils::now(); + tc->currentHeaderField = ""; + tc->currentHeaderValue = ""; + tc->url = ""; + tc->status = ""; + tc->headers.clear(); + tc->body = ""; + tc->writeBuf = ""; + *uptrN = (void *)tc; } inline void phyOnTcpClose(PhySocket *sock,void **uptr) { - _httpConnections.erase(sock); + _tcpConections.erase(sock); } inline void phyOnTcpData(PhySocket *sock,void **uptr,void *data,unsigned long len) { - HttpConnection *htc = reinterpret_cast(*uptr); - http_parser_execute(&(htc->parser),&HTTP_PARSER_SETTINGS,(const char *)data,len); - if ((htc->parser.upgrade)||(htc->parser.http_errno != HPE_OK)) - _phy.close(sock); + TcpConnection *tc = reinterpret_cast(*uptr); + switch(tc->type) { + case TcpConnection::TCP_HTTP_INCOMING: + case TcpConnection::TCP_HTTP_OUTGOING: + http_parser_execute(&(tc->parser),&HTTP_PARSER_SETTINGS,(const char *)data,len); + if ((tc->parser.upgrade)||(tc->parser.http_errno != HPE_OK)) { + _phy.close(sock); + return; + } + break; + case TcpConnection::TCP_TUNNEL_OUTGOING: + tc->body.append((const char *)data,len); + if (tc->body.length() > 65535) { + // sanity limit -- a message will never be this big since mlen is 16-bit + _phy.close(sock); + return; + } else if (tc->body.length() >= 5) { + const char *data = tc->body.data(); + const unsigned long mlen = ( ((((unsigned long)data[3]) & 0xff) << 8) | (((unsigned long)data[4]) & 0xff) ); + if (tc->body.length() >= (mlen + 5)) { + InetAddress from; + + unsigned long plen = mlen; // payload length, modified if there's an IP header + data += 5; + if (mlen == 4) { + // Hello message, which isn't sent by proxy and would be ignored by client + } else if (mlen) { + // Messages should contain IPv4 or IPv6 source IP address data + switch(data[0]) { + case 4: // IPv4 + if (plen >= 7) { + from.set((const void *)(data + 1),4,((((unsigned int)data[5]) & 0xff) << 8) | (((unsigned int)data[6]) & 0xff)); + data += 7; // type + 4 byte IP + 2 byte port + plen -= 7; + } + break; + case 6: // IPv6 + if (plen >= 19) { + from.set((const void *)(data + 1),16,((((unsigned int)data[17]) & 0xff) << 8) | (((unsigned int)data[18]) & 0xff)); + data += 19; // type + 16 byte IP + 2 byte port + plen -= 19; + } + break; + case 0: // none/omitted + break; + default: // invalid + _phy.close(sock); + return; + } + if (!from) { // missing IP header + _phy.close(sock); + return; + } + } + + ZT1_ResultCode rc = _node->processWirePacket( + OSUtils::now(), + (const struct sockaddr_storage *)&from, // Phy<> uses sockaddr_storage, so it'll always be that big + 1, // desperation == 1, TCP tunnel proxy + data, + plen, + &_nextBackgroundTaskDeadline); + if (ZT1_ResultCode_isFatal(rc)) { + char tmp[256]; + Utils::snprintf(tmp,sizeof(tmp),"fatal error code from processWirePacket: %d",(int)rc); + Mutex::Lock _l(_termReason_m); + _termReason = ONE_UNRECOVERABLE_ERROR; + _fatalErrorMessage = tmp; + this->terminate(); + _phy.close(sock); + return; + } + + if (tc->body.length() > (mlen + 5)) + tc->body = tc->body.substr(mlen + 5); + else tc->body = ""; + } + } + break; + } } inline void phyOnTcpWritable(PhySocket *sock,void **uptr) { - HttpConnection *htc = reinterpret_cast(*uptr); - long sent = _phy.tcpSend(sock,htc->body.data() + htc->writePtr,(unsigned long)htc->body.length() - htc->writePtr,true); - if (sent < 0) { - return; // close handler will have been called, so everything's dead - } else { - htc->lastActivity = OSUtils::now(); - htc->writePtr += sent; - if (htc->writePtr >= htc->body.length()) { - _phy.tcpSetNotifyWritable(sock,false); - if (htc->shouldKeepAlive) { - htc->writing = false; - htc->writePtr = 0; - htc->body = ""; - } else { - _phy.close(sock); // will call close handler to delete from _httpConnections - } + TcpConnection *tc = reinterpret_cast(*uptr); + if (tc->writeBuf.length()) { + long sent = _phy.tcpSend(sock,tc->writeBuf.data(),tc->writeBuf.length(),true); + if (sent > 0) { + tc->lastActivity = OSUtils::now(); + if (sent == tc->writeBuf.length()) { + tc->writeBuf = ""; + _phy.tcpSetNotifyWritable(sock,false); + if (!tc->shouldKeepAlive) + _phy.close(sock); // will call close handler to delete from _tcpConections + } else tc->writeBuf = tc->writeBuf.substr(sent); } - } + } else _phy.tcpSetNotifyWritable(sock,false); // sanity check... shouldn't happen } inline int nodeVirtualNetworkConfigFunction(uint64_t nwid,enum ZT1_VirtualNetworkConfigOperation op,const ZT1_VirtualNetworkConfig *nwc) @@ -586,7 +688,7 @@ public: _node->processVirtualNetworkFrame(OSUtils::now(),nwid,from.toInt(),to.toInt(),etherType,vlanId,data,len,&_nextBackgroundTaskDeadline); } - inline void onHttpRequestToServer(HttpConnection *htc) + inline void onHttpRequestToServer(TcpConnection *tc) { char tmpn[256]; std::string data; @@ -595,7 +697,7 @@ public: try { if (_controlPlane) - scode = _controlPlane->handleRequest(htc->from,htc->parser.method,htc->url,htc->headers,htc->body,data,contentType); + scode = _controlPlane->handleRequest(tc->from,tc->parser.method,tc->url,tc->headers,tc->body,data,contentType); else scode = 500; } catch ( ... ) { scode = 500; @@ -615,26 +717,24 @@ public: } Utils::snprintf(tmpn,sizeof(tmpn),"HTTP/1.1 %.3u %s\r\nCache-Control: no-cache\r\nPragma: no-cache\r\n",scode,scodestr); - htc->body.assign(tmpn); - htc->body.append("Content-Type: "); - htc->body.append(contentType); + tc->writeBuf.assign(tmpn); + tc->writeBuf.append("Content-Type: "); + tc->writeBuf.append(contentType); Utils::snprintf(tmpn,sizeof(tmpn),"\r\nContent-Length: %lu\r\n",(unsigned long)data.length()); - htc->body.append(tmpn); - if (!htc->shouldKeepAlive) - htc->body.append("Connection: close\r\n"); - htc->body.append("\r\n"); - if (htc->parser.method != HTTP_HEAD) - htc->body.append(data); + tc->writeBuf.append(tmpn); + if (!tc->shouldKeepAlive) + tc->writeBuf.append("Connection: close\r\n"); + tc->writeBuf.append("\r\n"); + if (tc->parser.method != HTTP_HEAD) + tc->writeBuf.append(data); - htc->writing = true; - htc->writePtr = 0; - _phy.tcpSetNotifyWritable(htc->sock,true); + _phy.tcpSetNotifyWritable(tc->sock,true); } - inline void onHttpResponseFromClient(HttpConnection *htc) + inline void onHttpResponseFromClient(TcpConnection *tc) { - if (!htc->shouldKeepAlive) - _phy.close(htc->sock); // will call close handler, which deletes from _httpConnections + if (!tc->shouldKeepAlive) + _phy.close(tc->sock); // will call close handler, which deletes from _tcpConections } private: @@ -671,7 +771,7 @@ private: std::map< uint64_t,std::vector > _tapAssignedIps; // ZeroTier assigned IPs, not user or dhcp assigned Mutex _taps_m; - std::map< PhySocket *,HttpConnection > _httpConnections; // no mutex for this since it's done in the main loop thread only + std::map< PhySocket *,TcpConnection > _tcpConections; // no mutex for this since it's done in the main loop thread only ReasonForTermination _termReason; std::string _fatalErrorMessage; @@ -699,83 +799,83 @@ static void StapFrameHandler(void *uptr,uint64_t nwid,const MAC &from,const MAC static int ShttpOnMessageBegin(http_parser *parser) { - HttpConnection *htc = reinterpret_cast(parser->data); - htc->currentHeaderField = ""; - htc->currentHeaderValue = ""; - htc->messageSize = 0; - htc->url = ""; - htc->status = ""; - htc->headers.clear(); - htc->body = ""; + TcpConnection *tc = reinterpret_cast(parser->data); + tc->currentHeaderField = ""; + tc->currentHeaderValue = ""; + tc->messageSize = 0; + tc->url = ""; + tc->status = ""; + tc->headers.clear(); + tc->body = ""; return 0; } static int ShttpOnUrl(http_parser *parser,const char *ptr,size_t length) { - HttpConnection *htc = reinterpret_cast(parser->data); - htc->messageSize += (unsigned long)length; - if (htc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE) + TcpConnection *tc = reinterpret_cast(parser->data); + tc->messageSize += (unsigned long)length; + if (tc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE) return -1; - htc->url.append(ptr,length); + tc->url.append(ptr,length); return 0; } static int ShttpOnStatus(http_parser *parser,const char *ptr,size_t length) { - HttpConnection *htc = reinterpret_cast(parser->data); - htc->messageSize += (unsigned long)length; - if (htc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE) + TcpConnection *tc = reinterpret_cast(parser->data); + tc->messageSize += (unsigned long)length; + if (tc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE) return -1; - htc->status.append(ptr,length); + tc->status.append(ptr,length); return 0; } static int ShttpOnHeaderField(http_parser *parser,const char *ptr,size_t length) { - HttpConnection *htc = reinterpret_cast(parser->data); - htc->messageSize += (unsigned long)length; - if (htc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE) + TcpConnection *tc = reinterpret_cast(parser->data); + tc->messageSize += (unsigned long)length; + if (tc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE) return -1; - if ((htc->currentHeaderField.length())&&(htc->currentHeaderValue.length())) { - htc->headers[htc->currentHeaderField] = htc->currentHeaderValue; - htc->currentHeaderField = ""; - htc->currentHeaderValue = ""; + if ((tc->currentHeaderField.length())&&(tc->currentHeaderValue.length())) { + tc->headers[tc->currentHeaderField] = tc->currentHeaderValue; + tc->currentHeaderField = ""; + tc->currentHeaderValue = ""; } for(size_t i=0;icurrentHeaderField.push_back(OSUtils::toLower(ptr[i])); + tc->currentHeaderField.push_back(OSUtils::toLower(ptr[i])); return 0; } static int ShttpOnValue(http_parser *parser,const char *ptr,size_t length) { - HttpConnection *htc = reinterpret_cast(parser->data); - htc->messageSize += (unsigned long)length; - if (htc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE) + TcpConnection *tc = reinterpret_cast(parser->data); + tc->messageSize += (unsigned long)length; + if (tc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE) return -1; - htc->currentHeaderValue.append(ptr,length); + tc->currentHeaderValue.append(ptr,length); return 0; } static int ShttpOnHeadersComplete(http_parser *parser) { - HttpConnection *htc = reinterpret_cast(parser->data); - if ((htc->currentHeaderField.length())&&(htc->currentHeaderValue.length())) - htc->headers[htc->currentHeaderField] = htc->currentHeaderValue; + TcpConnection *tc = reinterpret_cast(parser->data); + if ((tc->currentHeaderField.length())&&(tc->currentHeaderValue.length())) + tc->headers[tc->currentHeaderField] = tc->currentHeaderValue; return 0; } static int ShttpOnBody(http_parser *parser,const char *ptr,size_t length) { - HttpConnection *htc = reinterpret_cast(parser->data); - htc->messageSize += (unsigned long)length; - if (htc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE) + TcpConnection *tc = reinterpret_cast(parser->data); + tc->messageSize += (unsigned long)length; + if (tc->messageSize > ZT_MAX_HTTP_MESSAGE_SIZE) return -1; - htc->body.append(ptr,length); + tc->body.append(ptr,length); return 0; } static int ShttpOnMessageComplete(http_parser *parser) { - HttpConnection *htc = reinterpret_cast(parser->data); - htc->shouldKeepAlive = (http_should_keep_alive(parser) != 0); - htc->lastActivity = OSUtils::now(); - if (htc->server) { - htc->parent->onHttpRequestToServer(htc); + TcpConnection *tc = reinterpret_cast(parser->data); + tc->shouldKeepAlive = (http_should_keep_alive(parser) != 0); + tc->lastActivity = OSUtils::now(); + if (tc->type == TcpConnection::TCP_HTTP_INCOMING) { + tc->parent->onHttpRequestToServer(tc); } else { - htc->parent->onHttpResponseFromClient(htc); + tc->parent->onHttpResponseFromClient(tc); } return 0; }