added v2_1 sync processing;

v0.5.xx
Alexander Vdolainen 9 years ago
parent 84a71e6c3f
commit 7bacf72d55

@ -33,6 +33,8 @@
#define MAX_SXMPLTHREADS 8 #define MAX_SXMPLTHREADS 8
#define MAX_MSGINPROCESS 1024
#define MAX_STREAMS_TYPES 32767 #define MAX_STREAMS_TYPES 32767
#define MAX_LINKS 32768 #define MAX_LINKS 32768

@ -30,6 +30,8 @@
#define SXMP_MAJOR 4 #define SXMP_MAJOR 4
#define SXMP_MINOR 2 #define SXMP_MINOR 2
#define V2_1_TPROT "v2_1"
typedef enum { typedef enum {
V2 = 0, V2 = 0,
V2_1, V2_1,

@ -160,7 +160,7 @@ sxchnl_t *sxchannel_open(sxlink_t *co, int type)
pthread_mutex_unlock(&co->idx_msg_lock); pthread_mutex_unlock(&co->idx_msg_lock);
if(msgidx == IDX_INVAL) { r = SXE_MMESSAGES; goto __reterr2; } if(msgidx == IDX_INVAL) { r = SXE_MMESSAGES; goto __reterr2; }
else head->msgid = msgidx; else head->msgid = (uint16_t)msgidx;
/* now we're ready to write it */ /* now we're ready to write it */
r = _sxmpl_writemsg(co, msg); r = _sxmpl_writemsg(co, msg);

@ -163,6 +163,7 @@ static int __get_channels_list(void *cctx, sexp_t *sx)
if(co->cp_version == V2) { if(co->cp_version == V2) {
/* we're ready for messaging mode */ /* we're ready for messaging mode */
co->flags |= SXMP_MESSAGINGMODE; co->flags |= SXMP_MESSAGINGMODE;
co->flags &= ~SXMP_BATCHMODE;
} }
return SXE_SUCCESS; return SXE_SUCCESS;
@ -189,6 +190,7 @@ static int __get_streams(void *cctx, sexp_t *sx)
msg->mhead.payload_length = ulen + 1; msg->mhead.payload_length = ulen + 1;
/* and now we're ready to exit from batch mode */ /* and now we're ready to exit from batch mode */
link->flags |= SXMP_MESSAGINGMODE; link->flags |= SXMP_MESSAGINGMODE;
link->flags &= ~SXMP_BATCHMODE;
return SXE_SUCCESS; return SXE_SUCCESS;
} else { /* set all streams available */ } else { /* set all streams available */
@ -241,6 +243,7 @@ static int __get_streams(void *cctx, sexp_t *sx)
} }
link->flags |= SXMP_MESSAGINGMODE; link->flags |= SXMP_MESSAGINGMODE;
link->flags &= ~SXMP_BATCHMODE;
return SXE_SUCCESS; return SXE_SUCCESS;
} }
@ -397,6 +400,11 @@ static int __set_channels_list(void *cctx, sexp_t *sx)
sexp_t *isx, *iisx; sexp_t *isx, *iisx;
int id, r; int id, r;
/* determine how this function was called */
if(!strcmp(sx->list->val, "get-channels-list"))
co->cp_version = V2;
else co->cp_version = V2_1; /* last supported in this version */
SEXP_ITERATE_LIST(sx, isx, idx) { SEXP_ITERATE_LIST(sx, isx, idx) {
if(!idx) continue; if(!idx) continue;
@ -441,7 +449,7 @@ static int __my_version_ack(void *cctx, sexp_t *sx)
if(!idx) continue; if(!idx) continue;
if(idx > 2) return SXE_BADPROTO; if(idx > 2) return SXE_BADPROTO;
if(isx->ty == SEXP_LIST) return SXE_BADPROTO; if(isx->ty == SEXP_LIST) return SXE_BADPROTO;
if(!strcmp(isx->val, "v2_1")) link->cp_version = V2_1; if(!strcmp(isx->val, V2_1_TPROT)) link->cp_version = V2_1;
else link->cp_version = V_UNKNOWN; else link->cp_version = V_UNKNOWN;
} }
@ -458,7 +466,7 @@ static int __my_version_set(void *cctx, sexp_t *sx)
if(!idx) continue; if(!idx) continue;
if(idx > 2) return SXE_BADPROTO; if(idx > 2) return SXE_BADPROTO;
if(isx->ty == SEXP_LIST) return SXE_BADPROTO; if(isx->ty == SEXP_LIST) return SXE_BADPROTO;
if(!strcmp(isx->val, "v2_1")) link->cp_version = V2_1; if(!strcmp(isx->val, V2_1_TPROT)) link->cp_version = V2_1;
else return SXE_FAILED; /* failed to set another version */ else return SXE_FAILED; /* failed to set another version */
} }

@ -417,8 +417,8 @@ sxlink_t *__link_minimal_alloc(struct in_addr *addr)
if(!co) { r = ENOMEM; goto __fail; } if(!co) { r = ENOMEM; goto __fail; }
else memset(co, 0, sizeof(sxlink_t)); else memset(co, 0, sizeof(sxlink_t));
if(!(co->messages = malloc(sizeof(uintptr_t)*1024))) { r = ENOMEM; goto __fail; } if(!(co->messages = malloc(sizeof(uintptr_t)*MAX_MSGINPROCESS))) { r = ENOMEM; goto __fail; }
else memset(co->messages, 0, sizeof(uintptr_t)*1024); else memset(co->messages, 0, sizeof(uintptr_t)*MAX_MSGINPROCESS);
if(!(co->pctx = malloc(sizeof(sxsession_ctx_t)))) { r = ENOMEM; goto __fail; } if(!(co->pctx = malloc(sizeof(sxsession_ctx_t)))) { r = ENOMEM; goto __fail; }
else memset(co->pctx, 0, sizeof(sxsession_ctx_t)); else memset(co->pctx, 0, sizeof(sxsession_ctx_t));
@ -452,7 +452,7 @@ static int __link_second_alloc(sxlink_t *co)
memset(&co->idx_ch, 0, sizeof(idx_allocator_t)); memset(&co->idx_ch, 0, sizeof(idx_allocator_t));
memset(&co->idx_msg, 0, sizeof(idx_allocator_t)); memset(&co->idx_msg, 0, sizeof(idx_allocator_t));
if((idx_allocator_init(&co->idx_ch, 512, 0))) goto __fail; if((idx_allocator_init(&co->idx_ch, 512, 0))) goto __fail;
if((idx_allocator_init(&co->idx_msg, 1024, 0))) goto __fail; if((idx_allocator_init(&co->idx_msg, MAX_MSGINPROCESS, 0))) goto __fail;
if(!(co->channels = malloc(sizeof(uintptr_t)*512))) goto __fail; if(!(co->channels = malloc(sizeof(uintptr_t)*512))) goto __fail;
else memset(co->channels, 0, sizeof(uintptr_t)*512); else memset(co->channels, 0, sizeof(uintptr_t)*512);
@ -599,7 +599,7 @@ static void __link_destroy(sxlink_t *co)
if(!_CONN_UCOUNT(co)) { if(!_CONN_UCOUNT(co)) {
/* go thru messages */ /* go thru messages */
pthread_mutex_lock(&co->idx_msg_lock); pthread_mutex_lock(&co->idx_msg_lock);
for(i = 0; i < 1024; i++) { for(i = 0; i < MAX_MSGINPROCESS; i++) {
msg = co->messages[i]; msg = co->messages[i];
if(!msg) continue; if(!msg) continue;
else head = &msg->mhead; else head = &msg->mhead;
@ -731,6 +731,7 @@ static void *__sxmpl_thread(void *b)
#ifdef _PERFPROFILE #ifdef _PERFPROFILE
gettimeofday(&beg, NULL); gettimeofday(&beg, NULL);
#endif #endif
memset(mhead, 0, sizeof(sxmplv2_head_t));
rd = __conn_read(co, mhead, sizeof(sxmplv2_head_t)); rd = __conn_read(co, mhead, sizeof(sxmplv2_head_t));
#ifdef _PERFPROFILE #ifdef _PERFPROFILE
gettimeofday(&end, NULL); gettimeofday(&end, NULL);
@ -775,7 +776,7 @@ static void *__sxmpl_thread(void *b)
if(rd != mhead->payload_length) { if(rd != mhead->payload_length) {
mid = mhead->msgid; mid = mhead->msgid;
/* if we're need to do something */ /* if we're need to do something */
if(mhead->msgid >= 1024) { if(mhead->msgid >= MAX_MSGINPROCESS) {
mhead->opcode = SXE_INVALINDEX; mhead->opcode = SXE_INVALINDEX;
goto __return_error; goto __return_error;
} else { } else {
@ -817,17 +818,17 @@ static void *__sxmpl_thread(void *b)
pthread_mutex_unlock(&(co->sslinout[1])); pthread_mutex_unlock(&(co->sslinout[1]));
if(wr < 0) goto __finish; if(wr < 0) goto __finish;
} else { /* it's came back */ } else { /* it's came back */
mid = (ulong_t)mhead->msgid;
/* reply came ... */ /* reply came ... */
if(mhead->msgid >= 1024) { if(mid > (MAX_MSGINPROCESS - 1)) {
__inval_idx_nor: __inval_idx_nor:
fprintf(stderr, "[sxmplv2] Invalid index of the message.\n"); fprintf(stderr, "[sxmplv2] Invalid index of the message (%lu).\n", mid);
goto __again; goto __again;
} }
mid = mhead->msgid;
//hread_mutex_lock(&co->idx_msg_lock); msg = co->messages[(int)mid];
msg = co->messages[mid];
//hread_mutex_unlock(&co->idx_msg_lock); if(!msg) { goto __inval_idx_nor; }
if(!msg) goto __inval_idx_nor;
/* ok now we'are copy data and unlock wait mutex */ /* ok now we'are copy data and unlock wait mutex */
memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t)); memcpy(&msg->mhead, mhead, sizeof(sxmplv2_head_t));
@ -836,7 +837,7 @@ static void *__sxmpl_thread(void *b)
} else if(mhead->attr & SXMSG_LINK) { /* link layer messages */ } else if(mhead->attr & SXMSG_LINK) { /* link layer messages */
if(mhead->attr & SXMSG_CLOSED) goto __finish; /* close the link */ if(mhead->attr & SXMSG_CLOSED) goto __finish; /* close the link */
if(mhead->attr & SXMSG_PULSE) { /* it's a link pulse messages */ if(mhead->attr & SXMSG_PULSE) { /* it's a link pulse messages */
/* TODO: other service messages */ /* FIXME: special sxmpv2.1 top layer messages - should be here */
if(mhead->opcode == SXE_RAPIDMSG) { /* custom pulse */ if(mhead->opcode == SXE_RAPIDMSG) { /* custom pulse */
sx = parse_sexp(bbuf, mhead->payload_length); sx = parse_sexp(bbuf, mhead->payload_length);
if(sx && co->ssys->on_pulse) co->ssys->on_pulse(co, sx); if(sx && co->ssys->on_pulse) co->ssys->on_pulse(co, sx);
@ -1075,6 +1076,7 @@ sxlink_t *sxlink_master_accept(sxhub_t *ssys, int sck, struct in_addr *addr)
if(rd == -1) { r = SXE_LINKERROR; goto __fail3; } if(rd == -1) { r = SXE_LINKERROR; goto __fail3; }
if(rd != head->payload_length) { r = SXE_LINKERROR; goto __fail3; } if(rd != head->payload_length) { r = SXE_LINKERROR; goto __fail3; }
bbuf[rd] = '\0'; bbuf[rd] = '\0';
sx = parse_sexp(bbuf, rd); sx = parse_sexp(bbuf, rd);
if(!sx) goto __fail3; if(!sx) goto __fail3;
@ -1085,6 +1087,7 @@ sxlink_t *sxlink_master_accept(sxhub_t *ssys, int sck, struct in_addr *addr)
r = __eval_syssexp(co, sx); r = __eval_syssexp(co, sx);
memcpy(head, &msg->mhead, sizeof(sxmplv2_head_t)); memcpy(head, &msg->mhead, sizeof(sxmplv2_head_t));
head->opcode = r; head->opcode = r;
if(r != SXE_SUCCESS) { /* we finish */ if(r != SXE_SUCCESS) { /* we finish */
head->payload_length = 0; head->payload_length = 0;
__conn_write(co, head, sizeof(sxmplv2_head_t)); __conn_write(co, head, sizeof(sxmplv2_head_t));
@ -1167,6 +1170,12 @@ sxlink_t *sxlink_master_accept(sxhub_t *ssys, int sck, struct in_addr *addr)
return NULL; return NULL;
} }
enum {
_SYNC_ON_CHANNELS = 0,
_SYNC_ON_VERSION = 1,
_SYNC_ON_STREAMS = 2,
};
sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host, sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host,
int port, const char *SSL_cert, const char *login, int port, const char *SSL_cert, const char *login,
const char *passwd) const char *passwd)
@ -1174,7 +1183,7 @@ sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host,
sxlink_t *co = __link_minimal_alloc(NULL); sxlink_t *co = __link_minimal_alloc(NULL);
struct hostent *host_; struct hostent *host_;
struct sockaddr_in addr; struct sockaddr_in addr;
int r = SXE_SUCCESS, sck; int r = SXE_SUCCESS, sck, i, sync_state = _SYNC_ON_CHANNELS;
#ifdef WIN32 #ifdef WIN32
WSADATA wsaData; WSADATA wsaData;
#endif #endif
@ -1184,8 +1193,8 @@ sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host,
sxmplv2_head_t *head; sxmplv2_head_t *head;
sxmplv2_bundle_t *bundle; sxmplv2_bundle_t *bundle;
sxmsg_t *msg; sxmsg_t *msg;
size_t rd, wr; size_t rd, wr, ln;
int i; sexp_t *sx;
r = SXE_IGNORED; r = SXE_IGNORED;
if(!host || !SSL_cert) goto __fail; if(!host || !SSL_cert) goto __fail;
@ -1311,28 +1320,40 @@ sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host,
bbuf += sizeof(sxmplv2_head_t); bbuf += sizeof(sxmplv2_head_t);
head = (sxmplv2_head_t *)buf; head = (sxmplv2_head_t *)buf;
sexp_t *sx; /* authentification first both V2 and newer */
size_t ln; /* form a message -- credentials */
while(co->flags & SXMP_BATCHMODE) { ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(auth-set-credentials \"%s\" \"%s\")",
/* form a message -- credentials */ login ? login : "nil", passwd ? passwd : "nil");
ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(auth-set-credentials \"%s\" \"%s\")",
login ? login : "nil", passwd ? passwd : "nil"); head->opcode = SXE_SUCCESS;
head->payload_length = ln;
wr = __conn_write(co, buf, ln + sizeof(sxmplv2_head_t));
if(wr < 0) goto __fail2;
rd = __conn_read(co, head, sizeof(sxmplv2_head_t));
if(rd < 0) goto __fail2;
if(head->opcode != SXE_SUCCESS) {
r = head->opcode;
goto __fail2;
}
/* since V2.1 syncronization should be done here */
while(co->flags & SXMP_BATCHMODE) {
head->opcode = SXE_SUCCESS; head->opcode = SXE_SUCCESS;
head->payload_length = ln;
wr = __conn_write(co, buf, ln + sizeof(sxmplv2_head_t));
if(wr < 0) goto __fail2;
rd = __conn_read(co, head, sizeof(sxmplv2_head_t)); switch(sync_state) {
if(rd < 0) goto __fail2; case _SYNC_ON_CHANNELS: /* ok, get available channels */
if(head->opcode != SXE_SUCCESS) { ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(!@c>)");
r = head->opcode; break;
goto __fail2; case _SYNC_ON_VERSION:
ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(!@v> %s)", V2_1_TPROT);
break;
case _SYNC_ON_STREAMS:
ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(!@s>)");
break;
} }
/* ok, get available channels */ /* write a message */
head->opcode = SXE_SUCCESS;
ln = snprintf(bbuf, 65535 - sizeof(sxmplv2_head_t), "(get-channels-list)");
head->payload_length = ln; head->payload_length = ln;
wr = __conn_write(co, buf, ln + sizeof(sxmplv2_head_t)); wr = __conn_write(co, buf, ln + sizeof(sxmplv2_head_t));
if(wr < 0) goto __fail2; if(wr < 0) goto __fail2;
@ -1340,7 +1361,10 @@ sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host,
rd = __conn_read(co, head, sizeof(sxmplv2_head_t)); rd = __conn_read(co, head, sizeof(sxmplv2_head_t));
if(rd < 0) goto __fail2; if(rd < 0) goto __fail2;
if(head->opcode != SXE_SUCCESS) goto __fail2; if(head->opcode != SXE_SUCCESS) goto __fail2;
if(!head->payload_length) goto __fail2; if(!head->payload_length) {
sync_state++;
continue;
}
rd = __conn_read(co, bbuf, head->payload_length); rd = __conn_read(co, bbuf, head->payload_length);
if(rd < 0) goto __fail2; if(rd < 0) goto __fail2;
@ -1352,13 +1376,15 @@ sxlink_t *sxlink_connect(sxhub_t *ssys, const char *host,
if(!r) r = SXE_SUCCESS; if(!r) r = SXE_SUCCESS;
destroy_sexp(sx); destroy_sexp(sx);
/* write back */ if(sync_state != _SYNC_ON_STREAMS) {
head->opcode = r; /* write back */
head->payload_length = 0; head->opcode = r;
wr = __conn_write(co, head, sizeof(sxmplv2_head_t)); head->payload_length = 0;
if(wr < 0) { wr = __conn_write(co, head, sizeof(sxmplv2_head_t));
r = SXE_LINKERROR; goto __fail2;} if(wr < 0) { r = SXE_LINKERROR; goto __fail2;}
if(r != SXE_SUCCESS) { r = SXE_LINKERROR; goto __fail2;} if(r != SXE_SUCCESS) { r = SXE_LINKERROR; goto __fail2;}
}
sync_state++;
} }
/* if we're there - negotiation is done, going to init messaging mode */ /* if we're there - negotiation is done, going to init messaging mode */

Loading…
Cancel
Save