From 6c466d76d2abf1e14e7c95fd770e1614e9b43f4b Mon Sep 17 00:00:00 2001 From: Alexandre Snarskii Date: Sat, 11 Jul 2015 20:34:20 +0300 Subject: [PATCH] ok, new pipelining model seems working. --- bgpq_expander.c | 106 +++++++++++++++++++++++++++++++----------------- 1 file changed, 68 insertions(+), 38 deletions(-) diff --git a/bgpq_expander.c b/bgpq_expander.c index 5377257..56dcd61 100644 --- a/bgpq_expander.c +++ b/bgpq_expander.c @@ -411,8 +411,9 @@ bgpq_read(struct bgpq_expander* b, int fd) SX_DEBUG(debug_expander>2, "waiting for answer to %s, init %i '%.*s'\n", req->request, off, off, response); int ret = 0; + char* cres; - if (strchr(response, '\n')) + if ((cres=strchr(response, '\n'))!=NULL) goto have; repeat: ret = read(fd, response+off, sizeof(response)-off); @@ -426,16 +427,16 @@ repeat: }; off += ret; - if (!strchr(response, '\n')) + if (!(cres=strchr(response, '\n'))) goto repeat; have: SX_DEBUG(debug_expander>5, "got response of %.*s\n", off, response); if(response[0]=='A') { - char* eon, *c, *cres; + char* eon, *c; unsigned long togot=strtoul(response+1,&eon,10); - char* recvbuffer=malloc(togot); + char* recvbuffer=malloc(togot+2); int offset = 0; - memset(recvbuffer,0,togot); + memset(recvbuffer,0,togot+2); if(!eon || *eon!='\n') { sx_report(SX_ERROR,"A-code finished with wrong char '%c'(%s)\n", @@ -443,19 +444,29 @@ have: exit(1); }; - offset = off - ((eon+1) - response); - memcpy(recvbuffer, eon+1, off - ((eon+1) - response)); + if (off - ((eon+1) - response) > togot) { + /* full response and more data is already in buffer */ + memcpy(recvbuffer, eon+1, togot); + offset = togot; + memmove(response, eon+1+togot, off-((eon+1)-response)-togot); + off -= togot + ((eon+1)-response); + memset(response+off, 0, sizeof(response)-off); + } else { + /* response is not yet fully buffered */ + memcpy(recvbuffer, eon+1, off - ((eon+1)-response)); + offset = off - ((eon+1) - response); + memset(response, 0, sizeof(response)); + off = 0; + }; SX_DEBUG(debug_expander>5, "starting read with ready '%.*s', waiting for %lu\n", offset, recvbuffer, togot-offset); - if (togot < offset) { - memmove(response, recvbuffer+togot, offset-togot); - off = offset-togot; + + if (off > 0) goto have3; - } else if (togot == offset) { - goto have2; - }; + if (offset == togot) + goto reread2; reread: ret = read(fd, recvbuffer+offset, togot-offset); @@ -471,14 +482,10 @@ reread: "Read1: got '%.*s'\n", ret, recvbuffer+offset); offset+=ret; if(offset < togot) { - sx_report(SX_NOTICE, "expected %lu, got %lu\n", - togot, strlen(recvbuffer)); + SX_DEBUG(debug_expander>5, "expected %lu, got %lu expanding %s", + togot, strlen(recvbuffer), req->request); goto reread; }; - -have2: - off=0; - memset(response, 0, sizeof(response)); reread2: ret = read(fd, response+off, sizeof(response) - off); if (ret < 0) { @@ -509,11 +516,6 @@ have3: c+=spn+1; }; assert(c == recvbuffer+togot); - memmove(response, cres+1, off-((cres+1)-response)); - off -= (cres+1)-response; - memset(response+off, 0, sizeof(response) - off); - SX_DEBUG(debug_expander>5, - "fixed response of %i, %.*s\n", off, off, response); memset(recvbuffer,0,togot+2); free(recvbuffer); } else if(response[0]=='C') { @@ -534,6 +536,11 @@ have3: req->request); exit(1); }; + memmove(response, cres+1, off-((cres+1)-response)); + off -= (cres+1)-response; + memset(response+off, 0, sizeof(response) - off); + SX_DEBUG(debug_expander>5, + "fixed response of %i, %.*s\n", off, off, response); STAILQ_REMOVE_HEAD(&b->rq, next); b->piped--; @@ -573,19 +580,18 @@ repeat: sx_report(SX_FATAL, "EOF reading IRRd\n"); exit(1); }; + off += ret; - if (!strchr(response, '\n')) { - off += ret; + if (strchr(response, '\n') == NULL) goto repeat; - }; SX_DEBUG(debug_expander>2,"expander: initially got %lu bytes, '%s'\n", (unsigned long)strlen(response),response); if(response[0]=='A') { char* eon, *c; long togot=strtoul(response+1,&eon,10); - char recvbuffer[togot+1]; - int recvoff = 0; + char recvbuffer[togot+2]; + int offset = 0; if(eon && *eon!='\n') { sx_report(SX_ERROR,"A-code finised with wrong char '%c' (%s)\n", @@ -593,22 +599,38 @@ repeat: exit(1); }; + if (off - ((eon+1)-response) > togot) { + memcpy(recvbuffer, eon+1, togot); + offset = togot; + memmove(response, eon+1+togot, off - ((eon+1)-response) - togot); + off -= togot + ((eon+1)-response); + memset(response+off, 0, sizeof(response)-off); + } else { + memcpy(recvbuffer, eon+1, off - ((eon+1)-response)); + offset = off - ((eon+1) - response); + memset(response, 0, sizeof(response)); + off = 0; + }; + + if (off > 0) + goto have3; + if (offset == togot) + goto reread2; + reread: - ret = read(fd, recvbuffer+recvoff, togot+1-recvoff); + ret = read(fd, recvbuffer+offset, togot-offset); if (ret == 0) { sx_report(SX_FATAL,"EOF from IRRd (expand,result)\n"); } else if (ret < 0) { sx_report(SX_FATAL,"Error reading IRRd: %s (expand,result)\n", strerror(errno)); }; - recvoff += ret; - if (recvoff < togot+1) + offset += ret; + if (offset < togot) goto reread; - memset(response, 0, sizeof(response)); - off = 0; +reread2: ret = read(fd, response+off, sizeof(response)-off); - if (ret < 0) { sx_report(SX_FATAL, "error reading IRRd: %s\n", strerror(errno)); exit(1); @@ -616,9 +638,15 @@ reread: sx_report(SX_FATAL, "eof reading IRRd\n"); exit(1); }; + off += ret; +have3: + if (!strchr(response, '\n')) + goto reread2; - SX_DEBUG(debug_expander>2,"expander: final reply of %lu bytes, '%s'\n", - (unsigned long)strlen(recvbuffer),recvbuffer); + SX_DEBUG(debug_expander>2,"expander: final reply of %lu bytes, " + "%.*sreturn code %.*s", + (unsigned long)strlen(recvbuffer), offset, recvbuffer, off, + response); for(c=recvbuffer; cmacroses;mc;mc=mc->next) { if (!b->maxdepth) {