Logo Search packages:      
Sourcecode: netams version File versions  Download package

ds_netflow.c

/*************************************************************************
***   Authentication, authorization, accounting + firewalling package
***   Copyright 1998-2002 Anton Vinokurov <anton@netams.com>
***   Copyright 2002-2008 NeTAMS Development Team
***   This code is GPL v3
***   For latest version and more info, visit this project web page
***   located at http://www.netams.com
***
*************************************************************************/
/* $Id: ds_netflow.c,v 1.88 2008-02-23 08:35:02 anton Exp $ */

#include "netams.h"
#include "ds_any.h"
#include "netflow.h"

//////////////////////////////////////////////////////////////////////////
inline unsigned int     sDS_AcctNetFlowV5(Service_DS* ds, in_addr *ip_from, void *packet, time_t time);
inline unsigned int     sDS_AcctNetFlowV9(Service_DS* ds, in_addr *ip_from, u_char *packet, time_t time,unsigned int len);
void        ds_netflow_cancel(void *ptr);
u_short           processV9Template(u_char *packet, NFSource *nfSource);
u_short           processV9Data(u_char *packet, struct NetFlowV9Template *nftmpl, Flow *flow);
u_int64_t   v9ConvertIntField(void *field, int size, int tohbo);
void        createFlowAttribute(Flow *flow, const unsigned short attribute, void **value);

//////////////////////////////////////////////////////////////////////////
void  ds_netflow(Service_DS *ds) {
    struct sockaddr_in  sin;
    int                 socketid, reuseaddr, status;
      int                     len;
    void                *packet;
    struct timeval      start;
    socklen_t           size_ds;

      packet            = ds->packet;
      size_ds           = sizeof(sin);
    reuseaddr     = 1;

      socketid    = socket(PF_INET, SOCK_DGRAM, 0);
      if( socketid < 0 ) { 
            aLog(D_ERR, "failed to create netflow listening socket: %s", strerror(errno));
            return;
      }
      
      bzero(&sin, size_ds);
      sin.sin_family          = AF_INET;
    sin.sin_addr.s_addr = ds->l_addr.s_addr;
    sin.sin_port        = htons(ds->port);

    // set options
      status      = bigsockbuf(socketid, SO_RCVBUF, FT_SO_RCV_BUFSIZE);
      if( status < 0 ) {
            aLog(D_ERR, "failed to setsockopt receive buffer: %s", strerror(errno));
            return;
      } else {
            aLog(D_INFO, "receive bufer set to %u\n", status);
      }
      
    status = setsockopt(socketid, SOL_SOCKET, SO_REUSEADDR, (void *)&reuseaddr, sizeof(reuseaddr));
      if( status < 0 ) {
                aLog(D_ERR, "failed to setsockopt netflow listening socket: %s", strerror(errno));
                return;
    }

    // bind socket with serv_addr
    status = bind(socketid, (struct sockaddr *)&sin, size_ds);
    if( status < 0 ) { 
            aLog(D_ERR, "failed to bind netflow listening socket: %s", strerror(errno)); 
            return; 
      }
      SET_POLL( socketid );

      pthread_cleanup_push(ds_netflow_cancel, (void *)&socketid);
      pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
      pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
      
      ds->ds_flow = new Flow(ds->instance,ds->max_flow_slots);

      char buffer[32];
      aLog(D_INFO,"NetFlow packet processing for data-source:%u initialized\n", ds->instance);
      aLog(D_INFO,"Listening on %s:%u\n", inet_ntop(AF_INET, &(ds->l_addr), buffer, 32), ds->port);

      while( 1 ) {
            CHECK_POLL(ds,status);
            if( !status ) continue;

            if((pfd.revents & POLLIN) == 0)
                continue;

            // starting from here we need additional sanity checks
            len = recvfrom(socketid, packet, MAX_PKT_SIZE, 0, (struct sockaddr *)&sin, &size_ds);
            if(len == -1) {
                aLog(D_ERR, "Can not read from network\n");
                continue;
            }
            
        netams_gettimeofday(&start, NULL);
        ds->timestamp = start.tv_sec;
        
            switch(ntohs(*((u_int16_t *)packet))) {
                case FLOW_VERSION_5:
                  if((unsigned)len <  sizeof(struct Flow5StatHdr)) {
                        aLog(D_ERR, "Junk packet received, dropping.\n");
                        goto NF_ERROR;
                        }
                  sDS_AcctNetFlowV5(ds, &sin.sin_addr, packet, (time_t)start.tv_sec);
                  break;
                case FLOW_VERSION_9:
                  if((unsigned)len < sizeof(struct NetFlowV9Header)) {
                        aLog(D_ERR, "Junk packet received, dropping.\n");
                        goto NF_ERROR;
                        }
                  sDS_AcctNetFlowV9(ds, &sin.sin_addr, (u_char *)packet, (time_t)start.tv_sec, len);
                  break;
                default:
                        aLog(D_WARN, " NetFlow v%d packet unsupported\n", ntohs(*((u_int16_t *)packet)));
NF_ERROR:
                        ds->total_errors++;
                        
                  break;
            }
                        
            ds->Measure(&start, len);
    }
      pthread_cleanup_pop(0);
      return;
}
//////////////////////////////////////////////////////////////////////////
void ds_netflow_cancel(void *ptr) {
      int socketid = *((int *)ptr);
      
      shutdown(socketid, SHUT_RDWR);
        close( socketid );
}

inline unsigned int sDS_AcctNetFlowV9(Service_DS *ds, in_addr *ip_from, u_char *packet, time_t time, unsigned int len) {
      struct      NetFlowV9Header         *nfHeader;
      struct      NetFlowV9FlowSetHeader  *nfFlowSetHeader;
      u_short                       offset;
      NFSource                *nfSource;

      nfHeader                = (struct NetFlowV9Header *)packet;
      nfHeader->sourceId            = ntohl(nfHeader->sourceId);
      
      //look for source
      for(nfSource = ds->nfroot; nfSource != NULL; nfSource = nfSource->next) {
            if(ip_from->s_addr == nfSource->src_addr 
            && (!nfSource->engine_id || nfSource->engine_id == nfHeader->sourceId) )
                  break;
      }
      if( !nfSource ) {
            ds->total_errors++;
            char buffer[32];
            aLog(D_WARN, "NetFlow packet from unknown source: %s\n", inet_ntop(AF_INET, ip_from, buffer, 32)); 
            return 1; 
      }
      nfSource->packets++;
      nfSource->timestamp = ds->timestamp;
      
      // Count is strange. It looks like padding to 32. Will open TAC
      // to clarify that 'cause docs do not cover that
      nfHeader->count               = ntohs(nfHeader->count);
      nfHeader->sysUpTime           = ntohl(nfHeader->sysUpTime);
      nfHeader->unixSeconds         = ntohl(nfHeader->unixSeconds);
      nfHeader->packageSequence     = ntohl(nfHeader->packageSequence);
      
      if( nfSource->seq_id != 0 && nfSource->seq_id != (nfHeader->packageSequence - 1) ) { 
          aLog(D_WARN, "NF:%u awaited seq %u, received %u from %02X (%u flows are lost)\n", 
            ds->instance, nfSource->seq_id+1, nfHeader->packageSequence, 
            nfHeader->sourceId, nfHeader->packageSequence - nfSource->seq_id - 1);
            nfSource->errors++;
            ds->total_errors++;
      }
      
      nfSource->seq_id = nfHeader->packageSequence;  // storing sequence

      packet      += sizeof(struct NetFlowV9Header);
      len   -= sizeof(struct NetFlowV9Header);
      
      while(len>0) {
          if(sizeof(struct NetFlowV9FlowSetHeader) > len) {
            aLog(D_ERR, "Unexpected end of packet, stop processing.\n");
            return 1;
          }
          nfFlowSetHeader           = (struct NetFlowV9FlowSetHeader *)packet;
          nfFlowSetHeader->flowSetId      = ntohs(nfFlowSetHeader->flowSetId);
          nfFlowSetHeader->length   = ntohs(nfFlowSetHeader->length);

          if(nfFlowSetHeader->length > len) {
            aLog(D_ERR, "Unexpected end of packet, stop processing.\n");
            return 1;
          }

          offset=sizeof(struct NetFlowV9FlowSetHeader);

          if(nfFlowSetHeader->flowSetId == FLOW_VERSION_9_TMPL_ID) {
            while( offset < nfFlowSetHeader->length ) {
               offset +=processV9Template(packet + offset, nfSource);
            }
          } else if(nfFlowSetHeader->flowSetId == FLOW_VERSION_9_TMPL_OPT_ID) {
            aLog(D_INFO, "Received V9 options template. Not yet implemented\n");
          } else if(nfFlowSetHeader->flowSetId > 255) {
            struct NetFlowV9Template *nftmpl;
            struct flow_info_value  *flow_info;
            Flow *flow = ds->ds_flow;

            aDebug(DEBUG_FLOW, "Received v9 data packets from %u template\n", nfFlowSetHeader->flowSetId);
            for(u_short i = 0; i < nfSource->nfTemplate_num; i++) {
               nftmpl = nfSource->nfTemplate[i];
               if(nftmpl->header.templateId == nfFlowSetHeader->flowSetId) {
                   while( nfFlowSetHeader->length - offset > 3 ) { //this check for padding to 32 bit
                       offset += processV9Data(packet + offset, nftmpl, flow);
                  
                     //correct timers
                     flow_info = (struct flow_info_value *)flow->get(ATTR_FLOW_INFO);
                     if(ds->ds_flags == DS_CLOCK_REMOTE) { //e.g == CLOCK_REMOTE for now
                                    //use flow time
                                    flow_info->flow_first   = nfHeader->unixSeconds + flow_info->flow_first/1000 - nfHeader->sysUpTime/1000;
                                    flow_info->flow_last    = nfHeader->unixSeconds + flow_info->flow_last/1000 - nfHeader->sysUpTime/1000;
                     } else {
                              flow_info->flow_first   = time - (flow_info->flow_last - flow_info->flow_first)/1000;
                              flow_info->flow_last    = time;
                     }

                           ds->FE->DoSend(flow);
                     flow->reuse();
                    }
                  break;
                }
            }
          } else {
            aLog(D_WARN, "Received unknown template, ignoring.\n");
          }
          len    -= nfFlowSetHeader->length;
          packet += nfFlowSetHeader->length;
      }
      
      return 0;
}

u_short processV9Template(u_char *packet, NFSource *nfSource) {
    struct NetFlowV9TemplateHeader  *nfTemplateHeader;
    struct NetFlowV9Template        *nftmpl=NULL;
    u_short                   i;
    u_short                   len;

    // I know, this is weird, but by the first we will support only one template
    nfTemplateHeader = (struct NetFlowV9TemplateHeader *)packet;
    nfTemplateHeader->templateId = ntohs(nfTemplateHeader->templateId);
    nfTemplateHeader->fieldCount = ntohs(nfTemplateHeader->fieldCount);

    for(i = 0; i < nfSource->nfTemplate_num; i++) {
      if(nfSource->nfTemplate[i]->header.templateId == nfTemplateHeader->templateId){
                  nftmpl = nfSource->nfTemplate[i];
                  break;
            }
    }
    
    if(!nftmpl) { // new template received
      struct NetFlowV9Template **list;

      list=(struct NetFlowV9Template**)aMalloc(sizeof(struct NetFlowV9Template*)*(nfSource->nfTemplate_num+1));
      if(nfSource->nfTemplate)
            bcopy(nfSource->nfTemplate, list, sizeof(struct NetFlowV9Template*)*nfSource->nfTemplate_num);
      aFree(nfSource->nfTemplate);
      nfSource->nfTemplate = list;

      nftmpl=(struct NetFlowV9Template*)aMalloc(sizeof(struct NetFlowV9Template));
      nfSource->nfTemplate[nfSource->nfTemplate_num++] = nftmpl;
      aDebug(DEBUG_FLOW, "Initializing template ID=%u\n", nfTemplateHeader->templateId);
    }

    len = sizeof(struct NetFlowV9FieldInfo)*nfTemplateHeader->fieldCount;
    
    if(!memcmp(&nftmpl->header, nfTemplateHeader, sizeof(struct NetFlowV9TemplateHeader))
      && !memcmp(nftmpl->fields, packet + sizeof(struct NetFlowV9TemplateHeader), len)) 
            goto END;  //template same - no need to reinitialize

    if(nftmpl->fields != NULL)
      aFree(nftmpl->fields);

    bcopy(nfTemplateHeader, &nftmpl->header, sizeof(struct NetFlowV9TemplateHeader));
    nftmpl->fields = (struct NetFlowV9FieldInfo *)aMalloc(len);

    bcopy(packet + sizeof(struct NetFlowV9TemplateHeader), nftmpl->fields, len);

    for(i = 0; i < nfTemplateHeader->fieldCount; i++) {
      nftmpl->fields[i].type        = ntohs(nftmpl->fields[i].type);
      nftmpl->fields[i].length      = ntohs(nftmpl->fields[i].length);
    }
    aDebug(DEBUG_FLOW, "Template ID=%u reinitialized with %u fields\n",
            nfTemplateHeader->templateId, nfTemplateHeader->fieldCount);
END:    
    return sizeof(struct NetFlowV9TemplateHeader)+len;
}

void createFlowAttribute(Flow *flow, const unsigned short attribute, void **value) {
    if(*value == NULL)
      *value = flow->put(attribute);
    return;
}

u_short processV9Data(u_char *packet, struct NetFlowV9Template *nftmpl, Flow *flow) {
    u_short i, offset;

    struct flow_info_value          *flow_info  = NULL;
    struct ipv4_info_value          *ipv4_info  = NULL;
    struct tcp_info_value           *tcp_info   = NULL;
    struct as_info_value            *as_info    = NULL;
    struct ifindex_info_value       *ifindex_info     = NULL;
    struct multicast_info_value           *mcast_info = NULL;
    struct nexthop_info_value       *nexthop_info     = NULL;

    flow_info = (struct flow_info_value *)flow->put(ATTR_FLOW_INFO);

    // iterating over fields
    for(i = 0, offset = 0; i < nftmpl->header.fieldCount; i++) {
      u_char length = nftmpl->fields[i].length;
      switch( nftmpl->fields[i].type ) {
          case NFV9_IN_BYTES:
            flow_info->octets = v9ConvertIntField(packet + offset, length, 1);
            break;
          case NFV9_IN_PKTS:
            flow_info->packets = v9ConvertIntField(packet + offset, length, 1);
            break;
          case NFV9_FIRST_SWITCHED:
            flow_info->flow_first = v9ConvertIntField(packet + offset, length, 1);
            break;
          case NFV9_LAST_SWITCHED:
            flow_info->flow_last = v9ConvertIntField(packet + offset, length, 1);
            break;
          case NFV9_DIRECTION:
            flow_info->direction = (u_char)v9ConvertIntField(packet + offset, length, 0);
            break;
          case NFV9_PROTOCOL:
            createFlowAttribute(flow, ATTR_IPV4_INFO, (void **)&ipv4_info);
            ipv4_info->ip_p = v9ConvertIntField(packet + offset, length, 0);
            break;
          case NFV9_SRC_TOS:
            createFlowAttribute(flow, ATTR_IPV4_INFO, (void **)&ipv4_info);
            ipv4_info->ip_tos = v9ConvertIntField(packet + offset, length, 0);
            break;
          case NFV9_IPV4_SRC_ADDR:
            createFlowAttribute(flow, ATTR_IPV4_INFO, (void **)&ipv4_info);
            ipv4_info->ip_src.s_addr = v9ConvertIntField(packet + offset, length, 0);
            break;
          case NFV9_IPV4_DST_ADDR:
            createFlowAttribute(flow, ATTR_IPV4_INFO, (void **)&ipv4_info);
            ipv4_info->ip_dst.s_addr = v9ConvertIntField(packet + offset, length, 0);
            break;
          case NFV9_SRC_AS:
            createFlowAttribute(flow, ATTR_AS_INFO, (void **)&as_info);
            as_info->as_src = v9ConvertIntField(packet + offset, length, 0);
            break;
          case NFV9_DST_AS:
            createFlowAttribute(flow, ATTR_AS_INFO, (void **)&as_info);
            as_info->as_dst = v9ConvertIntField(packet + offset, length, 0);
            break;
          case NFV9_INPUT_SNMP:
            createFlowAttribute(flow, ATTR_IFINDEX_INFO, (void **)&ifindex_info);
            ifindex_info->if_in = v9ConvertIntField(packet + offset, length, 0);
            break;
          case NFV9_OUTPUT_SNMP:
            createFlowAttribute(flow, ATTR_IFINDEX_INFO, (void **)&ifindex_info);
            ifindex_info->if_out = v9ConvertIntField(packet + offset, length, 0);
            break;
          case NFV9_L4_SRC_PORT:
            createFlowAttribute(flow, ATTR_TCP_INFO, (void **)&tcp_info);
            tcp_info->src_port = v9ConvertIntField(packet + offset, length, 0);
            break;
          case NFV9_L4_DST_PORT:
            createFlowAttribute(flow, ATTR_TCP_INFO, (void **)&tcp_info);
            tcp_info->dst_port = v9ConvertIntField(packet + offset, length, 0);
            break;
          case NFV9_MUL_DST_PKTS:
            createFlowAttribute(flow, ATTR_MULTICAST_INFO, (void **)&mcast_info);
            mcast_info->dst_packets = v9ConvertIntField(packet + offset, length, 1);
            break;
          case NFV9_MUL_DST_BYTES:
            createFlowAttribute(flow, ATTR_MULTICAST_INFO, (void **)&mcast_info);
            mcast_info->dst_bytes = v9ConvertIntField(packet + offset, length, 1);
            break;
          case NFV9_MUL_IGMP_TYPE:
            createFlowAttribute(flow, ATTR_MULTICAST_INFO, (void **)&mcast_info);
            mcast_info->igmp_type = (u_char)v9ConvertIntField(packet + offset, length, 0);
            break;
          case NFV9_IPV4_NEXT_HOP:
            createFlowAttribute(flow, ATTR_NEXTHOP_INFO, (void **)&nexthop_info);
            nexthop_info->ipv4_nexthop.s_addr = v9ConvertIntField(packet + offset, length, 0);
            break;
          case NFV9_BGP_IPV4_NEXT_HOP:
            createFlowAttribute(flow, ATTR_NEXTHOP_INFO, (void **)&nexthop_info);
            nexthop_info->bgpv4_nexthop.s_addr = v9ConvertIntField(packet + offset, length, 0);
            break;
          default:
            break;
      }
      offset += length;
    }
    return offset;
}

u_int64_t v9ConvertIntField(void *field, int size, int tohbo) {
    switch( size ) {
      case 1: {
          u_char res;
          bcopy(field, &res, 1);
          return (u_int64_t)(res);
      }
      case 2: {
          u_int16_t res;
          bcopy(field, &res, 2);
          return (u_int64_t)(tohbo ? ntohs(res) : res);
      }
        case 4: {
          u_int32_t res;
          bcopy(field, &res, 4);
          return (u_int64_t)(tohbo ? ntohl(res) : res);
      }
        case 8: {
          u_int64_t res;
          bcopy(field, &res, 8);
          return tohbo ? ntohll(res) : res;
      }
        default:
          return 0;
    }
}

inline unsigned int sDS_AcctNetFlowV5(Service_DS* ds, in_addr *ip_from, void *packet, time_t time) {
      IPStat5Msg              *msg;
      IPFlow5Stat             *v5flow;
      Flow5StatHdr                  *hdr;
      
      struct flow_info_value        *flow_info;
      struct ipv4_info_value        *ipv4_info;
      struct tcp_info_value         *tcp_info;
      struct as_info_value          *as_info;
      struct ifindex_info_value     *ifindex_info;
      
      FlowEngine  *FE         = ds->FE;
      Flow        *flow       = ds->ds_flow;
      NFSource    *nfSource;
      
      msg   = (IPStat5Msg *)packet;
      hdr   = &msg->header;
      
      //look for source
      for(nfSource = ds->nfroot; nfSource != NULL; nfSource = nfSource->next) {
            if(ip_from->s_addr == nfSource->src_addr 
            && (!nfSource->engine_id || nfSource->engine_id == hdr->engine_id) )
                  break;
      }
      if( !nfSource ) {
            ds->total_errors++;
            char buffer[32];
            aLog(D_WARN, "NetFlow packet from unknown source: %s\n", inet_ntop(AF_INET, ip_from, buffer, 32)); 
            return 1; 
      }
      nfSource->packets++;
      nfSource->timestamp = ds->timestamp;
      
      hdr->count         = ntohs(hdr->count);
      hdr->sysUpTime     = ntohl(hdr->sysUpTime);
      hdr->unix_secs     = ntohl(hdr->unix_secs);
      hdr->unix_nsecs    = ntohl(hdr->unix_nsecs);
      hdr->flow_sequence = ntohl(hdr->flow_sequence);

      aDebug(DEBUG_FLOW, " ds seq %u, fl seq %u, %d records:\n", nfSource->seq_id, hdr->flow_sequence, hdr->count);

      if( !nfSource->seq_id )
            nfSource->seq_id = hdr->flow_sequence;
      else if( nfSource->seq_id != hdr->flow_sequence ) { 
            aLog(D_WARN, "NF:%u awaited seq %u, received %u from %02X (%u flows are lost)\n",
                  ds->instance, nfSource->seq_id, hdr->flow_sequence,
                  hdr->engine_id, hdr->flow_sequence-nfSource->seq_id);
                  
            if( hdr->count > V5_MAXFLOWS ) 
                  aLog(D_WARN, "NF:%u packet with wrong number of flows (>V5FLOWS_PER_PAK), received from %02X \n", ds->instance, hdr->engine_id);
            nfSource->seq_id = hdr->flow_sequence; // restoring back
            nfSource->errors++;
            ds->total_errors++;
      }
            
      //update counters
      nfSource->seq_id  += hdr->count;
      nfSource->flows         += hdr->count;
      ds->total_flows   += hdr->count;

      aDebug(DEBUG_FLOW, " NetFlow v5:%02X packet ds:%u seq %u/%u with %u records:\n",
            hdr->engine_id, ds->instance, hdr->flow_sequence, nfSource->seq_id, hdr->count);

      for (int i = 0; i < hdr->count; i++) {
            v5flow = &msg->records[i];

            flow->reuse();

            flow_info         = (struct flow_info_value*)flow->put(ATTR_FLOW_INFO);
            flow_info->packets      = ntohl(v5flow->dPkts);
            flow_info->octets = ntohl(v5flow->dOctets);

            if(ds->ds_flags == DS_CLOCK_REMOTE) { //e.g == CLOCK_REMOTE for now 
                  //use flow time
                  flow_info->flow_first   = hdr->unix_secs + ntohl(v5flow->First)/1000 - hdr->sysUpTime/1000;
                  flow_info->flow_last    = hdr->unix_secs + ntohl(v5flow->Last)/1000 - hdr->sysUpTime/1000;
            } else {
                  flow_info->flow_first   = time - (ntohl(v5flow->Last) - ntohl(v5flow->First))/1000;
                  flow_info->flow_last    = time;
            }
                  
            ipv4_info         = (struct ipv4_info_value*)flow->put(ATTR_IPV4_INFO);
            ipv4_info->ip_p         = v5flow->prot;
            ipv4_info->ip_tos = v5flow->tos;
            ipv4_info->ip_src.s_addr= v5flow->srcaddr.s_addr;
            ipv4_info->ip_dst.s_addr= v5flow->dstaddr.s_addr;

            if(ipv4_info->ip_p == IPPROTO_TCP || ipv4_info->ip_p == IPPROTO_UDP) {
                  tcp_info          = (struct tcp_info_value*)flow->put(ATTR_TCP_INFO);
                  tcp_info->src_port      = v5flow->srcport;
                  tcp_info->dst_port      = v5flow->dstport;
            }
                  
            as_info           = (struct as_info_value*)flow->put(ATTR_AS_INFO);
            as_info->as_src   = v5flow->src_as;
            as_info->as_dst   = v5flow->dst_as;
                  
            ifindex_info      = (struct ifindex_info_value*)flow->put(ATTR_IFINDEX_INFO);
            ifindex_info->if_in     = v5flow->input;
            ifindex_info->if_out    = v5flow->output;

            FE->DoSend(flow);       
      }

      return 0;
}

//////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////


Generated by  Doxygen 1.6.0   Back to index