ok, new pipelining model seems working.

This commit is contained in:
Alexandre Snarskii
2015-07-11 20:34:20 +03:00
parent d169fdba7b
commit 6c466d76d2

View File

@@ -411,8 +411,9 @@ bgpq_read(struct bgpq_expander* b, int fd)
SX_DEBUG(debug_expander>2, "waiting for answer to %s, init %i '%.*s'\n",
req->request, off, off, response);
int ret = 0;
char* cres;
if (strchr(response, '\n'))
if ((cres=strchr(response, '\n'))!=NULL)
goto have;
repeat:
ret = read(fd, response+off, sizeof(response)-off);
@@ -426,16 +427,16 @@ repeat:
};
off += ret;
if (!strchr(response, '\n'))
if (!(cres=strchr(response, '\n')))
goto repeat;
have:
SX_DEBUG(debug_expander>5, "got response of %.*s\n", off, response);
if(response[0]=='A') {
char* eon, *c, *cres;
char* eon, *c;
unsigned long togot=strtoul(response+1,&eon,10);
char* recvbuffer=malloc(togot);
char* recvbuffer=malloc(togot+2);
int offset = 0;
memset(recvbuffer,0,togot);
memset(recvbuffer,0,togot+2);
if(!eon || *eon!='\n') {
sx_report(SX_ERROR,"A-code finished with wrong char '%c'(%s)\n",
@@ -443,19 +444,29 @@ have:
exit(1);
};
offset = off - ((eon+1) - response);
memcpy(recvbuffer, eon+1, off - ((eon+1) - response));
if (off - ((eon+1) - response) > togot) {
/* full response and more data is already in buffer */
memcpy(recvbuffer, eon+1, togot);
offset = togot;
memmove(response, eon+1+togot, off-((eon+1)-response)-togot);
off -= togot + ((eon+1)-response);
memset(response+off, 0, sizeof(response)-off);
} else {
/* response is not yet fully buffered */
memcpy(recvbuffer, eon+1, off - ((eon+1)-response));
offset = off - ((eon+1) - response);
memset(response, 0, sizeof(response));
off = 0;
};
SX_DEBUG(debug_expander>5,
"starting read with ready '%.*s', waiting for %lu\n",
offset, recvbuffer, togot-offset);
if (togot < offset) {
memmove(response, recvbuffer+togot, offset-togot);
off = offset-togot;
if (off > 0)
goto have3;
} else if (togot == offset) {
goto have2;
};
if (offset == togot)
goto reread2;
reread:
ret = read(fd, recvbuffer+offset, togot-offset);
@@ -471,14 +482,10 @@ reread:
"Read1: got '%.*s'\n", ret, recvbuffer+offset);
offset+=ret;
if(offset < togot) {
sx_report(SX_NOTICE, "expected %lu, got %lu\n",
togot, strlen(recvbuffer));
SX_DEBUG(debug_expander>5, "expected %lu, got %lu expanding %s",
togot, strlen(recvbuffer), req->request);
goto reread;
};
have2:
off=0;
memset(response, 0, sizeof(response));
reread2:
ret = read(fd, response+off, sizeof(response) - off);
if (ret < 0) {
@@ -509,11 +516,6 @@ have3:
c+=spn+1;
};
assert(c == recvbuffer+togot);
memmove(response, cres+1, off-((cres+1)-response));
off -= (cres+1)-response;
memset(response+off, 0, sizeof(response) - off);
SX_DEBUG(debug_expander>5,
"fixed response of %i, %.*s\n", off, off, response);
memset(recvbuffer,0,togot+2);
free(recvbuffer);
} else if(response[0]=='C') {
@@ -534,6 +536,11 @@ have3:
req->request);
exit(1);
};
memmove(response, cres+1, off-((cres+1)-response));
off -= (cres+1)-response;
memset(response+off, 0, sizeof(response) - off);
SX_DEBUG(debug_expander>5,
"fixed response of %i, %.*s\n", off, off, response);
STAILQ_REMOVE_HEAD(&b->rq, next);
b->piped--;
@@ -573,19 +580,18 @@ repeat:
sx_report(SX_FATAL, "EOF reading IRRd\n");
exit(1);
};
off += ret;
if (!strchr(response, '\n')) {
off += ret;
if (strchr(response, '\n') == NULL)
goto repeat;
};
SX_DEBUG(debug_expander>2,"expander: initially got %lu bytes, '%s'\n",
(unsigned long)strlen(response),response);
if(response[0]=='A') {
char* eon, *c;
long togot=strtoul(response+1,&eon,10);
char recvbuffer[togot+1];
int recvoff = 0;
char recvbuffer[togot+2];
int offset = 0;
if(eon && *eon!='\n') {
sx_report(SX_ERROR,"A-code finised with wrong char '%c' (%s)\n",
@@ -593,22 +599,38 @@ repeat:
exit(1);
};
if (off - ((eon+1)-response) > togot) {
memcpy(recvbuffer, eon+1, togot);
offset = togot;
memmove(response, eon+1+togot, off - ((eon+1)-response) - togot);
off -= togot + ((eon+1)-response);
memset(response+off, 0, sizeof(response)-off);
} else {
memcpy(recvbuffer, eon+1, off - ((eon+1)-response));
offset = off - ((eon+1) - response);
memset(response, 0, sizeof(response));
off = 0;
};
if (off > 0)
goto have3;
if (offset == togot)
goto reread2;
reread:
ret = read(fd, recvbuffer+recvoff, togot+1-recvoff);
ret = read(fd, recvbuffer+offset, togot-offset);
if (ret == 0) {
sx_report(SX_FATAL,"EOF from IRRd (expand,result)\n");
} else if (ret < 0) {
sx_report(SX_FATAL,"Error reading IRRd: %s (expand,result)\n",
strerror(errno));
};
recvoff += ret;
if (recvoff < togot+1)
offset += ret;
if (offset < togot)
goto reread;
memset(response, 0, sizeof(response));
off = 0;
reread2:
ret = read(fd, response+off, sizeof(response)-off);
if (ret < 0) {
sx_report(SX_FATAL, "error reading IRRd: %s\n", strerror(errno));
exit(1);
@@ -616,9 +638,15 @@ reread:
sx_report(SX_FATAL, "eof reading IRRd\n");
exit(1);
};
off += ret;
have3:
if (!strchr(response, '\n'))
goto reread2;
SX_DEBUG(debug_expander>2,"expander: final reply of %lu bytes, '%s'\n",
(unsigned long)strlen(recvbuffer),recvbuffer);
SX_DEBUG(debug_expander>2,"expander: final reply of %lu bytes, "
"%.*sreturn code %.*s",
(unsigned long)strlen(recvbuffer), offset, recvbuffer, off,
response);
for(c=recvbuffer; c<recvbuffer+togot;) {
size_t spn=strcspn(c," \n");
@@ -627,6 +655,7 @@ reread:
if(callback) callback(c,udata,request);
c+=spn+1;
};
memset(recvbuffer, 0, togot+2);
} else if(response[0]=='C') {
/* no data */
} else if(response[0]=='D') {
@@ -722,7 +751,8 @@ bgpq_expand(struct bgpq_expander* b)
read(fd, ident, sizeof(ident));
};
fcntl(fd, F_SETFL, O_NONBLOCK|(fcntl(fd, F_GETFL)));
if (pipelining)
fcntl(fd, F_SETFL, O_NONBLOCK|(fcntl(fd, F_GETFL)));
for(mc=b->macroses;mc;mc=mc->next) {
if (!b->maxdepth) {