From 9eb665209b414ca215d4c93cb0e30f92eaeef524 Mon Sep 17 00:00:00 2001 From: Alexandre Snarskii Date: Tue, 23 Jun 2015 14:56:05 +0300 Subject: [PATCH] pipelining now counts buffer size. --- bgpq3.h | 3 +++ bgpq_expander.c | 46 +++++++++++++++++++++++++++++++++++++--------- sx_maxsockbuf.c | 2 +- 3 files changed, 41 insertions(+), 10 deletions(-) diff --git a/bgpq3.h b/bgpq3.h index b7aca0f..596ae8a 100644 --- a/bgpq3.h +++ b/bgpq3.h @@ -23,6 +23,7 @@ typedef enum { struct bgpq_prequest { struct bgpq_prequest* next; char request[128]; + int size; int (*callback)(char*, void*); void *udata; }; @@ -46,6 +47,8 @@ struct bgpq_expander { char* match; char* server; unsigned maxlen; + int socksize; + int qsize; }; diff --git a/bgpq_expander.c b/bgpq_expander.c index 28e0984..f6f09fc 100644 --- a/bgpq_expander.c +++ b/bgpq_expander.c @@ -31,7 +31,7 @@ bgpq_expander_init(struct bgpq_expander* b, int af) if(!b) return 0; memset(b,0,sizeof(struct bgpq_expander)); - + b->tree=sx_radix_tree_new(af); if(!b->tree) goto fixups; @@ -258,9 +258,11 @@ bgpq_pipeline_dequeue_ripe(FILE* f, struct bgpq_expander* b) /* end of object */ struct bgpq_prequest* p=b->firstpipe; b->firstpipe=b->firstpipe->next; + b->qsize-=p->size; free(p); b->piped--; if(!b->piped) { + b->lastpipe=NULL; return 0; }; }; @@ -362,12 +364,14 @@ bgpq_expand_ripe(FILE* f, int (*callback)(char*, void*), void* udata, return 0; }; +int bgpq_pipeline_dequeue(FILE* f, struct bgpq_expander* b); + int bgpq_pipeline(FILE* f, int (*callback)(char*, void*), void* udata, char* fmt, ...) { char request[128]; - int ret; + int ret, rlen; struct bgpq_prequest* bp=NULL; struct bgpq_expander* d=(struct bgpq_expander*)udata; va_list ap; @@ -375,7 +379,19 @@ bgpq_pipeline(FILE* f, int (*callback)(char*, void*), void* udata, vsnprintf(request,sizeof(request),fmt,ap); va_end(ap); - SX_DEBUG(debug_expander,"expander: sending '%s'\n", request); + rlen=strlen(request); + if(rlen+d->qsize >= d->socksize) { + SX_DEBUG(debug_expander, "looks like socket buffer shortage, " + "queued %i of %i, dequeueing\n", d->qsize, d->socksize); + if (d->family==AF_INET6) { + bgpq_pipeline_dequeue_ripe(f, d); + } else { + bgpq_pipeline_dequeue(f, d); + }; + }; + + SX_DEBUG(debug_expander,"expander: sending '%s' (queued %i of %i)\n", + request, d->qsize, d->socksize); bp=malloc(sizeof(struct bgpq_prequest)); if(!bp) { @@ -387,7 +403,7 @@ bgpq_pipeline(FILE* f, int (*callback)(char*, void*), void* udata, ret=fwrite(request,1,strlen(request),f); - if(ret!=strlen(request)) { + if(ret!=rlen) { sx_report(SX_FATAL,"Partial write to radb, only %i bytes written: %s\n", ret,strerror(errno)); exit(1); @@ -396,6 +412,8 @@ bgpq_pipeline(FILE* f, int (*callback)(char*, void*), void* udata, strlcpy(bp->request,request,sizeof(bp->request)); bp->callback=callback; bp->udata=udata; + bp->size=rlen; + d->qsize+=rlen; if(d->lastpipe) { d->lastpipe->next=bp; @@ -484,9 +502,10 @@ bgpq_pipeline_dequeue(FILE* f, struct bgpq_expander* b) pipe=b->firstpipe; b->firstpipe=b->firstpipe->next; b->piped--; + b->qsize-=pipe->size; free(pipe); - }; + b->lastpipe=NULL; return 0; }; @@ -544,7 +563,7 @@ bgpq_expand_radb(FILE* f, int (*callback)(char*, void*), void* udata, }; SX_DEBUG(debug_expander>2,"expander: final reply of %lu bytes, '%s'\n", (unsigned long)strlen(recvbuffer),recvbuffer); - + for(c=recvbuffer; c0) { + SX_DEBUG(debug_expander, "Acquired sendbuf of %i bytes\n", err); + b->socksize=err; + } else { + shutdown(fd, SHUT_RDWR); + close(fd); + fd=-1; + continue; + }; f=fdopen(fd,"a+"); if(!f) { shutdown(fd,SHUT_RDWR); @@ -628,7 +656,7 @@ bgpq_expand(struct bgpq_expander* b) " error: %s\n", b->server, strerror(errno)); exit(1); }; - + if((ret=fwrite("!!\n",1,3,f))!=3) { sx_report(SX_ERROR,"Partial fwrite to radb: %i bytes, %s\n", ret, strerror(errno)); @@ -719,7 +747,7 @@ bgpq_expand(struct bgpq_expander* b) }; }; }; - + fwrite("!q\n",1,3,f); fclose(f); return 1; diff --git a/sx_maxsockbuf.c b/sx_maxsockbuf.c index 3cccb44..d9f4f75 100644 --- a/sx_maxsockbuf.c +++ b/sx_maxsockbuf.c @@ -85,5 +85,5 @@ sx_maxsockbuf(int s, int dir) voptval, iterations); */ }; - return 0; + return voptval; };