callbacks redesigned

This commit is contained in:
Alexandre Snarskii
2015-07-12 16:42:50 +03:00
parent 7ed2ea9174
commit aa043c2f7d

View File

@@ -26,25 +26,6 @@ int pipelining=1;
int expand_as23456=0;
int expand_special_asn=0;
struct limited_req {
int cdepth;
struct bgpq_expander* b;
int fd;
};
struct limited_req*
lreq_alloc(struct bgpq_expander* b, int cdepth, int fd)
{
struct limited_req* lr = malloc(sizeof(struct limited_req));
if (!lr)
return NULL;
memset(lr, 0, sizeof(struct limited_req));
lr->cdepth=cdepth;
lr->b=b;
lr->fd=fd;
return lr;
};
int
bgpq_expander_init(struct bgpq_expander* b, int af)
{
@@ -234,26 +215,26 @@ bgpq_expander_add_prefix_range(struct bgpq_expander* b, char* prefix)
};
int
bgpq_expanded_macro(char* as, void* udata, char* request)
bgpq_expanded_macro(char* as, struct bgpq_expander* ex,
struct bgpq_request* req)
{
struct bgpq_expander* ex=(struct bgpq_expander*)udata;
if(!ex) return 0;
bgpq_expander_add_as(ex,as);
return 1;
};
int bgpq_pipeline(struct bgpq_expander* b, int fd,
int (*callback)(char*, void*,char*), void* udata, char* fmt, ...);
int bgpq_expand_irrd(struct bgpq_expander* b, int fd,
int (*callback)(char*, void*,char*), void* udata, char* fmt, ...);
struct bgpq_request* bgpq_pipeline(struct bgpq_expander* b,
int (*callback)(char*, struct bgpq_expander* b, struct bgpq_request* req),
void* udata, char* fmt, ...);
int bgpq_expand_irrd(struct bgpq_expander* b,
int (*callback)(char*, struct bgpq_expander* b, struct bgpq_request* req),
void* udata, char* fmt, ...);
int
bgpq_expanded_macro_limit(char* as, void* udata, char* request)
bgpq_expanded_macro_limit(char* as, struct bgpq_expander* b,
struct bgpq_request* req)
{
struct limited_req* lr = (struct limited_req*)udata;
struct sx_slentry* already = lr->b->already;
if (!strncasecmp(as, "AS-", 3)) {
addasset:
struct sx_slentry* already = b->already;
if (!strncasecmp(as, "AS-", 3) || strchr(as, '-') || strchr(as, ':')) {
while(already) {
if (!strcasecmp(already->text, as)) {
SX_DEBUG(debug_expander>2,"%s is already expanding, ignore\n",
@@ -262,49 +243,41 @@ addasset:
};
already=already->next;
};
if(lr->cdepth + 1 < lr->b->maxdepth) {
struct limited_req* lr1 = lreq_alloc(lr->b,lr->cdepth+1,lr->fd);
bgpq_expander_add_already(lr->b,as);
if(b->cdepth + 1 < b->maxdepth && req->depth + 1 < b->maxdepth) {
bgpq_expander_add_already(b,as);
if (pipelining) {
bgpq_pipeline(lr->b,lr->fd,bgpq_expanded_macro_limit,lr1,
"!i%s\n",as);
struct bgpq_request* req1 = bgpq_pipeline(b,
bgpq_expanded_macro_limit, NULL, "!i%s\n", as);
req1->depth = req->depth+1;
} else {
bgpq_expand_irrd(lr->b,lr->fd,bgpq_expanded_macro_limit,lr1,
"!i%s\n",as);
b->cdepth++;
bgpq_expand_irrd(b, bgpq_expanded_macro_limit, NULL, "!i%s\n",
as);
b->cdepth--;
};
} else {
SX_DEBUG(debug_expander>2, "ignoring %s at depth %i\n", as,
lr->cdepth+1);
b->cdepth?(b->cdepth+1):(req->depth+1));
};
} else if(!strncasecmp(as, "AS", 2)) {
char* cc = strchr(as, ':');
if (cc) {
if(!strncasecmp(cc+1, "AS-", 3)) goto addasset;
SX_DEBUG(debug_expander,"Unexpected sub-as object '%s' in "
"response to %s\n", as, request);
if(bgpq_expander_add_as(b, as)) {
SX_DEBUG(debug_expander>2, ".. added asn %s\n", as);
} else {
if(bgpq_expander_add_as(lr->b, as)) {
SX_DEBUG(debug_expander>2, ".. added asn %s\n", as);
} else {
SX_DEBUG(debug_expander, ".. some error adding as %s (in "
"response to %s)\n", as, request);
};
SX_DEBUG(debug_expander, ".. some error adding as %s (in "
"response to %s)\n", as, req->request);
};
} else {
sx_report(SX_ERROR, "unexpected object '%s' in expanded_macro_limit "
"(in response to %s)\n", as, request);
"(in response to %s)\n", as, req->request);
};
return 1;
};
int
bgpq_expanded_prefix(char* as, void* udata, char* request)
bgpq_expanded_prefix(char* as, struct bgpq_expander* ex,
struct bgpq_request* req __attribute__((unused)))
{
struct bgpq_expander* ex=(struct bgpq_expander*)udata;
char* d = strchr(as, '^');
if(!ex) return 0;
if (!d)
bgpq_expander_add_prefix(ex,as);
else
@@ -313,10 +286,9 @@ bgpq_expanded_prefix(char* as, void* udata, char* request)
};
int
bgpq_expanded_v6prefix(char* prefix, void* udata, char* request)
bgpq_expanded_v6prefix(char* prefix, struct bgpq_expander* ex,
struct bgpq_request* req)
{
struct bgpq_expander* ex=(struct bgpq_expander*)udata;
if(!ex) return 0;
bgpq_expander_add_prefix(ex,prefix);
return 1;
};
@@ -324,8 +296,8 @@ bgpq_expanded_v6prefix(char* prefix, void* udata, char* request)
int bgpq_pipeline_dequeue(int fd, struct bgpq_expander* b);
static struct bgpq_request*
bgpq_request_alloc(char* request, int (*callback)(char*, void*, char*),
void* udata)
bgpq_request_alloc(char* request, int (*callback)(char*, struct bgpq_expander*,
struct bgpq_request*), void* udata)
{
struct bgpq_request* bp = malloc(sizeof(struct bgpq_request));
if (!bp)
@@ -339,9 +311,17 @@ bgpq_request_alloc(char* request, int (*callback)(char*, void*, char*),
return bp;
};
int
bgpq_pipeline(struct bgpq_expander* b, int fd,
int (*callback)(char*, void*,char*), void* udata, char* fmt, ...)
static void
bgpq_request_free(struct bgpq_request* req)
{
if (req->request) free(req->request);
free(req);
};
struct bgpq_request*
bgpq_pipeline(struct bgpq_expander* b,
int (*callback)(char*, struct bgpq_expander*, struct bgpq_request*),
void* udata, char* fmt, ...)
{
char request[128];
int ret, rlen;
@@ -362,27 +342,27 @@ bgpq_pipeline(struct bgpq_expander* b, int fd,
exit(1);
};
if (STAILQ_EMPTY(&b->wq)) {
ret=write(fd, request, bp->size);
ret=write(b->fd, request, bp->size);
if (ret < 0)
sx_report(SX_FATAL, "Error writing request: %s\n", strerror(errno));
bp->offset=ret;
if (ret == bp->size) {
STAILQ_INSERT_TAIL(&b->rq, bp, next);
} else {
bp->offset=ret;
STAILQ_INSERT_TAIL(&b->wq, bp, next);
};
} else
STAILQ_INSERT_TAIL(&b->wq, bp, next);
return 0;
return bp;
};
static void
bgpq_write(struct bgpq_expander* b, int fd)
bgpq_write(struct bgpq_expander* b)
{
while(!STAILQ_EMPTY(&b->wq)) {
struct bgpq_request* req = STAILQ_FIRST(&b->wq);
int ret = write(fd, req->request+req->offset, req->size-req->offset);
int ret = write(b->fd, req->request+req->offset, req->size-req->offset);
if (ret < 0)
sx_report(SX_FATAL, "error writing data: %s\n", strerror(errno));
@@ -398,7 +378,7 @@ bgpq_write(struct bgpq_expander* b, int fd)
};
static int
bgpq_selread(struct bgpq_expander* b, int fd, char* buffer, int size)
bgpq_selread(struct bgpq_expander* b, char* buffer, int size)
{
fd_set rfd, wfd;
int ret;
@@ -406,12 +386,12 @@ bgpq_selread(struct bgpq_expander* b, int fd, char* buffer, int size)
repeat:
FD_ZERO(&rfd);
FD_SET(fd, &rfd);
FD_SET(b->fd, &rfd);
FD_ZERO(&wfd);
if (!STAILQ_EMPTY(&b->wq))
FD_SET(fd, &wfd);
FD_SET(b->fd, &wfd);
ret = select(fd+1, &rfd, &wfd, NULL, &timeout);
ret = select(b->fd+1, &rfd, &wfd, NULL, &timeout);
if (ret == 0)
sx_report(SX_FATAL, "select timeout\n");
else if (ret == -1 && errno == EINTR)
@@ -419,22 +399,22 @@ repeat:
else if (ret == -1)
sx_report(SX_FATAL, "select error %i: %s\n", errno, strerror(errno));
if (!STAILQ_EMPTY(&b->wq) && FD_ISSET(fd, &wfd))
bgpq_write(b, fd);
if (!STAILQ_EMPTY(&b->wq) && FD_ISSET(b->fd, &wfd))
bgpq_write(b);
if (FD_ISSET(fd, &rfd))
return read(fd, buffer, size);
if (FD_ISSET(b->fd, &rfd))
return read(b->fd, buffer, size);
goto repeat;
};
int
bgpq_read(struct bgpq_expander* b, int fd)
bgpq_read(struct bgpq_expander* b)
{
static char response[256];
static int off = 0;
if (!STAILQ_EMPTY(&b->wq))
bgpq_write(b, fd);
bgpq_write(b);
while(!STAILQ_EMPTY(&b->rq)) {
struct bgpq_request* req = STAILQ_FIRST(&b->rq);
@@ -446,7 +426,7 @@ bgpq_read(struct bgpq_expander* b, int fd)
if ((cres=strchr(response, '\n'))!=NULL)
goto have;
repeat:
ret = bgpq_selread(b, fd, response+off, sizeof(response)-off);
ret = bgpq_selread(b, response+off, sizeof(response)-off);
if (ret < 0) {
if (errno == EAGAIN)
goto repeat;
@@ -499,7 +479,7 @@ have:
goto reread2;
reread:
ret = bgpq_selread(b, fd, recvbuffer+offset, togot-offset);
ret = bgpq_selread(b, recvbuffer+offset, togot-offset);
if (ret < 0) {
if (errno == EAGAIN)
goto reread;
@@ -517,7 +497,7 @@ reread:
goto reread;
};
reread2:
ret = bgpq_selread(b, fd, response+off, sizeof(response) - off);
ret = bgpq_selread(b, response+off, sizeof(response) - off);
if (ret < 0) {
if (errno == EAGAIN)
goto reread2;
@@ -542,7 +522,7 @@ have3:
size_t spn=strcspn(c," \n");
if(spn) c[spn]=0;
if(c[0]==0) break;
req->callback(c, req->udata, req->request);
req->callback(c, b, req);
c+=spn+1;
};
assert(c == recvbuffer+togot);
@@ -574,26 +554,30 @@ have3:
STAILQ_REMOVE_HEAD(&b->rq, next);
b->piped--;
/* XXXXXX - free(req) */
bgpq_request_free(req);
};
return 0;
};
int
bgpq_expand_irrd(struct bgpq_expander* b, int fd,
int (*callback)(char*, void*,char*), void* udata, char* fmt, ...)
bgpq_expand_irrd(struct bgpq_expander* b,
int (*callback)(char*, struct bgpq_expander*, struct bgpq_request* ),
void* udata, char* fmt, ...)
{
char request[128], response[128];
va_list ap;
int ret, off = 0;
struct bgpq_request *req;
va_start(ap,fmt);
vsnprintf(request,sizeof(request),fmt,ap);
va_end(ap);
req = bgpq_request_alloc(request, callback, udata);
SX_DEBUG(debug_expander,"expander: sending '%s'\n", request);
ret=write(fd, request, strlen(request));
ret=write(b->fd, request, strlen(request));
if(ret!=strlen(request)) {
sx_report(SX_FATAL,"Partial write to IRRd, only %i bytes written: %s\n",
ret, strerror(errno));
@@ -602,7 +586,7 @@ bgpq_expand_irrd(struct bgpq_expander* b, int fd,
memset(response,0,sizeof(response));
repeat:
ret = bgpq_selread(b, fd, response+off, sizeof(response)-off);
ret = bgpq_selread(b, response+off, sizeof(response)-off);
if (ret < 0) {
sx_report(SX_ERROR, "Error reading IRRd: %s\n", strerror(errno));
exit(1);
@@ -648,7 +632,7 @@ repeat:
goto reread2;
reread:
ret = bgpq_selread(b, fd, recvbuffer+offset, togot-offset);
ret = bgpq_selread(b, recvbuffer+offset, togot-offset);
if (ret == 0) {
sx_report(SX_FATAL,"EOF from IRRd (expand,result)\n");
} else if (ret < 0) {
@@ -660,7 +644,7 @@ reread:
goto reread;
reread2:
ret = bgpq_selread(b, fd, response+off, sizeof(response)-off);
ret = bgpq_selread(b, response+off, sizeof(response)-off);
if (ret < 0) {
sx_report(SX_FATAL, "error reading IRRd: %s\n", strerror(errno));
exit(1);
@@ -682,7 +666,7 @@ have3:
size_t spn=strcspn(c," \n");
if(spn) c[spn]=0;
if(c[0]==0) break;
if(callback) callback(c,udata,request);
if(callback) callback(c, b, req);
c+=spn+1;
};
memset(recvbuffer, 0, togot+2);
@@ -698,6 +682,7 @@ have3:
sx_report(SX_ERROR,"Wrong reply: %s\n", response);
exit(0);
};
bgpq_request_free(req);
return 0;
};
@@ -753,6 +738,8 @@ bgpq_expand(struct bgpq_expander* b)
exit(1);
};
b->fd = fd;
if((ret=write(fd, "!!\n", 3))!=3) {
sx_report(SX_ERROR,"Partial write to IRRd: %i bytes, %s\n",
ret, strerror(errno));
@@ -786,20 +773,14 @@ bgpq_expand(struct bgpq_expander* b)
for(mc=b->macroses;mc;mc=mc->next) {
if (!b->maxdepth) {
bgpq_expand_irrd(b, fd,bgpq_expanded_macro,b,"!i%s,1\n",mc->text);
bgpq_expand_irrd(b, bgpq_expanded_macro, b, "!i%s,1\n", mc->text);
} else {
struct limited_req* lr = lreq_alloc(b, 0, fd);
bgpq_expander_add_already(b,mc->text);
if (!lr) {
sx_report(SX_FATAL, "Unable to allocate memory: %s\n",
strerror(errno));
exit(1);
};
if (pipelining) {
bgpq_pipeline(b,fd,bgpq_expanded_macro_limit,lr,"!i%s\n",
bgpq_pipeline(b, bgpq_expanded_macro_limit, NULL, "!i%s\n",
mc->text);
} else {
bgpq_expand_irrd(b, fd,bgpq_expanded_macro_limit,lr,"!i%s\n",
bgpq_expand_irrd(b, bgpq_expanded_macro_limit, NULL, "!i%s\n",
mc->text);
};
};
@@ -807,19 +788,19 @@ bgpq_expand(struct bgpq_expander* b)
if(pipelining) {
if(!STAILQ_EMPTY(&b->wq))
bgpq_write(b, fd);
bgpq_write(b);
if (!STAILQ_EMPTY(&b->rq))
bgpq_read(b, fd);
bgpq_read(b);
};
if(b->generation>=T_PREFIXLIST) {
unsigned i, j, k;
for(mc=b->rsets;mc;mc=mc->next) {
if(b->family==AF_INET) {
bgpq_expand_irrd(b,fd,bgpq_expanded_prefix,b,"!i%s,1\n",
bgpq_expand_irrd(b, bgpq_expanded_prefix, NULL, "!i%s,1\n",
mc->text);
} else {
bgpq_expand_irrd(b,fd,bgpq_expanded_v6prefix,b,"!i%s,1\n",
bgpq_expand_irrd(b, bgpq_expanded_v6prefix, NULL, "!i%s,1\n",
mc->text);
};
};
@@ -831,36 +812,34 @@ bgpq_expand(struct bgpq_expander* b)
if(b->family==AF_INET6) {
if(!pipelining) {
if(k>0)
bgpq_expand_irrd(b,fd,
bgpq_expanded_v6prefix,b,
"!6as%u.%u\r\n", k, i*8+j);
bgpq_expand_irrd(b, bgpq_expanded_v6prefix,
NULL, "!6as%u.%u\r\n", k, i*8+j);
else
bgpq_expand_irrd(b,fd,
bgpq_expanded_v6prefix,b,"!6as%u\r\n",
i*8+j);
bgpq_expand_irrd(b, bgpq_expanded_v6prefix,
NULL,"!6as%u\r\n", i*8+j);
} else {
if(k>0)
bgpq_pipeline(b,fd,bgpq_expanded_v6prefix,b,
"!6as%u.%u\r\n", k, i*8+j);
bgpq_pipeline(b, bgpq_expanded_v6prefix,
NULL, "!6as%u.%u\r\n", k, i*8+j);
else
bgpq_pipeline(b,fd,bgpq_expanded_v6prefix,b,
"!6as%u\r\n", i*8+j);
bgpq_pipeline(b,bgpq_expanded_v6prefix,
NULL, "!6as%u\r\n", i*8+j);
};
} else {
if(!pipelining) {
if(k>0)
bgpq_expand_irrd(b,fd,bgpq_expanded_prefix,
b,"!gas%u.%u\n", k, i*8+j);
bgpq_expand_irrd(b, bgpq_expanded_prefix,
NULL,"!gas%u.%u\n", k, i*8+j);
else
bgpq_expand_irrd(b,fd,bgpq_expanded_prefix,
b,"!gas%u\n", i*8+j);
bgpq_expand_irrd(b, bgpq_expanded_prefix,
NULL,"!gas%u\n", i*8+j);
} else {
if(k>0)
bgpq_pipeline(b,fd,bgpq_expanded_prefix,b,
"!gas%u.%u\n", k, i*8+j);
bgpq_pipeline(b, bgpq_expanded_prefix,
NULL, "!gas%u.%u\n", k, i*8+j);
else
bgpq_pipeline(b,fd,bgpq_expanded_prefix,b,
"!gas%u\n", i*8+j);
bgpq_pipeline(b, bgpq_expanded_prefix,
NULL, "!gas%u\n", i*8+j);
};
};
};
@@ -869,9 +848,9 @@ bgpq_expand(struct bgpq_expander* b)
};
if(pipelining) {
if(!STAILQ_EMPTY(&b->wq))
bgpq_write(b, fd);
bgpq_write(b);
if (!STAILQ_EMPTY(&b->rq))
bgpq_read(b, fd);
bgpq_read(b);
};
};