Stateless RPC rework

This commit is contained in:
Joseph Henry 2016-01-11 10:12:59 -08:00
parent ff9317365a
commit 3e65ecb93d
10 changed files with 698 additions and 975 deletions

View file

@ -49,6 +49,14 @@
#undef TCP_MSS #undef TCP_MSS
#define TCP_MSS 1460 #define TCP_MSS 1460
/**
* MEMP_NUM_REASSDATA: the number of IP packets simultaneously queued for
* reassembly (whole packets, not fragments!)
*/
// #undef MEMP_NUM_REASSDATA
//#define MEMP_NUM_REASSDATA 64
/* /*
The TCP window size can be adjusted by changing the define TCP_WND. However, The TCP window size can be adjusted by changing the define TCP_WND. However,
do keep in mind that this should be at least twice the size of TCP_MSS (thus do keep in mind that this should be at least twice the size of TCP_MSS (thus

View file

@ -58,40 +58,6 @@
#include "RPC.h" #include "RPC.h"
#include "common.inc.c" #include "common.inc.c"
/* Global Declarations */
static int (*realconnect)(CONNECT_SIG);
static int (*realbind)(BIND_SIG);
static int (*realaccept)(ACCEPT_SIG);
static int (*reallisten)(LISTEN_SIG);
static int (*realsocket)(SOCKET_SIG);
static int (*realsetsockopt)(SETSOCKOPT_SIG);
static int (*realgetsockopt)(GETSOCKOPT_SIG);
static int (*realaccept4)(ACCEPT4_SIG);
static long (*realsyscall)(SYSCALL_SIG);
static int (*realclose)(CLOSE_SIG);
static int (*realclone)(CLONE_SIG);
static int (*realdup2)(DUP2_SIG);
static int (*realdup3)(DUP3_SIG);
static int (*realgetsockname)(GETSOCKNAME_SIG);
/* Exported Function Prototypes */
void my_init(void);
int connect(CONNECT_SIG);
int bind(BIND_SIG);
int accept(ACCEPT_SIG);
int listen(LISTEN_SIG);
int socket(SOCKET_SIG);
int setsockopt(SETSOCKOPT_SIG);
int getsockopt(GETSOCKOPT_SIG);
int accept4(ACCEPT4_SIG);
long syscall(SYSCALL_SIG);
int close(CLOSE_SIG);
int clone(CLONE_SIG);
int dup2(DUP2_SIG);
int dup3(DUP3_SIG);
int getsockname(GETSOCKNAME_SIG);
static int init_service_connection();
static void load_symbols(void); static void load_symbols(void);
static void set_up_intercept(); static void set_up_intercept();
@ -99,44 +65,33 @@ static void set_up_intercept();
------------------- Intercept<--->Service Comm mechanisms ---------------------- ------------------- Intercept<--->Service Comm mechanisms ----------------------
------------------------------------------------------------------------------*/ ------------------------------------------------------------------------------*/
static int rpcfd = -1; /* used for fd-transfers */
static int thispid = -1; static int thispid = -1;
static int instance_count = 0; char *network_pathname;
static int connected_to_service() {
return rpcfd == -1 ? 0 : 1;
}
/* Check whether the socket is mapped to the service or not. We /* Check whether the socket is mapped to the service or not. We
need to know if this is a regular AF_LOCAL socket or an end of a socketpair need to know if this is a regular AF_LOCAL socket or an end of a socketpair
that the service uses. We don't want to keep state in the intercept, so that the service uses. We don't want to keep state in the intercept, so
we simply ask the service via an RPC */ we simply ask the service via an RPC */
static int is_mapped_to_service(int sockfd)
static int connected_to_service(int sockfd)
{ {
if(rpcfd < 0) dwr(MSG_DEBUG_EXTRA,"connected_to_service():\n");
return 0; /* no connection obviously implies no mapping */ socklen_t len;
dwr(MSG_DEBUG,"is_mapped_to_service()\n"); struct sockaddr_storage addr;
return rpc_send_command(RPC_MAP_REQ, rpcfd, &sockfd, sizeof(sockfd)); len = sizeof addr;
struct sockaddr_un * addr_un;
getpeername(sockfd, (struct sockaddr*)&addr, &len);
if (addr.ss_family == AF_LOCAL || addr.ss_family == AF_LOCAL) {
addr_un = (struct sockaddr_un*)&addr;
if(strcmp(addr_un->sun_path, network_pathname) == 0) {
dwr(MSG_DEBUG_EXTRA,"connected_to_service(): Yes, %s\n", addr_un->sun_path);
return 1;
}
}
dwr(MSG_DEBUG_EXTRA,"connected_to_service(): Not connected to service\n");
return 0;
} }
/* Sets up the connection pipes and sockets to the service */
static int init_service_connection()
{
const char *network_id;
char rpcname[1024];
network_id = getenv("ZT_NC_NETWORK");
/* Do noting if not configured (sanity check -- should never get here in this case) */
if (!network_id){
fprintf(stderr, "init_service_connection(): ZT_NC_NETWORK not set.\n");
exit(0);
}
if((rpcfd < 0 && instance_count==0) || thispid != getpid())
rpc_mutex_init();
strncpy(rpcname,network_id,sizeof(rpcname));
instance_count++;
return rpc_join(rpcname);
}
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
------------------------ ctors and dtors (and friends) ------------------------ ------------------------ ctors and dtors (and friends) ------------------------
@ -154,7 +109,6 @@ static void load_symbols(void)
dwr(MSG_DEBUG,"detected duplicate call to global constructor (pid=%d).\n", thispid); dwr(MSG_DEBUG,"detected duplicate call to global constructor (pid=%d).\n", thispid);
} }
thispid = getpid(); thispid = getpid();
realconnect = dlsym(RTLD_NEXT, "connect"); realconnect = dlsym(RTLD_NEXT, "connect");
realbind = dlsym(RTLD_NEXT, "bind"); realbind = dlsym(RTLD_NEXT, "bind");
realaccept = dlsym(RTLD_NEXT, "accept"); realaccept = dlsym(RTLD_NEXT, "accept");
@ -179,9 +133,12 @@ static void _init(void) { set_up_intercept(); }
/* get symbols and initialize mutexes */ /* get symbols and initialize mutexes */
static void set_up_intercept() static void set_up_intercept()
{ {
network_pathname = getenv("ZT_NC_NETWORK");
dwr(MSG_DEBUG,"Connecting to service at: %s\n", network_pathname);
if (!getenv("ZT_NC_NETWORK")) if (!getenv("ZT_NC_NETWORK"))
return; return;
/* Hook/intercept Posix net API symbols */ /* Hook/intercept Posix net API symbols */
rpc_mutex_init();
load_symbols(); load_symbols();
} }
@ -193,11 +150,10 @@ static void set_up_intercept()
int setsockopt(SETSOCKOPT_SIG) int setsockopt(SETSOCKOPT_SIG)
{ {
if(realsetsockopt == NULL){ if(realsetsockopt == NULL){
dwr(MSG_ERROR, "setsockopt(): SYMBOL NOT FOUND.\n"); dwr(MSG_ERROR,"setsockopt(): SYMBOL NOT FOUND.\n");
return -1; return -1;
} }
dwr(MSG_DEBUG,"setsockopt(%d)\n", socket); dwr(MSG_DEBUG,"setsockopt(%d)\n", socket);
/* return(realsetsockopt(socket, level, option_name, option_value, option_len)); */ /* return(realsetsockopt(socket, level, option_name, option_value, option_len)); */
if(level == SOL_IPV6 && option_name == IPV6_V6ONLY) if(level == SOL_IPV6 && option_name == IPV6_V6ONLY)
return 0; return 0;
@ -209,9 +165,8 @@ int setsockopt(SETSOCKOPT_SIG)
if(socket == STDIN_FILENO || socket == STDOUT_FILENO || socket == STDERR_FILENO) if(socket == STDIN_FILENO || socket == STDOUT_FILENO || socket == STDERR_FILENO)
return(realsetsockopt(socket, level, option_name, option_value, option_len)); return(realsetsockopt(socket, level, option_name, option_value, option_len));
int err = realsetsockopt(socket, level, option_name, option_value, option_len); int err = realsetsockopt(socket, level, option_name, option_value, option_len);
if(err < 0){ if(err < 0)
perror("setsockopt():\n"); perror("setsockopt():\n");
}
return 0; return 0;
} }
@ -223,19 +178,13 @@ int setsockopt(SETSOCKOPT_SIG)
int getsockopt(GETSOCKOPT_SIG) int getsockopt(GETSOCKOPT_SIG)
{ {
if(realgetsockopt == NULL){ if(realgetsockopt == NULL){
dwr(MSG_ERROR, "getsockopt(): SYMBOL NOT FOUND.\n"); dwr(MSG_ERROR,"getsockopt(): SYMBOL NOT FOUND.\n");
return -1; return -1;
} }
dwr(MSG_DEBUG,"getsockopt(%d)\n", sockfd); dwr(MSG_DEBUG,"getsockopt(%d)\n", sockfd);
if(!connected_to_service(sockfd)) {
if(is_mapped_to_service(sockfd) <= 0) { // First, check if the service manages this
return realgetsockopt(sockfd, level, optname, optval, optlen); return realgetsockopt(sockfd, level, optname, optval, optlen);
} }
//int err = realgetsockopt(sockfd, level, optname, optval, optlen);
/* TODO: this condition will need a little more intelligence later on
-- we will need to know if this fd is a local we are spoofing, or a true local */
if(optname == SO_TYPE) { if(optname == SO_TYPE) {
int* val = (int*)optval; int* val = (int*)optval;
*val = 2; *val = 2;
@ -244,7 +193,6 @@ int getsockopt(GETSOCKOPT_SIG)
return 0; return 0;
} }
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
----------------------------------- socket() ----------------------------------- ----------------------------------- socket() -----------------------------------
------------------------------------------------------------------------------*/ ------------------------------------------------------------------------------*/
@ -255,9 +203,7 @@ int socket(SOCKET_SIG)
{ {
if(realsocket == NULL) if(realsocket == NULL)
set_up_intercept(); set_up_intercept();
dwr(MSG_DEBUG,"socket():\n"); dwr(MSG_DEBUG,"socket():\n");
int newfd = -1;
/* Check that type makes sense */ /* Check that type makes sense */
int flags = socket_type & ~SOCK_TYPE_MASK; int flags = socket_type & ~SOCK_TYPE_MASK;
if (flags & ~(SOCK_CLOEXEC | SOCK_NONBLOCK)) { if (flags & ~(SOCK_CLOEXEC | SOCK_NONBLOCK)) {
@ -275,7 +221,6 @@ int socket(SOCKET_SIG)
return -1; return -1;
} }
/* TODO: detect ENFILE condition */ /* TODO: detect ENFILE condition */
if(socket_family == AF_LOCAL if(socket_family == AF_LOCAL
|| socket_family == AF_NETLINK || socket_family == AF_NETLINK
|| socket_family == AF_UNIX) { || socket_family == AF_UNIX) {
@ -283,35 +228,14 @@ int socket(SOCKET_SIG)
dwr(MSG_DEBUG,"realsocket() = %d\n", err); dwr(MSG_DEBUG,"realsocket() = %d\n", err);
return err; return err;
} }
rpcfd = !connected_to_service() ? init_service_connection() : rpcfd;
if(rpcfd < 0) {
dwr(MSG_DEBUG,"BAD service connection. exiting.\n");
exit(-1);
}
/* Assemble and send RPC */ /* Assemble and send RPC */
struct socket_st rpc_st; struct socket_st rpc_st;
rpc_st.socket_family = socket_family; rpc_st.socket_family = socket_family;
rpc_st.socket_type = socket_type; rpc_st.socket_type = socket_type;
rpc_st.protocol = protocol; rpc_st.protocol = protocol;
rpc_st.__tid = syscall(SYS_gettid); rpc_st.__tid = syscall(SYS_gettid);
/* -1 is passed since we we're generating the new socket in this call */
newfd = rpc_send_command(RPC_SOCKET, rpcfd, &rpc_st, sizeof(struct socket_st)); return rpc_send_command(RPC_SOCKET, -1, &rpc_st, sizeof(struct socket_st));
if(newfd > 0)
{
dwr(MSG_DEBUG,"sending fd = %d to Service over (%d)\n", newfd, rpcfd);
/* send our local-fd number back to service so
it can complete its mapping table entry */
/* send fd mapping and get confirmation */
if(rpc_send_command(RPC_MAP, rpcfd, &newfd, sizeof(newfd)) > -1) {
errno = ERR_OK;
dwr(MSG_DEBUG, "RXd fd confirmation. Mapped!\n");
return newfd; /* Mapping complete, everything is OK */
}
}
dwr(MSG_DEBUG,"Error while receiving new fd.\n");
return -1;
} }
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
@ -323,13 +247,12 @@ int socket(SOCKET_SIG)
int connect(CONNECT_SIG) int connect(CONNECT_SIG)
{ {
if(realconnect == NULL){ if(realconnect == NULL){
dwr(MSG_ERROR, "connect(): SYMBOL NOT FOUND.\n"); dwr(MSG_ERROR,"connect(): SYMBOL NOT FOUND.\n");
return -1; return -1;
} }
dwr(MSG_DEBUG,"connect(%d):\n", __fd); dwr(MSG_DEBUG,"connect(%d):\n", __fd);
struct sockaddr_in *connaddr; struct sockaddr_in *connaddr;
connaddr = (struct sockaddr_in *) __addr; connaddr = (struct sockaddr_in *) __addr;
/* Check that this is a valid fd */ /* Check that this is a valid fd */
if(fcntl(__fd, F_GETFD) < 0) { if(fcntl(__fd, F_GETFD) < 0) {
errno = EBADF; errno = EBADF;
@ -347,8 +270,6 @@ int connect(CONNECT_SIG)
errno = EAFNOSUPPORT; errno = EAFNOSUPPORT;
return -1; return -1;
} }
/* FIXME: Check that address is in user space, return EFAULT ? */
/* make sure we don't touch any standard outputs */ /* make sure we don't touch any standard outputs */
if(__fd == STDIN_FILENO || __fd == STDOUT_FILENO || __fd == STDERR_FILENO) if(__fd == STDIN_FILENO || __fd == STDOUT_FILENO || __fd == STDERR_FILENO)
return(realconnect(__fd, __addr, __len)); return(realconnect(__fd, __addr, __len));
@ -358,18 +279,15 @@ int connect(CONNECT_SIG)
|| connaddr->sin_family == AF_NETLINK || connaddr->sin_family == AF_NETLINK
|| connaddr->sin_family == AF_UNIX)) { || connaddr->sin_family == AF_UNIX)) {
int err = realconnect(__fd, __addr, __len); int err = realconnect(__fd, __addr, __len);
//perror("connect():");
return err; return err;
} }
/* Assemble and send RPC */ /* Assemble and send RPC */
struct connect_st rpc_st; struct connect_st rpc_st;
rpc_st.__tid = syscall(SYS_gettid); rpc_st.__tid = syscall(SYS_gettid);
rpc_st.__fd = __fd; rpc_st.__fd = __fd;
memcpy(&rpc_st.__addr, __addr, sizeof(struct sockaddr_storage)); memcpy(&rpc_st.__addr, __addr, sizeof(struct sockaddr_storage));
memcpy(&rpc_st.__len, &__len, sizeof(socklen_t)); memcpy(&rpc_st.__len, &__len, sizeof(socklen_t));
return rpc_send_command(RPC_CONNECT, __fd, &rpc_st, sizeof(struct connect_st));
return rpc_send_command(RPC_CONNECT, rpcfd, &rpc_st, sizeof(struct connect_st));
} }
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
@ -381,7 +299,7 @@ int connect(CONNECT_SIG)
int bind(BIND_SIG) int bind(BIND_SIG)
{ {
if(realbind == NULL){ if(realbind == NULL){
dwr(MSG_ERROR, "bind(): SYMBOL NOT FOUND.\n"); dwr(MSG_ERROR,"bind(): SYMBOL NOT FOUND.\n");
return -1; return -1;
} }
dwr(MSG_DEBUG,"bind(%d):\n", sockfd); dwr(MSG_DEBUG,"bind(%d):\n", sockfd);
@ -397,11 +315,9 @@ int bind(BIND_SIG)
errno = ENOTSOCK; errno = ENOTSOCK;
return -1; return -1;
} }
/* make sure we don't touch any standard outputs */ /* make sure we don't touch any standard outputs */
if(sockfd == STDIN_FILENO || sockfd == STDOUT_FILENO || sockfd == STDERR_FILENO) if(sockfd == STDIN_FILENO || sockfd == STDOUT_FILENO || sockfd == STDERR_FILENO)
return(realbind(sockfd, addr, addrlen)); return(realbind(sockfd, addr, addrlen));
/* If local, just use normal syscall */ /* If local, just use normal syscall */
struct sockaddr_in *connaddr; struct sockaddr_in *connaddr;
connaddr = (struct sockaddr_in *)addr; connaddr = (struct sockaddr_in *)addr;
@ -413,7 +329,6 @@ int bind(BIND_SIG)
dwr(MSG_DEBUG,"realbind, err = %d\n", err); dwr(MSG_DEBUG,"realbind, err = %d\n", err);
return err; return err;
} }
int port = connaddr->sin_port; int port = connaddr->sin_port;
int ip = connaddr->sin_addr.s_addr; int ip = connaddr->sin_addr.s_addr;
unsigned char d[4]; unsigned char d[4];
@ -421,28 +336,25 @@ int bind(BIND_SIG)
d[1] = (ip >> 8) & 0xFF; d[1] = (ip >> 8) & 0xFF;
d[2] = (ip >> 16) & 0xFF; d[2] = (ip >> 16) & 0xFF;
d[3] = (ip >> 24) & 0xFF; d[3] = (ip >> 24) & 0xFF;
dwr(MSG_DEBUG, "bind(): %d.%d.%d.%d: %d\n", d[0],d[1],d[2],d[3], ntohs(port)); dwr(MSG_DEBUG,"bind(): %d.%d.%d.%d: %d\n", d[0],d[1],d[2],d[3], ntohs(port));
/* Assemble and send RPC */ /* Assemble and send RPC */
struct bind_st rpc_st; struct bind_st rpc_st;
rpc_st.sockfd = sockfd; rpc_st.sockfd = sockfd;
rpc_st.__tid = syscall(SYS_gettid); rpc_st.__tid = syscall(SYS_gettid);
memcpy(&rpc_st.addr, addr, sizeof(struct sockaddr_storage)); memcpy(&rpc_st.addr, addr, sizeof(struct sockaddr_storage));
memcpy(&rpc_st.addrlen, &addrlen, sizeof(socklen_t)); memcpy(&rpc_st.addrlen, &addrlen, sizeof(socklen_t));
return rpc_send_command(RPC_BIND, sockfd, &rpc_st, sizeof(struct bind_st));
return rpc_send_command(RPC_BIND, rpcfd, &rpc_st, sizeof(struct bind_st));
} }
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
----------------------------------- accept4() ---------------------------------- ----------------------------------- accept4() ----------------------------------
------------------------------------------------------------------------------*/ ------------------------------------------------------------------------------*/
/* int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags */ /* int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags */
int accept4(ACCEPT4_SIG) int accept4(ACCEPT4_SIG)
{ {
if(realaccept4 == NULL){ if(realaccept4 == NULL){
dwr(MSG_ERROR, "accept4(): SYMBOL NOT FOUND.\n"); dwr(MSG_ERROR,"accept4(): SYMBOL NOT FOUND.\n");
return -1; return -1;
} }
dwr(MSG_DEBUG,"accept4(%d):\n", sockfd); dwr(MSG_DEBUG,"accept4(%d):\n", sockfd);
@ -450,8 +362,7 @@ int accept4(ACCEPT4_SIG)
fcntl(sockfd, F_SETFL, FD_CLOEXEC); fcntl(sockfd, F_SETFL, FD_CLOEXEC);
if ((flags & SOCK_NONBLOCK)) if ((flags & SOCK_NONBLOCK))
fcntl(sockfd, F_SETFL, O_NONBLOCK); fcntl(sockfd, F_SETFL, O_NONBLOCK);
int newfd = accept(sockfd, addr, addrlen); return accept(sockfd, addr, addrlen);
return newfd;
} }
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
@ -463,7 +374,7 @@ int accept4(ACCEPT4_SIG)
int accept(ACCEPT_SIG) int accept(ACCEPT_SIG)
{ {
if(realaccept == NULL){ if(realaccept == NULL){
dwr(MSG_ERROR, "accept(): SYMBOL NOT FOUND.\n"); dwr(MSG_ERROR,"accept(): SYMBOL NOT FOUND.\n");
return -1; return -1;
} }
dwr(MSG_DEBUG,"accept(%d):\n", sockfd); dwr(MSG_DEBUG,"accept(%d):\n", sockfd);
@ -502,39 +413,27 @@ int accept(ACCEPT_SIG)
dwr(MSG_DEBUG,"EINVAL\n"); dwr(MSG_DEBUG,"EINVAL\n");
return -1; return -1;
} }
/* redirect calls for standard I/O descriptors to kernel */ /* redirect calls for standard I/O descriptors to kernel */
if(sockfd == STDIN_FILENO || sockfd == STDOUT_FILENO || sockfd == STDERR_FILENO){ if(sockfd == STDIN_FILENO || sockfd == STDOUT_FILENO || sockfd == STDERR_FILENO){
dwr(MSG_DEBUG,"realaccept():\n"); dwr(MSG_DEBUG,"realaccept():\n");
return(realaccept(sockfd, addr, addrlen)); return(realaccept(sockfd, addr, addrlen));
} }
if(addr) if(addr)
addr->sa_family = AF_INET; addr->sa_family = AF_INET;
/* TODO: also get address info */
/* The following line is required for libuv/nodejs to accept connections properly, /* The following line is required for libuv/nodejs to accept connections properly,
however, this has the side effect of causing certain webservers to max out the CPU however, this has the side effect of causing certain webservers to max out the CPU
in an accept loop */ in an accept loop */
//fcntl(sockfd, F_SETFL, SOCK_NONBLOCK); //fcntl(sockfd, F_SETFL, SOCK_NONBLOCK);
int new_conn_socket = get_new_fd(sockfd); int new_fd = get_new_fd(sockfd);
if(new_fd > 0) {
if(new_conn_socket > 0)
{
dwr(MSG_DEBUG, "accept(): RX: fd = (%d) over (%d)\n", new_conn_socket, rpcfd);
/* Send our local-fd number back to service so it can complete its mapping table */
dwr(MSG_DEBUG, "accept(): sending perceived fd (%d) to service.\n", new_conn_socket);
rpc_send_command(RPC_MAP, rpcfd, &new_conn_socket, sizeof(new_conn_socket));
dwr(MSG_DEBUG,"accept()=%d\n", new_conn_socket);
errno = ERR_OK; errno = ERR_OK;
return new_conn_socket; /* OK */ return new_fd;
} }
dwr(MSG_DEBUG, "accept(): EAGAIN - Error reading signal byte from service");
errno = EAGAIN; errno = EAGAIN;
return -EAGAIN; return -EAGAIN;
} }
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
------------------------------------- listen()---------------------------------- ------------------------------------- listen()----------------------------------
------------------------------------------------------------------------------*/ ------------------------------------------------------------------------------*/
@ -543,7 +442,7 @@ int accept(ACCEPT_SIG)
int listen(LISTEN_SIG) int listen(LISTEN_SIG)
{ {
if(reallisten == NULL){ if(reallisten == NULL){
dwr(MSG_ERROR, "listen(): SYMBOL NOT FOUND.\n"); dwr(MSG_ERROR,"listen(): SYMBOL NOT FOUND.\n");
return -1; return -1;
} }
dwr(MSG_DEBUG,"listen(%d):\n", sockfd); dwr(MSG_DEBUG,"listen(%d):\n", sockfd);
@ -565,25 +464,19 @@ int listen(LISTEN_SIG)
errno = EOPNOTSUPP; errno = EOPNOTSUPP;
return -1; return -1;
} }
/* make sure we don't touch any standard outputs */ /* make sure we don't touch any standard outputs */
if(sockfd == STDIN_FILENO || sockfd == STDOUT_FILENO || sockfd == STDERR_FILENO) if(sockfd == STDIN_FILENO || sockfd == STDOUT_FILENO || sockfd == STDERR_FILENO)
return(reallisten(sockfd, backlog)); return(reallisten(sockfd, backlog));
if(is_mapped_to_service(sockfd) < 0) { if(!connected_to_service(sockfd)) {
/* We now know this socket is not one of our socketpairs */ reallisten(sockfd, backlog);
int err = reallisten(sockfd, backlog);
dwr(MSG_DEBUG,"reallisten()=%d\n", err);
return err;
} }
/* Assemble and send RPC */ /* Assemble and send RPC */
struct listen_st rpc_st; struct listen_st rpc_st;
rpc_st.sockfd = sockfd; rpc_st.sockfd = sockfd;
rpc_st.backlog = backlog; rpc_st.backlog = backlog;
rpc_st.__tid = syscall(SYS_gettid); rpc_st.__tid = syscall(SYS_gettid);
return rpc_send_command(RPC_LISTEN, sockfd, &rpc_st, sizeof(struct listen_st));
return rpc_send_command(RPC_LISTEN, rpcfd, &rpc_st, sizeof(struct listen_st));
} }
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
@ -594,12 +487,12 @@ int listen(LISTEN_SIG)
int clone(CLONE_SIG) int clone(CLONE_SIG)
{ {
if(realclone == NULL){ if(realclone == NULL){
dwr(MSG_ERROR, "clone(): SYMBOL NOT FOUND.\n"); dwr(MSG_ERROR,"clone(): SYMBOL NOT FOUND.\n");
return -1; return -1;
} }
dwr(MSG_DEBUG,"clone()\n"); dwr(MSG_DEBUG,"clone()\n");
int err = realclone(fn, child_stack, flags, arg); int err = realclone(fn, child_stack, flags, arg);
init_service_connection(); set_up_intercept();
return err; return err;
} }
@ -612,64 +505,26 @@ int close(CLOSE_SIG)
{ {
dwr(MSG_DEBUG, "close(%d)\n", fd); dwr(MSG_DEBUG, "close(%d)\n", fd);
if(realclose == NULL) if(realclose == NULL)
init_service_connection(); set_up_intercept();
if(fd == rpcfd) return realclose(fd);
return -1; /* TODO: Ignore request to shut down our rpc fd, this is *almost always* safe */
if(fd != STDIN_FILENO && fd != STDOUT_FILENO && fd != STDERR_FILENO)
return realclose(fd);
return -1;
}
/*------------------------------------------------------------------------------
-------------------------------------- dup2() ----------------------------------
------------------------------------------------------------------------------*/
/* int oldfd, int newfd */
int dup2(DUP2_SIG)
{
if(realdup2 == NULL){
dwr(MSG_ERROR, "dup2(): SYMBOL NOT FOUND.\n");
return -1;
}
dwr(MSG_DEBUG,"dup2(%d, %d)\n", oldfd, newfd);
if(oldfd == rpcfd) {
dwr(MSG_DEBUG,"client application attempted to dup2 RPC socket (%d). This is not allowed.\n", oldfd);
errno = EBADF;
return -1;
}
return realdup2(oldfd, newfd);
}
/*------------------------------------------------------------------------------
-------------------------------------- dup3() ----------------------------------
------------------------------------------------------------------------------*/
/* int oldfd, int newfd, int flags */
int dup3(DUP3_SIG)
{
if(realdup3 == NULL){
dwr(MSG_ERROR, "dup3(): SYMBOL NOT FOUND.\n");
return -1;
}
dwr(MSG_DEBUG,"dup3(%d, %d, %d)\n", oldfd, newfd, flags);
return realdup3(oldfd, newfd, flags);
} }
/*------------------------------------------------------------------------------ /*------------------------------------------------------------------------------
-------------------------------- getsockname() --------------------------------- -------------------------------- getsockname() ---------------------------------
------------------------------------------------------------------------------*/ ------------------------------------------------------------------------------*/
/* define GETSOCKNAME_SIG int sockfd, struct sockaddr *addr, socklen_t *addrlen */ /* int sockfd, struct sockaddr *addr, socklen_t *addrlen */
int getsockname(GETSOCKNAME_SIG) int getsockname(GETSOCKNAME_SIG)
{ {
if (realgetsockname == NULL) { if (realgetsockname == NULL) {
dwr(MSG_ERROR, "getsockname(): SYMBOL NOT FOUND. \n"); dwr(MSG_ERROR,"getsockname(): SYMBOL NOT FOUND. \n");
return -1; return -1;
} }
dwr(MSG_DEBUG, "getsockname(%d)\n", sockfd); dwr(MSG_DEBUG,"getsockname(%d)\n", sockfd);
if(!is_mapped_to_service(sockfd)) if(connected_to_service(sockfd) == 0) {
dwr(MSG_DEBUG,"getsockname(): not used by service\n");
return realgetsockname(sockfd, addr, addrlen); return realgetsockname(sockfd, addr, addrlen);
}
/* This is kind of a hack as it stands -- assumes sockaddr is sockaddr_in /* This is kind of a hack as it stands -- assumes sockaddr is sockaddr_in
* and is an IPv4 address. */ * and is an IPv4 address. */
@ -678,12 +533,15 @@ int getsockname(GETSOCKNAME_SIG)
rpc_st.sockfd = sockfd; rpc_st.sockfd = sockfd;
memcpy(&rpc_st.addr, addr, *addrlen); memcpy(&rpc_st.addr, addr, *addrlen);
memcpy(&rpc_st.addrlen, &addrlen, sizeof(socklen_t)); memcpy(&rpc_st.addrlen, &addrlen, sizeof(socklen_t));
rpc_send_command(RPC_GETSOCKNAME, rpcfd, &rpc_st, sizeof(struct getsockname_st)); int rpcfd = rpc_send_command(RPC_GETSOCKNAME, sockfd, &rpc_st, sizeof(struct getsockname_st));
/* read address info from service */ /* read address info from service */
char addrbuf[sizeof(struct sockaddr_storage)]; char addrbuf[sizeof(struct sockaddr_storage)];
memset(&addrbuf, 0, sizeof(struct sockaddr_storage)); memset(&addrbuf, 0, sizeof(struct sockaddr_storage));
read(rpcfd, &addrbuf, sizeof(struct sockaddr_storage));
if(rpcfd > -1)
if(read(rpcfd, &addrbuf, sizeof(struct sockaddr_storage)) > 0)
close(rpcfd);
struct sockaddr_storage sock_storage; struct sockaddr_storage sock_storage;
memcpy(&sock_storage, addrbuf, sizeof(struct sockaddr_storage)); memcpy(&sock_storage, addrbuf, sizeof(struct sockaddr_storage));
*addrlen = sizeof(struct sockaddr_in); *addrlen = sizeof(struct sockaddr_in);
@ -697,9 +555,7 @@ int getsockname(GETSOCKNAME_SIG)
------------------------------------------------------------------------------*/ ------------------------------------------------------------------------------*/
long syscall(SYSCALL_SIG){ long syscall(SYSCALL_SIG){
dwr(MSG_DEBUG_EXTRA,"syscall(%u, ...):\n", number);
//dwr(MSG_DEBUG_EXTRA,"syscall(%u, ...):\n", number);
va_list ap; va_list ap;
uintptr_t a,b,c,d,e,f; uintptr_t a,b,c,d,e,f;
va_start(ap, number); va_start(ap, number);

View file

@ -53,4 +53,35 @@
#define DUP2_SIG int oldfd, int newfd #define DUP2_SIG int oldfd, int newfd
#define DUP3_SIG int oldfd, int newfd, int flags #define DUP3_SIG int oldfd, int newfd, int flags
void my_init(void);
int connect(CONNECT_SIG);
int bind(BIND_SIG);
int accept(ACCEPT_SIG);
int listen(LISTEN_SIG);
int socket(SOCKET_SIG);
int setsockopt(SETSOCKOPT_SIG);
int getsockopt(GETSOCKOPT_SIG);
int accept4(ACCEPT4_SIG);
long syscall(SYSCALL_SIG);
int close(CLOSE_SIG);
int clone(CLONE_SIG);
int dup2(DUP2_SIG);
int dup3(DUP3_SIG);
int getsockname(GETSOCKNAME_SIG);
static int (*realconnect)(CONNECT_SIG);
static int (*realbind)(BIND_SIG);
static int (*realaccept)(ACCEPT_SIG);
static int (*reallisten)(LISTEN_SIG);
static int (*realsocket)(SOCKET_SIG);
static int (*realsetsockopt)(SETSOCKOPT_SIG);
static int (*realgetsockopt)(GETSOCKOPT_SIG);
static int (*realaccept4)(ACCEPT4_SIG);
static long (*realsyscall)(SYSCALL_SIG);
static int (*realclose)(CLOSE_SIG);
static int (*realclone)(CLONE_SIG);
static int (*realdup2)(DUP2_SIG);
static int (*realdup3)(DUP3_SIG);
static int (*realgetsockname)(GETSOCKNAME_SIG);
#endif #endif

File diff suppressed because it is too large Load diff

View file

@ -33,7 +33,9 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include <utility>
#include <stdexcept> #include <stdexcept>
#include <stdint.h>
#include "../node/Constants.hpp" #include "../node/Constants.hpp"
#include "../node/MulticastGroup.hpp" #include "../node/MulticastGroup.hpp"
@ -110,18 +112,17 @@ private:
static err_t nc_connected(void *arg, struct tcp_pcb *tpcb, err_t err); static err_t nc_connected(void *arg, struct tcp_pcb *tpcb, err_t err);
// RPC handlers (from NetconIntercept) // RPC handlers (from NetconIntercept)
void unload_rpc(void *data, pid_t &pid, pid_t &tid, int &rpc_count, char (timestamp[20]), char &cmd, void* &payload); void unload_rpc(void *data, pid_t &pid, pid_t &tid,
int &rpc_count, char (timestamp[20]), char (magic[sizeof(uint64_t)]), char &cmd, void* &payload);
void handle_getsockname(PhySocket *sock, void **uptr, struct getsockname_st *getsockname_rpc); void handle_getsockname(PhySocket *sock, PhySocket *rpcsock, void **uptr, struct getsockname_st *getsockname_rpc);
void handle_bind(PhySocket *sock, void **uptr, struct bind_st *bind_rpc); void handle_bind(PhySocket *sock, PhySocket *rpcsock, void **uptr, struct bind_st *bind_rpc);
void handle_listen(PhySocket *sock, void **uptr, struct listen_st *listen_rpc); void handle_listen(PhySocket *sock, PhySocket *rpcsock, void **uptr, struct listen_st *listen_rpc);
void handle_map_request(PhySocket *sock, void **uptr, unsigned char* buf);
void handle_retval(PhySocket *sock, void **uptr, int rpc_count, int newfd);
TcpConnection * handle_socket(PhySocket *sock, void **uptr, struct socket_st* socket_rpc); TcpConnection * handle_socket(PhySocket *sock, void **uptr, struct socket_st* socket_rpc);
void handle_connect(PhySocket *sock, void **uptr, struct connect_st* connect_rpc); void handle_connect(PhySocket *sock, PhySocket *rpcsock, TcpConnection *conn, struct connect_st* connect_rpc);
void handle_write(TcpConnection *conn); void handle_write(TcpConnection *conn);
int send_return_value(TcpConnection *conn, int retval, int _errno); int send_return_value(PhySocket *sock, int retval, int _errno);
int send_return_value(int fd, int retval, int _errno); int send_return_value(int fd, int retval, int _errno);
void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len); void phyOnDatagram(PhySocket *sock,void **uptr,const struct sockaddr *from,void *data,unsigned long len);
@ -135,6 +136,9 @@ private:
void phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len); void phyOnUnixData(PhySocket *sock,void **uptr,void *data,unsigned long len);
void phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable); void phyOnFileDescriptorActivity(PhySocket *sock,void **uptr,bool readable,bool writable);
TcpConnection *getConnection(PhySocket *sock);
void closeConnection(PhySocket *sock);
ip_addr_t convert_ip(struct sockaddr_in * addr) ip_addr_t convert_ip(struct sockaddr_in * addr)
{ {
ip_addr_t conn_addr; ip_addr_t conn_addr;
@ -147,23 +151,16 @@ private:
return conn_addr; return conn_addr;
} }
// Client helpers
TcpConnection *getConnectionByTheirFD(PhySocket *sock, int fd);
void closeConnection(TcpConnection *conn);
void closeAll();
void closeClient(PhySocket *sock);
void compact_dump();
void dump();
void die(int exret);
Phy<NetconEthernetTap *> _phy; Phy<NetconEthernetTap *> _phy;
PhySocket *_unixListenSocket; PhySocket *_unixListenSocket;
std::vector<TcpConnection*> tcp_connections; std::vector<TcpConnection*> tcp_connections;
std::vector<PhySocket*> rpc_sockets;
std::map<PhySocket*, pid_t> pidmap; std::map<PhySocket*, pid_t> pidmap;
pid_t rpc_counter;
std::map<uint64_t, std::pair<PhySocket*, void*> > jobmap;
std::map<uint64_t, PhySocket*> sockmap;
pid_t rpc_counter;
netif interface; netif interface;
MAC _mac; MAC _mac;

View file

@ -5,6 +5,9 @@
#include <errno.h> #include <errno.h>
#include <sys/syscall.h> #include <sys/syscall.h>
#include <fcntl.h>
#include <stdint.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <strings.h> #include <strings.h>
#include "RPC.h" #include "RPC.h"
@ -14,18 +17,32 @@
static int instance_count; static int instance_count;
static int rpc_count; static int rpc_count;
static pthread_mutex_t lock;
static pthread_mutex_t lock;
void rpc_mutex_init() { void rpc_mutex_init() {
if(pthread_mutex_init(&lock, NULL) != 0) { if(pthread_mutex_init(&lock, NULL) != 0) {
fprintf(stderr, "error while initializing service call mutex\n"); fprintf(stderr, "error while initializing service call mutex\n");
} }
} }
void rpc_mutex_destroy() { void rpc_mutex_destroy() {
pthread_mutex_destroy(&lock); pthread_mutex_destroy(&lock);
} }
/*
* Reads a new file descriptor from the service
*/
int get_new_fd(int sock)
{
char buf[BUF_SZ];
int newfd;
ssize_t size = sock_fd_read(sock, buf, sizeof(buf), &newfd);
if(size > 0){
return newfd;
}
fprintf(stderr, "get_new_fd(): Error, unable to read fd over (%d)\n", sock);
return -1;
}
/* /*
* Reads a return value from the service and sets errno (if applicable) * Reads a return value from the service and sets errno (if applicable)
*/ */
@ -46,21 +63,6 @@ int get_retval(int rpc_sock)
return -1; return -1;
} }
/*
* Reads a new file descriptor from the service
*/
int get_new_fd(int sock)
{
char buf[BUF_SZ];
int newfd;
ssize_t size = sock_fd_read(sock, buf, sizeof(buf), &newfd);
if(size > 0){
return newfd;
}
fprintf(stderr, "get_new_fd(): Error, unable to read fd over (%d)\n", sock);
return -1;
}
int rpc_join(const char * sockname) int rpc_join(const char * sockname)
{ {
struct sockaddr_un addr; struct sockaddr_un addr;
@ -81,9 +83,9 @@ int rpc_join(const char * sockname)
sleep(1); sleep(1);
} }
else { else {
int newfd = dup2(sock, RPC_FD-instance_count); //int newfd = dup2(sock, RPC_FD-instance_count);
close(sock); //close(sock);
return newfd; return sock;
} }
attempts++; attempts++;
} }
@ -93,14 +95,26 @@ int rpc_join(const char * sockname)
/* /*
* Send a command to the service * Send a command to the service
*/ */
int rpc_send_command(int cmd, int rpc_sock, void *data, int len) int rpc_send_command(int cmd, int forfd, void *data, int len)
{ {
char cmdbuf[BUF_SZ];
cmdbuf[0] = cmd;
memcpy(&cmdbuf[1], data, len);
pthread_mutex_lock(&lock); pthread_mutex_lock(&lock);
char metabuf[BUF_SZ]; // portion of buffer which contains RPC metadata for debugging char padding[] = {0, 1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89};
char cmdbuf[BUF_SZ], magic[TOKEN_SIZE], metabuf[BUF_SZ];
memcpy(magic+MAGIC_SIZE, padding, TOKEN_SIZE);
uint64_t magic_num;
// ephemeral RPC socket used only for this command
int rpc_sock = rpc_join("/root/dev/ztest/nc_e5cd7a9e1c3511dd");
// Generate token
int fdrand = open("/dev/urandom", O_RDONLY);
read(fdrand, &magic, MAGIC_SIZE);
memcpy(&magic_num, magic, MAGIC_SIZE);
cmdbuf[CMD_ID_IDX] = cmd;
memcpy(&cmdbuf[MAGIC_IDX], &magic_num, MAGIC_SIZE);
memcpy(&cmdbuf[STRUCT_IDX], data, len);
// Format: [sig_byte] + [cmd_id] + [magic] + [meta] + [payload]
#ifdef VERBOSE #ifdef VERBOSE
/* /*
#define IDX_PID 0 #define IDX_PID 0
@ -119,43 +133,52 @@ int rpc_send_command(int cmd, int rpc_sock, void *data, int len)
time_t timestamp; time_t timestamp;
timestamp = time(NULL); timestamp = time(NULL);
strftime(timestring, sizeof(timestring), "%H:%M:%S", localtime(&timestamp)); strftime(timestring, sizeof(timestring), "%H:%M:%S", localtime(&timestamp));
metabuf[IDX_SIGNAL_BYTE] = 'R';
memcpy(&metabuf[IDX_PID], &pid, sizeof(pid_t) ); /* pid */ memcpy(&metabuf[IDX_PID], &pid, sizeof(pid_t) ); /* pid */
memcpy(&metabuf[IDX_TID], &tid, sizeof(pid_t) ); /* tid */ memcpy(&metabuf[IDX_TID], &tid, sizeof(pid_t) ); /* tid */
memcpy(&metabuf[IDX_COUNT], &rpc_count, sizeof(rpc_count) ); /* rpc_count */ memcpy(&metabuf[IDX_COUNT], &rpc_count, sizeof(rpc_count) ); /* rpc_count */
memcpy(&metabuf[IDX_TIME], &timestring, 20 ); /* timestamp */ memcpy(&metabuf[IDX_TIME], &timestring, 20 ); /* timestamp */
#endif #endif
/* Combine command flag+payload with RPC metadata */ /* Combine command flag+payload with RPC metadata */
memcpy(&metabuf[IDX_PAYLOAD], cmdbuf, len); memcpy(&metabuf[IDX_PAYLOAD], cmdbuf, len + 1 + MAGIC_SIZE);
// Write RPC
int n_write = write(rpc_sock, &metabuf, BUF_SZ); int n_write = write(rpc_sock, &metabuf, BUF_SZ);
if(n_write < 0) { if(n_write < 0) {
fprintf(stderr, "Error writing command to service (CMD = %d)\n", cmdbuf[0]); fprintf(stderr, "Error writing command to service (CMD = %d)\n", cmdbuf[CMD_ID_IDX]);
errno = 0; errno = 0;
} }
// Write token to corresponding data stream
if(n_write > 0 && forfd > -1){
usleep(5000);
int w = send(forfd, &magic, TOKEN_SIZE, 0);
}
// Process response from service
int ret = ERR_OK; int ret = ERR_OK;
if(n_write > 0) { if(n_write > 0) {
if(cmdbuf[0]==RPC_SOCKET) { if(cmdbuf[CMD_ID_IDX]==RPC_SOCKET) {
ret = get_new_fd(rpc_sock); pthread_mutex_unlock(&lock);
return rpc_sock; // Used as new socket
} }
if(cmdbuf[0]==RPC_MAP_REQ if(cmdbuf[CMD_ID_IDX]==RPC_CONNECT
|| cmdbuf[0]==RPC_CONNECT || cmdbuf[CMD_ID_IDX]==RPC_BIND
|| cmdbuf[0]==RPC_BIND || cmdbuf[CMD_ID_IDX]==RPC_LISTEN) {
|| cmdbuf[0]==RPC_LISTEN
|| cmdbuf[0]==RPC_MAP) {
ret = get_retval(rpc_sock); ret = get_retval(rpc_sock);
} }
if(cmdbuf[0]==RPC_GETSOCKNAME) { if(cmdbuf[CMD_ID_IDX]==RPC_GETSOCKNAME) {
ret = n_write; pthread_mutex_unlock(&lock);
return rpc_sock; // Don't close rpc here, we'll use it to read getsockopt_st
} }
} }
else { else
ret = -1; ret = -1;
} close(rpc_sock); // We're done with this RPC socket, close it (if type-R)
pthread_mutex_unlock(&lock); pthread_mutex_unlock(&lock);
return ret; return ret;
} }
/* /*
* Send file descriptor * Send file descriptor
*/ */
@ -166,21 +189,17 @@ ssize_t sock_fd_write(int sock, int fd)
struct iovec iov; struct iovec iov;
char buf = '\0'; char buf = '\0';
int buflen = 1; int buflen = 1;
union { union {
struct cmsghdr cmsghdr; struct cmsghdr cmsghdr;
char control[CMSG_SPACE(sizeof (int))]; char control[CMSG_SPACE(sizeof (int))];
} cmsgu; } cmsgu;
struct cmsghdr *cmsg; struct cmsghdr *cmsg;
iov.iov_base = &buf; iov.iov_base = &buf;
iov.iov_len = buflen; iov.iov_len = buflen;
msg.msg_name = NULL; msg.msg_name = NULL;
msg.msg_namelen = 0; msg.msg_namelen = 0;
msg.msg_iov = &iov; msg.msg_iov = &iov;
msg.msg_iovlen = 1; msg.msg_iovlen = 1;
if (fd != -1) { if (fd != -1) {
msg.msg_control = cmsgu.control; msg.msg_control = cmsgu.control;
msg.msg_controllen = sizeof(cmsgu.control); msg.msg_controllen = sizeof(cmsgu.control);
@ -193,13 +212,11 @@ ssize_t sock_fd_write(int sock, int fd)
msg.msg_control = NULL; msg.msg_control = NULL;
msg.msg_controllen = 0; msg.msg_controllen = 0;
} }
size = sendmsg(sock, &msg, 0); size = sendmsg(sock, &msg, 0);
if (size < 0) if (size < 0)
perror ("sendmsg"); perror ("sendmsg");
return size; return size;
} }
/* /*
* Read a file descriptor * Read a file descriptor
*/ */
@ -214,10 +231,8 @@ ssize_t sock_fd_read(int sock, void *buf, ssize_t bufsize, int *fd)
char control[CMSG_SPACE(sizeof (int))]; char control[CMSG_SPACE(sizeof (int))];
} cmsgu; } cmsgu;
struct cmsghdr *cmsg; struct cmsghdr *cmsg;
iov.iov_base = buf; iov.iov_base = buf;
iov.iov_len = bufsize; iov.iov_len = bufsize;
msg.msg_name = NULL; msg.msg_name = NULL;
msg.msg_namelen = 0; msg.msg_namelen = 0;
msg.msg_iov = &iov; msg.msg_iov = &iov;
@ -239,7 +254,6 @@ ssize_t sock_fd_read(int sock, void *buf, ssize_t bufsize, int *fd)
fprintf (stderr, "invalid cmsg_type %d\n",cmsg->cmsg_type); fprintf (stderr, "invalid cmsg_type %d\n",cmsg->cmsg_type);
return -1; return -1;
} }
*fd = *((int *) CMSG_DATA(cmsg)); *fd = *((int *) CMSG_DATA(cmsg));
} else *fd = -1; } else *fd = -1;
} else { } else {

View file

@ -1,12 +1,25 @@
#ifndef __RPCLIB_H_ #ifndef __RPCLIB_H_
#define __RPCLIB_H_ #define __RPCLIB_H_
#define IDX_PID 0 #include <stdint.h>
#define IDX_TID sizeof(pid_t)
#define MAGIC_SIZE sizeof(uint64_t)
#define MAGIC_PADDING_SIZE 12
#define TOKEN_SIZE MAGIC_SIZE+MAGIC_PADDING_SIZE
// 1st section
#define IDX_SIGNAL_BYTE 0
#define IDX_PID 1
#define IDX_TID sizeof(pid_t) + 1
#define IDX_COUNT IDX_TID + sizeof(pid_t) #define IDX_COUNT IDX_TID + sizeof(pid_t)
#define IDX_TIME IDX_COUNT + sizeof(int) #define IDX_TIME IDX_COUNT + sizeof(int)
#define IDX_PAYLOAD IDX_TIME + 20 /* 20 being the length of the timestamp string */ #define IDX_PAYLOAD IDX_TIME + 20 /* 20 being the length of the timestamp string */
// 2nd section
#define CMD_ID_IDX 0
#define MAGIC_IDX 1
#define STRUCT_IDX MAGIC_IDX+MAGIC_SIZE
#define BUF_SZ 256 #define BUF_SZ 256
#define PAYLOAD_SZ 223 /* BUF_SZ-IDX_PAYLOAD */ #define PAYLOAD_SZ 223 /* BUF_SZ-IDX_PAYLOAD */
@ -37,17 +50,18 @@
extern "C" { extern "C" {
#endif #endif
int get_retval(int);
int rpc_join(const char * sockname);
int rpc_send_command(int cmd, int forfd, void *data, int len);
int get_new_fd(int sock);
ssize_t sock_fd_write(int sock, int fd);
ssize_t sock_fd_read(int sock, void *buf, ssize_t bufsize, int *fd);
void rpc_mutex_destroy(); void rpc_mutex_destroy();
void rpc_mutex_init(); void rpc_mutex_init();
int get_retval(int);
int get_new_fd(int);
int rpc_join(const char * sockname);
int rpc_send_command(int cmd, int rpc_sock, void *data, int len);
ssize_t sock_fd_write(int sock, int fd);
ssize_t sock_fd_read(int sock, void *buf, ssize_t bufsize, int *fd);
/* Structures used for sending commands via RPC mechanism */ /* Structures used for sending commands via RPC mechanism */

View file

@ -42,7 +42,7 @@
#ifndef _COMMON_H #ifndef _COMMON_H
#define _COMMON_H 1 #define _COMMON_H 1
#define DEBUG_LEVEL 0 #define DEBUG_LEVEL 3
#define MSG_WARNING 4 #define MSG_WARNING 4
#define MSG_ERROR 1 // Errors #define MSG_ERROR 1 // Errors

View file

View file

@ -309,7 +309,7 @@ public:
if ((long)fd > _nfds) if ((long)fd > _nfds)
_nfds = (long)fd; _nfds = (long)fd;
FD_SET(fd,&_readfds); FD_SET(fd,&_readfds);
sws.type = ZT_PHY_SOCKET_FD; sws.type = ZT_PHY_SOCKET_UNIX_IN; /* TODO: Type was changed to allow for CBs with new RPC model */
sws.sock = fd; sws.sock = fd;
sws.uptr = uptr; sws.uptr = uptr;
memset(&(sws.saddr),0,sizeof(struct sockaddr_storage)); memset(&(sws.saddr),0,sizeof(struct sockaddr_storage));