Logo Search packages:      
Sourcecode: netams version File versions  Download package

s_monitor.c

/*************************************************************************
***   Authentication, authorization, accounting + firewalling package
***   Copyright 1998-2002 Anton Vinokurov <anton@netams.com>
***   Copyright 2002-2008 NeTAMS Development Team
***   This code is GPL v3
***   For latest version and more info, visit this project web page
***   located at http://www.netams.com
***
*************************************************************************/
/* $Id: s_monitor.c,v 1.106 2009-08-01 09:23:55 anton Exp $ */

#include "netams.h"
#include "netflow.h"

static int initialized=0;

#define MON_TYPES_NUM   5

#define MONITOR_DELAY   60      //delay between load into sql

#define MON_NONE   0
#define MON_FILE   1
#define MON_STORAGE      2
#define MON_NETFLOW      3
#define MON_FILE_XML 4

const char  *xml_start_str = "<?xml version=\"1.0\" encoding=\"utf-8\"?>";
const char  *xml_root_start_str = "<netams>";
const char  *xml_root_close_str = "</netams>";

/////////////////////////////////////////////////////////////////////////
typedef struct Monitor {
      unsigned long long total;     //total items
      unsigned long commit_num;     //number of commits

      //MON_STORAGE and MON_FILE
      u_char storage_no;
      char *filename;
      union {
            FILE *fd;
            //MON_NETFLOW
            struct sockaddr_in *dest;
      }; 
      Monitor *next;
} Monitor;
/////////////////////////////////////////////////////////////////////////
int   cShowMonitor      (struct cli_def *cli, const char *cmd, char **argv, int argc);
int   cRotateMonitor    (struct cli_def *cli, const char *cmd, char **argv, int argc);
void  xmlopen(Monitor *);
void  xmlclose(Monitor *);

/////////////////////////////////////////////////////////////////////////
//defined commands
static const  struct CMD_DB cmd_db[] = {
{ 2, 0, 0, "show",      PRIVILEGE_UNPRIVILEGED, MODE_EXEC, NULL,                "shows various system parameters" },
{ 0, 2, 0, "monitor",   PRIVILEGE_UNPRIVILEGED, MODE_EXEC, cShowMonitor,      "monitors statistics" },
{ 4, 0, 0, "rotate",    PRIVILEGE_UNPRIVILEGED, MODE_EXEC, NULL,        "rotate files and logs" },
{ 0, 4, 0, "monitor",   PRIVILEGE_UNPRIVILEGED, MODE_EXEC, cRotateMonitor,          NULL },
{ 9, 0, 1, "monitor",   PRIVILEGE_UNPRIVILEGED, MODE_MONITOR, NULL,           "look and store specified unit activity" },
{ 0, 9, 9, "unit",      PRIVILEGE_UNPRIVILEGED, MODE_MONITOR, cServiceProcessCfg,   NULL },
{ 10, 9, 9, "to", PRIVILEGE_UNPRIVILEGED, MODE_MONITOR, NULL,                 NULL },
{ 0, 10, 10, "file",    PRIVILEGE_UNPRIVILEGED, MODE_MONITOR, cServiceProcessCfg,   NULL },
{ 0, 10, 10, "xmlfile", PRIVILEGE_UNPRIVILEGED, MODE_MONITOR, cServiceProcessCfg,   NULL },
{ 0, 10, 10, "storage", PRIVILEGE_UNPRIVILEGED, MODE_MONITOR, cServiceProcessCfg,   NULL },
{ 0, 10, 10, "netflow", PRIVILEGE_UNPRIVILEGED, MODE_MONITOR, cServiceProcessCfg,   NULL },
{ 0, 0, 0, "start",     PRIVILEGE_UNPRIVILEGED, MODE_MONITOR, cServiceStart,            NULL },
{ 0, 0, 0, "stop",      PRIVILEGE_UNPRIVILEGED, MODE_MONITOR, cServiceStop,             NULL },
{ -1, 0, 0, NULL,  0, 0, NULL, NULL }
};
/////////////////////////////////////////////////////////////////////////
class Service_Monitor: public Service_Monitor_Interface {
      public:
            unsigned long long total;       //total items
            unsigned long storage_time;                     //last time saving data
            unsigned long netflow_time;
            char *buf;

            //MON_NETFLOW
            int sock;
            IPStat5Msg *message;

            Monitor *monitors[MON_TYPES_NUM];
            pthread_mutex_t lock; //to protect from possible race between different data-sources

            Service_Monitor();
            ~Service_Monitor();

            void ShowCfg(struct cli_def *cli, u_char flags);
            int ProcessCfg(struct cli_def *cli, char **argv, int argc, u_char no_flag);
            void Cancel();
                        
            u_char SendNetflow(Monitor *m);

            //interface calls
            int aMonitor(NetUnit *u, Flow *flow);
};
//////////////////////////////////////////////////////////////////////////////////////////
Service* InitMonitorService() {
        if(!initialized) {
                InitCliCommands(cmd_db);
                initialized = 1;
        }
        return (Service*)new Service_Monitor();
}
//////////////////////////////////////////////////////////////////////////////////////////
Service_Monitor::Service_Monitor():Service_Monitor_Interface() {
      //prepare buffer
      //
      buf=(char*)aMalloc(1024);
      
      total=0;
      message=NULL;
      storage_time=netflow_time=0;
      
      bzero(monitors, MON_TYPES_NUM*sizeof(void*));
      
      netams_mutex_init(&lock, NULL);
}

Service_Monitor::~Service_Monitor() {
      Monitor *m, *tmp;

      //sweep service from units
      netams_rwlock_rdlock(&Units->rwlock);
      for(NetUnit *u=(NetUnit*)Units->root; u!=NULL; u=(NetUnit*)u->next)     
            if(u->monitors) ELIST_REMOVE(u->monitors, this);
      netams_rwlock_unlock(&Units->rwlock);
      
      //clear MON_FILE
      for(m=monitors[MON_FILE];m!=NULL;m=tmp) {
            tmp=m->next;
            aFree(m->filename);
            aFree(m);
      }
      
      //clear MON_FILE_XML
      for(m=monitors[MON_FILE_XML];m!=NULL;m=tmp) {
            tmp=m->next;
            aFree(m->filename);
            aFree(m);
      }
      
      //clear MON_STORAGE
      for(m=monitors[MON_STORAGE];m!=NULL;m=tmp) {
            tmp=m->next;
            if(m->filename) aFree(m->filename);
            aFree(m);
      }

      //clear MON_NETFLOW
      for(m=monitors[MON_NETFLOW];m!=NULL;m=tmp) {
            tmp=m->next;
            if(m->dest) aFree(m->dest);
            aFree(m);
      }
      if(message) {
            aFree(message);
      }

      if(buf) aFree(buf);
      netams_mutex_destroy(&lock);
}
//////////////////////////////////////////////////////////////////////////////////////////
int Service_Monitor::ProcessCfg(struct cli_def *cli, char **argv, int argc, u_char no_flag){
      if(argc<3) return CLI_OK;

      if (STREQ(argv[1], "to")) {
            Monitor *m, *p=NULL;
            if (STRARG(argv[2], "storage")) {
                  Service *st;
                  u_short storage_no;

                  storage_no=strtol(argv[3], NULL, 10);
                  st = Services->getService(SERVICE_STORAGE, storage_no);
                  if(!st) 
                        cli_error(cli, "no such service storage:%u", storage_no);
                  else if(!((Service_Storage_Interface*)st)->isAccepted(ST_CONN_MONITOR)) 
                        cli_error(cli, "this service storage %u does not accept monitoring data",storage_no);
                  
                  for(m=monitors[MON_STORAGE]; m!=NULL; m=m->next) {
                        if(m->storage_no==storage_no) break;
                        p=m;
                  }

                  if (no_flag) {
                        if(!m) return CLI_OK;
                        if(st) ((Service_Storage_Interface*)st)->SaveFile(m->filename,ST_CONN_MONITOR);
                        cli_error(cli, "switch off monitoring to storage %u", storage_no);
                        //remove from list
                        if(monitors[MON_STORAGE]==m) 
                              monitors[MON_STORAGE]=m->next;
                        else 
                              p->next=m->next;
                        if(m->filename) aFree(m->filename);
                        if(m->fd) fclose(m->fd);
                        aFree(m);
                  } else {
                        if(m) return CLI_OK;
                        m=(Monitor*)aMalloc(sizeof(Monitor));
                        m->storage_no=storage_no;
                        print_to_string(&m->filename,"monitor.%u.%u",instance,storage_no);
                        m->fd=fopen(m->filename,"at");
                        if(!m->fd) {
                              cli_error(cli, "Can't open temporary file %s : %s",
                                    m->filename,strerror(errno));
                        }
                        //put in list
                        if(monitors[MON_STORAGE]==NULL)
                              monitors[MON_STORAGE]=m;
                        else 
                              p->next=m;
                        cli_error(cli, "monitoring to storage %u (%p)", m->storage_no, st);
                  }     
            } else if (STRARG(argv[2], "file")) {
                  char *filename=argv[3];
                                    
                  for(m=monitors[MON_FILE]; m!=NULL; m=m->next) {
                        if(STREQ(m->filename, filename)) break;
                        p=m;
                  }

            if (no_flag) {
                        if(!m) return CLI_OK;
                        cli_error(cli, "switch off monitoring to file %s", m->filename);
                        //remove from list
                        if(monitors[MON_FILE]==m)
                              monitors[MON_FILE]=m->next;
                        else
                              p->next=m->next;

                        if(m->fd) fclose(m->fd);
                        aFree(m->filename);
                        aFree(m);
            } else {
                        if(m) return CLI_OK;
                        FILE *f=fopen(filename, "at");
                        if(!f) {
                              cli_error(cli, "unable to open monitor output file %s", filename);
                              return CLI_OK;
                        }
                        m=(Monitor*)aMalloc(sizeof(Monitor));
                        m->filename=set_string(filename);
                              m->fd=f;
                        setlinebuf(m->fd);
                        //put in list
                  if(monitors[MON_FILE]==NULL)
                              monitors[MON_FILE]=m;
                        else
                              p->next=m;
                        cli_error(cli, "monitoring to file %s", m->filename);
             }
        } else if (STRARG(argv[2], "xmlfile")) {
                  char *filename=argv[3];
                                    
                  for(m=monitors[MON_FILE_XML]; m!=NULL; m=m->next) {
                        if(STREQ(m->filename, filename)) break;
                        p=m;
                  }

            if (no_flag) {
                        if(!m) return CLI_OK;
                        cli_error(cli, "switch off monitoring to xmlfile %s", m->filename);
                        //remove from list
                        if(monitors[MON_FILE_XML]==m)
                              monitors[MON_FILE_XML]=m->next;
                        else
                              p->next=m->next;

                        if(m->fd) {
                            xmlclose(m);
                            fclose(m->fd);
                        }
                        aFree(m->filename);
                        aFree(m);
            } else {
                        if(m) return CLI_OK;
                        FILE *f=fopen(filename, "at");
                        if(!f) {
                              cli_error(cli, "unable to open monitor output xmlfile %s", filename);
                              return CLI_OK;
                        }
                        fclose(f);
                        f=fopen(filename, "r+t");
                        if(!f) {
                              cli_error(cli, "unable to open monitor output xmlfile %s", filename);
                              return CLI_OK;
                        }
                        fseek(f, 0, SEEK_END);
                        m=(Monitor*)aMalloc(sizeof(Monitor));
                        m->filename=set_string(filename);
                        m->fd=f;
                        setlinebuf(m->fd);
                        xmlopen(m);
                        //put in list
                        if(monitors[MON_FILE_XML]==NULL)
                            monitors[MON_FILE_XML]=m;
                        else
                            p->next=m;
                        cli_error(cli, "monitoring to xmlfile %s", m->filename);
             }
            } else if(STRARG(argv[2], "netflow")) {
                  in_addr addr;
                  u_short port;

                  inet_aton(argv[3], &addr);
                  port=htons(strtol(argv[4], NULL, 10));
                  if(port==0) {
                        cli_error(cli, "Wrong port to send netflow");
                        return CLI_OK;
                  }
                  
                  for(m=monitors[MON_NETFLOW]; m!=NULL; m=m->next) {
                        if(m->dest->sin_addr.s_addr == addr.s_addr
                              && m->dest->sin_port == port) break;
                        p=m;
                  }

                  
                  if (no_flag) {
                        if(!m) return CLI_OK;
                        SendNetflow(m);
                        cli_error(cli, "switch off monitoring to netflow");
                        //remove from list
                        if(monitors[MON_NETFLOW]==m)
                              monitors[MON_NETFLOW]=m->next;
                        else
                              p->next=m->next;
                        aFree(m->dest);
                        aFree(m);
                        //remove supply
                        if(monitors[MON_NETFLOW]==NULL) {
                              aFree(message);
                              message=NULL;
                              shutdown(sock, SHUT_RDWR);
                              close(sock);
                        }
                  } else {
                        if(m) return CLI_OK;
                        //prepare suuply
                        if(!message) {
                              message=(IPStat5Msg*)aMalloc(sizeof(IPStat5Msg));
                              message->header.version=htons(FLOW_VERSION_5);
                              message->header.count=0;
                              message->header.unix_secs=time(NULL);; //see PR 65
                              message->header.sysUpTime=0; //see PR 65
                              message->header.engine_id=0;
                              message->header.engine_type=htons(31);
                              message->header.flow_sequence=htonl(total);

                              sock=socket(AF_INET, SOCK_DGRAM, 0);
                              int status=bigsockbuf(sock, SO_SNDBUF, FT_SO_RCV_BUFSIZE);
                              if (status<0) {
                                    aLog(D_WARN, "failed to setsockopt send buffer: %s", strerror(errno));
                              } else {
                                    aDebug(DEBUG_MONITOR, "send bufer set to %u\n", status);
                              }
                        }
                        m=(Monitor*)aMalloc(sizeof(Monitor));
                        m->dest=(struct sockaddr_in*)aMalloc(sizeof(struct sockaddr_in));
                        m->dest->sin_family = AF_INET;
                        m->dest->sin_addr.s_addr = addr.s_addr;
                        m->dest->sin_port = port;
                        //put in list
                        if(monitors[MON_NETFLOW]==NULL)
                              monitors[MON_NETFLOW]=m;
                        else
                              p->next=m;
                        cli_error(cli, "monitoring to netflow %s:%u", argv[3], ntohs(port));
                  }
            }
      } else if (STRARG(argv[1], "unit")) {
            NetUnit *u;
            u_char i=2;

            u = aParseUnit(argv, &i);
        if (u) { 
            if (!no_flag) {
                  ELIST_ADD(u->monitors, this); 
                  cli_error(cli, "monitoring unit %s (%06X)", u->name?u->name:"<\?\?>", u->id);
        } else {
                  ELIST_REMOVE(u->monitors, this);
                  cli_error(cli, "not monitoring unit %s (%06X)", u->name?u->name:"<\?\?>", u->id);
        }
     } else
      return CLI_ERROR;
      }
      return CLI_OK;
}     
//////////////////////////////////////////////////////////////////////////////////////////
int cShowMonitor(struct cli_def *cli, const char *cmd, char **argv, int argc){
      Monitor *m;
      Service *s=NULL;

      while((s=Services->getServiceNextByType(SERVICE_MONITOR,s))) {
            Service_Monitor *cfg=(Service_Monitor*)s;

            cli_print(cli, "service monitor %u",s->instance);
            for(m=cfg->monitors[MON_FILE]; m!=NULL; m=m->next) {
                  cli_print(cli, "monitor to file %s", m->filename);
                  cli_print(cli, "\t%llu flows", m->total);
            }
            for(m=cfg->monitors[MON_FILE_XML]; m!=NULL; m=m->next) {
                  cli_print(cli, "monitor to xmlfile %s", m->filename);
                  cli_print(cli, "\t%llu flows", m->total);
            }
            for(m=cfg->monitors[MON_STORAGE]; m!=NULL; m=m->next) {
                  cli_print(cli, "monitor to storage %u", m->storage_no);
                  cli_print(cli, "\t%llu flows in %lu commits", m->total, m->commit_num);
            }
            for(m=cfg->monitors[MON_NETFLOW]; m!=NULL; m=m->next) {
                  char buf[32];
                  inet_ntop(AF_INET, &(m->dest->sin_addr), buf, 32);
                  cli_print(cli, "monitor to netflow %s %u", buf, ntohs(m->dest->sin_port));
                  cli_print(cli, "\t%llu flows in %lu packets", m->total, m->commit_num);
            }

            if (!(cfg->monitors[MON_FILE] || cfg->monitors[MON_STORAGE]
                    || cfg->monitors[MON_NETFLOW] || cfg->monitors[MON_FILE_XML])) { 
                  cli_print(cli, "Monitoring is off");
                  continue;
            }
                  
            cli_bufprint(cli, "Units: ");
            netams_rwlock_rdlock(&Units->rwlock);
            for(NetUnit *u=(NetUnit*)Units->root; u!=NULL; u=(NetUnit*)u->next) {
                  if(u->monitors && ELIST_SEARCH(u->monitors, s))
                        cli_bufprint(cli, "%s(%06X)", u->name?u->name:"", u->id);
            }
            netams_rwlock_unlock(&Units->rwlock);
            cli_bufprint(cli, "\n");
        cli_print(cli, "Packets monitored: %llu",cfg->total);
      }
      return CLI_OK;
}

int cRotateMonitor(struct cli_def *cli, const char *cmd, char **argv, int argc) {
      char *arg;

      if ((arg = STRARG(argv[1], "monitor"))) {
            u_char instance=strtol(arg, NULL, 10);
            Service *s=Services->getService(SERVICE_MONITOR, instance);
            if(!s) {
                  cli_error(cli, "Service monitor:%u not exist.",instance);
                  return CLI_OK;
            }
            Service_Monitor *cfg=(Service_Monitor*)s;
            if(cfg->monitors[MON_FILE]==NULL && cfg->monitors[MON_FILE_XML]==NULL) {
                  cli_error(cli, "Service monitor:%u not monitoring to file",instance);
                  return CLI_OK;
            }
            char filename[64],mytime[32];
            time_t t;
            time(&t);
            strftime(mytime,32,"%Y-%m-%d_%H:%M",localtime(&t));
            
            netams_mutex_lock(&cfg->lock);
            for(Monitor *m=cfg->monitors[MON_FILE]; m!=NULL; m=m->next) {
                  cli_error(cli, "Rotating monitor file %s for service %s:%u", m->filename,s->getName(),s->instance);
                  aLog(D_INFO, "Rotating monitor file %s for service %s:%u\n", m->filename,s->getName(),s->instance);
                  fclose(m->fd);
                  sprintf(filename,"%s.%s",m->filename,mytime);
                  rename(m->filename,filename);
                  m->fd=fopen(m->filename, "at");
            }
            for(Monitor *m=cfg->monitors[MON_FILE_XML]; m!=NULL; m=m->next) {
                  cli_error(cli, "Rotating monitor xmlfile %s for service %s:%u", m->filename,s->getName(),s->instance);
                  aLog(D_INFO, "Rotating monitor xmlfile %s for service %s:%u\n", m->filename,s->getName(),s->instance);
                  xmlclose(m);
                  fclose(m->fd);
                  sprintf(filename,"%s.%s",m->filename,mytime);
                  rename(m->filename,filename);
                  m->fd=fopen(m->filename, "at");
                  fclose(m->fd);
                  m->fd=fopen(m->filename, "r+t");
                  xmlopen(m);
            }
            netams_mutex_unlock(&cfg->lock);
      } else
            return CLI_ERROR;
      return CLI_OK;
}
//////////////////////////////////////////////////////////////////////////////////////////
void Service_Monitor::ShowCfg(struct cli_def *cli, u_char flags){
      Monitor *m;

      for(m=monitors[MON_FILE]; m!=NULL; m=m->next)
            cli_print(cli, "monitor to file %s", m->filename);
      for(m=monitors[MON_FILE_XML]; m!=NULL; m=m->next)
            cli_print(cli, "monitor to xmlfile %s", m->filename);
      for(m=monitors[MON_STORAGE]; m!=NULL; m=m->next)      
            cli_print(cli, "monitor to storage %u", m->storage_no);
      for(m=monitors[MON_NETFLOW]; m!=NULL; m=m->next) {    
            char buf[32];
            inet_ntop(AF_INET, &(m->dest->sin_addr), buf, 32);
            cli_print(cli, "monitor to netflow %s %u", buf, ntohs(m->dest->sin_port));
      }
      netams_rwlock_rdlock(&Units->rwlock);
      for(NetUnit *u=(NetUnit*)Units->root; u!=NULL; u=(NetUnit*)u->next) {
            if(u->monitors && ELIST_SEARCH(u->monitors, this))
                  cli_print(cli, "monitor unit %s", u->name);
      }
      netams_rwlock_unlock(&Units->rwlock);     
}
//////////////////////////////////////////////////////////////////////////////////////////
int Service_Monitor::aMonitor(NetUnit *u, Flow *flow) {
      Monitor *m;

      struct flow_info_value        *flow_info  = (struct flow_info_value*)flow->get(ATTR_FLOW_INFO);
    struct ipv4_info_value          *ipv4_info  = (struct ipv4_info_value*)flow->get(ATTR_IPV4_INFO);
      struct ifindex_info_value     *ifindex_info     = (struct ifindex_info_value*)flow->get(ATTR_IFINDEX_INFO);
      struct as_info_value          *as_info    = (struct as_info_value*)flow->get(ATTR_AS_INFO);
      struct tcp_info_value         *tcp_info   = (struct tcp_info_value*)flow->get(ATTR_TCP_INFO);
      struct layer7_info_value      *layer7_info      = (struct layer7_info_value*)flow->get(ATTR_LAYER7_INFO);   
      
      unsigned long t   = flow_info->flow_last;
      unsigned flowt    = (unsigned)(flow_info->flow_last - flow_info->flow_first);
      
#ifdef DEBUG
      if(aDebug(DEBUG_MONITOR,"monitoring flow %p \n", flow)) {
            flow->Debug(DEBUG_MONITOR);
      }
#endif
      
      if(!ipv4_info) {
            aLog(D_WARN, "Trying to monitor non IPv4 packet, not enough code bits\n");
            goto EXIT;
      }

      netams_mutex_lock(&lock);
      
      total++;

      if(monitors[MON_FILE]) {
            //prepare
            char t_T[30];
            char *ptr=buf;
            timeU2T( t, (char*)&t_T);

            ptr+=sprintf(ptr, "%s %u %s %06X", (char*)&t_T, flowt, u->name?u->name:"<\?\?>", u->id);
            
            if(ipv4_info) {
                  char buf1[32], buf2[32];
                  inet_ntop(AF_INET, &(ipv4_info->ip_src), buf1, 32);
                  inet_ntop(AF_INET, &(ipv4_info->ip_dst), buf2, 32);
                  
                  ptr+=sprintf(ptr, " %02u", ipv4_info->ip_p);
            
                  if (tcp_info) {
                        ptr+=sprintf(ptr, " s:%s:%u d:%s:%u",
                              buf1, ntohs(tcp_info->src_port),
                              buf2, ntohs(tcp_info->dst_port));
                  } else 
                        ptr+=sprintf(ptr, " s:%s d:%s", buf1, buf2);
            }

            if(ifindex_info)
                  ptr+=sprintf(ptr, " if:%u->%u", ntohs(ifindex_info->if_in), ntohs(ifindex_info->if_out));
            if(as_info)
                  ptr+=sprintf(ptr, " as:%u->%u", ntohs(as_info->as_src), ntohs(as_info->as_dst));
            if (layer7_info && layer7_info->value)
                  ptr+=sprintf(ptr, " url:%s", layer7_info->value);

            ptr+=sprintf(ptr, " %lu %llu\n", flow_info->packets, flow_info->octets);
            
            //write to file
            for(m=monitors[MON_FILE]; m!=NULL; m=m->next) {
                  int res=fputs(buf, m->fd);
                  if(res<0) {
                        aLog(D_WARN, "Fail write to file %s, reopen it!\n", m->filename);
                        m->fd=fopen(m->filename, "at");
                        res=fputs(buf, m->fd);
                  }
                  if(res>0) 
                        m->total++;
            }
      } 
      
      if(monitors[MON_FILE_XML]) {
            //prepare
            char t_T[30];
            char *ptr=buf;
            timeU2T( t, (char*)&t_T);

            ptr+=sprintf(ptr, "<slink date=\"%s\" flowtime=\"%u\" name=\"%s\" id=\"%06X\"", (char*)&t_T, flowt, u->name?u->name:"<\?\?>", u->id);
            
            if(ipv4_info) {
                  char buf1[32], buf2[32];
                  inet_ntop(AF_INET, &(ipv4_info->ip_src), buf1, 32);
                  inet_ntop(AF_INET, &(ipv4_info->ip_dst), buf2, 32);
                  
                  ptr+=sprintf(ptr, " proto=\"%02u\" srcip=\"%s\" dstip=\"%s\"", ipv4_info->ip_p, buf1, buf2);
            
                  if (tcp_info) {
                        ptr+=sprintf(ptr, " srcport=\"%u\" dstport=\"%u\"",
                              ntohs(tcp_info->src_port),
                              ntohs(tcp_info->dst_port));
                  }
            }

            if(ifindex_info)
                  ptr+=sprintf(ptr, " srcif=\"%u\" dstif=\"%u\"", ntohs(ifindex_info->if_in), ntohs(ifindex_info->if_out));
            if(as_info)
                  ptr+=sprintf(ptr, " srcas=\"%u\" dstas=\"%u\"", ntohs(as_info->as_src), ntohs(as_info->as_dst));
            if (layer7_info && layer7_info->value)
                  ptr+=sprintf(ptr, " url=\"%s\"", layer7_info->value);

            ptr+=sprintf(ptr, " packets=\"%lu\" bytes=\"%llu\"/>\n", flow_info->packets, flow_info->octets);
            
            //write to file
            for(m=monitors[MON_FILE_XML]; m!=NULL; m=m->next) {
                  int res=fputs(buf, m->fd);
                  if(res<0) {
                        aLog(D_WARN, "Fail write to file %s, reopen it!\n", m->filename);
                        m->fd=fopen(m->filename, "at");
                        fclose(m->fd);
                        m->fd=fopen(m->filename, "r+t");
                        fseek(m->fd, 0, SEEK_END);
                        xmlopen(m);
                        res=fputs(buf, m->fd);
                  }
                  if(res>0) 
                        m->total++;
            }
      }
      
      if (monitors[MON_STORAGE]) {
            sprintf(buf, "%lu,%u,%u,%u,%u,%u,%u,%u,%u,%u,%u,%u,%lu,%llu,%s\n",
                  t,
                  flowt,
                  u->id,
                  ipv4_info->ip_p,
                  (unsigned)ntohl(ipv4_info->ip_src.s_addr),
                  tcp_info?ntohs(tcp_info->src_port):0,
                  (unsigned)ntohl(ipv4_info->ip_dst.s_addr),
                  tcp_info?ntohs(tcp_info->dst_port):0,
                  ifindex_info?ntohs(ifindex_info->if_in):0,
                  ifindex_info?ntohs(ifindex_info->if_out):0,
                  as_info?ntohs(as_info->as_src):0,
                  as_info?ntohs(as_info->as_dst):0,
                  flow_info->packets,
                  flow_info->octets,
                  layer7_info?(layer7_info->value?layer7_info->value:""):"");
                  
            for(m=monitors[MON_STORAGE]; m!=NULL; m=m->next) {
                  if(!m->fd) {
                        m->fd=fopen(m->filename,"at");
                        if(!m->fd) {
                              aLog(D_WARN, "Can't open temporary file %s : %s\n",m->filename,strerror(errno));
                              continue;
                        }
                        setlinebuf(m->fd);
                  }
                  fputs(buf, m->fd);
                  m->total++;
            }
            
            if( storage_time < t)  {
                  storage_time=t+MONITOR_DELAY;
                  Service *st;

                  for(m=monitors[MON_STORAGE]; m!=NULL; m=m->next) {
                        fclose(m->fd);
                        m->fd=NULL;
                st = Services->getService(SERVICE_STORAGE, m->storage_no);
                if(st) {
                              ((Service_Storage_Interface*)st)->SaveFile(m->filename,ST_CONN_MONITOR);
                              m->commit_num++;
                        }
                  }
            } 
        }

      if (monitors[MON_NETFLOW]) {
            IPFlow5Stat  *record=&message->records[message->header.count];                

            record->dOctets   =     htonl(flow_info->octets);
            record->dPkts     =     htonl(flow_info->packets);
            record->input     =     ifindex_info?ifindex_info->if_in:0;
            record->output    =     ifindex_info?ifindex_info->if_out:0;
            record->nexthop.s_addr=INADDR_ANY;

            record->tos =     ipv4_info->ip_tos;
            record->prot      =     ipv4_info->ip_p;
            record->First     =     htonl((flow_info->flow_first)*1000);   //trick here, see PR 65
            record->Last      =     htonl((flow_info->flow_last)*1000);   //trick here, see PR 65

            record->dst_as    =     as_info?as_info->as_dst:0;
            record->dst_mask= 0;
            record->dstaddr.s_addr  =     ipv4_info->ip_dst.s_addr;
            record->dstport   =     tcp_info?tcp_info->dst_port:0;

            record->src_as    =     as_info?as_info->as_src:0;
            record->src_mask= 0;
            record->srcaddr.s_addr  =     ipv4_info->ip_src.s_addr;
            record->srcport   =     tcp_info?tcp_info->src_port:0;

            message->header.count++;
      
            if (message->header.count==V5_MAXFLOWS || netflow_time < t) {
                  netflow_time=t+MONITOR_DELAY;
                  
                  for(m=monitors[MON_NETFLOW]; m!=NULL; m=m->next) {
                        SendNetflow(m);
                  }
            }
      }

EXIT:
      netams_mutex_unlock(&lock);
      return 1;
}
//////////////////////////////////////////////////////////////////////////////////////////
void Service_Monitor::Cancel(){
      Monitor *m, *tmp;
      
      //clear MON_FILE
      for(m=monitors[MON_FILE];m!=NULL;m=tmp) {
            tmp=m->next;
            if(m->fd) fclose(m->fd);
      }

      //clear MON_FILE_XML
      for(m=monitors[MON_FILE_XML];m!=NULL;m=tmp) {
            tmp=m->next;
            if(m->fd) {
                xmlclose(m);
                fclose(m->fd);
            }
      }

      //clear MON_STORAGE
      for(m=monitors[MON_STORAGE];m!=NULL;m=tmp) {
            tmp=m->next;
            Service *st = Services->getService(SERVICE_STORAGE, m->storage_no);
                  if(m->fd) fclose(m->fd);
            if(st) {
                  ((Service_Storage_Interface*)st)->SaveFile(m->filename, ST_CONN_MONITOR);
            ((Service_Storage_Interface*)st)->Close(ST_CONN_MONITOR);
            }
      }
      
      //clear MON_NETFLOW
      for(m=monitors[MON_NETFLOW];m!=NULL;m=tmp) {
            tmp=m->next;
            SendNetflow(m);
      }
      if(message) {
            shutdown(sock, SHUT_RDWR);
            close(sock);
      }
}
//////////////////////////////////////////////////////////////////////////////////////////
u_char Service_Monitor::SendNetflow(Monitor *m) {
      u_char res=1;
      if(message->header.count==0) return 0;
      unsigned count=message->header.count;
      message->header.count=htons(message->header.count);
      message->header.unix_secs=htonl(message->header.unix_secs);

      int size = sizeof(struct Flow5StatHdr)+count*sizeof(struct IPFlow5Stat);
      if (sendto(sock, message, size, 0, (struct sockaddr *)m->dest, sizeof(struct sockaddr_in)) <0) {
            char buf[32];
            inet_ntop(AF_INET, &(m->dest->sin_addr), buf, 32);
            aLog(D_WARN, "UDP send netflow to %s:%u failed: %s", buf, ntohs(m->dest->sin_port), strerror(errno));
            res=0;
      } else {
            m->total+=count;
            m->commit_num++;
      }
      
      message->header.unix_secs=netflow_time;
      message->header.count=0;
      message->header.flow_sequence=htonl(total);

      return res;
}

void xmlopen(Monitor *m) {
    long    pos, minpos;
    char    buf[10];
    int           rc;
    
    // Oops, can't get current position, do nothing
    if((pos = ftell(m->fd)) == -1) {
      aLog(D_WARN, "Can't get current position for file %s : %s\n",m->filename,strerror(errno));
      return;
    }

    // If this is new file, should write xml start code
    if(pos == 0) {
      fprintf(m->fd, "%s\n%s\n", xml_start_str, xml_root_start_str);
      return;
    }

    // If file isn't broken, the closing XML element should be maximum at 
    // (end_of_file_position - (length_of_element + 2)
    // To be more relaxed we allowing to search two times farther
    minpos = pos - (strlen(xml_root_close_str) * 2 + 2);
    if(minpos < 0)
      minpos = 0;

    // minus length of closing element
    pos -= strlen(xml_root_close_str);

    // Seeking closing element
    buf[0] = '\0';
    while(pos > minpos) {
      if(fseek(m->fd, pos, SEEK_SET) == -1) {
          aLog(D_WARN, "fseek %s : %s\n",m->filename,strerror(errno));
          return;
      }

      fgets(buf, sizeof(buf), m->fd);
      
      if(buf[0] == '<')
          break;
      --pos;
    }
    
    if(strncmp(buf, xml_root_close_str, strlen(xml_root_close_str)) != 0)
      rc = fseek(m->fd, 0, SEEK_END);
    else
      rc = fseek(m->fd, pos, SEEK_SET);

    if(rc == -1)
        aLog(D_WARN, "Problems with file %s : %s\n",m->filename,strerror(errno));
}

void xmlclose(Monitor *m) {
    fprintf(m->fd, "%s\n", xml_root_close_str);
}
//////////////////////////////////////////////////////////////////////////////////////////

Generated by  Doxygen 1.6.0   Back to index