FDs transferred over listen socket + other fixes

This commit is contained in:
Joseph Henry 2015-12-12 01:28:59 -08:00
parent 761bb4fdd0
commit 99afc74021
3 changed files with 64 additions and 93 deletions

View file

@ -195,7 +195,6 @@ int get_new_fd(int oversock)
int send_cmd(int rpc_fd, char *cmd) int send_cmd(int rpc_fd, char *cmd)
{ {
//dwr(MSG_DEBUG, "\n---send_cmd[start]\n");
pthread_mutex_lock(&lock); pthread_mutex_lock(&lock);
char metabuf[BUF_SZ]; // portion of buffer which contains RPC metadata for debugging char metabuf[BUF_SZ]; // portion of buffer which contains RPC metadata for debugging
#ifdef VERBOSE #ifdef VERBOSE
@ -223,7 +222,7 @@ int send_cmd(int rpc_fd, char *cmd)
#endif #endif
/* Combine command flag+payload with RPC metadata */ /* Combine command flag+payload with RPC metadata */
memcpy(&metabuf[IDX_PAYLOAD], cmd, PAYLOAD_SZ); memcpy(&metabuf[IDX_PAYLOAD], cmd, PAYLOAD_SZ);
usleep(10000); usleep(1000);
int n_write = write(rpc_fd, &metabuf, BUF_SZ); int n_write = write(rpc_fd, &metabuf, BUF_SZ);
if(n_write < 0){ if(n_write < 0){
dwr(MSG_DEBUG,"Error writing command to service (CMD = %d)\n", cmd[0]); dwr(MSG_DEBUG,"Error writing command to service (CMD = %d)\n", cmd[0]);
@ -235,14 +234,12 @@ int send_cmd(int rpc_fd, char *cmd)
if(n_write > 0) { if(n_write > 0) {
if(cmd[0]==RPC_SOCKET) { if(cmd[0]==RPC_SOCKET) {
dwr(MSG_DEBUG," waiting on get_new_fd()...\n");
ret = get_new_fd(fdret_sock); ret = get_new_fd(fdret_sock);
} }
if(cmd[0]==RPC_MAP) { if(cmd[0]==RPC_MAP) {
ret = n_write; ret = n_write;
} }
if(cmd[0]==RPC_MAP_REQ || cmd[0]==RPC_CONNECT || cmd[0]==RPC_BIND) { if(cmd[0]==RPC_MAP_REQ || cmd[0]==RPC_CONNECT || cmd[0]==RPC_BIND) {
dwr(MSG_DEBUG," waiting on get_retval()...\n");
ret = get_retval(); ret = get_retval();
} }
if(cmd[0]==RPC_LISTEN || cmd[0]==RPC_GETSOCKNAME) { if(cmd[0]==RPC_LISTEN || cmd[0]==RPC_GETSOCKNAME) {
@ -253,7 +250,6 @@ int send_cmd(int rpc_fd, char *cmd)
ret = -1; ret = -1;
} }
pthread_mutex_unlock(&lock); pthread_mutex_unlock(&lock);
//dwr(MSG_DEBUG, "---send_cmd[end]\n\n");
return ret; return ret;
} }
@ -529,10 +525,8 @@ int socket(SOCKET_SIG)
memset(cmd, '\0', BUF_SZ); memset(cmd, '\0', BUF_SZ);
cmd[0] = RPC_MAP; cmd[0] = RPC_MAP;
memcpy(&cmd[1], &newfd, sizeof(newfd)); memcpy(&cmd[1], &newfd, sizeof(newfd));
/* send fd mapping and get confirmation */ /* send fd mapping and get confirmation */
dwr(MSG_DEBUG, "pre-send_cmd\n"); err = send_cmd(fdret_sock, cmd);
err = send_cmd(fdret_sock, cmd);
dwr(MSG_DEBUG, "post-send_cmd\n");
if(err > -1) { if(err > -1) {
errno = ERR_OK; errno = ERR_OK;
@ -611,7 +605,6 @@ int connect(CONNECT_SIG)
} }
/* Assemble and send RPC */ /* Assemble and send RPC */
int err;
char cmd[BUF_SZ]; char cmd[BUF_SZ];
memset(cmd, '\0', BUF_SZ); memset(cmd, '\0', BUF_SZ);
struct connect_st rpc_st; struct connect_st rpc_st;
@ -621,7 +614,6 @@ int connect(CONNECT_SIG)
memcpy(&rpc_st.__len, &__len, sizeof(socklen_t)); memcpy(&rpc_st.__len, &__len, sizeof(socklen_t));
cmd[0] = RPC_CONNECT; cmd[0] = RPC_CONNECT;
memcpy(&cmd[1], &rpc_st, sizeof(struct connect_st)); memcpy(&cmd[1], &rpc_st, sizeof(struct connect_st));
return send_cmd(fdret_sock, cmd); return send_cmd(fdret_sock, cmd);
} }
@ -672,7 +664,6 @@ int bind(BIND_SIG)
} }
#endif #endif
int err;
/* make sure we don't touch any standard outputs */ /* make sure we don't touch any standard outputs */
if(sockfd == STDIN_FILENO || sockfd == STDOUT_FILENO || sockfd == STDERR_FILENO) if(sockfd == STDIN_FILENO || sockfd == STDOUT_FILENO || sockfd == STDERR_FILENO)
return(realbind(sockfd, addr, addrlen)); return(realbind(sockfd, addr, addrlen));
@ -807,42 +798,31 @@ int accept(ACCEPT_SIG)
/* The following line is required for libuv/nodejs to accept connections properly, /* The following line is required for libuv/nodejs to accept connections properly,
however, this has the side effect of causing certain webservers to max out the CPU however, this has the side effect of causing certain webservers to max out the CPU
in an accept loop */ in an accept loop */
//fcntl(sockfd, F_SETFL, O_NONBLOCK); fcntl(sockfd, F_SETFL, O_NONBLOCK);
int new_conn_socket = get_new_fd(sockfd);
char c[1]; if(new_conn_socket > 0)
int new_conn_socket;
int n = read(sockfd, c, sizeof(c)); /* Read signal byte */
if(n > 0)
{ {
new_conn_socket = get_new_fd(fdret_sock); //new_conn_socket = get_new_fd(fdret_sock);
dwr(MSG_DEBUG, " accept(): RX: fd = (%d) over (%d)\n", new_conn_socket, fdret_sock); dwr(MSG_DEBUG, " accept(): RX: fd = (%d) over (%d)\n", new_conn_socket, fdret_sock);
if(new_conn_socket > 0) { /* Send our local-fd number back to service so it can complete its mapping table */
/* Send our local-fd number back to service so it can complete its mapping table */ memset(cmd, '\0', BUF_SZ);
memset(cmd, '\0', BUF_SZ); cmd[0] = RPC_MAP;
cmd[0] = RPC_MAP; memcpy(&cmd[1], &new_conn_socket, sizeof(new_conn_socket));
memcpy(&cmd[1], &new_conn_socket, sizeof(new_conn_socket));
dwr(MSG_DEBUG, "accept(): sending perceived fd (%d) to service.\n", new_conn_socket); dwr(MSG_DEBUG, "accept(): sending perceived fd (%d) to service.\n", new_conn_socket);
int n_write = send_cmd(fdret_sock, cmd); int n_write = send_cmd(fdret_sock, cmd);
if(n_write < 0) { if(n_write < 0) {
errno = ECONNABORTED; /* TODO: Closest match, service unreachable */ errno = ECONNABORTED;
handle_error("accept", "ECONNABORTED - Error sending perceived FD to service", -1); handle_error("accept", "ECONNABORTED - Error sending perceived FD to service", -1);
return -1;
}
errno = ERR_OK;
dwr(MSG_DEBUG,"*accept()=%d\n", new_conn_socket);
handle_error("accept", "", new_conn_socket);
return new_conn_socket; /* OK */
}
else {
errno = ECONNABORTED; /* TODO: Closest match, service unreachable */
handle_error("accept", "ECONNABORTED - Error receiving new FD from service", -1);
return -1; return -1;
} }
errno = ERR_OK;
dwr(MSG_DEBUG,"accept()=%d\n", new_conn_socket);
handle_error("accept", "", new_conn_socket);
return new_conn_socket; /* OK */
} }
errno = EAGAIN; /* necessary? */ errno = EAGAIN; /* necessary? */
handle_error("accept", "EAGAIN - Error reading signal byte from service", -1); handle_error("accept", "EAGAIN - Error reading signal byte from service", -1);
return -EAGAIN; return -EAGAIN;
@ -1024,8 +1004,7 @@ int getsockname(GETSOCKNAME_SIG)
dwr(MSG_ERROR, "getsockname(): SYMBOL NOT FOUND.\n"); dwr(MSG_ERROR, "getsockname(): SYMBOL NOT FOUND.\n");
return -1; return -1;
} }
return realgetsockname(sockfd, addr, addrlen); return realgetsockname(sockfd, addr, addrlen);
/* assemble command */ /* assemble command */
char cmd[BUF_SZ]; char cmd[BUF_SZ];
struct getsockname_st rpc_st; struct getsockname_st rpc_st;

View file

@ -832,46 +832,38 @@ err_t NetconEthernetTap::nc_accept(void *arg, struct tcp_pcb *newpcb, err_t err)
int listening_fd = tap->_phy.getDescriptor(conn->dataSock); int listening_fd = tap->_phy.getDescriptor(conn->dataSock);
if(conn) { if(conn) {
ZT_PHY_SOCKFD_TYPE fds[2]; ZT_PHY_SOCKFD_TYPE fds[2];
if(socketpair(PF_LOCAL, SOCK_STREAM, 0, fds) < 0) { if(socketpair(PF_LOCAL, SOCK_STREAM, 0, fds) < 0) {
if(errno < 0) { if(errno < 0) {
l->tap->send_return_value(conn, -1, errno); l->tap->send_return_value(conn, -1, errno);
dwr(MSG_ERROR, " nc_accept(): unable to create socketpair\n"); dwr(MSG_ERROR, " nc_accept(): unable to create socketpair\n");
return ERR_MEM; return ERR_MEM;
}
} }
TcpConnection *new_tcp_conn = new TcpConnection(); }
new_tcp_conn->dataSock = tap->_phy.wrapSocket(fds[0], new_tcp_conn); TcpConnection *new_tcp_conn = new TcpConnection();
new_tcp_conn->rpcSock = conn->rpcSock; new_tcp_conn->dataSock = tap->_phy.wrapSocket(fds[0], new_tcp_conn);
new_tcp_conn->pcb = newpcb; new_tcp_conn->rpcSock = conn->rpcSock;
new_tcp_conn->their_fd = fds[1]; new_tcp_conn->pcb = newpcb;
tap->tcp_connections.push_back(new_tcp_conn); new_tcp_conn->their_fd = fds[1];
dwr(MSG_DEBUG, " nc_accept(): socketpair = {%d, %d}\n", fds[0], fds[1]); tap->tcp_connections.push_back(new_tcp_conn);
int n, send_fd = tap->_phy.getDescriptor(conn->rpcSock); dwr(MSG_DEBUG, " nc_accept(): socketpair = {%d, %d}\n", fds[0], fds[1]);
if((n = send(listening_fd, "z", 1, MSG_NOSIGNAL)) < 0) { int send_fd = tap->_phy.getDescriptor(conn->rpcSock);
dwr(MSG_ERROR, " nc_accept(): Error: [send(listening_fd,...) = MSG_NOSIGNAL].\n");
return -1; if(sock_fd_write(listening_fd, fds[1]) < 0){
} dwr(MSG_ERROR, " nc_accept(%d): error writing signal byte (listen_fd = %d, perceived_fd = %d)\n", listening_fd, send_fd, fds[1]);
else if(n > 0) { return -1;
if(sock_fd_write(send_fd, fds[1]) > 0) { }
close(fds[1]); // close other end of socketpair else {
new_tcp_conn->pending = true; close(fds[1]); // close other end of socketpair
} new_tcp_conn->pending = true;
else { }
dwr(MSG_ERROR, " nc_accept(%d): unable to send fd to client\n", listening_fd);
}
}
else {
dwr(MSG_ERROR, " nc_accept(%d): error writing signal byte (send_fd = %d, perceived_fd = %d)\n", listening_fd, send_fd, fds[1]);
return -1;
}
tap->lwipstack->_tcp_arg(newpcb, new Larg(tap, new_tcp_conn)); tap->lwipstack->_tcp_arg(newpcb, new Larg(tap, new_tcp_conn));
tap->lwipstack->_tcp_recv(newpcb, nc_recved); tap->lwipstack->_tcp_recv(newpcb, nc_recved);
tap->lwipstack->_tcp_err(newpcb, nc_err); tap->lwipstack->_tcp_err(newpcb, nc_err);
tap->lwipstack->_tcp_sent(newpcb, nc_sent); tap->lwipstack->_tcp_sent(newpcb, nc_sent);
tap->lwipstack->_tcp_poll(newpcb, nc_poll, 1); tap->lwipstack->_tcp_poll(newpcb, nc_poll, 1);
tcp_accepted(conn->pcb); // Let lwIP know that it can queue additional incoming connections tcp_accepted(conn->pcb); // Let lwIP know that it can queue additional incoming connections
return ERR_OK; return ERR_OK;
} }
else { else {
dwr(MSG_ERROR, " nc_accept(%d): can't locate Connection object for PCB.\n", listening_fd); dwr(MSG_ERROR, " nc_accept(%d): can't locate Connection object for PCB.\n", listening_fd);
@ -904,10 +896,10 @@ err_t NetconEthernetTap::nc_recved(void *arg, struct tcp_pcb *tpcb, struct pbuf
return ERR_OK; // ? return ERR_OK; // ?
} }
if(p == NULL) { if(p == NULL) {
if(l->conn) { if(l->conn && !l->conn->listening) {
dwr(MSG_INFO, " nc_recved(): closing connection\n"); dwr(MSG_INFO, " nc_recved(): closing connection\n");
l->tap->closeConnection(l->conn); // l->tap->closeConnection(l->conn);
return ERR_ABRT; return ERR_ABRT;
} }
else { else {
dwr(MSG_ERROR, " nc_recved(): can't locate connection via (arg)\n"); dwr(MSG_ERROR, " nc_recved(): can't locate connection via (arg)\n");
@ -1092,11 +1084,11 @@ err_t NetconEthernetTap::nc_connected(void *arg, struct tcp_pcb *tpcb, err_t err
void NetconEthernetTap::unload_rpc(void *data, pid_t &pid, pid_t &tid, int &rpc_count, char (timestamp[20]), char &cmd, void* &payload) void NetconEthernetTap::unload_rpc(void *data, pid_t &pid, pid_t &tid, int &rpc_count, char (timestamp[20]), char &cmd, void* &payload)
{ {
unsigned char *buf = (unsigned char*)data; unsigned char *buf = (unsigned char*)data;
memcpy(&pid, &buf[IDX_PID], sizeof(pid_t)); memcpy(&pid, &buf[IDX_PID], sizeof(pid_t));
memcpy(&tid, &buf[IDX_TID], sizeof(pid_t)); memcpy(&tid, &buf[IDX_TID], sizeof(pid_t));
memcpy(&rpc_count, &buf[IDX_COUNT], sizeof(int)); memcpy(&rpc_count, &buf[IDX_COUNT], sizeof(int));
memcpy(timestamp, &buf[IDX_TIME], 20); memcpy(timestamp, &buf[IDX_TIME], 20);
memcpy(&cmd, &buf[IDX_PAYLOAD], sizeof(char)); memcpy(&cmd, &buf[IDX_PAYLOAD], sizeof(char));
} }
/* /*
@ -1182,23 +1174,23 @@ void NetconEthernetTap::handle_getsockname(PhySocket *sock, void **uptr, struct
TcpConnection *conn = getConnectionByTheirFD(sock, getsockname_rpc->sockfd); TcpConnection *conn = getConnectionByTheirFD(sock, getsockname_rpc->sockfd);
dwr(MSG_DEBUG, "handle_getsockname(): sockfd = %d\n", getsockname_rpc->sockfd); dwr(MSG_DEBUG, "handle_getsockname(): sockfd = %d\n", getsockname_rpc->sockfd);
/* /*
int port = conn->addr->sin_port;
int ip = conn->addr->sin_addr.s_addr; int ip = conn->addr->sin_addr.s_addr;
unsigned char d[4]; unsigned char d[4];
d[0] = ip & 0xFF; d[0] = ip & 0xFF;
d[1] = (ip >> 8) & 0xFF; d[1] = (ip >> 8) & 0xFF;
d[2] = (ip >> 16) & 0xFF; d[2] = (ip >> 16) & 0xFF;
d[3] = (ip >> 24) & 0xFF; d[3] = (ip >> 24) & 0xFF;
int port = conn->addr->sin_port;
dwr(MSG_ERROR, " handle_getsockname(): returning address: %d.%d.%d.%d: %d\n", d[0],d[1],d[2],d[3], port); dwr(MSG_ERROR, " handle_getsockname(): returning address: %d.%d.%d.%d: %d\n", d[0],d[1],d[2],d[3], port);
*/ */
// Assemble address "command" to send to intercept // Assemble address "command" to send to intercept
char retmsg[sizeof(struct sockaddr)]; char retmsg[sizeof(struct sockaddr)];
memset(&retmsg, '\0', sizeof(retmsg)); memset(&retmsg, '\0', sizeof(retmsg));
dwr(MSG_ERROR, " handle_getsockname(): %d\n", sizeof(retmsg)); dwr(MSG_ERROR, " handle_getsockname(): %d\n", sizeof(retmsg));
memcpy(&retmsg, conn->addr, sizeof(struct sockaddr)); if(conn == NULL) return;
memcpy(&retmsg, conn->addr, 1);
// Get connection's RPC fd and send structure containing bound address // Get connection's RPC fd and send structure containing bound address
int fd = _phy.getDescriptor(conn->rpcSock); int fd = _phy.getDescriptor(conn->rpcSock);
write(fd, &retmsg, sizeof(struct sockaddr)); write(fd, &retmsg, sizeof(struct sockaddr));

View file

@ -171,17 +171,17 @@ ssize_t sock_fd_read(int sock, void *buf, ssize_t bufsize, int *fd)
size = recvmsg (sock, &msg, 0); size = recvmsg (sock, &msg, 0);
if (size < 0) { if (size < 0) {
perror ("recvmsg"); perror ("recvmsg");
exit(1); return -1;
} }
cmsg = CMSG_FIRSTHDR(&msg); cmsg = CMSG_FIRSTHDR(&msg);
if (cmsg && cmsg->cmsg_len == CMSG_LEN(sizeof(int))) { if (cmsg && cmsg->cmsg_len == CMSG_LEN(sizeof(int))) {
if (cmsg->cmsg_level != SOL_SOCKET) { if (cmsg->cmsg_level != SOL_SOCKET) {
fprintf (stderr, "invalid cmsg_level %d\n",cmsg->cmsg_level); fprintf (stderr, "invalid cmsg_level %d\n",cmsg->cmsg_level);
exit(1); return -1;
} }
if (cmsg->cmsg_type != SCM_RIGHTS) { if (cmsg->cmsg_type != SCM_RIGHTS) {
fprintf (stderr, "invalid cmsg_type %d\n",cmsg->cmsg_type); fprintf (stderr, "invalid cmsg_type %d\n",cmsg->cmsg_type);
exit(1); return -1;
} }
*fd = *((int *) CMSG_DATA(cmsg)); *fd = *((int *) CMSG_DATA(cmsg));
@ -190,7 +190,7 @@ ssize_t sock_fd_read(int sock, void *buf, ssize_t bufsize, int *fd)
size = read (sock, buf, bufsize); size = read (sock, buf, bufsize);
if (size < 0) { if (size < 0) {
perror("read"); perror("read");
exit(1); return -1;
} }
} }
return size; return size;