/* * Secure X Message Passing Library v2 implementation. * (sxmplv2) it superseed all versions before due to the: * - memory consumption * - new features such as pulse emitting * - performance optimization * * (c) Askele Group 2013-2015 * (c) Alexander Vdolainen 2013-2015 * * libsxmp is free software: you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published * by the Free Software Foundation, either version 2.1 of the License, or * (at your option) any later version. * * libsxmp is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. * See the GNU Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with this program. If not, see ."; * */ #include #include #include #include #include #include #include #include #include #include #ifdef WIN32 #include #define EBADE 1 #define NETDB_SUCCESS 0 #else #include #include #include #include #endif #include #include #include #include #include #include #include #include #include "internal.h" typedef struct __sxmpl_bundle_type { void *buf; sxlink_t *conn; } sxmplv2_bundle_t; /* networking helpers */ #ifndef WIN32 int __resolvehost(const char *hostname, char *buf, int buf_len, struct hostent **rhp) { struct hostent *hostbuf ; struct hostent *hp = *rhp = NULL; int herr = 0, hres = 0; hostbuf = malloc(sizeof(struct hostent)); if(!hostbuf) return NO_ADDRESS; hres = gethostbyname_r(hostname, hostbuf, buf, buf_len, &hp, &herr); if(hres) return NO_ADDRESS; *rhp = hp; return NETDB_SUCCESS; } #endif static int __conn_read(sxlink_t *co, void *buf, size_t buf_len) { int rfd = SSL_get_fd(co->ssl), r; fd_set readset, writeset; int ofcmode, read_blocked = 0, read_blocked_on_write = 0; /* First we make the socket nonblocking */ #ifndef WIN32 ofcmode = fcntl(rfd, F_GETFL,0); ofcmode |= O_NDELAY; if(fcntl(rfd, F_SETFL, ofcmode)) sxlink_log(co, SXERROR_LOG, "[sxmplv2] (RD)Couldn't make socket nonblocking"); #endif __retry: do { __try_again: if(co->flags & SXMP_CLOSED) { return -1; } r = SSL_read(co->ssl, buf, (int)buf_len); switch(SSL_get_error (co->ssl, r)) { case SSL_ERROR_NONE: return r; break; case SSL_ERROR_WANT_READ: /* get prepare to select */ read_blocked = 1; break; case SSL_ERROR_WANT_WRITE: /* here we blocked on write */ read_blocked_on_write = 1; break; case SSL_ERROR_SYSCALL: if(errno == EAGAIN || errno == EINTR) goto __try_again; else { sxlink_log(co, SXERROR_LOG, "[sxmplv2] (RD)SSL syscall error.\n"); goto __close_conn; } break; case SSL_ERROR_WANT_CONNECT: case SSL_ERROR_WANT_ACCEPT: sxlink_log(co, SXERROR_LOG, "[sxmplv2] (RD)SSL negotiation required. Trying again.\n"); goto __try_again; break; case SSL_ERROR_SSL: sxlink_log(co, SXERROR_LOG, "[sxmplv2] (RD)SSL error occured. Connection will be closed.\n"); goto __close_conn; break; case SSL_ERROR_ZERO_RETURN: sxlink_log(co, SXERROR_LOG, "[sxmplv2] (RD)SSL connection is cleary closed.\n"); default: __close_conn: ERR_free_strings(); co->flags |= SXMP_CLOSED; sxlink_log(co, SXERROR_LOG, "[sxmplv2] (RD)Unknown error on %s (errno = %d)\n", co->uuid, errno); ERR_remove_state(0); return -1; } } while(SSL_pending(co->ssl) && !read_blocked); __select_retry: if(read_blocked) { FD_ZERO(&readset); FD_SET(rfd, &readset); /* waits until something will be ready to read */ r = select(rfd + 1, &readset, NULL, NULL, NULL); if(r < 0) { if(errno == EINTR || errno == EAGAIN) goto __select_retry; sxlink_log(co, SXERROR_LOG, "[sxmplv2] (RD)Select (%d) on %s\n", errno, co->uuid); ERR_remove_state(0); return -1; } if(!r) { sxlink_log(co, SXERROR_LOG, "[sxmplv2] (RD)Nothing to wait for\n"); ERR_remove_state(0); return 0; } read_blocked = 0; if(r && FD_ISSET(rfd, &readset)) goto __retry; /* try to read again */ } if(read_blocked_on_write) { /* we was blocked on write */ FD_ZERO(&readset); FD_ZERO(&writeset); FD_SET(rfd, &readset); FD_SET(rfd, &writeset); r = select(rfd + 1, &readset, &writeset, NULL, NULL); read_blocked_on_write = 0; if(r && FD_ISSET(rfd, &writeset)) goto __retry; } return r; } static int __conn_write(sxlink_t *co, void *buf, size_t buf_len) { int r, rfd = SSL_get_fd(co->ssl); fd_set writeset; __retry: if(co->flags & SXMP_CLOSED) { return -1; } r = SSL_write(co->ssl, buf, (int)buf_len); switch(SSL_get_error(co->ssl, r)) { case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: /* here we should block */ FD_ZERO(&writeset); FD_SET(rfd, &writeset); r = select(rfd + 1, NULL, &writeset, NULL, NULL); if(r && FD_ISSET(rfd, &writeset)) goto __retry; break; case SSL_ERROR_SYSCALL: if(errno == EAGAIN || errno == EINTR) goto __retry; else goto __close_conn; break; default: __close_conn: if(r < 0) { /* set closed flag */ ERR_free_strings(); co->flags |= SXMP_CLOSED; sxlink_log(co, SXERROR_LOG, "[sxmplv2] (WR)Unknown error on %s (%d)\n", co->uuid, r); ERR_remove_state(0); return -1; } else return r; } return r; } int _sxmpl_writemsg(sxlink_t *co, sxmsg_t *msg) { sxmplv2_head_t *head; size_t rd; int r; char *buf = NULL; if(!co || !msg) return SXE_FAILED; /* check message for validity */ head = &msg->mhead; if(head->payload_length && !msg->payload) return SXE_FAILED; if(head->payload_length) { buf = malloc(sizeof(sxmplv2_head_t) + head->payload_length); memcpy(buf, head, sizeof(sxmplv2_head_t)); memcpy(buf + sizeof(sxmplv2_head_t), msg->payload, head->payload_length); } /* write the head and payload if applicable */ pthread_mutex_lock(&co->sslinout[1]); if(!buf) rd = __conn_write(co, head, sizeof(sxmplv2_head_t)); else rd = __conn_write(co, buf, sizeof(sxmplv2_head_t) + head->payload_length); if(rd < 0) { co->flags |= SXMP_CLOSED; r = SXE_ESSL; } pthread_mutex_unlock(&co->sslinout[1]); if(!(co->flags & SXMP_CLOSED)) r = SXE_SUCCESS; if(buf) free(buf); return r; } int _sxmpl_rapidwrite(sxlink_t *link, sxmsg_t *msg) { char *buf = msg->payload - sizeof(sxmplv2_head_t); size_t rd; int r; if(!link || !msg) return SXE_FAILED; memcpy(buf, &msg->mhead, sizeof(sxmplv2_head_t)); pthread_mutex_lock(&link->sslinout[1]); rd = __conn_write(link, buf, sizeof(sxmplv2_head_t) + msg->mhead.payload_length); if(rd < 0) { link->flags |= SXMP_CLOSED; r = SXE_ESSL; } pthread_mutex_unlock(&link->sslinout[1]); if(!(link->flags & SXMP_CLOSED)) r = SXE_SUCCESS; return r; } static sxmplv2_bundle_t *__sxmpl_bundle_create(sxlink_t *co) { sxmplv2_bundle_t *n = malloc(sizeof(sxmplv2_bundle_t)); if(!n) return NULL; else memset(n, 0, sizeof(sxmplv2_bundle_t)); n->buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); if(n->buf == MAP_FAILED) { free(n); return NULL; } n->conn = co; return n; } static void __sxmpl_bundle_destroy(sxmplv2_bundle_t *n) { munmap(n->buf, 65536); free(n); return; } extern int ex_ssldata_index; /** < index used to work with additional data * provided to the special call during SSL handshake */ /* this function is an ugly implementation to get C string with uuid */ extern char *__generate_uuid(void); static pthread_mutex_t *lock_cs; static long *lock_count; static void pthreads_locking_callback(int mode, int type, const char *file, int line) { if (mode & CRYPTO_LOCK) { pthread_mutex_lock(&(lock_cs[type])); lock_count[type]++; } else { pthread_mutex_unlock(&(lock_cs[type])); } } static void pthreads_thread_id(CRYPTO_THREADID *tid) { #ifdef WIN32 CRYPTO_THREADID_set_numeric(tid, (unsigned long)GetCurrentThreadId()); #else CRYPTO_THREADID_set_numeric(tid, (unsigned long)pthread_self()); #endif } int sxmp_init(void) { int i; /* init SSL library */ SSL_library_init(); OpenSSL_add_all_algorithms(); SSL_load_error_strings(); ex_ssldata_index = SSL_get_ex_new_index(0, "__ssldata index", NULL, NULL, NULL); /* here we go - init all */ lock_cs = OPENSSL_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t)); lock_count = OPENSSL_malloc(CRYPTO_num_locks() * sizeof(long)); for (i = 0; i < CRYPTO_num_locks(); i++) { lock_count[i] = 0; pthread_mutex_init(&(lock_cs[i]), NULL); } CRYPTO_THREADID_set_callback(pthreads_thread_id); CRYPTO_set_locking_callback(pthreads_locking_callback); return 0; } void sxmp_finalize(void) { int i; CRYPTO_set_locking_callback(NULL); for (i = 0; i < CRYPTO_num_locks(); i++) { pthread_mutex_destroy(&(lock_cs[i])); } OPENSSL_free(lock_cs); OPENSSL_free(lock_count); ERR_free_strings(); ENGINE_cleanup(); CRYPTO_cleanup_all_ex_data(); EVP_cleanup(); return; } sxlink_t *__link_minimal_alloc(struct in_addr *addr) { sxlink_t *co = malloc(sizeof(sxlink_t)); int r; if(!co) { r = ENOMEM; goto __fail; } else memset(co, 0, sizeof(sxlink_t)); if(!(co->messages = malloc(sizeof(uintptr_t)*MAX_MSGINPROCESS))) { r = ENOMEM; goto __fail; } else memset(co->messages, 0, sizeof(uintptr_t)*MAX_MSGINPROCESS); if(!(co->pctx = malloc(sizeof(sxsession_ctx_t)))) { r = ENOMEM; goto __fail; } else memset(co->pctx, 0, sizeof(sxsession_ctx_t)); if(addr) { if(!(co->pctx->addr = malloc(sizeof(struct in_addr)))) { r = ENOMEM; goto __fail; } memcpy(co->pctx->addr, addr, sizeof(struct in_addr)); } if(!(co->uuid = __generate_uuid())) { r = ENOMEM; goto __fail; } return co; __fail: if(co) { if(co->pctx) { if(co->pctx->addr) free(co->pctx->addr); free(co->pctx); } if(co->messages) free(co->messages); free(co); } errno = r; return NULL; } static int __link_second_alloc(sxlink_t *co) { usrtc_node_init(&co->csnode, co); /* initialize index allocators */ memset(&co->idx_ch, 0, sizeof(idx_allocator_t)); memset(&co->idx_msg, 0, sizeof(idx_allocator_t)); memset(&co->idx_streams, 0, sizeof(idx_allocator_t)); if((idx_allocator_init(&co->idx_ch, MAX_CHANNELSOPENED, 0))) goto __fail; if((idx_allocator_init(&co->idx_msg, MAX_MSGINPROCESS, 0))) goto __fail; if((idx_allocator_init(&co->idx_streams, MAX_STREAMSOPENED, 0))) goto __fail; if(!(co->channels = malloc(sizeof(uintptr_t)*MAX_CHANNELSOPENED))) goto __fail; else memset(co->channels, 0, sizeof(uintptr_t)*MAX_CHANNELSOPENED); if(!(co->streams = malloc(sizeof(uintptr_t)*MAX_STREAMSOPENED))) goto __fail; else memset(co->streams, 0, sizeof(uintptr_t)*MAX_STREAMSOPENED); /* init mutexes */ pthread_mutex_init(&co->idx_ch_lock, NULL); pthread_mutex_init(&co->idx_msg_lock, NULL); pthread_mutex_init(&co->write_pending_lock, NULL); pthread_mutex_init(&co->sslinout[0], NULL); pthread_mutex_init(&co->sslinout[1], NULL); pthread_mutex_init(&co->idx_streams_lock, NULL); /* init list */ list_init_head(&co->write_pending); return SXE_SUCCESS; __fail: if(co->channels) free(co->channels); if(co->streams) free(co->streams); idx_allocator_destroy(&co->idx_msg); idx_allocator_destroy(&co->idx_ch); idx_allocator_destroy(&co->idx_streams); return SXE_ENOMEM; } static void __link_second_free(sxlink_t *co) { if(co->channels) free(co->channels); if(co->streams) free(co->streams); idx_allocator_destroy(&co->idx_msg); idx_allocator_destroy(&co->idx_ch); idx_allocator_destroy(&co->idx_streams); pthread_mutex_destroy(&co->idx_ch_lock); pthread_mutex_destroy(&co->idx_msg_lock); pthread_mutex_destroy(&co->write_pending_lock); pthread_mutex_destroy(&co->sslinout[0]); pthread_mutex_destroy(&co->sslinout[1]); pthread_mutex_destroy(&co->idx_streams_lock); return; } static void __link_minimal_free(sxlink_t *co) { if(co) { if(co->pctx) { if(co->pctx->addr) free(co->pctx->addr); free(co->pctx); } if(co->messages) free(co->messages); free(co->uuid); free(co); } return; } static void __link_close_streams(sxlink_t *link) { int i; struct sxstream_opened *stream; if(!link->streams) return; for(i = 0; i < MAX_STREAMSOPENED; i++) { if(link->streams[i]) { stream = link->streams[i]; stream->desc->ops->s_close(stream); free(stream); } } return; } static int __eval_sysrpc(sxlink_t *link, sexp_t *sx, int builtin) { sxl_rpclist_t *rpc_list; usrtc_node_t *node; sxl_rpc_t *rentry; char *rpcf; if(builtin) rpc_list = link->hub->stream_rpc; else rpc_list = link->hub->system_rpc; if(sx->ty == SEXP_LIST) rpcf = sx->list->val; else return SXE_BADPROTO; /* find an appropriate function */ node = usrtc_lookup(rpc_list->rpc_tree, rpcf); if(!node) return SXE_ENORPC; else rentry = (sxl_rpc_t *)usrtc_node_getdata(node); /* call it */ return rentry->rpcf((void *)link, sx); } static inline int __eval_syssexp(sxlink_t *co, sexp_t *sx) { return __eval_sysrpc(co, sx, 0); } static inline int __eval_builtinsexp(sxlink_t *co, sexp_t *sx) { return __eval_sysrpc(co, sx, 1); } #ifdef _NO_SXMPMP #define _CONN_INUSE(lo) (lo)->usecount++; #define _CONN_NOTINUSE(lo) (lo)->usecount--; #define _CONN_UCOUNT(lo) (lo)->usecount #else static inline void _CONN_INUSE(sxlink_t *l) { pthread_rwlock_wrlock(&l->hub->rwlock); l->usecount++; pthread_rwlock_unlock(&l->hub->rwlock); } static inline void _CONN_NOTINUSE(sxlink_t *l) { pthread_rwlock_wrlock(&l->hub->rwlock); l->usecount--; pthread_rwlock_unlock(&l->hub->rwlock); } static inline int _CONN_UCOUNT(sxlink_t *l) { int r; pthread_rwlock_rdlock(&l->hub->rwlock); r = l->usecount; pthread_rwlock_unlock(&l->hub->rwlock); return r; } #endif static void __link_destroy(sxlink_t *l) { int i = 0, fd; sxmsg_t *msg, *omsg; sxppmsg_t *ppm; list_node_t *iter, *siter; sxchnl_t *chan; sxmplv2_head_t *head; sxhub_t *hub = l->hub; /* first we will unpin all messages and mark it as errors on */ if(l->pending_messages) { pthread_mutex_lock(&l->write_pending_lock); list_for_each_safe(&l->write_pending, iter, siter) { ppm = container_of(iter, sxppmsg_t, node); omsg = ppm->msg; /* ok, now we're able to remove it from list */ list_del(&ppm->node); if(omsg->mhead.attr & SXMSG_CLOSED) { /* message is closed - destroy it */ pthread_mutex_unlock(&omsg->wait); pthread_mutex_destroy(&omsg->wait); free(omsg); } else { /* wake up */ omsg->mhead.opcode = SXE_LINKERROR; pthread_mutex_unlock(&omsg->wait); } free(ppm); l->pending_messages--; } pthread_mutex_unlock(&l->write_pending_lock); } /* free queue */ ERR_remove_state(0); ERR_remove_thread_state(0); ERR_free_strings(); /* update use count */ _CONN_NOTINUSE(l); /* ok, let's free other if we can */ if(!_CONN_UCOUNT(l)) { /* go thru messages */ pthread_mutex_lock(&l->idx_msg_lock); for(i = 0; i < MAX_MSGINPROCESS; i++) { msg = l->messages[i]; if(!msg) continue; else head = &msg->mhead; head->opcode = SXE_LINKERROR; pthread_mutex_unlock(&msg->wait); pthread_mutex_destroy(&msg->wait); free(msg); l->messages[i] = NULL; idx_free(&l->idx_msg, i); } pthread_mutex_unlock(&l->idx_msg_lock); /* ok now we will free the channels */ pthread_mutex_lock(&l->idx_ch_lock); for(i = 0; i < 512; i++) { chan = l->channels[i]; if(!chan) continue; idx_free(&l->idx_ch, i); free(chan); } pthread_mutex_unlock(&l->idx_ch_lock); if(hub->on_destroy) hub->on_destroy(l); if(l->pctx->login) free(l->pctx->login); if(l->pctx->passwd) free(l->pctx->passwd); SSL_set_shutdown(l->ssl, SSL_RECEIVED_SHUTDOWN | SSL_SENT_SHUTDOWN); fd = SSL_get_fd(l->ssl); SSL_free(l->ssl); l->ssl = NULL; ERR_remove_thread_state(0); ERR_remove_state(0); ERR_free_strings(); close(fd); __link_close_streams(l); __link_second_free(l); __link_minimal_free(l); } return; } static void *__sxmpl_thread(void *b) { sxmplv2_bundle_t *bun = (sxmplv2_bundle_t *)b; sxlink_t *co = bun->conn; void *buf = bun->buf; char *bbuf = (char*)buf; sxmplv2_head_t *mhead = (sxmplv2_head_t *)buf; sxmsg_t *msg, *omsg; sexp_t *sx; sxchnl_t *channel; list_node_t *iter, *siter; sxppmsg_t *ppm; pthread_t self = pthread_self(); struct timespec wtick; int dispatch = 0, e; size_t rd, wr, total_rd; ulong_t mid = 0; #ifdef _PERFPROFILE struct timeval beg, end; #endif /* byte buffer is following head */ bbuf += sizeof(sxmplv2_head_t); __wait_alive: /* flag test - FIXME: make it atomic (it will works atomically on x86, btw on others not) */ if(!(co->flags & SXMP_ALIVE)) { if(co->flags & SXMP_CLOSED) goto __finish; else { usleep(20); goto __wait_alive; } } /* check up a thread */ if(pthread_equal(self, co->thrd_poll[7])) /* dispatcher */ dispatch = 1; /* update use count */ _CONN_INUSE(co); /* the following logic : (except dispatcher) * 1. check up pending write -> if exists write one and start again., otherwise go next * 2. read from ssl connection (we will sleep if other already acquire the lock) */ while(1) { __again: if(co->flags & SXMP_CLOSED) goto __finish; /* go away if required asap */ /* works with pending messages */ if(co->pending_messages && !(co->flags & SXMP_CLOSED)) { pthread_mutex_lock(&co->write_pending_lock); list_for_each_safe(&co->write_pending, iter, siter) { ppm = container_of(iter, sxppmsg_t, node); omsg = ppm->msg; if(_sxmpl_writemsg(co, omsg) != SXE_SUCCESS) { pthread_mutex_unlock(&co->write_pending_lock); goto __finish; /* write failed - finishing ... */ } /* ok, now we're able to remove it from list */ list_del(&ppm->node); if(omsg->mhead.attr & SXMSG_CLOSED) { /* message is closed - destroy it */ pthread_mutex_unlock(&omsg->wait); pthread_mutex_destroy(&omsg->wait); free(omsg); } free(ppm); co->pending_messages--; } pthread_mutex_unlock(&co->write_pending_lock); } if(!dispatch) pthread_mutex_lock(&(co->sslinout[0])); else { /* dispatch thread ticking every ət */ wtick.tv_sec = time(NULL) + 1; e = pthread_mutex_timedlock(&(co->sslinout[0]), &wtick); if(e == ETIMEDOUT) goto __again; } if(co->flags & SXMP_CLOSED) { pthread_mutex_unlock(&(co->sslinout[0])); goto __finish; } #ifdef _PERFPROFILE gettimeofday(&beg, NULL); #endif memset(mhead, 0, sizeof(sxmplv2_head_t)); total_rd = 0; while(total_rd != sizeof(sxmplv2_head_t)) { total_rd += __conn_read(co, buf + total_rd, sizeof(sxmplv2_head_t) - total_rd); if(total_rd == -1) { co->flags |= SXMP_CLOSED; break; } } rd = total_rd; #ifdef _PERFPROFILE gettimeofday(&end, NULL); if((end.tv_sec - beg.tv_sec) > 0) { printf("connread(head) Seconds: %ld ", end.tv_sec - beg.tv_sec); printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec)); } else printf("connread(head) µS: %ld\n", end.tv_usec - beg.tv_usec); #endif if(co->flags & SXMP_CLOSED) { pthread_mutex_unlock(&(co->sslinout[0])); goto __finish; /* go away if required asap */ } if(rd < 0) { __sslproto_error: co->flags |= SXMP_CLOSED; pthread_mutex_unlock(&(co->sslinout[0])); goto __finish; } else { /* check up if we can read or not */ if(mhead->payload_length) { #ifdef _PERFPROFILE gettimeofday(&beg, NULL); #endif total_rd = 0; while(total_rd != mhead->payload_length) { total_rd += __conn_read(co, bbuf + total_rd, mhead->payload_length - total_rd); if(total_rd == -1) goto __sslproto_error; } rd = total_rd; #ifdef _PERFPROFILE gettimeofday(&end, NULL); if((end.tv_sec - beg.tv_sec) > 0) { printf("connread(payload) Seconds: %ld ", end.tv_sec - beg.tv_sec); printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec)); } else printf("connread(payload) µS: %ld\n", end.tv_usec - beg.tv_usec); #endif } pthread_mutex_unlock(&(co->sslinout[0])); /* take a message */ if(mhead->attr & SXMSG_PROTO) { /* protocol message i.e. channel open/close */ /* ok, check up the side */ if(mhead->attr & SXMSG_REPLYREQ) { /* means we're not initiators and we don't need to allocate a message */ if(mhead->attr & SXMSG_OPEN) mhead->opcode = _channel_open(co, &mhead->reserve); else mhead->opcode = _channel_close(co, mhead->reserve); /* set flags */ mhead->payload_length = 0; mhead->attr &= ~SXMSG_REPLYREQ; pthread_mutex_lock(&(co->sslinout[1])); wr = __conn_write(co, mhead, sizeof(sxmplv2_head_t)); pthread_mutex_unlock(&(co->sslinout[1])); if(wr < 0) goto __finish; } else { /* it's came back */ mid = (ulong_t)mhead->msgid; /* reply came ... */ if(mid > (MAX_MSGINPROCESS - 1)) { __inval_idx_nor: sxlink_log(co, SXERROR_LOG, "[sxmplv2] Invalid index of the message (%lu).\n", mid); goto __again; } msg = co->messages[(int)mid]; if(!msg) { goto __inval_idx_nor; } /* ok now we'are copy data and unlock wait mutex */ memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t)); pthread_mutex_unlock(&msg->wait); } } else if(mhead->attr & SXMSG_LINK) { /* link layer messages */ if(mhead->attr & SXMSG_CLOSED) goto __finish; /* close the link */ if(mhead->attr & SXMSG_PULSE) { /* it's a link pulse messages */ /* FIXME: special sxmpv2.1 top layer messages - should be here */ if(mhead->opcode == SXE_RAPIDMSG) { /* custom pulse */ sx = parse_sexp(bbuf, mhead->payload_length); if(sx && co->hub->on_pulse) co->hub->on_pulse(co, sx); if(sx) destroy_sexp(sx); } } } else { /* regular messages */ if((mhead->attr & SXMSG_OPEN) && (mhead->attr & SXMSG_REPLYREQ)) { /* dialog initiation */ if(mhead->reserve >= MAX_CHANNELSOPENED) channel = NULL; else channel = co->channels[mhead->reserve]; if(!channel) { /* ok, we'are failed */ mhead->opcode = SXE_NOSUCHCHAN; __ret_regerr: mhead->payload_length = 0; mhead->attr &= ~SXMSG_REPLYREQ; mhead->attr &= ~SXMSG_OPEN; mhead->attr |= SXMSG_CLOSED; pthread_mutex_lock(&(co->sslinout[1])); wr = __conn_write(co, mhead, sizeof(sxmplv2_head_t)); pthread_mutex_unlock(&(co->sslinout[1])); if(wr < 0) goto __finish; else goto __again; } /* if message is busy - fails */ mid = mhead->msgid; msg = co->messages[mid]; if(msg) { mhead->opcode = SXE_EBUSY; goto __ret_regerr; } /* now we will take a deal */ if(!(msg = malloc(sizeof(sxmsg_t)))) { mhead->opcode = SXE_ENOMEM; goto __ret_regerr; } else { /* set mutex and channel */ pthread_mutex_init(&msg->wait, NULL); pthread_mutex_lock(&msg->wait); msg->pch = channel; /* copy header only */ memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t)); if(mhead->payload_length) msg->payload = bbuf; } pthread_mutex_lock(&co->idx_msg_lock); idx_reserve(&co->idx_msg, mid); co->messages[mid] = msg; pthread_mutex_unlock(&co->idx_msg_lock); /* now we are able to process the message */ _message_process(msg); } else if(mhead->attr & SXMSG_CLOSED) { /* check for the message */ if(mhead->msgid >= MAX_MSGINPROCESS) goto __inval_idx_nor; mid = mhead->msgid; pthread_mutex_lock(&co->idx_msg_lock); msg = co->messages[mid]; if(!msg) { pthread_mutex_unlock(&co->idx_msg_lock); goto __inval_idx_nor; } /* message dialog is closed - remove this right now */ idx_free(&co->idx_msg, mid); co->messages[mid] = NULL; pthread_mutex_unlock(&co->idx_msg_lock); if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */ /* now just free it */ pthread_mutex_unlock(&msg->wait); pthread_mutex_destroy(&msg->wait); free(msg); } else { memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t)); if(mhead->payload_length) { msg->payload = malloc(mhead->payload_length); if(msg->payload) memcpy(msg->payload, bbuf, mhead->payload_length); else msg->mhead.opcode = SXE_ENOMEM; } pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */ } } else if((!(mhead->attr & SXMSG_CLOSED) && !(mhead->attr & SXMSG_OPEN)) && (mhead->attr & SXMSG_REPLYREQ)) { /* ongoing dialog */ /* check for the message */ if(mhead->msgid >= MAX_MSGINPROCESS) goto __inval_idx_nor; mid = mhead->msgid; msg = co->messages[mid]; if(!msg) goto __inval_idx_nor; if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */ pthread_mutex_lock(&co->idx_msg_lock); idx_free(&co->idx_msg, mid); co->messages[mid] = NULL; pthread_mutex_unlock(&co->idx_msg_lock); /* now just free it */ pthread_mutex_destroy(&msg->wait); free(msg); /* we must reply */ mhead->opcode = SXE_ETIMEDOUT; goto __ret_regerr; } else { memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t)); if(mhead->payload_length) { msg->payload = malloc(mhead->payload_length); if(msg->payload) memcpy(msg->payload, bbuf, mhead->payload_length); else { mhead->opcode = msg->mhead.opcode = SXE_ENOMEM; /* we will return it to waitee */ msg->mhead.attr &= ~SXMSG_REPLYREQ; /* doesn't need to reply */ /* reply here now */ mhead->payload_length = 0; mhead->attr &= ~SXMSG_REPLYREQ; mhead->attr &= ~SXMSG_OPEN; mhead->attr |= SXMSG_CLOSED; pthread_mutex_lock(&(co->sslinout[1])); wr = __conn_write(co, mhead, sizeof(sxmplv2_head_t)); pthread_mutex_unlock(&(co->sslinout[1])); if(wr < 0) goto __finish; } } pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */ } } else { mhead->opcode = SXE_BADPROTO; goto __ret_regerr; } } } } __finish: co->flags |= SXMP_CLOSED; __link_destroy(co); __sxmpl_bundle_destroy(b); /* destroy bundle */ return NULL; } sxlink_t *sxlink_master_accept(sxhub_t *hub, int sck, struct in_addr *addr) { void *buf = NULL; char *bbuf; sxlink_t *link = __link_minimal_alloc(addr); sxmsg_t *msg = NULL; sxmplv2_head_t *head; sxmplv2_bundle_t *bundle; sexp_t *sx; size_t rd; int r = SXE_FAILED, i; if(!link) { errno = SXE_ENOMEM; return NULL; } else link->hub = hub; /* ok, now we need to init ssl stuff */ /* check up - do we need to initialize SSL context? */ r = _sxhub_settls_ctx_s(hub); if(r != SXE_SUCCESS) goto __fail; /* now we will create an SSL connection */ link->ssl = SSL_new(hub->ctx); if(!link->ssl) { r = SXE_ENOMEM; goto __fail; } else SSL_set_fd(link->ssl, sck); /* attach connected socket */ /* set the context to verify ssl connection */ SSL_set_ex_data(link->ssl, ex_ssldata_index, (void *)link); SSL_set_accept_state(link->ssl); if(SSL_accept(link->ssl) == -1) { r = SXE_EPERM; goto __fail; } /* leak here ? */ /* set connection to the batch mode */ link->flags |= SXMP_BATCHMODE; /* allocate our first buffer */ buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); if(buf == MAP_FAILED) { r = SXE_ENOMEM; goto __fail2; } /* allocate first message */ if(!(msg = malloc(sizeof(sxmsg_t)))) { r = SXE_ENOMEM; goto __fail2; } else { memset(msg, 0, sizeof(sxmsg_t)); link->messages[0] = msg; } bbuf = (char *)buf; bbuf += sizeof(sxmplv2_head_t); i = 0; while(link->flags & SXMP_BATCHMODE) { rd = __conn_read(link, buf, sizeof(sxmplv2_head_t)); if(rd == sizeof(sxmplv2_head_t)) { head = (sxmplv2_head_t *)buf; /* check for returns */ if(head->opcode != SXE_SUCCESS) { r = head->opcode; goto __fail3; } else { /* opcode is fine */ i++; /* if we're ready for messaging mode, turn off batch mode */ if(link->flags & SXMP_MESSAGINGMODE) { link->flags &= ~SXMP_BATCHMODE; break; } /* if count too big - fail */ if(i > MAX_SXMP_SYNC_ITERATIONS) { /* ugly */ r = SXE_FAILED; goto __fail3; } } if(!head->payload_length) continue; /* pass the following check up */ rd = __conn_read(link, bbuf, head->payload_length); if(rd == -1) { r = SXE_LINKERROR; goto __fail3; } if(rd != head->payload_length) { r = SXE_LINKERROR; goto __fail3; } bbuf[rd] = '\0'; sx = parse_sexp(bbuf, rd); if(!sx) goto __fail3; /* initialize message */ msg->payload = bbuf; msg->mhead.payload_length = 0; /* deal with it */ r = __eval_syssexp(link, sx); memcpy(head, &msg->mhead, sizeof(sxmplv2_head_t)); head->opcode = r; if(r != SXE_SUCCESS) { /* we finish */ head->payload_length = 0; __conn_write(link, head, sizeof(sxmplv2_head_t)); destroy_sexp(sx); goto __fail3; } rd = __conn_write(link, buf, sizeof(sxmplv2_head_t) + msg->mhead.payload_length); if(rd == -1) { r = SXE_LINKERROR; goto __fail3; } if(rd != sizeof(sxmplv2_head_t) + msg->mhead.payload_length) { destroy_sexp(sx); goto __fail3; } destroy_sexp(sx); } else { r = SXE_LINKERROR; goto __fail3; } } /* if we're there - negotiation is done, going to init messaging mode */ r = __link_second_alloc(link); if(r != SXE_SUCCESS) goto __fail3; /* free message */ link->messages[0] = NULL; free(msg); /* and now we're need to create a thread poll */ if(!(bundle = malloc(sizeof(sxmplv2_bundle_t)))) { r = SXE_ENOMEM; goto __fail4; } else { bundle->buf = buf; bundle->conn = link; } for(i = 0; i < MAX_SXMPLTHREADS; i++) { if(bundle == (void *)0xdead) bundle = __sxmpl_bundle_create(link); if(!bundle) goto __fail5; r = pthread_create(&link->thrd_poll[i], NULL, __sxmpl_thread, bundle); /* and here, alloc tls */ if(r) goto __fail5; else { bundle = (void *)0xdead; pthread_detach(link->thrd_poll[i]); } } /* all is done, connection now ready */ link->flags |= SXMP_ALIVE; r = SXE_SUCCESS; errno = r; /* free context for this thread */ ERR_remove_state(0); return link; __fail5: r = SXE_ENOMEM; /* bundles will be freed by the threads when SSL_read will fails. */ __fail4: __link_second_free(link); __fail3: if(hub->on_destroy) hub->on_destroy(link); __fail2: if(msg) free(msg); if(buf != MAP_FAILED) munmap(buf, 65536); SSL_shutdown(link->ssl); __fail: if(link) { if(link->ssl) { ERR_remove_thread_state(0); ERR_remove_state(0); ERR_free_strings(); SSL_free(link->ssl); } __link_minimal_free(link); } close(sck); errno = r; return NULL; } enum { _SYNC_ON_CHANNELS = 0, _SYNC_ON_VERSION = 1, _SYNC_ON_STREAMS = 2, }; sxlink_t *sxlink_connect(sxhub_t *hub, const char *host, int port, const char *SSL_cert, const char *login, const char *passwd) { return sxlink_connect_at(hub, host, port, SSL_cert, login, passwd, NULL); } sxlink_t *sxlink_connect_at(sxhub_t *hub, const char *host, int port, const char *SSL_cert, const char *login, const char *passwd, const void *priv) { sxlink_t *link = __link_minimal_alloc(NULL); struct hostent *host_; struct sockaddr_in addr; int r = SXE_SUCCESS, sck, i, sync_state = _SYNC_ON_CHANNELS; #ifdef WIN32 WSADATA wsaData; #endif char hostbuf[2048]; void *buf = NULL; char *bbuf; sxmplv2_head_t *head; sxmplv2_bundle_t *bundle; sxmsg_t *msg; size_t rd, wr, ln; sexp_t *sx; r = SXE_IGNORED; if(!host || !SSL_cert) goto __fail; if(!link) { r = SXE_ENOMEM; goto __fail; } link->hub = hub; #ifdef WIN32 WSAStartup(MAKEWORD(2, 2), &wsaData); #endif /* check up ssl context */ r = _sxhub_settls_ctx(hub, SSL_cert); if(r != SXE_SUCCESS) goto __fail; /* resolve host */ #ifdef WIN32 host_ = gethostbyname(host); if(!host_) { r = SXE_FAILED; goto __fail; } #else r = __resolvehost(host, hostbuf, 2048, &host_); if(r) { r = SXE_FAILED; goto __fail; } #endif /* create a socket */ sck = socket(PF_INET, SOCK_STREAM, 0); memset(&addr, 0, sizeof(addr)); /* try to connect it */ addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = *(uint32_t*)(host_->h_addr); r = connect(sck, (struct sockaddr*)&addr, sizeof(addr)); if(r) { close(sck); r = SXE_FAILED; /* couldn't connect to the desired host */ goto __fail; } /* SSL handshake */ link->ssl = SSL_new(hub->ctx); if(!link->ssl) { close(sck); r = SXE_ENOMEM; goto __fail; } SSL_set_fd(link->ssl, sck); /* attach connected socket */ SSL_set_connect_state(link->ssl); if(SSL_connect(link->ssl) == -1) { r = SXE_EPERM; /* shutdown connection */ goto __fail; } /* if success we're ready to use established SSL channel */ /* set connection to the batch mode */ link->flags |= SXMP_BATCHMODE; /* allocate our first buffer */ buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); if(buf == MAP_FAILED) { r = SXE_ENOMEM; goto __fail2; } /* allocate first message */ if(!(msg = malloc(sizeof(sxmsg_t)))) { r = SXE_ENOMEM; goto __fail2; } else { memset(msg, 0, sizeof(sxmsg_t)); link->messages[0] = msg; } bbuf = (char *)buf; bbuf += sizeof(sxmplv2_head_t); head = (sxmplv2_head_t *)buf; /* set out data */ if(priv) sxlink_setpriv(link, priv); /* authentification first both V2 and newer */ /* form a message -- credentials */ ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(auth-set-credentials \"%s\" \"%s\")", login ? login : "nil", passwd ? passwd : "nil"); head->opcode = SXE_SUCCESS; head->payload_length = ln; wr = __conn_write(link, buf, ln + sizeof(sxmplv2_head_t)); if(wr < 0) goto __fail3; rd = __conn_read(link, head, sizeof(sxmplv2_head_t)); if(rd < 0) goto __fail3; if(head->opcode != SXE_SUCCESS) { r = head->opcode; goto __fail3; } /* since V2.1 syncronization should be done here */ while(link->flags & SXMP_BATCHMODE) { head->opcode = SXE_SUCCESS; switch(sync_state) { case _SYNC_ON_CHANNELS: /* ok, get available channels */ ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(!@c>)"); break; case _SYNC_ON_VERSION: ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(!@v> %s)", V2_1_TPROT); break; case _SYNC_ON_STREAMS: ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(!@s>)"); break; } /* write a message */ head->payload_length = ln; wr = __conn_write(link, buf, ln + sizeof(sxmplv2_head_t)); if(wr < 0) goto __fail3; rd = __conn_read(link, head, sizeof(sxmplv2_head_t)); if(rd < 0) goto __fail3; if(head->opcode != SXE_SUCCESS) goto __fail3; if(!head->payload_length) { sync_state++; continue; } rd = __conn_read(link, bbuf, head->payload_length); if(rd < 0) goto __fail3; /* perform a parsing of the desired message */ bbuf[rd] = '\0'; sx = parse_sexp(bbuf, rd); if(!sx) { r = SXE_BADPROTO; goto __fail3; } r = __eval_syssexp(link, sx); if(!r) r = SXE_SUCCESS; destroy_sexp(sx); if(sync_state != _SYNC_ON_STREAMS) { /* write back */ head->opcode = r; head->payload_length = 0; wr = __conn_write(link, head, sizeof(sxmplv2_head_t)); if(wr < 0) { r = SXE_LINKERROR; goto __fail3;} if(r != SXE_SUCCESS) { r = SXE_LINKERROR; goto __fail3;} } sync_state++; } /* if we're there - negotiation is done, going to init messaging mode */ r = __link_second_alloc(link); if(r != SXE_SUCCESS) goto __fail3; /* free message */ link->messages[0] = NULL; free(msg); /* and now we're need to create a thread poll */ if(!(bundle = malloc(sizeof(sxmplv2_bundle_t)))) { r = SXE_ENOMEM; goto __fail4; } else { bundle->buf = buf; bundle->conn = link; } for(i = 0; i < MAX_SXMPLTHREADS; i++) { if(bundle == (void *)0xdead) bundle = __sxmpl_bundle_create(link); if(!bundle) goto __fail5; r = pthread_create(&link->thrd_poll[i], NULL, __sxmpl_thread, bundle); if(r) goto __fail5; else { pthread_detach(link->thrd_poll[i]); bundle = (void *)0xdead; } } /* all is done, connection now ready */ link->flags |= SXMP_ALIVE; return link; __fail5: r = SXE_ENOMEM; /* bundles will be freed by the threads when SSL_read will fails. */ __fail4: __link_second_free(link); __fail3: if(hub->on_destroy) hub->on_destroy(link); __fail2: if(buf != MAP_FAILED) munmap(buf, 65536); SSL_shutdown(link->ssl); ERR_remove_thread_state(0); ERR_remove_state(0); close(sck); __fail: if(link) { if(link->ssl) SSL_free(link->ssl); __link_minimal_free(link); } errno = r; return NULL; } int sxlink_close(sxlink_t *co) { sxmplv2_head_t mhead; pthread_t curr = pthread_self(); int i; memset(&mhead, 0, sizeof(sxmplv2_head_t)); /* setup header */ mhead.attr = SXMSG_LINK | SXMSG_CLOSED; pthread_mutex_lock(&(co->sslinout[1])); __conn_write(co, &mhead, sizeof(sxmplv2_head_t)); pthread_mutex_unlock(&(co->sslinout[1])); /* we will not wait anything */ co->flags |= SXMP_CLOSED; for(i = 0; i < 8; i++) { if(!pthread_equal(curr, co->thrd_poll[i])) pthread_join(co->thrd_poll[i], NULL); } return SXE_SUCCESS; }