You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
libsxmp/lib/sxmplv2.c

1491 lines
42 KiB
C

/*
* Secure X Message Passing Library v2 implementation.
* (sxmplv2) it superseed all versions before due to the:
* - memory consumption
* - new features such as pulse emitting
* - performance optimization
*
* (c) Askele Group 2013-2015 <http://askele.com>
* (c) Alexander Vdolainen 2013-2015 <avdolainen@gmail.com>
*
* libsxmp is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* libsxmp is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.";
*
*/
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/mman.h>
#include <fcntl.h>
#ifdef WIN32
#include <Winsock2.h>
#define EBADE 1
#define NETDB_SUCCESS 0
#else
#include <sys/select.h>
#include <netdb.h>
#include <unistd.h>
#include <uuid/uuid.h>
#endif
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <openssl/engine.h>
#include <tdata/usrtc.h>
#include <tdata/list.h>
#include <sexpr/sexp.h>
#include <sxmp/limits.h>
#include <sxmp/sxmp.h>
#include "internal.h"
typedef struct __sxmpl_bundle_type {
void *buf;
sxlink_t *conn;
} sxmplv2_bundle_t;
/* networking helpers */
#ifndef WIN32
int __resolvehost(const char *hostname, char *buf, int buf_len,
struct hostent **rhp)
{
struct hostent *hostbuf ;
struct hostent *hp = *rhp = NULL;
int herr = 0, hres = 0;
hostbuf = malloc(sizeof(struct hostent));
if(!hostbuf) return NO_ADDRESS;
hres = gethostbyname_r(hostname, hostbuf,
buf, buf_len, &hp, &herr);
if(hres) return NO_ADDRESS;
*rhp = hp;
return NETDB_SUCCESS;
}
#endif
/* connection helper, since for accept and connect are the same sets of ops
* using defines here ... ugly again, btw works.
*/
#define SSLX_ACCEPT 0
#define SSLX_CONNECT 1
static int __conn_negotiate(sxlink_t *co, int way)
{
int rfd = SSL_get_fd(co->ssl), r;
fd_set readset, writeset;
int read_blocked = 0, read_blocked_on_write = 0;
char em[3];
memset(em, 0, sizeof(char)*3);
switch(way) {
case SSLX_ACCEPT: strcpy(em, "AC"); break;
case SSLX_CONNECT: strcpy(em, "CO"); break;
default:
sxlink_log(co, SXERROR_LOG, "[sxmplv2] Negotiate way (accept?connect?) is invalid.\n");
return -1;
break;
}
__retry:
do {
__try_again:
ERR_clear_error();
if(way == SSLX_ACCEPT) r = SSL_accept(co->ssl);
else r = SSL_connect(co->ssl);
switch(SSL_get_error (co->ssl, r)) {
case SSL_ERROR_NONE:
return r;
break;
case SSL_ERROR_WANT_READ:
/* get prepare to select */
read_blocked = 1;
break;
case SSL_ERROR_WANT_WRITE: /* here we blocked on write */
read_blocked_on_write = 1;
break;
case SSL_ERROR_SYSCALL:
if(errno == EAGAIN || errno == EINTR) goto __try_again;
else {
sxlink_log(co, SXERROR_LOG, "[sxmplv2] (%s)SSL syscall error.\n", em);
goto __close_conn;
}
break;
case SSL_ERROR_SSL:
sxlink_log(co, SXERROR_LOG, "[sxmplv2] (%s)SSL error occured:%s. Connection will be closed.\n",
em, ERR_error_string(ERR_get_error(), NULL));
goto __close_conn;
break;
case SSL_ERROR_ZERO_RETURN:
sxlink_log(co, SXERROR_LOG, "[sxmplv2] (%s)SSL connection is cleary closed.\n", em);
default:
__close_conn:
ERR_free_strings();
co->flags |= SXMP_CLOSED;
sxlink_log(co, SXERROR_LOG, "[sxmplv2] (%s)Unknown error on %s (errno = %d)\n",
em, co->uuid, errno);
ERR_remove_state(0);
return -1;
}
} while(SSL_pending(co->ssl) && !read_blocked);
__select_retry:
if(read_blocked) {
FD_ZERO(&readset);
FD_SET(rfd, &readset);
/* waits until something will be ready to read */
r = select(rfd + 1, &readset, NULL, NULL, NULL);
if(r < 0) {
if(errno == EINTR || errno == EAGAIN) goto __select_retry;
sxlink_log(co, SXERROR_LOG, "[sxmplv2] (%s)Select (%d) on %s\n", em, errno, co->uuid);
ERR_remove_state(0);
return -1;
}
if(!r) {
sxlink_log(co, SXERROR_LOG, "[sxmplv2] (%s)Nothing to wait for\n", em);
ERR_remove_state(0);
return 0;
}
read_blocked = 0;
if(r && FD_ISSET(rfd, &readset)) goto __retry; /* try to read again */
}
if(read_blocked_on_write) { /* we was blocked on write */
FD_ZERO(&readset);
FD_ZERO(&writeset);
FD_SET(rfd, &readset);
FD_SET(rfd, &writeset);
r = select(rfd + 1, &readset, &writeset, NULL, NULL);
read_blocked_on_write = 0;
if(r && FD_ISSET(rfd, &writeset)) goto __retry;
}
return r;
}
static int __conn_read(sxlink_t *co, void *buf, size_t buf_len)
{
int rfd = SSL_get_fd(co->ssl), r;
fd_set readset, writeset;
int ofcmode, read_blocked = 0, read_blocked_on_write = 0;
/* First we make the socket nonblocking */
#ifndef WIN32
ofcmode = fcntl(rfd, F_GETFL,0);
ofcmode |= O_NDELAY;
if(fcntl(rfd, F_SETFL, ofcmode))
fprintf(stderr, "[sxmplv2] (RD)Couldn't make socket nonblocking");
#endif
__retry:
do {
__try_again:
if(co->flags & SXMP_CLOSED) {
return -1;
}
r = SSL_read(co->ssl, buf, (int)buf_len);
switch(SSL_get_error (co->ssl, r)) {
case SSL_ERROR_NONE:
return r;
break;
case SSL_ERROR_WANT_READ:
/* get prepare to select */
read_blocked = 1;
break;
case SSL_ERROR_WANT_WRITE: /* here we blocked on write */
read_blocked_on_write = 1;
break;
case SSL_ERROR_SYSCALL:
if(errno == EAGAIN || errno == EINTR) goto __try_again;
else {
sxlink_log(co, SXERROR_LOG, "[sxmplv2] (RD)SSL syscall error.\n");
goto __close_conn;
}
break;
case SSL_ERROR_WANT_CONNECT:
case SSL_ERROR_WANT_ACCEPT:
sxlink_log(co, SXERROR_LOG, "[sxmplv2] (RD)SSL negotiation required. Trying again.\n");
goto __try_again;
break;
case SSL_ERROR_SSL:
sxlink_log(co, SXERROR_LOG, "[sxmplv2] (RD)SSL error occured. Connection will be closed.\n");
goto __close_conn;
break;
case SSL_ERROR_ZERO_RETURN:
sxlink_log(co, SXERROR_LOG, "[sxmplv2] (RD)SSL connection is cleary closed.\n");
default:
__close_conn:
ERR_free_strings();
co->flags |= SXMP_CLOSED;
sxlink_log(co, SXERROR_LOG, "[sxmplv2] (RD)Unknown error on %s (errno = %d)\n", co->uuid, errno);
ERR_remove_state(0);
return -1;
}
} while(SSL_pending(co->ssl) && !read_blocked);
__select_retry:
if(read_blocked) {
FD_ZERO(&readset);
FD_SET(rfd, &readset);
/* waits until something will be ready to read */
r = select(rfd + 1, &readset, NULL, NULL, NULL);
if(r < 0) {
if(errno == EINTR || errno == EAGAIN) goto __select_retry;
sxlink_log(co, SXERROR_LOG, "[sxmplv2] (RD)Select (%d) on %s\n", errno, co->uuid);
ERR_remove_state(0);
return -1;
}
if(!r) {
sxlink_log(co, SXERROR_LOG, "[sxmplv2] (RD)Nothing to wait for\n");
ERR_remove_state(0);
return 0;
}
read_blocked = 0;
if(r && FD_ISSET(rfd, &readset)) goto __retry; /* try to read again */
}
if(read_blocked_on_write) { /* we was blocked on write */
FD_ZERO(&readset);
FD_ZERO(&writeset);
FD_SET(rfd, &readset);
FD_SET(rfd, &writeset);
r = select(rfd + 1, &readset, &writeset, NULL, NULL);
read_blocked_on_write = 0;
if(r && FD_ISSET(rfd, &writeset)) goto __retry;
}
return r;
}
static int __conn_write(sxlink_t *co, void *buf, size_t buf_len)
{
int r, rfd = SSL_get_fd(co->ssl);
fd_set writeset;
__retry:
if(co->flags & SXMP_CLOSED) {
return -1;
}
r = SSL_write(co->ssl, buf, (int)buf_len);
switch(SSL_get_error(co->ssl, r)) {
case SSL_ERROR_WANT_READ:
case SSL_ERROR_WANT_WRITE:
/* here we should block */
FD_ZERO(&writeset);
FD_SET(rfd, &writeset);
r = select(rfd + 1, NULL, &writeset, NULL, NULL);
if(r && FD_ISSET(rfd, &writeset)) goto __retry;
break;
case SSL_ERROR_SYSCALL:
if(errno == EAGAIN || errno == EINTR) goto __retry;
else goto __close_conn;
break;
default:
__close_conn:
if(r < 0) {
/* set closed flag */
ERR_free_strings();
co->flags |= SXMP_CLOSED;
sxlink_log(co, SXERROR_LOG, "[sxmplv2] (WR)Unknown error on %s (%d)\n", co->uuid, r);
ERR_remove_state(0);
return -1;
} else return r;
}
return r;
}
int _sxmpl_writemsg(sxlink_t *co, sxmsg_t *msg)
{
sxmplv2_head_t *head;
size_t rd;
int r;
char *buf = NULL;
if(!co || !msg) return SXE_FAILED;
/* check message for validity */
head = &msg->mhead;
if(head->payload_length && !msg->payload) return SXE_FAILED;
if(head->payload_length) {
buf = malloc(sizeof(sxmplv2_head_t) + head->payload_length);
memcpy(buf, head, sizeof(sxmplv2_head_t));
memcpy(buf + sizeof(sxmplv2_head_t), msg->payload, head->payload_length);
}
/* write the head and payload if applicable */
pthread_mutex_lock(&co->sslinout[1]);
if(!buf)
rd = __conn_write(co, head, sizeof(sxmplv2_head_t));
else rd = __conn_write(co, buf, sizeof(sxmplv2_head_t) + head->payload_length);
if(rd < 0) {
co->flags |= SXMP_CLOSED;
r = SXE_ESSL;
}
pthread_mutex_unlock(&co->sslinout[1]);
if(!(co->flags & SXMP_CLOSED)) r = SXE_SUCCESS;
if(buf) free(buf);
return r;
}
int _sxmpl_rapidwrite(sxlink_t *link, sxmsg_t *msg)
{
char *buf = msg->payload - sizeof(sxmplv2_head_t);
size_t rd;
int r;
if(!link || !msg) return SXE_FAILED;
memcpy(buf, &msg->mhead, sizeof(sxmplv2_head_t));
pthread_mutex_lock(&link->sslinout[1]);
rd = __conn_write(link, buf, sizeof(sxmplv2_head_t) + msg->mhead.payload_length);
if(rd < 0) {
link->flags |= SXMP_CLOSED;
r = SXE_ESSL;
}
pthread_mutex_unlock(&link->sslinout[1]);
if(!(link->flags & SXMP_CLOSED)) r = SXE_SUCCESS;
return r;
}
static sxmplv2_bundle_t *__sxmpl_bundle_create(sxlink_t *co)
{
sxmplv2_bundle_t *n = malloc(sizeof(sxmplv2_bundle_t));
if(!n) return NULL;
else memset(n, 0, sizeof(sxmplv2_bundle_t));
n->buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if(n->buf == MAP_FAILED) {
free(n);
return NULL;
}
n->conn = co;
return n;
}
static void __sxmpl_bundle_destroy(sxmplv2_bundle_t *n)
{
munmap(n->buf, 65536);
free(n);
return;
}
extern int ex_ssldata_index; /** < index used to work with additional data
* provided to the special call during SSL handshake */
/* this function is an ugly implementation to get C string with uuid */
extern char *__generate_uuid(void);
static pthread_mutex_t *lock_cs;
static long *lock_count;
static void pthreads_locking_callback(int mode, int type, const char *file, int line)
{
if (mode & CRYPTO_LOCK) {
pthread_mutex_lock(&(lock_cs[type]));
lock_count[type]++;
} else {
pthread_mutex_unlock(&(lock_cs[type]));
}
}
static void pthreads_thread_id(CRYPTO_THREADID *tid)
{
#ifdef WIN32
CRYPTO_THREADID_set_numeric(tid, (unsigned long)GetCurrentThreadId());
#else
CRYPTO_THREADID_set_numeric(tid, (unsigned long)pthread_self());
#endif
}
int sxmp_init(void)
{
int i;
/* init SSL library */
SSL_library_init();
OpenSSL_add_all_algorithms();
SSL_load_error_strings();
ex_ssldata_index = SSL_get_ex_new_index(0, "__ssldata index", NULL, NULL, NULL);
/* here we go - init all */
lock_cs = OPENSSL_malloc(CRYPTO_num_locks() * sizeof(pthread_mutex_t));
lock_count = OPENSSL_malloc(CRYPTO_num_locks() * sizeof(long));
for (i = 0; i < CRYPTO_num_locks(); i++) {
lock_count[i] = 0;
pthread_mutex_init(&(lock_cs[i]), NULL);
}
CRYPTO_THREADID_set_callback(pthreads_thread_id);
CRYPTO_set_locking_callback(pthreads_locking_callback);
return 0;
}
void sxmp_finalize(void)
{
int i;
CRYPTO_set_locking_callback(NULL);
for (i = 0; i < CRYPTO_num_locks(); i++) {
pthread_mutex_destroy(&(lock_cs[i]));
}
OPENSSL_free(lock_cs);
OPENSSL_free(lock_count);
ERR_free_strings();
ENGINE_cleanup();
CRYPTO_cleanup_all_ex_data();
EVP_cleanup();
return;
}
sxlink_t *__link_minimal_alloc(struct in_addr *addr)
{
sxlink_t *co = malloc(sizeof(sxlink_t));
int r;
if(!co) { r = ENOMEM; goto __fail; }
else memset(co, 0, sizeof(sxlink_t));
if(!(co->messages = malloc(sizeof(uintptr_t)*MAX_MSGINPROCESS))) { r = ENOMEM; goto __fail; }
else memset(co->messages, 0, sizeof(uintptr_t)*MAX_MSGINPROCESS);
if(!(co->pctx = malloc(sizeof(sxsession_ctx_t)))) { r = ENOMEM; goto __fail; }
else memset(co->pctx, 0, sizeof(sxsession_ctx_t));
if(addr) {
if(!(co->pctx->addr = malloc(sizeof(struct in_addr)))) { r = ENOMEM; goto __fail; }
memcpy(co->pctx->addr, addr, sizeof(struct in_addr));
}
if(!(co->uuid = __generate_uuid())) { r = ENOMEM; goto __fail; }
return co;
__fail:
if(co) {
if(co->pctx) {
if(co->pctx->addr) free(co->pctx->addr);
free(co->pctx);
}
if(co->messages) free(co->messages);
free(co);
}
errno = r;
return NULL;
}
static int __link_second_alloc(sxlink_t *co)
{
usrtc_node_init(&co->csnode, co);
/* initialize index allocators */
memset(&co->idx_ch, 0, sizeof(idx_allocator_t));
memset(&co->idx_msg, 0, sizeof(idx_allocator_t));
memset(&co->idx_streams, 0, sizeof(idx_allocator_t));
if((idx_allocator_init(&co->idx_ch, MAX_CHANNELSOPENED, 0))) goto __fail;
if((idx_allocator_init(&co->idx_msg, MAX_MSGINPROCESS, 0))) goto __fail;
if((idx_allocator_init(&co->idx_streams, MAX_STREAMSOPENED, 0))) goto __fail;
if(!(co->channels = malloc(sizeof(uintptr_t)*MAX_CHANNELSOPENED))) goto __fail;
else memset(co->channels, 0, sizeof(uintptr_t)*MAX_CHANNELSOPENED);
if(!(co->streams = malloc(sizeof(uintptr_t)*MAX_STREAMSOPENED))) goto __fail;
else memset(co->streams, 0, sizeof(uintptr_t)*MAX_STREAMSOPENED);
/* init mutexes */
pthread_mutex_init(&co->idx_ch_lock, NULL);
pthread_mutex_init(&co->idx_msg_lock, NULL);
pthread_mutex_init(&co->write_pending_lock, NULL);
pthread_mutex_init(&co->sslinout[0], NULL);
pthread_mutex_init(&co->sslinout[1], NULL);
pthread_mutex_init(&co->idx_streams_lock, NULL);
/* init list */
list_init_head(&co->write_pending);
return SXE_SUCCESS;
__fail:
if(co->channels) free(co->channels);
if(co->streams) free(co->streams);
idx_allocator_destroy(&co->idx_msg);
idx_allocator_destroy(&co->idx_ch);
idx_allocator_destroy(&co->idx_streams);
return SXE_ENOMEM;
}
static void __link_second_free(sxlink_t *co)
{
if(co->channels) free(co->channels);
if(co->streams) free(co->streams);
idx_allocator_destroy(&co->idx_msg);
idx_allocator_destroy(&co->idx_ch);
idx_allocator_destroy(&co->idx_streams);
pthread_mutex_destroy(&co->idx_ch_lock);
pthread_mutex_destroy(&co->idx_msg_lock);
pthread_mutex_destroy(&co->write_pending_lock);
pthread_mutex_destroy(&co->sslinout[0]);
pthread_mutex_destroy(&co->sslinout[1]);
pthread_mutex_destroy(&co->idx_streams_lock);
return;
}
static void __link_minimal_free(sxlink_t *co)
{
if(co) {
if(co->pctx) {
if(co->pctx->addr) free(co->pctx->addr);
free(co->pctx);
}
if(co->messages) free(co->messages);
free(co->uuid);
free(co);
}
return;
}
static void __link_close_streams(sxlink_t *link)
{
int i;
struct sxstream_opened *stream;
if(!link->streams) return;
for(i = 0; i < MAX_STREAMSOPENED; i++) {
if(link->streams[i]) {
stream = link->streams[i];
stream->desc->ops->s_close(stream);
free(stream);
}
}
return;
}
static int __eval_sysrpc(sxlink_t *link, sexp_t *sx, int builtin)
{
sxl_rpclist_t *rpc_list;
usrtc_node_t *node;
sxl_rpc_t *rentry;
char *rpcf;
if(builtin) rpc_list = link->hub->stream_rpc;
else rpc_list = link->hub->system_rpc;
if(sx->ty == SEXP_LIST && sx->list != NULL) rpcf = sx->list->val;
else return SXE_BADPROTO;
/* find an appropriate function */
node = usrtc_lookup(rpc_list->rpc_tree, rpcf);
if(!node) return SXE_ENORPC;
else rentry = (sxl_rpc_t *)usrtc_node_getdata(node);
/* call it */
return rentry->rpcf((void *)link, sx);
}
static inline int __eval_syssexp(sxlink_t *co, sexp_t *sx)
{
return __eval_sysrpc(co, sx, 0);
}
static inline int __eval_builtinsexp(sxlink_t *co, sexp_t *sx)
{
return __eval_sysrpc(co, sx, 1);
}
#ifdef _NO_SXMPMP
#define _CONN_INUSE(lo) (lo)->usecount++;
#define _CONN_NOTINUSE(lo) (lo)->usecount--;
#define _CONN_UCOUNT(lo) (lo)->usecount
#else
static inline void _CONN_INUSE(sxlink_t *l) {
pthread_rwlock_wrlock(&l->hub->rwlock);
l->usecount++;
pthread_rwlock_unlock(&l->hub->rwlock);
}
static inline void _CONN_NOTINUSE(sxlink_t *l) {
pthread_rwlock_wrlock(&l->hub->rwlock);
l->usecount--;
pthread_rwlock_unlock(&l->hub->rwlock);
}
static inline int _CONN_UCOUNT(sxlink_t *l) {
int r;
pthread_rwlock_rdlock(&l->hub->rwlock);
r = l->usecount;
pthread_rwlock_unlock(&l->hub->rwlock);
return r;
}
#endif
static void __link_destroy(sxlink_t *l)
{
int i = 0, fd;
sxmsg_t *msg, *omsg;
sxppmsg_t *ppm;
list_node_t *iter, *siter;
sxchnl_t *chan;
sxmplv2_head_t *head;
sxhub_t *hub = l->hub;
/* first we will unpin all messages and mark it as errors on */
if(l->pending_messages) {
pthread_mutex_lock(&l->write_pending_lock);
list_for_each_safe(&l->write_pending, iter, siter) {
ppm = container_of(iter, sxppmsg_t, node);
omsg = ppm->msg;
/* ok, now we're able to remove it from list */
list_del(&ppm->node);
if(omsg->mhead.attr & SXMSG_CLOSED) { /* message is closed - destroy it */
pthread_mutex_unlock(&omsg->wait);
pthread_mutex_destroy(&omsg->wait);
free(omsg);
} else { /* wake up */
omsg->mhead.opcode = SXE_LINKERROR;
pthread_mutex_unlock(&omsg->wait);
}
free(ppm);
l->pending_messages--;
}
pthread_mutex_unlock(&l->write_pending_lock);
}
/* free queue */
ERR_remove_state(0);
ERR_remove_thread_state(0);
ERR_free_strings();
/* update use count */
_CONN_NOTINUSE(l);
/* ok, let's free other if we can */
if(!_CONN_UCOUNT(l)) {
/* go thru messages */
pthread_mutex_lock(&l->idx_msg_lock);
for(i = 0; i < MAX_MSGINPROCESS; i++) {
msg = l->messages[i];
if(!msg) continue;
else head = &msg->mhead;
head->opcode = SXE_LINKERROR;
pthread_mutex_unlock(&msg->wait);
pthread_mutex_destroy(&msg->wait);
free(msg);
l->messages[i] = NULL;
idx_free(&l->idx_msg, i);
}
pthread_mutex_unlock(&l->idx_msg_lock);
/* ok now we will free the channels */
pthread_mutex_lock(&l->idx_ch_lock);
for(i = 0; i < 512; i++) {
chan = l->channels[i];
if(!chan) continue;
idx_free(&l->idx_ch, i);
free(chan);
}
pthread_mutex_unlock(&l->idx_ch_lock);
if(hub->on_destroy) hub->on_destroy(l);
if(l->pctx->login) free(l->pctx->login);
if(l->pctx->passwd) free(l->pctx->passwd);
SSL_set_shutdown(l->ssl, SSL_RECEIVED_SHUTDOWN | SSL_SENT_SHUTDOWN);
fd = SSL_get_fd(l->ssl);
SSL_free(l->ssl);
l->ssl = NULL;
ERR_remove_thread_state(0);
ERR_remove_state(0);
ERR_free_strings();
close(fd);
__link_close_streams(l);
__link_second_free(l);
__link_minimal_free(l);
}
return;
}
static void *__sxmpl_thread(void *b)
{
sxmplv2_bundle_t *bun = (sxmplv2_bundle_t *)b;
sxlink_t *co = bun->conn;
void *buf = bun->buf;
char *bbuf = (char*)buf;
sxmplv2_head_t *mhead = (sxmplv2_head_t *)buf;
sxmsg_t *msg, *omsg;
sexp_t *sx;
sxchnl_t *channel;
list_node_t *iter, *siter;
sxppmsg_t *ppm;
pthread_t self = pthread_self();
struct timespec wtick;
int dispatch = 0, e;
size_t rd, wr, total_rd;
ulong_t mid = 0;
#ifdef _PERFPROFILE
struct timeval beg, end;
#endif
/* byte buffer is following head */
bbuf += sizeof(sxmplv2_head_t);
__wait_alive:
/* flag test - FIXME: make it atomic (it will works atomically on x86, btw on others not) */
if(!(co->flags & SXMP_ALIVE)) {
if(co->flags & SXMP_CLOSED) goto __finish;
else {
usleep(20);
goto __wait_alive;
}
}
/* check up a thread */
if(pthread_equal(self, co->thrd_poll[7])) /* dispatcher */
dispatch = 1;
/* update use count */
_CONN_INUSE(co);
/* the following logic : (except dispatcher)
* 1. check up pending write -> if exists write one and start again., otherwise go next
* 2. read from ssl connection (we will sleep if other already acquire the lock)
*/
while(1) {
__again:
if(co->flags & SXMP_CLOSED) goto __finish; /* go away if required asap */
/* works with pending messages */
if(co->pending_messages && !(co->flags & SXMP_CLOSED)) {
pthread_mutex_lock(&co->write_pending_lock);
list_for_each_safe(&co->write_pending, iter, siter) {
ppm = container_of(iter, sxppmsg_t, node);
omsg = ppm->msg;
if(_sxmpl_writemsg(co, omsg) != SXE_SUCCESS) {
pthread_mutex_unlock(&co->write_pending_lock);
goto __finish; /* write failed - finishing ... */
}
/* ok, now we're able to remove it from list */
list_del(&ppm->node);
if(omsg->mhead.attr & SXMSG_CLOSED) { /* message is closed - destroy it */
pthread_mutex_unlock(&omsg->wait);
pthread_mutex_destroy(&omsg->wait);
free(omsg);
}
free(ppm);
co->pending_messages--;
}
pthread_mutex_unlock(&co->write_pending_lock);
}
if(!dispatch) pthread_mutex_lock(&(co->sslinout[0]));
else { /* dispatch thread ticking every ət */
wtick.tv_sec = time(NULL) + 1;
e = pthread_mutex_timedlock(&(co->sslinout[0]), &wtick);
if(e == ETIMEDOUT) goto __again;
}
if(co->flags & SXMP_CLOSED) {
pthread_mutex_unlock(&(co->sslinout[0]));
goto __finish;
}
#ifdef _PERFPROFILE
gettimeofday(&beg, NULL);
#endif
memset(mhead, 0, sizeof(sxmplv2_head_t));
total_rd = 0;
while(total_rd != sizeof(sxmplv2_head_t)) {
total_rd += __conn_read(co, buf + total_rd, sizeof(sxmplv2_head_t) - total_rd);
if(total_rd == -1) {
co->flags |= SXMP_CLOSED;
break;
}
}
rd = total_rd;
#ifdef _PERFPROFILE
gettimeofday(&end, NULL);
if((end.tv_sec - beg.tv_sec) > 0) {
printf("connread(head) Seconds: %ld ", end.tv_sec - beg.tv_sec);
printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec));
} else printf("connread(head) µS: %ld\n", end.tv_usec - beg.tv_usec);
#endif
if(co->flags & SXMP_CLOSED) {
pthread_mutex_unlock(&(co->sslinout[0]));
goto __finish; /* go away if required asap */
}
if(rd < 0) {
__sslproto_error:
co->flags |= SXMP_CLOSED;
pthread_mutex_unlock(&(co->sslinout[0]));
goto __finish;
} else {
/* check up if we can read or not */
if(mhead->payload_length) {
#ifdef _PERFPROFILE
gettimeofday(&beg, NULL);
#endif
total_rd = 0;
while(total_rd != mhead->payload_length) {
total_rd += __conn_read(co, bbuf + total_rd, mhead->payload_length - total_rd);
if(total_rd == -1) goto __sslproto_error;
}
rd = total_rd;
#ifdef _PERFPROFILE
gettimeofday(&end, NULL);
if((end.tv_sec - beg.tv_sec) > 0) {
printf("connread(payload) Seconds: %ld ", end.tv_sec - beg.tv_sec);
printf("µS: %ld\n", end.tv_usec + (1000000 - beg.tv_usec));
} else printf("connread(payload) µS: %ld\n", end.tv_usec - beg.tv_usec);
#endif
}
pthread_mutex_unlock(&(co->sslinout[0]));
/* take a message */
if(mhead->attr & SXMSG_PROTO) { /* protocol message i.e. channel open/close */
/* ok, check up the side */
if(mhead->attr & SXMSG_REPLYREQ) { /* means we're not initiators and we don't need to allocate a message */
if(mhead->attr & SXMSG_OPEN)
mhead->opcode = _channel_open(co, &mhead->reserve);
else mhead->opcode = _channel_close(co, mhead->reserve);
/* set flags */
mhead->payload_length = 0;
mhead->attr &= ~SXMSG_REPLYREQ;
pthread_mutex_lock(&(co->sslinout[1]));
wr = __conn_write(co, mhead, sizeof(sxmplv2_head_t));
pthread_mutex_unlock(&(co->sslinout[1]));
if(wr < 0) goto __finish;
} else { /* it's came back */
mid = (ulong_t)mhead->msgid;
/* reply came ... */
if(mid > (MAX_MSGINPROCESS - 1)) {
__inval_idx_nor:
sxlink_log(co, SXERROR_LOG, "[sxmplv2] Invalid index of the message (%lu).\n", mid);
goto __again;
}
msg = co->messages[(int)mid];
if(!msg) { goto __inval_idx_nor; }
/* ok now we'are copy data and unlock wait mutex */
memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t));
pthread_mutex_unlock(&msg->wait);
}
} else if(mhead->attr & SXMSG_LINK) { /* link layer messages */
if(mhead->attr & SXMSG_CLOSED) goto __finish; /* close the link */
if(mhead->attr & SXMSG_PULSE) { /* it's a link pulse messages */
/* FIXME: special sxmpv2.1 top layer messages - should be here */
if(mhead->opcode == SXE_RAPIDMSG) { /* custom pulse */
sx = parse_sexp(bbuf, mhead->payload_length);
if(sx && co->hub->on_pulse) co->hub->on_pulse(co, sx);
if(sx) destroy_sexp(sx);
}
}
} else { /* regular messages */
if((mhead->attr & SXMSG_OPEN) && (mhead->attr & SXMSG_REPLYREQ)) { /* dialog initiation */
if(mhead->reserve >= MAX_CHANNELSOPENED) channel = NULL;
else channel = co->channels[mhead->reserve];
if(!channel) { /* ok, we'are failed */
mhead->opcode = SXE_NOSUCHCHAN;
__ret_regerr:
mhead->payload_length = 0;
mhead->attr &= ~SXMSG_REPLYREQ;
mhead->attr &= ~SXMSG_OPEN;
mhead->attr |= SXMSG_CLOSED;
pthread_mutex_lock(&(co->sslinout[1]));
wr = __conn_write(co, mhead, sizeof(sxmplv2_head_t));
pthread_mutex_unlock(&(co->sslinout[1]));
if(wr < 0) goto __finish;
else goto __again;
}
/* if message is busy - fails */
mid = mhead->msgid;
msg = co->messages[mid];
if(msg) { mhead->opcode = SXE_EBUSY; goto __ret_regerr; }
/* now we will take a deal */
if(!(msg = malloc(sizeof(sxmsg_t)))) {
mhead->opcode = SXE_ENOMEM; goto __ret_regerr;
} else {
/* set mutex and channel */
pthread_mutex_init(&msg->wait, NULL);
pthread_mutex_lock(&msg->wait);
msg->pch = channel;
/* copy header only */
memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t));
if(mhead->payload_length) msg->payload = bbuf;
}
pthread_mutex_lock(&co->idx_msg_lock);
idx_reserve(&co->idx_msg, mid);
co->messages[mid] = msg;
pthread_mutex_unlock(&co->idx_msg_lock);
/* now we are able to process the message */
_message_process(msg);
} else if(mhead->attr & SXMSG_CLOSED) {
/* check for the message */
if(mhead->msgid >= MAX_MSGINPROCESS) goto __inval_idx_nor;
mid = mhead->msgid;
pthread_mutex_lock(&co->idx_msg_lock);
msg = co->messages[mid];
if(!msg) {
pthread_mutex_unlock(&co->idx_msg_lock); goto __inval_idx_nor; }
/* message dialog is closed - remove this right now */
idx_free(&co->idx_msg, mid);
co->messages[mid] = NULL;
pthread_mutex_unlock(&co->idx_msg_lock);
if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */
/* now just free it */
pthread_mutex_unlock(&msg->wait);
pthread_mutex_destroy(&msg->wait);
free(msg);
} else {
memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t));
if(mhead->payload_length) {
msg->payload = malloc(mhead->payload_length);
if(msg->payload) memcpy(msg->payload, bbuf, mhead->payload_length);
else msg->mhead.opcode = SXE_ENOMEM;
}
pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */
}
} else if((!(mhead->attr & SXMSG_CLOSED) && !(mhead->attr & SXMSG_OPEN)) &&
(mhead->attr & SXMSG_REPLYREQ)) { /* ongoing dialog */
/* check for the message */
if(mhead->msgid >= MAX_MSGINPROCESS) goto __inval_idx_nor;
mid = mhead->msgid;
msg = co->messages[mid];
if(!msg) goto __inval_idx_nor;
if(msg->mhead.attr & SXMSG_TIMEDOUT) { /* nobody wait for it */
pthread_mutex_lock(&co->idx_msg_lock);
idx_free(&co->idx_msg, mid);
co->messages[mid] = NULL;
pthread_mutex_unlock(&co->idx_msg_lock);
/* now just free it */
pthread_mutex_destroy(&msg->wait);
free(msg);
/* we must reply */
mhead->opcode = SXE_ETIMEDOUT;
goto __ret_regerr;
} else {
memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t));
if(mhead->payload_length) {
msg->payload = malloc(mhead->payload_length);
if(msg->payload) memcpy(msg->payload, bbuf, mhead->payload_length);
else {
mhead->opcode = msg->mhead.opcode = SXE_ENOMEM; /* we will return it to waitee */
msg->mhead.attr &= ~SXMSG_REPLYREQ; /* doesn't need to reply */
/* reply here now */
mhead->payload_length = 0;
mhead->attr &= ~SXMSG_REPLYREQ;
mhead->attr &= ~SXMSG_OPEN;
mhead->attr |= SXMSG_CLOSED;
pthread_mutex_lock(&(co->sslinout[1]));
wr = __conn_write(co, mhead, sizeof(sxmplv2_head_t));
pthread_mutex_unlock(&(co->sslinout[1]));
if(wr < 0) goto __finish;
}
}
pthread_mutex_unlock(&msg->wait); /* wake up thread waiting for */
}
} else { mhead->opcode = SXE_BADPROTO; goto __ret_regerr; }
}
}
}
__finish:
co->flags |= SXMP_CLOSED;
__link_destroy(co);
__sxmpl_bundle_destroy(b); /* destroy bundle */
return NULL;
}
sxlink_t *sxlink_master_accept(sxhub_t *hub, int sck, struct in_addr *addr)
{
void *buf = NULL;
char *bbuf;
sxlink_t *link = __link_minimal_alloc(addr);
sxmsg_t *msg = NULL;
sxmplv2_head_t *head;
sxmplv2_bundle_t *bundle;
sexp_t *sx;
size_t rd;
int r = SXE_FAILED, i;
if(!link) {
errno = SXE_ENOMEM;
return NULL;
} else link->hub = hub;
/* ok, now we need to init ssl stuff */
/* check up - do we need to initialize SSL context? */
r = _sxhub_settls_ctx_s(hub); /* TODO: mark on first failed attempt */
if(r != SXE_SUCCESS) goto __fail;
/* now we will create an SSL connection */
link->ssl = SSL_new(hub->ctx);
if(!link->ssl) { r = SXE_ENOMEM; goto __fail; }
else SSL_set_fd(link->ssl, sck); /* attach connected socket */
/* set the context to verify ssl connection */
SSL_set_ex_data(link->ssl, ex_ssldata_index, (void *)link);
SSL_set_accept_state(link->ssl);
if(__conn_negotiate(link, SSLX_ACCEPT) <= 0)
{ r = SXE_EPERM; goto __fail; } /* leak here ? */
/* set connection to the batch mode */
link->flags |= SXMP_BATCHMODE;
/* allocate our first buffer */
buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if(buf == MAP_FAILED) { r = SXE_ENOMEM; goto __fail2; }
/* allocate first message */
if(!(msg = malloc(sizeof(sxmsg_t)))) { r = SXE_ENOMEM; goto __fail2; }
else {
memset(msg, 0, sizeof(sxmsg_t));
link->messages[0] = msg;
}
bbuf = (char *)buf;
bbuf += sizeof(sxmplv2_head_t);
i = 0;
while(link->flags & SXMP_BATCHMODE) {
rd = __conn_read(link, buf, sizeof(sxmplv2_head_t));
if(rd == sizeof(sxmplv2_head_t)) {
head = (sxmplv2_head_t *)buf;
/* check for returns */
if(head->opcode != SXE_SUCCESS) { r = head->opcode; goto __fail3; }
else { /* opcode is fine */
i++;
/* if we're ready for messaging mode, turn off batch mode */
if(link->flags & SXMP_MESSAGINGMODE) {
link->flags &= ~SXMP_BATCHMODE;
break;
}
/* if count too big - fail */
if(i > MAX_SXMP_SYNC_ITERATIONS) { /* ugly */
r = SXE_FAILED;
goto __fail3;
}
}
if(!head->payload_length) continue; /* pass the following check up */
rd = __conn_read(link, bbuf, head->payload_length);
if(rd == -1) { r = SXE_LINKERROR; goto __fail3; }
if(rd != head->payload_length) { r = SXE_LINKERROR; goto __fail3; }
bbuf[rd] = '\0';
sx = parse_sexp(bbuf, rd);
if(!sx) goto __fail3;
/* initialize message */
msg->payload = bbuf;
msg->mhead.payload_length = 0;
/* deal with it */
r = __eval_syssexp(link, sx);
memcpy(head, &msg->mhead, sizeof(sxmplv2_head_t));
head->opcode = r;
if(r != SXE_SUCCESS) { /* we finish */
head->payload_length = 0;
__conn_write(link, head, sizeof(sxmplv2_head_t));
destroy_sexp(sx);
goto __fail3;
}
rd = __conn_write(link, buf, sizeof(sxmplv2_head_t) + msg->mhead.payload_length);
if(rd == -1) { r = SXE_LINKERROR; goto __fail3; }
if(rd != sizeof(sxmplv2_head_t) + msg->mhead.payload_length) {
destroy_sexp(sx);
goto __fail3;
}
destroy_sexp(sx);
} else { r = SXE_LINKERROR; goto __fail3; }
}
/* if we're there - negotiation is done, going to init messaging mode */
r = __link_second_alloc(link);
if(r != SXE_SUCCESS) goto __fail3;
/* free message */
link->messages[0] = NULL;
free(msg);
/* and now we're need to create a thread poll */
if(!(bundle = malloc(sizeof(sxmplv2_bundle_t)))) { r = SXE_ENOMEM; goto __fail4; }
else {
bundle->buf = buf;
bundle->conn = link;
}
for(i = 0; i < MAX_SXMPLTHREADS; i++) {
if(bundle == (void *)0xdead) bundle = __sxmpl_bundle_create(link);
if(!bundle) goto __fail5;
r = pthread_create(&link->thrd_poll[i], NULL, __sxmpl_thread, bundle); /* and here, alloc tls */
if(r) goto __fail5;
else {
bundle = (void *)0xdead;
pthread_detach(link->thrd_poll[i]);
}
}
/* all is done, connection now ready */
link->flags |= SXMP_ALIVE;
r = SXE_SUCCESS;
errno = r;
/* free context for this thread */
ERR_remove_state(0);
return link;
__fail5:
r = SXE_ENOMEM;
/* bundles will be freed by the threads when SSL_read will fails. */
__fail4:
__link_second_free(link);
__fail3:
if(hub->on_destroy) hub->on_destroy(link);
__fail2:
if(msg) free(msg);
if(buf != MAP_FAILED) munmap(buf, 65536);
SSL_shutdown(link->ssl);
__fail:
if(link) {
if(link->ssl) {
ERR_remove_thread_state(0);
ERR_remove_state(0);
ERR_free_strings();
SSL_free(link->ssl);
}
__link_minimal_free(link);
}
close(sck);
errno = r;
return NULL;
}
enum {
_SYNC_ON_CHANNELS = 0,
_SYNC_ON_VERSION = 1,
_SYNC_ON_STREAMS = 2,
};
sxlink_t *sxlink_connect(sxhub_t *hub, const char *host,
int port, const char *SSL_cert, const char *login,
const char *passwd)
{
return sxlink_connect_at(hub, host, port, SSL_cert, login, passwd, NULL);
}
sxlink_t *sxlink_connect_at(sxhub_t *hub, const char *host,
int port, const char *SSL_cert, const char *login,
const char *passwd, const void *priv)
{
sxlink_t *link = __link_minimal_alloc(NULL);
struct hostent *host_;
struct sockaddr_in addr;
int r = SXE_SUCCESS, sck, i, sync_state = _SYNC_ON_CHANNELS;
#ifdef WIN32
WSADATA wsaData;
#endif
char hostbuf[2048];
void *buf = NULL;
char *bbuf;
sxmplv2_head_t *head;
sxmplv2_bundle_t *bundle;
sxmsg_t *msg;
size_t rd, wr, ln;
sexp_t *sx;
r = SXE_IGNORED;
if(!host || !SSL_cert) goto __fail;
if(!link) { r = SXE_ENOMEM; goto __fail; }
link->hub = hub;
#ifdef WIN32
WSAStartup(MAKEWORD(2, 2), &wsaData);
#endif
/* check up ssl context */
r = _sxhub_settls_ctx(hub, SSL_cert);
if(r != SXE_SUCCESS) goto __fail;
/* resolve host */
#ifdef WIN32
host_ = gethostbyname(host);
if(!host_) {
r = SXE_FAILED;
goto __fail;
}
#else
r = __resolvehost(host, hostbuf, 2048, &host_);
if(r) {
r = SXE_FAILED;
goto __fail;
}
#endif
/* create a socket */
sck = socket(PF_INET, SOCK_STREAM, 0);
memset(&addr, 0, sizeof(addr));
/* try to connect it */
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = *(uint32_t*)(host_->h_addr);
r = connect(sck, (struct sockaddr*)&addr, sizeof(addr));
if(r) {
close(sck);
r = SXE_FAILED; /* couldn't connect to the desired host */
goto __fail;
}
/* SSL handshake */
link->ssl = SSL_new(hub->ctx);
if(!link->ssl) {
close(sck);
r = SXE_ENOMEM;
goto __fail;
}
SSL_set_fd(link->ssl, sck); /* attach connected socket */
SSL_set_connect_state(link->ssl);
if(__conn_negotiate(link, SSLX_CONNECT) <= 0) {
r = SXE_EPERM;
/* shutdown connection */
goto __fail;
} /* if success we're ready to use established SSL channel */
/* set connection to the batch mode */
link->flags |= SXMP_BATCHMODE;
/* allocate our first buffer */
buf = mmap(NULL, 65536, PROT_READ|PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if(buf == MAP_FAILED) { r = SXE_ENOMEM; goto __fail2; }
/* allocate first message */
if(!(msg = malloc(sizeof(sxmsg_t)))) { r = SXE_ENOMEM; goto __fail2; }
else {
memset(msg, 0, sizeof(sxmsg_t));
link->messages[0] = msg;
}
bbuf = (char *)buf;
bbuf += sizeof(sxmplv2_head_t);
head = (sxmplv2_head_t *)buf;
/* set out data */
if(priv) sxlink_setpriv(link, priv);
/* authentification first both V2 and newer */
/* form a message -- credentials */
ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(auth-set-credentials \"%s\" \"%s\")",
login ? login : "nil", passwd ? passwd : "nil");
head->opcode = SXE_SUCCESS;
head->payload_length = ln;
wr = __conn_write(link, buf, ln + sizeof(sxmplv2_head_t));
if(wr < 0) goto __fail3;
rd = __conn_read(link, head, sizeof(sxmplv2_head_t));
if(rd < 0) goto __fail3;
if(head->opcode != SXE_SUCCESS) {
r = head->opcode;
goto __fail3;
}
/* since V2.1 syncronization should be done here */
while(link->flags & SXMP_BATCHMODE) {
head->opcode = SXE_SUCCESS;
switch(sync_state) {
case _SYNC_ON_CHANNELS: /* ok, get available channels */
ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(!@c>)");
break;
case _SYNC_ON_VERSION:
ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(!@v> %s)", V2_1_TPROT);
break;
case _SYNC_ON_STREAMS:
ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(!@s>)");
break;
}
/* write a message */
head->payload_length = ln;
wr = __conn_write(link, buf, ln + sizeof(sxmplv2_head_t));
if(wr < 0) goto __fail3;
rd = __conn_read(link, head, sizeof(sxmplv2_head_t));
if(rd < 0) goto __fail3;
if(head->opcode != SXE_SUCCESS) goto __fail3;
if(!head->payload_length) {
sync_state++;
continue;
}
rd = __conn_read(link, bbuf, head->payload_length);
if(rd < 0) goto __fail3;
/* perform a parsing of the desired message */
bbuf[rd] = '\0';
sx = parse_sexp(bbuf, rd);
if(!sx) { r = SXE_BADPROTO; goto __fail3; }
r = __eval_syssexp(link, sx);
if(!r) r = SXE_SUCCESS;
destroy_sexp(sx);
if(sync_state != _SYNC_ON_STREAMS) {
/* write back */
head->opcode = r;
head->payload_length = 0;
wr = __conn_write(link, head, sizeof(sxmplv2_head_t));
if(wr < 0) { r = SXE_LINKERROR; goto __fail3;}
if(r != SXE_SUCCESS) { r = SXE_LINKERROR; goto __fail3;}
}
sync_state++;
}
/* if we're there - negotiation is done, going to init messaging mode */
r = __link_second_alloc(link);
if(r != SXE_SUCCESS) goto __fail3;
/* free message */
link->messages[0] = NULL;
free(msg);
/* and now we're need to create a thread poll */
if(!(bundle = malloc(sizeof(sxmplv2_bundle_t)))) { r = SXE_ENOMEM; goto __fail4; }
else {
bundle->buf = buf;
bundle->conn = link;
}
for(i = 0; i < MAX_SXMPLTHREADS; i++) {
if(bundle == (void *)0xdead) bundle = __sxmpl_bundle_create(link);
if(!bundle) goto __fail5;
r = pthread_create(&link->thrd_poll[i], NULL, __sxmpl_thread, bundle);
if(r) goto __fail5;
else {
pthread_detach(link->thrd_poll[i]);
bundle = (void *)0xdead;
}
}
/* all is done, connection now ready */
link->flags |= SXMP_ALIVE;
return link;
__fail5:
r = SXE_ENOMEM;
/* bundles will be freed by the threads when SSL_read will fails. */
__fail4:
__link_second_free(link);
__fail3:
if(hub->on_destroy) hub->on_destroy(link);
__fail2:
if(buf != MAP_FAILED) munmap(buf, 65536);
SSL_shutdown(link->ssl);
ERR_remove_thread_state(0);
ERR_remove_state(0);
close(sck);
__fail:
if(link) {
if(link->ssl) SSL_free(link->ssl);
__link_minimal_free(link);
}
errno = r;
return NULL;
}
#undef SSLX_ACCEPT
#undef SSLX_CONNECT
int sxlink_close(sxlink_t *co)
{
sxmplv2_head_t mhead;
pthread_t curr = pthread_self();
int i;
memset(&mhead, 0, sizeof(sxmplv2_head_t));
/* setup header */
mhead.attr = SXMSG_LINK | SXMSG_CLOSED;
pthread_mutex_lock(&(co->sslinout[1]));
__conn_write(co, &mhead, sizeof(sxmplv2_head_t));
pthread_mutex_unlock(&(co->sslinout[1]));
/* we will not wait anything */
co->flags |= SXMP_CLOSED;
for(i = 0; i < 8; i++) {
if(!pthread_equal(curr, co->thrd_poll[i])) pthread_join(co->thrd_poll[i], NULL);
}
return SXE_SUCCESS;
}