diff --git a/include/sxmp/limits.h b/include/sxmp/limits.h index 639c3b0..e532fb0 100644 --- a/include/sxmp/limits.h +++ b/include/sxmp/limits.h @@ -33,6 +33,8 @@ #define MAX_SXMPLTHREADS 8 +#define MAX_MSGINPROCESS 1024 + #define MAX_STREAMS_TYPES 32767 #define MAX_LINKS 32768 diff --git a/include/sxmp/version.h b/include/sxmp/version.h index f1147e0..ad0c53c 100644 --- a/include/sxmp/version.h +++ b/include/sxmp/version.h @@ -30,6 +30,8 @@ #define SXMP_MAJOR 4 #define SXMP_MINOR 2 +#define V2_1_TPROT "v2_1" + typedef enum { V2 = 0, V2_1, diff --git a/lib/channel.c b/lib/channel.c index 370e7d6..2c3e8f6 100644 --- a/lib/channel.c +++ b/lib/channel.c @@ -160,7 +160,7 @@ sxchnl_t *sxchannel_open(sxlink_t *co, int type) pthread_mutex_unlock(&co->idx_msg_lock); if(msgidx == IDX_INVAL) { r = SXE_MMESSAGES; goto __reterr2; } - else head->msgid = msgidx; + else head->msgid = (uint16_t)msgidx; /* now we're ready to write it */ r = _sxmpl_writemsg(co, msg); diff --git a/lib/link.c b/lib/link.c index 661a72e..8f9d323 100644 --- a/lib/link.c +++ b/lib/link.c @@ -163,6 +163,7 @@ static int __get_channels_list(void *cctx, sexp_t *sx) if(co->cp_version == V2) { /* we're ready for messaging mode */ co->flags |= SXMP_MESSAGINGMODE; + co->flags &= ~SXMP_BATCHMODE; } return SXE_SUCCESS; @@ -189,6 +190,7 @@ static int __get_streams(void *cctx, sexp_t *sx) msg->mhead.payload_length = ulen + 1; /* and now we're ready to exit from batch mode */ link->flags |= SXMP_MESSAGINGMODE; + link->flags &= ~SXMP_BATCHMODE; return SXE_SUCCESS; } else { /* set all streams available */ @@ -241,6 +243,7 @@ static int __get_streams(void *cctx, sexp_t *sx) } link->flags |= SXMP_MESSAGINGMODE; + link->flags &= ~SXMP_BATCHMODE; return SXE_SUCCESS; } @@ -397,6 +400,11 @@ static int __set_channels_list(void *cctx, sexp_t *sx) sexp_t *isx, *iisx; int id, r; + /* determine how this function was called */ + if(!strcmp(sx->list->val, "get-channels-list")) + co->cp_version = V2; + else co->cp_version = V2_1; /* last supported in this version */ + SEXP_ITERATE_LIST(sx, isx, idx) { if(!idx) continue; @@ -441,7 +449,7 @@ static int __my_version_ack(void *cctx, sexp_t *sx) if(!idx) continue; if(idx > 2) return SXE_BADPROTO; if(isx->ty == SEXP_LIST) return SXE_BADPROTO; - if(!strcmp(isx->val, "v2_1")) link->cp_version = V2_1; + if(!strcmp(isx->val, V2_1_TPROT)) link->cp_version = V2_1; else link->cp_version = V_UNKNOWN; } @@ -458,7 +466,7 @@ static int __my_version_set(void *cctx, sexp_t *sx) if(!idx) continue; if(idx > 2) return SXE_BADPROTO; if(isx->ty == SEXP_LIST) return SXE_BADPROTO; - if(!strcmp(isx->val, "v2_1")) link->cp_version = V2_1; + if(!strcmp(isx->val, V2_1_TPROT)) link->cp_version = V2_1; else return SXE_FAILED; /* failed to set another version */ } diff --git a/lib/sxmplv2.c b/lib/sxmplv2.c index 8423d98..41c5cd9 100644 --- a/lib/sxmplv2.c +++ b/lib/sxmplv2.c @@ -417,8 +417,8 @@ sxlink_t *__link_minimal_alloc(struct in_addr *addr) if(!co) { r = ENOMEM; goto __fail; } else memset(co, 0, sizeof(sxlink_t)); - if(!(co->messages = malloc(sizeof(uintptr_t)*1024))) { r = ENOMEM; goto __fail; } - else memset(co->messages, 0, sizeof(uintptr_t)*1024); + if(!(co->messages = malloc(sizeof(uintptr_t)*MAX_MSGINPROCESS))) { r = ENOMEM; goto __fail; } + else memset(co->messages, 0, sizeof(uintptr_t)*MAX_MSGINPROCESS); if(!(co->pctx = malloc(sizeof(sxsession_ctx_t)))) { r = ENOMEM; goto __fail; } else memset(co->pctx, 0, sizeof(sxsession_ctx_t)); @@ -452,7 +452,7 @@ static int __link_second_alloc(sxlink_t *co) memset(&co->idx_ch, 0, sizeof(idx_allocator_t)); memset(&co->idx_msg, 0, sizeof(idx_allocator_t)); if((idx_allocator_init(&co->idx_ch, 512, 0))) goto __fail; - if((idx_allocator_init(&co->idx_msg, 1024, 0))) goto __fail; + if((idx_allocator_init(&co->idx_msg, MAX_MSGINPROCESS, 0))) goto __fail; if(!(co->channels = malloc(sizeof(uintptr_t)*512))) goto __fail; else memset(co->channels, 0, sizeof(uintptr_t)*512); @@ -599,7 +599,7 @@ static void __link_destroy(sxlink_t *co) if(!_CONN_UCOUNT(co)) { /* go thru messages */ pthread_mutex_lock(&co->idx_msg_lock); - for(i = 0; i < 1024; i++) { + for(i = 0; i < MAX_MSGINPROCESS; i++) { msg = co->messages[i]; if(!msg) continue; else head = &msg->mhead; @@ -731,6 +731,7 @@ static void *__sxmpl_thread(void *b) #ifdef _PERFPROFILE gettimeofday(&beg, NULL); #endif + memset(mhead, 0, sizeof(sxmplv2_head_t)); rd = __conn_read(co, mhead, sizeof(sxmplv2_head_t)); #ifdef _PERFPROFILE gettimeofday(&end, NULL); @@ -775,7 +776,7 @@ static void *__sxmpl_thread(void *b) if(rd != mhead->payload_length) { mid = mhead->msgid; /* if we're need to do something */ - if(mhead->msgid >= 1024) { + if(mhead->msgid >= MAX_MSGINPROCESS) { mhead->opcode = SXE_INVALINDEX; goto __return_error; } else { @@ -817,17 +818,17 @@ static void *__sxmpl_thread(void *b) pthread_mutex_unlock(&(co->sslinout[1])); if(wr < 0) goto __finish; } else { /* it's came back */ + mid = (ulong_t)mhead->msgid; /* reply came ... */ - if(mhead->msgid >= 1024) { + if(mid > (MAX_MSGINPROCESS - 1)) { __inval_idx_nor: - fprintf(stderr, "[sxmplv2] Invalid index of the message.\n"); + fprintf(stderr, "[sxmplv2] Invalid index of the message (%lu).\n", mid); goto __again; } - mid = mhead->msgid; - //hread_mutex_lock(&co->idx_msg_lock); - msg = co->messages[mid]; - //hread_mutex_unlock(&co->idx_msg_lock); - if(!msg) goto __inval_idx_nor; + + msg = co->messages[(int)mid]; + + if(!msg) { goto __inval_idx_nor; } /* ok now we'are copy data and unlock wait mutex */ memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t)); @@ -836,7 +837,7 @@ static void *__sxmpl_thread(void *b) } else if(mhead->attr & SXMSG_LINK) { /* link layer messages */ if(mhead->attr & SXMSG_CLOSED) goto __finish; /* close the link */ if(mhead->attr & SXMSG_PULSE) { /* it's a link pulse messages */ - /* TODO: other service messages */ + /* FIXME: special sxmpv2.1 top layer messages - should be here */ if(mhead->opcode == SXE_RAPIDMSG) { /* custom pulse */ sx = parse_sexp(bbuf, mhead->payload_length); if(sx && co->ssys->on_pulse) co->ssys->on_pulse(co, sx); @@ -1075,6 +1076,7 @@ sxlink_t *sxlink_master_accept(sxhub_t *ssys, int sck, struct in_addr *addr) if(rd == -1) { r = SXE_LINKERROR; goto __fail3; } if(rd != head->payload_length) { r = SXE_LINKERROR; goto __fail3; } bbuf[rd] = '\0'; + sx = parse_sexp(bbuf, rd); if(!sx) goto __fail3; @@ -1085,6 +1087,7 @@ sxlink_t *sxlink_master_accept(sxhub_t *ssys, int sck, struct in_addr *addr) r = __eval_syssexp(co, sx); memcpy(head, &msg->mhead, sizeof(sxmplv2_head_t)); head->opcode = r; + if(r != SXE_SUCCESS) { /* we finish */ head->payload_length = 0; __conn_write(co, head, sizeof(sxmplv2_head_t)); @@ -1167,6 +1170,12 @@ sxlink_t *sxlink_master_accept(sxhub_t *ssys, int sck, struct in_addr *addr) return NULL; } +enum { + _SYNC_ON_CHANNELS = 0, + _SYNC_ON_VERSION = 1, + _SYNC_ON_STREAMS = 2, +}; + sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host, int port, const char *SSL_cert, const char *login, const char *passwd) @@ -1174,7 +1183,7 @@ sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host, sxlink_t *co = __link_minimal_alloc(NULL); struct hostent *host_; struct sockaddr_in addr; - int r = SXE_SUCCESS, sck; + int r = SXE_SUCCESS, sck, i, sync_state = _SYNC_ON_CHANNELS; #ifdef WIN32 WSADATA wsaData; #endif @@ -1184,8 +1193,8 @@ sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host, sxmplv2_head_t *head; sxmplv2_bundle_t *bundle; sxmsg_t *msg; - size_t rd, wr; - int i; + size_t rd, wr, ln; + sexp_t *sx; r = SXE_IGNORED; if(!host || !SSL_cert) goto __fail; @@ -1311,28 +1320,40 @@ sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host, bbuf += sizeof(sxmplv2_head_t); head = (sxmplv2_head_t *)buf; - sexp_t *sx; - size_t ln; - while(co->flags & SXMP_BATCHMODE) { - /* form a message -- credentials */ - ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(auth-set-credentials \"%s\" \"%s\")", - login ? login : "nil", passwd ? passwd : "nil"); + /* authentification first both V2 and newer */ + /* form a message -- credentials */ + ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(auth-set-credentials \"%s\" \"%s\")", + login ? login : "nil", passwd ? passwd : "nil"); + + head->opcode = SXE_SUCCESS; + head->payload_length = ln; + wr = __conn_write(co, buf, ln + sizeof(sxmplv2_head_t)); + if(wr < 0) goto __fail2; + + rd = __conn_read(co, head, sizeof(sxmplv2_head_t)); + if(rd < 0) goto __fail2; + if(head->opcode != SXE_SUCCESS) { + r = head->opcode; + goto __fail2; + } + /* since V2.1 syncronization should be done here */ + while(co->flags & SXMP_BATCHMODE) { head->opcode = SXE_SUCCESS; - head->payload_length = ln; - wr = __conn_write(co, buf, ln + sizeof(sxmplv2_head_t)); - if(wr < 0) goto __fail2; - rd = __conn_read(co, head, sizeof(sxmplv2_head_t)); - if(rd < 0) goto __fail2; - if(head->opcode != SXE_SUCCESS) { - r = head->opcode; - goto __fail2; + switch(sync_state) { + case _SYNC_ON_CHANNELS: /* ok, get available channels */ + ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(!@c>)"); + break; + case _SYNC_ON_VERSION: + ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(!@v> %s)", V2_1_TPROT); + break; + case _SYNC_ON_STREAMS: + ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(!@s>)"); + break; } - /* ok, get available channels */ - head->opcode = SXE_SUCCESS; - ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(get-channels-list)"); + /* write a message */ head->payload_length = ln; wr = __conn_write(co, buf, ln + sizeof(sxmplv2_head_t)); if(wr < 0) goto __fail2; @@ -1340,7 +1361,10 @@ sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host, rd = __conn_read(co, head, sizeof(sxmplv2_head_t)); if(rd < 0) goto __fail2; if(head->opcode != SXE_SUCCESS) goto __fail2; - if(!head->payload_length) goto __fail2; + if(!head->payload_length) { + sync_state++; + continue; + } rd = __conn_read(co, bbuf, head->payload_length); if(rd < 0) goto __fail2; @@ -1352,13 +1376,15 @@ sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host, if(!r) r = SXE_SUCCESS; destroy_sexp(sx); - /* write back */ - head->opcode = r; - head->payload_length = 0; - wr = __conn_write(co, head, sizeof(sxmplv2_head_t)); - if(wr < 0) { - r = SXE_LINKERROR; goto __fail2;} - if(r != SXE_SUCCESS) { r = SXE_LINKERROR; goto __fail2;} + if(sync_state != _SYNC_ON_STREAMS) { + /* write back */ + head->opcode = r; + head->payload_length = 0; + wr = __conn_write(co, head, sizeof(sxmplv2_head_t)); + if(wr < 0) { r = SXE_LINKERROR; goto __fail2;} + if(r != SXE_SUCCESS) { r = SXE_LINKERROR; goto __fail2;} + } + sync_state++; } /* if we're there - negotiation is done, going to init messaging mode */