Logo Search packages:      
Sourcecode: netams version File versions  Download package

netflow.c

/*************************************************************************
***   Authentication, authorization, accounting + firewalling package
***   Copyright 1998-2002 Anton Vinokurov <anton@netams.com>
***   Copyright 2002-2008 NeTAMS Development Team
***   This code is GPL v3
***   For latest version and more info, visit this project web page
***   located at http://www.netams.com
***
*************************************************************************/
/* $Id: netflow.c,v 1.43 2009-08-01 09:23:55 anton Exp $ */

#include "config.h"
extern "C" {
      #include "lib.h"
}

#define NETFLOW_CLASS
#include "netflow.h"

void Debug(const char *msg,...);

#ifdef DEBUG
static char buf1[32], buf2[32];
#endif


#define aMalloc(x) calloc(1,x)
#define aFree(x)   free(x)

//////////////////////////////////////////////////////////////////////////
NetFlow::NetFlow(const char *dst_host, u_short dst_port) {
      bzero(&udp_dest, sizeof(udp_dest));
      udp_dest.sin_family=AF_INET;
      udp_dest.sin_port=htons(dst_port);
      inet_aton(dst_host, &udp_dest.sin_addr);
      udp_socket_fd=socket(AF_INET, SOCK_DGRAM, 0);
      if (udp_socket_fd==-1)
      {
            fprintf(stderr, "failed to create UDP socket: %s\n", strerror(errno));
            exit(5);
      }

      int status=bigsockbuf(udp_socket_fd, SO_SNDBUF, FT_SO_RCV_BUFSIZE);
      if (status<0) {
            fprintf(stderr, "failed to setsockopt send buffer: %s", strerror(errno));
            exit(5);
      } else {
            Debug("send bufer set to %u\n", status);
      }

      t_active=60;
      t_inactive=60;
      t_expired=7;
      t_msg_expired=300;

      e_used=0;
      e_total=0;
      t_now=t=t_msg=time(NULL);
      table=NULL;
      ready=NULL;
      active=NULL;

      last_id = expired_id = sent_flows = sent_packets = 0;

      table=(entry_hash*)aMalloc(IPV4_HASH_SIZE*sizeof(entry_hash));

      message=(IPStat5Msg*)aMalloc(sizeof(IPStat5Msg));
        message->header.version=htons(FLOW_VERSION_5);
        message->header.count=0;
      message->header.unix_secs=t_now; //see PR 65
      message->header.sysUpTime=0; //see PR 65
        message->header.engine_id=0;
        message->header.engine_type=htons(31);
        message->header.flow_sequence=htonl(sent_flows);
}
//////////////////////////////////////////////////////////////////////////
NetFlow::~NetFlow() {
      FlushAll();

      entry *ptr=ready;
      while(ready) {
            ptr=ready;
            ready=ready->next;
            aFree(ptr);
      }
      aFree(table);
      aFree(message);
      close(udp_socket_fd);
}
//////////////////////////////////////////////////////////////////////////
void NetFlow::SetTimeouts(unsigned active,unsigned inactive,unsigned expired, unsigned msg_expired) {
      if(active) t_active=active;
      if(inactive) t_inactive=inactive;
      if(expired) t_expired=expired;
      if(msg_expired) t_msg_expired=msg_expired;
}
//////////////////////////////////////////////////////////////////////////
void NetFlow::CheckExpire(entry *e){
      if ((unsigned)e->last - (unsigned)e->start > t_active) e->expired=1;
      else if (t_now - (unsigned)e->last > t_inactive) e->expired=1;
#ifdef DEBUG
      if(e->expired) Debug("%p expired\n", e);
#endif
}
//////////////////////////////////////////////////////////////////////////
void NetFlow::Processpacket(struct ip *ip) {
      u_short hash;
      u_short sp=0, dp=0;
      in_addr_t src,dst;
      u_char proto;
      u_char tos;
      entry *e;
      entry_hash *h;
      u_short chunk=0;

      dst = ip->ip_dst.s_addr;
      src = ip->ip_src.s_addr;
      proto = ip->ip_p;
      tos = ip->ip_tos;

      if (ntohs(ip->ip_off) & (IP_MF | IP_OFFMASK)) {
            //packet fragmented. check obtained from /usr/src/sys/netinet/ip_fastfwd.c
            sp=dp=0;
            hash=IPV4_ADDR_HASH(src, dst);
        } else
      if (ip->ip_p==IPPROTO_TCP || ip->ip_p==IPPROTO_UDP) {
            struct tcphdr *th;
            th=(struct tcphdr *)((unsigned char *)ip + ip->ip_hl*4);
            sp=th->th_sport;
            dp=th->th_dport;
            hash=IPV4_FULL_HASH(src, dst, sp, dp);
      } else {
            hash=IPV4_ADDR_HASH(src, dst);
      }

      h = &table[hash];

      for(e=h->root; e!=NULL; e=e->next){
            if (e->ip_dst==dst && e->ip_src==src &&
            e->ip_proto==proto && e->ip_tos==tos &&
            e->src_port==sp && e->dst_port==dp) {
                  e->ip_len+=ntohs(ip->ip_len);
                  if(!e->count) e->start=t_now;
                  e->count++;
                  e->last=t_now;
                  break;
            }
            chunk++;
      }

      if (e==NULL) {
            if(chunk > MAX_CHUNK) FlushAll(); //protection against DoS
            if(ready) { //we'll use ready empty entry
                  e=ready;
                  ready=ready->next;
            } else { // we must create a new flow record
                  e=(entry*)aMalloc(sizeof(entry));
                  e_total++;
            }
            e_used++;
            e->ip_dst=dst;
            e->ip_src=src;
            e->dst_port=dp;
            e->src_port=sp;
            e->ip_len=ntohs(ip->ip_len);
            e->ip_proto=proto;
            e->ip_tos=tos;
            e->start=e->last=t_now;
            e->expired=0;
            e->count=1;
            last_id++;

            if(h->root == NULL) {
                  //organize active list
                  h->next_active = active;
                  active = h;
            }

            //plug new entry
            e->next=h->root;
            h->root=e;
#ifdef DEBUG
            inet_ntop(AF_INET, &(ip->ip_src), buf1, 32);
            inet_ntop(AF_INET, &(ip->ip_dst), buf2, 32);
            Debug("[%u]-%p %u s:%s:%u d:%s:%u %u\n", hash, e, proto, buf1, ntohs(sp), buf2, ntohs(dp), ntohs(ip->ip_len));
#endif
      }
}
//////////////////////////////////////////////////////////////////////////
void NetFlow::Expiresearch(){

      t_now=time(NULL);
      if((unsigned)(t_now-t)<t_expired) return;
      t=t_now;

      entry_hash *prev_h=NULL;
      entry_hash *next_h;

      Debug("ExpireSearch t:[%u] a:[%u] at %u: \n", last_id,last_id-expired_id, t_now);

      for(entry_hash *h = active; h!=NULL; h = next_h) {
          next_h = h->next_active;

          entry *prev_e=NULL;
          entry *next_e;
          for(entry *e = h->root; e!=NULL; e = next_e) {
            next_e = e->next;

                  CheckExpire(e);
            if (e->expired) {
            // with high probability used packet might be used again thou ...
                  if(e->count) {
                        ToSend(e);
                        e->count=0;
                        e->ip_len=0;
                        e->expired=0;
                        e->start=e->last=t_now;
                        } else {
                        expired_id++;

                        //remove from table
                        if(e == h->root) h->root = e->next;
                        else prev_e->next = e->next;

                        //this means we have flow that was unaccessible one round
                        //move it ready
                        e->next=ready;
                        ready=e;
                        e_used--;
                        continue;
                  }
                  }
            prev_e = e;
          }
          //remove non acive brances from list
          if(h->root == NULL) {
            if(h == active) active = h->next_active;
            else prev_h->next_active = h->next_active;
          } else
            prev_h = h;
      }

      if(unsigned(t-t_msg) > t_msg_expired) DoSend();
}
//////////////////////////////////////////////////////////////////////////
void NetFlow::FlushAll() {
      entry *next_e;

      Debug("FlushAll::begin\n");
        Debug("FlushAll t:[%u] a:[%u] at %u: ", last_id, last_id-expired_id, t_now);

      for (entry_hash *h = active; h!=NULL; h = h->next_active) {
            for(entry *e = h->root; e!=NULL; e = next_e) {
                  next_e = e->next;

                  expired_id++;

                  if(e->count) ToSend(e);
                  e->next=ready;
                  ready=e;
                  e_used--;
            }
            h->root=NULL;
      }
      active = NULL;

      // it's time to check if there is stalled unsent data in message
      DoSend();
      Debug("FlushAll::end\n");
}
//////////////////////////////////////////////////////////////////////////
void NetFlow::ToSend(entry *e) {

      IPFlow5Stat  *record=&message->records[message->header.count];

        record->dOctets=htonl(e->ip_len);
        record->dPkts=htonl(e->count);
        record->input=0;
        record->output=0;
        record->nexthop.s_addr=INADDR_ANY;

        record->tos=e->ip_tos;
        record->prot=e->ip_proto;
        record->First=htonl((e->start-message->header.unix_secs)*1000);   //trick here, see PR 65
        record->Last=htonl((e->last-message->header.unix_secs)*1000);   //trick here, see PR 65

        record->dst_as=0;
        record->dst_mask=0;
        record->dstaddr.s_addr=e->ip_dst;
        record->dstport=e->dst_port;

        record->src_as=0;
        record->src_mask=0;
        record->srcaddr.s_addr=e->ip_src;
        record->srcport=e->src_port;

        message->header.count++;
        Debug("record %u in packet %llu prepared, %u\n", message->header.count-1, sent_packets+1, e->ip_len);
        if (message->header.count==V5_MAXFLOWS) DoSend();
}
//////////////////////////////////////////////////////////////////////////
void NetFlow::DoSend() {
      if (!message || !message->header.count) return;
      Debug("DoSend::begin\n");
      sent_packets++;
      sent_flows=sent_flows+message->header.count;
        Debug("sending NetFlow v5 packet with %u flows\n", message->header.count);

      unsigned count=message->header.count;
      message->header.count=htons(message->header.count);
      message->header.unix_secs=htonl(message->header.unix_secs);
      if (sendto(udp_socket_fd, message, sizeof(struct Flow5StatHdr)+count*sizeof(struct IPFlow5Stat), 0, (struct sockaddr *)&udp_dest, sizeof(udp_dest)) <0)
            Debug("UDP send failed[%u]: %s", errno, strerror(errno));

      Debug("DoSend::end\n");
      t_msg=t_now;
//prepare new header
      message->header.unix_secs=t_now;
      message->header.count=0;
      message->header.flow_sequence=htonl(sent_flows);
}
//////////////////////////////////////////////////////////////////////////
void NetFlow::Status() {
      Debug("\nlast_id=%u, active=%u\n", last_id, last_id-expired_id);
        Debug("sent %llu flows in %llu packets\n", sent_flows, sent_packets);
}
//////////////////////////////////////////////////////////////////////////
u_char debug=0, quiet=0;

void Debug(const char *msg,...){
#ifdef DEBUG
      va_list args;
      char *params;

      if (!debug || quiet) return;
      params=(char *)malloc(2049);
      va_start(args, msg);
      vsprintf(params, msg, args);
      va_end(args);

      fprintf(stdout, "[%lu]: %s", time(NULL), params);
      fflush(stdout);

      free(params);
#endif
}
//////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////

Generated by  Doxygen 1.6.0   Back to index