Git Product home page Git Product logo

libipfix's Introduction

DISCONTUING WORK ON THIS PROJECT

I'm going to look at using https://github.com/logstash-plugins/logstash-codec-netflow instead, which is at least more actively maintained than I can resource for this particular project.

libipfix (forked version of 1.0 with many patches to make json-milestone-2 or so)

This package contains source code for a library to export and collect IPFIX measurement and accounting data. The library is complemented with an IPFIX collector and a basic IPFIX probe.

The library supports IPFIX (draft-ietf-ipfix-protocol-24.txt, draft-ietf-ipfix-info-15.txt, draft-ietf-psamp-info-05.txt) and Netflow9 (RFC 3954) using TCP, UDP and SCTP as transport protocol.

There are some small example programs containing code that demonstrates how to use the library.

Find more information at http://libipfix.sourceforge.net/ (upstream; will not relate to this fork)

The latest release is available from http://sourceforge.net/projects/libipfix/ (upstream) https://github.com/cameronkerrnz/libipfix (this fork)

Please send inquiries/comments/reports about this fork to Cameron Kerr [email protected]

Compiling (for JSONlines output)

As for Red Hat Enterprise Linux 6. I hope to make an RPM for the next release.

sudo yum install "@Development Tools" libpcap-devel
git clone https://github.com/cameronkerrnz/libipfix.git
cd libipfix
./configure --prefix=/opt/libipfix --enable-jsonlines --enable-fallback-templates
make
make   # yes, a second time to work around some faulty Makefile rules (patch welcome!)
sudo make install

Running the software initially

Because the transcript above had the libraries installed in a non-standard place, we can set LD_LIBRARY_PATH, or add the path to /etc/ld.so.conf.d/libipfix, or ... I'm sure there's a more clever way of specifying this with the linker...

$ LD_LIBRARY_PATH=/opt/libipfix/lib /opt/libipfix/bin/ipfix_collector --help

ipfix collector ($Revision: 1.12 $ Mar 26 2015)

usage: ipfix_collector [options]

options:
  -h                          this help
  -4                          accept connections via AF_INET socket
  -6                          accept connections via AF_INET6 socket
  -o <datadir>                store files of collected data in this dir
  -p <portno>                 listen on this port (default=4739)
  -s                          support SCTP clients
  -t                          support TCP clients
  -u                          support UDP clients
  -v                          increase verbose level
jsonlines options:
  --json                      export JSON to a file; one JSON doc/line
  --jsonfile <filename>       file to append to, or '-' for stdout
  --json-record-unknown-sets         include bytes of sets dropped due to no template
fallback templates:
  --fallback-templates=netscaler

example: ipfix_collector -stu -vv -o .

Let's test that we can receive our IPFIX / Appflow messages. Ensure you have configured your appliance to send to the port that you will listen to (the standard ipfix port is UDP/4739 -- ipfix is also specified on TCP and SCTP -- I have only tested / developed with UDP currently, because that is all that the Netscalers offer).

It is useful to bear in mind that this traffic is both unencrypted and unauthenticated, other by whatever network layer resrictions you provide.

We'll write the output to a file /tmp/data.json

$ LD_LIBRARY_PATH=/opt/libipfix/lib /opt/libipfix/bin/ipfix_collector -4 -u -vv --json --jsonfile /tmp/data.json --fallback-templates=netscaler
[ipfix_collector] listen on port 4739, write to stdout ...
[ipfix_collector] data goes to file /tmp/data.json as one JSON document per line
... you should soon see DATA RECORDS and TEMPLATE RECORDS flow up your screen with -vv
...
Template source is fallback for template ID 258
DATA RECORD:
 template id:  258
 nfields:      37
 observationPointId: 6914017
 exportingProcessId: 0
 flowId: 27059599
 netscaler_transaction_id: 2958993
 netscaler_connection_id: 27059599
 ipVersion: 4
 protocolIdentifier: 6
 paddingOctets: 0x0000
 sourceIPv4Address: «IP»
 destinationIPv4Address: «IP»
 sourceTransportPort: 50473
 destinationTransportPort: 443
 packetDeltaCount: 1
 octetDeltaCount: 682
 tcpControlBits: 24
 netscaler_flow_flags: 84025344
 flowStartMicroseconds: 15617890647616118639
 flowEndMicroseconds: 15617890647616118639
 ingressInterface: 2
 egressInterface: 2147483651
 netscaler_app_name_app_id: 9541
 netscaler_app_unit_name_app_id: 0
 netscaler_http_res_forw_fb: 0
 netscaler_http_res_forw_lb: 0
 netscaler_connection_chain_id: 0x00000000000000000000000000000000
 netscaler_connection_chain_hop_count: 0
 netscaler_http_req_url: «Request URL»
 netscaler_http_req_cookie: «Cookies sent»
 netscaler_http_req_referer: «HTTP referrer»
 netscaler_http_req_method: GET
 netscaler_http_req_host: «HTTP host header»
 netscaler_http_req_user_agent: «HTTP user-agent string»
 netscaler_http_content_type:
 netscaler_http_req_authorization:
 netscaler_http_req_via:
 netscaler_http_req_x_forwarded_for:
 netscaler_http_domain_name:
...
^C
[ipfix_collector] got signo 2, bye.

I've anonymised various fields. Now have a look at /tmp/data.json. Because its all one long line, I'll reformat it to show one line on multiple lines, using Python's pretty-printer.

$ tail -1 /tmp/data.json | python -mjson.tool
{
    "destinationIPv4Address": "«IP»",
    "destinationTransportPort": 443,
    "egressInterface": 2147483651,
    "exportingProcessId": 0,
    "flowEndMicroseconds": "2015-03-26T01:52:12.000Z",
    "flowId": 27059599,
    "flowStartMicroseconds": "2015-03-26T01:52:12.000Z",
    "ingressInterface": 2,
    "ipVersion": 4,
    "ipfix_exporter_ip": "«IP»",
    "ipfix_template_id": "258",
    "ipfix_template_source": "fallback",
    "ipfix_timestamp": "2015-03-26T01:52:12Z",
    "netscaler_app_name_app_id": 9541,
    "netscaler_app_unit_name_app_id": 0,
    "netscaler_connection_chain_hop_count": 0,
    "netscaler_connection_chain_id": "00 00 00 00  00 00 00 00  00 00 00 00  00 00 00 00",
    "netscaler_connection_id": 27059599,
    "netscaler_flow_flags": 84025344,
    "netscaler_http_req_cookie": "«Cookies»",
    "netscaler_http_req_host": "«Host»",
    "netscaler_http_req_method": "GET",
    "netscaler_http_req_referer": "«Referrer»",
    "netscaler_http_req_url": "«Request URI»",
    "netscaler_http_req_user_agent": "«User agent»",
    "netscaler_http_res_forw_fb": "2036-02-07T06:28:16.000Z",
    "netscaler_http_res_forw_lb": "2036-02-07T06:28:16.000Z",
    "netscaler_transaction_id": 2958993,
    "observationPointId": 6914017,
    "octetDeltaCount": 682,
    "packetDeltaCount": 1,
    "protocolIdentifier": 6,
    "sourceIPv4Address": "«IP»",
    "sourceTransportPort": 50473,
    "tcpControlBits": 24
}

At this point, its useful to remember that every LINE is a separate JSON document. But the FILE is NOT a valid JSON data-structure, so you can't process the file (or more than one line of the file) using a tool that expects JSON (unless it can handle JSONlines).

Note: did you know that proper JSON says to escape the '/' character? That may come as something of a surprise to you. You'll also perhaps notice that the NetScaler logs already excape the URL according to the Common Logging Format (CLF) convention. I shall perhaps look at decoding them and encoding them as UTF-8, but that is not a priority.

$ tail -2 /tmp/data.json | python -mjson.tool
Extra data: line 2 column 1 - line 3 column 1 (char 911 - 2430)

Running as a daemon

Run this under a system such as SystemD or Supervisord. I'm deploying this on RHEL6 with Supervisord installed from pip (and currently requires Python 2.7, which you can get from Red Hat's SCL channel.

There is an example supervisord configuration in the doc/ directory.

Add a service account

I'll create a local user that this software will run as. That user will only need access to write to the log file. I suggest you set the group permissions for whatever will be reading the logs (eg. nxlog, as shown later). Naturally, change the path to suit your needs.

sudo /usr/sbin/useradd --system --user-group ipfix
sudo install --directory --owner ipfix --group nxlog --mode 0750 /logs/current/ipfix/

Let's see how to run it by hand.

sudo su - ipfix
export LD_LIBRARY_PATH=/opt/libipfix/lib
/opt/libipfix/bin/ipfix_collector -4 -u --json --jsonfile /logs/current/ipfix/data.json --fallback-templates=netscaler

Log files must be rotated

So now we have the process running, and logging data. Logging data needs to be rotated, so let's do that now before we forget and cause a problem later on. Assuming that you're running logrotate, creating a log rotation policy is fairly easy. Note that because we don't record a PID, as a proper daemon will, it may not work if there are multiple such processes found.

# cat /etc/logrotate.d/ipfix
/logs/current/ipfix/data.json {
    nodateext
    rotate 3
    daily
    compress
    delaycompress
    postrotate
        skill -HUP -u ipfix -c ipfix_collector
    endscript
}

Note that I've specified a rather short rotation lifetime, because I'm passing all this to nxlog, and nxlog will be looking after retention. Alter to suit your environment and needs.

Force a rotation and check that a new file has opened. I like to make my files in /etc/logrotate.d/ fairly standalone so I can force a rotation on a particular policy.

logrotate -f /etc/logrotate.d/ipfix

Do something with the data

Where you put the file and what you do with it will depend on your use-case. I will show you how you can use nxlog to tail the file, add some extra information, and send it on to something like Logstash and Elasticsearch, where you can then view it with Kibana.

Read the data with nxlog and forward it to logstash

Note that logstash can tail a file (I believe), but I prefer to have the data go into nxlog, because I set nxlog the task of managing data retention, and it will add some extra data which will help me use the data inside of the rest of my logging system. Nothing about this program requires (or even knows about) nxlog, or ELK. Its only assumption is that you can tail a file where each line is a JSON document.

Here is about the simplest config for nxlog that will read the file, add some extra data (for illustration), and send it to logstash.

<Input in_ipfix_netscalers>
    Module        im_file
    File          "/logs/current/ipfix/data.json"
    SavePos       TRUE
    ReadFromLast  TRUE
    InputType     LineBased
    Exec          parse_json(); \
                  $SITE_application_stack = "some_group_of_netscalers"; \
                  $SITE_log_type = "ipfix_appflow"; \
                  $SITE_environment = "dev"; \
                  $EventTime = parsedate($ipfix_timestamp);
</Input>

# IMPORTANT
# =========
#
# When receiving input as JSON, and then modifying it, beware that the default
# presentation of the output is the same as the input. So, if you add
# SITE_application_stack etc. to the incoming object, and then proceed to write
# it out without having first had to_json() applied to it, you will not get the
# SITE_application_stack attribute added to the outgoing JSON document; this
# messes up the message routing. So remember to apply to_json() in each output
# that is outputting JSON (and similarly for any other such output).
#
<Output out_logstash>
    Module      om_tcp
    Host        mylogstash.example.com
    Port        5140
    Exec        to_json();
</Output>

<Route route_all>
    Path  in_ipfix_netscalers => out_logstash
</Route>

From nxlog, you may not do anything further, but if you like to process things further in the likes of logstash (eg. putting different major systems in different sets of indexes inside Elasticsearch), then you may need something like the followig snippets (treat these as inspiration). Before we go further, just check your nxlog logging in case of an error. You could even verify that it sending data to logstash with tcpdump (assuming that tcpdump is up at the time).

tcpdump -q -p -nn -s0 -i lo -A tcp and port 5140 | grep netscalers

Here's the start of a suitable logstash configuration.

input
{
    tcp
    {
        host => "0.0.0.0"
        port => 5140
        mode => "server"
        codec => "json_lines"
    }
}

filter
{
    # We create a different index for each day, which makes removing old data
    # fairly easy. It also means that we can optimise old indexes (which we
    # shouldn't need to do unless we've deleted documents from an index), or
    # reduce the number of replicas for old data, or change where an index is
    # stored.
    #
    # Some application stacks are very heavy in terms of log volume. To
    # give us more flexibility in how we handle those indexes (such as
    # removing or reducing replica count earlier than we would otherwise),
    # we can put them into different indexes in a case-by-case basis, and
    # the rest will go into a common index.
    #
    # Note that the variable name must be lowercased in the template name
    # (and ONLY in the template name); I think it is interpreted by Elastic
    # Search, not by LogStash, and ES seems to want it lowercase.
    #
    # One symptom of the template not applying is that the .raw attributes,
    # such as username.raw, aren't available.
    #
    if [SITE_application_stack] in ["bigone", "megaone", "netscalers"]
    {
        alter
        {
            add_field =>
            {
                "site_index_basename" => "%{SITE_application_stack}"
            }
        }
    }
    else
    {
        alter
        {
            add_field =>
            {
                "site_index_basename" => "logstash"
            }
        }
    }

    date
    {
        match => ["EventTime", "YYYY-MM-dd HH:mm:ss"]
    }
}

output
{
    # Kibana 4 (up to at least beta 4) requires all nodes to be ES version 1.4.0+,
    # as it doesn't know (although the data is there) how to differentiate a
    # client node
    #
    # Doc: http://logstash.net/docs/1.4.2/outputs/elasticsearch_http
    #
    elasticsearch_http
    {
        host => "127.0.0.1"
        template_name => "%{site_index_basename}"
        index => "%{site_index_basename}-%{+YYYY.MM.dd}"
    }
}

There are other common things you could do, such as geoip lookups and user-agent breakdown, but that's well outside the scope of this document.

If you are sorting things into different groups of indexes, then you may need to do something with your templates in Elasticsearch. Access the REST interface (I suggest using the Koph plugin -- use whatever you are comfortable with) and get the template for 'logstash'.

# curl -XGET localhost:9200/_template/logstash?pretty
{
  "logstash" : {
    "order" : 0,
    "template" : "logstash-*",
    "settings" : {
      "index.refresh_interval" : "30s",
      "index.number_of_replicas" : "1"
    },
    "mappings" : {
      "_default_" : {
        "dynamic_templates" : [ {
          "string_fields" : {
            "mapping" : {
              "index" : "analyzed",
              "omit_norms" : true,
              "type" : "string",
              "fields" : {
                "raw" : {
                  "ignore_above" : 256,
                  "index" : "not_analyzed",
                  "type" : "string"
                }
              }
            },
            "match_mapping_type" : "string",
            "match" : "*"
          }
        } ],
        "properties" : {
          "geoip" : {
            "path" : "full",
            "dynamic" : true,
            "type" : "object",
            "properties" : {
              "location" : {
                "type" : "geo_point"
              }
            }
          },
          "@version" : {
            "index" : "not_analyzed",
            "type" : "string"
          }
        },
        "_all" : {
          "enabled" : true
        }
      }
    },
    "aliases" : { }
  }
}

I increase the refresh interval to about 30s for larger things (this is more efficient). Change the bit where it says "logstash-" to be "netscalers-", and removing the outer layering as shown, PUT the new template.

curl -XPUT localhost:9200/_template/netscalers -d '
{
  "template" : "netscalers-*",
  "settings" : {
    "index.refresh_interval" : "30s",
    "index.number_of_replicas" : "1"
  },
  "mappings" : {
    "_default_" : {
      "dynamic_templates" : [ {
        "string_fields" : {
          "mapping" : {
            "index" : "analyzed",
            "omit_norms" : true,
            "type" : "string",
            "fields" : {
              "raw" : {
                "ignore_above" : 256,
                "index" : "not_analyzed",
                "type" : "string"
              }
            }
          },
          "match_mapping_type" : "string",
          "match" : "*"
        }
      } ],
      "properties" : {
        "geoip" : {
          "path" : "full",
          "dynamic" : true,
          "type" : "object",
          "properties" : {
            "location" : {
              "type" : "geo_point"
            }
          }
        },
        "@version" : {
          "index" : "not_analyzed",
          "type" : "string"
        }
      },
      "_all" : {
        "enabled" : true
      }
    }
  },
  "aliases" : { }
}
'

NOTE: I have not attempted to optimise the mapping that this template would produce. I know there is plenty of work in that area that could be done.

Make sure you get the following output

{"acknowledged":true}

libipfix's People

Contributors

ramasek avatar cameronkerrnz avatar caschmoll avatar flash64 avatar bluca avatar

Stargazers

 avatar

Watchers

 avatar

Forkers

bluca

libipfix's Issues

"record2: msg too short" for Netscaler Appflow record 258 (Netscaler version 11 onwards?)

The output (stdout) from the process says the following. Note that there is evidently an alignment issue.

The destinationIPv4Address would suggest that there have been 16 bits inserted somewhere. Earlier than that, the ipVersion looks doubtful; it should be 4 in healthy messages.

DATA RECORD: 
 template id:  258 
 nfields:      37
 observationPointId: 16777472
 exportingProcessId: 16777410
 flowId: 10001368872483028992
 netscaler_transaction_id: 5952
 netscaler_connection_id: 2835678359
 ipVersion: 114
 protocolIdentifier: 238
 paddingOctets: 0xbb01
 sourceIPv4Address: 242.235.4.6
 destinationIPv4Address: 0.0.10.116
 sourceTransportPort: 8260
 destinationTransportPort: 2626
 packetDeltaCount: 77102093285654528
 octetDeltaCount: 131072
 tcpControlBits: 0
 netscaler_flow_flags: 1343225856
 flowStartMicroseconds: 5506685000838
 flowEndMicroseconds: 15841974686597700742
 ingressInterface: 3688497163
 egressInterface: 123863040
 netscaler_app_name_app_id: 229376
 netscaler_app_unit_name_app_id: 196608
 netscaler_http_res_forw_fb: 2729462849163231232
 netscaler_http_res_forw_lb: 0
 netscaler_connection_chain_id: 0x00000000000000000000000000000000
 netscaler_connection_chain_hop_count: 0
 netscaler_http_req_url: 
 netscaler_http_req_cookie: 
 netscaler_http_req_referer: 
 netscaler_http_req_method: 
 netscaler_http_req_host: 
 netscaler_http_req_user_agent: 
 netscaler_http_content_type: 
 netscaler_http_req_authorization: 
 netscaler_http_req_via: 
 netscaler_http_req_x_forwarded_for: 
 netscaler_http_domain_name: 
[ipfix_decode_datarecord] record3: msg too short
[ipfix_parse_msg] set1: decode record failed: Input/output error

Create a mechanism to persist template definitions based on a whitelist.

The NetScaler, and I suspect probably many others, have a static mapping between Template ID and template definition. So for example, it appears that Template ID 258 is always the HTTP report.

Templates are unique within a transport session and observation domain, but in this use-case, it would (very likely) still be a NetScaler if the transport session was the same but the observation domain was different.

So I propose the following change:

Ideally, we would need a set of rules (a config element) that we can use to determine whether something should have 'netscaler' template definitions assigned. That's kinda complex, and it would be easier to simply add a command-line argument '--netscaler' that just blanket applies the definition (but this would require that only netscalers are sending data to it, which is still a likely use-case and a useful hit-the-ground-running target for this milestone.

The template definitions only come in from the wire, and if a new static template definition was to be added, it might be useful to have the templates stored as PCAP files... hmmm, that would mean that we would need to collect the template for each device, and persist them. Tempting.

If, when seeing a template definition, we were to record that to disk using some hash or combination of source-destination IP address, destination UDP port (ie. transport session) and observation domain, and simply write those packets to disk. For 'security' we could make them sticky (ie. not overwrite them once they are judged to be 'sound' by administrative fiat). When starting up, we could read them.

This needs a bit more thought before committing to a path for this milestone.

Create a mapping to give templates a useful name

Currently JSON exports ipfix_template_id with a (string representation of) an number. That is not very user-friendly, and it would better to have some human-readable identifier so we can easily determine which kind of messages we want to work with.

It would be good to have a (even incomplete) mapping so we can at least say that can be reported as . eg. that template 277 gets reported as being a template called 'NetScaler licence report'

For reference:

{
    "exportingProcessId": 1,
    "ipfix_template_id": "277",
    "netscaler_current_license_consumed": 0,
    "netscaler_license_type": 1,
    "netscaler_max_license_count": 5,
    "observationPointId": 10882033
}

and another one, note how the above contains observationPointId while the one below contains observationDomainId instead:

{
    "ipfix_template_id": "265",
    "netscaler_app_name": "mrr",
    "netscaler_app_name_app_id": 9660,
    "netscaler_app_name_incarnation_number": 48,
    "netscaler_app_template_name": "",
    "observationDomainId": 2
}

One problem being that template IDs are shared, and don't carry a concept of a PEN.

RFC7011 clarifies that Template IDs are only unique within the particular transport session and observation domain

  Template ID

  Each Template Record is given a unique Template ID in the range
  256 to 65535.  This uniqueness is local to the Transport Session
  and Observation Domain that generated the Template ID.  Since
  Template IDs are used as Set IDs in the Sets they describe (see
  Section 3.4.3), values 0-255 are reserved for special Set types
  (e.g., Template Sets themselves), and Templates and Options
  Templates (see Section 3.4.2) cannot share Template IDs within a
  Transport Session and Observation Domain.  There are no
  constraints regarding the order of the Template ID allocation.  As
  Exporting Processes are free to allocate Template IDs as they see
  fit, Collecting Processes MUST NOT assume incremental Template
  IDs, or anything about the contents of a Template based on its
  Template ID alone.

and elsewhere in that RFC

Different Observation Domains within a Transport Session MAY use the
same Template ID value to refer to different Templates; Collecting
Processes MUST properly handle this case.

Definitions from RFC

   Observation Point

      An Observation Point is a location in the network where packets
      can be observed.  Examples include a line to which a probe is
      attached; a shared medium, such as an Ethernet-based LAN; a single
      port of a router; or a set of interfaces (physical or logical) of
      a router.

      Note that every Observation Point is associated with an
      Observation Domain (defined below) and that one Observation Point
      may be a superset of several other Observation Points.  For
      example, one Observation Point can be an entire line card.  That
      would be the superset of the individual Observation Points at the
      line card's interfaces.

   Observation Domain

      An Observation Domain is the largest set of Observation Points for
      which Flow information can be aggregated by a Metering Process.
      For example, a router line card may be an Observation Domain if it
      is composed of several interfaces, each of which is an Observation
      Point.  In the IPFIX Message it generates, the Observation Domain
      includes its Observation Domain ID, which is unique per Exporting
      Process.  That way, the Collecting Process can identify the
      specific Observation Domain from the Exporter that sends the IPFIX
      Messages.  Every Observation Point is associated with an
      Observation Domain.  It is RECOMMENDED that Observation Domain IDs
      also be unique per IPFIX Device.

I think I'll put this one on hold until I can expose the software to some production data.

ipfix_print_newmsg outputs faulty timestamp

From a debugging trace, we see a very wrong timestamp.

IPFIX-HDR:
 version=10, length=1425
 unixtime=1425527867 (4429280-03-28 11:44:11 NZDT)
...

From the code:

/** netflow9 header format
 **  0                   1                   2                   3
 **    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 **   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 **   |       Version Number          |            Count              |
 **   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 **   |                           sysUpTime                           |
 **   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 **   |                           UNIX Secs                           |
 **   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 **   |                       Sequence Number                         |
 **   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 **   |                        Source ID                              |
 **   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 */
/** ipfix header format
 **  0                   1                   2                   3
 **    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 **   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 **   |       Version Number          |            Length             |
 **   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 **   |                         Export Time                           |
 **   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 **   |                       Sequence Number                         |
 **   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 **   |                     Observation Domain ID                     |
 **   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 */
typedef struct {
    uint16_t   version;     /* version of Flow Record format of this packet */
    union {
        struct {
            uint16_t   count;       /* total number of record in this packet */
            uint32_t   sysuptime;   /* sysuptime in milliseconds */
            uint32_t   unixtime;    /* seconds since 1970 */
        } nf9;
        struct {
            uint16_t   length;      /* total number of record in this packet */
            uint32_t   exporttime;  /* seconds since 1970 */
        } ipfix;
    } u;
    uint32_t   seqno;       /* incremental sequence counter */
    uint32_t   sourceid;    /* sourceid / observation domain id */

} ipfix_hdr_t;

I can't test for NetFlow 9, but I suspect its just for IPFIX.

The following shows that the value of u.ipfix.exporttime is correct:

$ date --date="@1425527867"
Thu Mar  5 16:57:47 NZDT 2015

So the problem lies in the formulation of timebuf

I think the issues is a casting one.

The problem seems to be how we're passing in the export time. We're casting a always 32-bit unsigned value to a variable sized unsigned long.

There is likely surrounding code that has similar bugs and fit for auditing.

Emit a timestamp for each log message; don't rely on others.

Where the logs are picked up by a log shipper such as nxlog, logstash, or others, they would currently be putting in the timestamp themselves. This is a problem if it is having to back-fill logs or catch-up (perhaps due to a restart), so we shall have to emit the timestamp ourselves (in such a way that others won't have to).

Eg. In nxlog, you might end up having the following in the relayed JSON (ie. if having nxlog call to_json() on its input).

{"EventReceivedTime":"2015-03-05 13:05:44", ...

It also gives us EventTime (if nxlog was able to find a timestamp), but in this case, we don't have one, so we only have EventReceivedTime

Nxlog can use whatever we like. Eg.

$EventTime = parsedate($date + " " + $time);

So the attribute name is entirely flexible, and the format is open to many standard formats (see http://nxlog.org/documentation/nxlog-community-edition-reference-manual-v20928#core_func_parsedate)

My preferred format would be either of:

1977-09-06T01:02:03.004Z
1977-09-06T01:02:03.004+02:00

The numeric offset from GMT being a bit more user-friendly and easier for a user (in that timezone at least) to validate. But given that NetScalers can easily span multiple timezones in various deployments, and that the amount of work to convert between timezones is not spectacularly small, the case could be made for the GMT format (not least being that's what ElasticSearch would expect, and the code would be simpler). So in the interest of getting the job done, I'll go with GMT (Zero timezone offset).

For performance reasons, we'll generate and cache the time stamp at the beginning of each incoming message.

Use gettimeofday(&tv, NULL) to get the time in seconds+microseconds since the epoch.
Use struct tm *gmtime_r(const time_t *timep, struct tm *result) to convert to broken-down time in GMT.
Use strftime("%Y-%m-%dT%H:%M:%S") to print most of the timestamp.
Use snprintf("%03dZ", tv.tv_usec * 1000) to append to the previous string.

Alternatively, we could potentially use time() to get the seconds since the Epoch, and then populate the tv structure ourselves (setting tv_usec to 0). If we don't care about sub-second precision, then that might allow for some optimisation in reducing the number of system-calls. (eg. have a signal handler connected to a timer that fires once a second). But I don't like using signals when I don't need to...

Where should we put this code though? We could put it in ipfix_parse_msg, but that does assume that we're reading messages 'live'.... that seems reasonable though as we don't have a mechanism to read from a file. But presumably we might be able to read from a captured packet stream, in which case timestamp information would come from the packet stream, not the global system state. That might point the onus on providing timestamp information to the caller of ipfix_parse_msg, as the caller knows where the data came from. Callers of ipfix_parse_msg include process_client_udp, process_client_tcp, process_client_sctp and process_client_ssl at this time.

I'm not sure how many pieces of software make use of libipfix at this stage, but I don't think its many. I'm inclined to simply modify the (public function) ipfix_parse_msg to accept a struct tv *, which, if non-NULL, should be a provided timestamp source, and if NULL, is a signal to ipfix_parse_msg to generate one itself (if it deigns it useful to do) -- perhaps by something in the exporter's callback table.

Omit JSON attributes if value is an empty string

eg.

"netscaler_http_req_url": "",
"netscaler_http_req_cookie": "",
"netscaler_http_req_referer": "",
"netscaler_http_req_method": "",
"netscaler_http_req_host": "",
"netscaler_http_req_user_agent": "",
"netscaler_http_content_type": "",
"netscaler_http_req_authorization": "",
"netscaler_http_req_via": "",
"netscaler_http_req_x_forwarded_for": "",
"netscaler_aaa_username": "",

Having a lot of "" entries (which are not the same as missing -- but is null the same as missing?) makes it harder to exclude such things from terms panels in Kibana.

With the NetScalers, at least, you can select which items to export (eg. HTTP Referrer), but this just says which parts of the template to fill in.

DB table ipfix_messages is still being added to per-message with JSON output

I think we need to modify the ipfix_export_newmsg_db function so that it only adds a record to the database if the message is a template, and not a data. Unfortunately, we can't get this determination from the header of the IPFIX message.

int ipfix_export_newmsg_db( ipfixs_node_t *s, ipfix_hdr_t *hdr, void *arg )
{
    ipfixe_data_db_t *data = (ipfixe_data_db_t*)arg;

    if ( data->mysql ) {
        snprintf( query, MAXQUERYLEN,
                  "INSERT INTO `%s` SET `%s`='%u', `%s`='%lu'",
                  IPFIX_DB_MESSAGETABLE,
                  IPFIX_DB_MSGT_EXPID, s->exporterid, IPFIX_DB_MSGT_TIME,
                  (hdr->version==IPFIX_VERSION_NF9)?
                  (u_long)hdr->u.nf9.unixtime:(u_long)hdr->u.ipfix.exporttime );

        if ( mysql_query( data->mysql, query ) !=0 ) {
            mlogf( 0, "[export_newmsg_db] mysql_query(%s) failed: %s\n",
                   query, mysql_error(data->mysql) );
            return -1;
        }
        s->last_msgid = (unsigned int) mysql_insert_id( data->mysql );
    }

    return 0;
}

This gets called from ipfix_col.c

/*
 * name:        ipfix_export_hdr()
 * parameters:
 * return:      0/-1
 */
int ipfix_export_hdr( ipfixs_node_t *s, ipfix_hdr_t *hdr )
{
    ipfixe_node_t *e;
    int           retval=0;

    if ( !hdr || !s)
        return -1;

    /** call exporter funcs
     */
    for ( e=g_exporter; e!=NULL; e=e->next ) {
        if ( e->elem->export_newmsg )
            if ( e->elem->export_newmsg( s, hdr, e->elem->data ) <0 )
                retval=-1;
    }

    return retval;
}

But does ipfix_export_hdr know if this is a data or template message? No, its called in ipfix_parse_msg function just before it gets into processing the sets (data/template) in each message.

To implement, we could:

  1. Add a local variable to track if we have run ipfix_export_hdr for this message.
  2. Condition the call of ipfix_exprt_hdr to be run unless we're outputting as JSON:
        /** read rest of ipfix message
         */
        if ( (setid == IPFIX_SETID_TEMPLATE_NF9)
             || (setid == IPFIX_SETID_OPTTEMPLATE_NF9)
             || (setid == IPFIX_SETID_TEMPLATE)
             || (setid == IPFIX_SETID_OPTTEMPLATE) ) {
  1. In the 'template' branch, IF we're outputting as JSON, THEN run ipfix_export_hdr if we haven't already.

Although it might be long-term better to change the callback structure so that a similar callback can be registered (export_set) that would be called at the beginning of each set. Collectors would then need to determine if they have already done something with this message. This would be cleaner in the calling logic also, and cleaner from an API point of view.

Emit events via JSON to capture 'no template for XXX, skip data set' so we can see data loss

IPFIX-HDR:
 version=10, length=1425
 unixtime=1425336290 (4461672-07-13 09:38:42 NZST)
 seqno=2732, odid=2
[ipfix_parse_msg] no template for 262, skip data set
[ipfix_parse_msg] no template for 257, skip data set
[ipfix_parse_msg] no template for 258, skip data set
[ipfix_parse_msg] no template for 257, skip data set
[ipfix_parse_msg] no template for 258, skip data set
[ipfix_parse_msg] no template for 262, skip data set
[ipfix_parse_msg] no template for 257, skip data set
[ipfix_parse_msg] no template for 258, skip data set

This is annoying, because with default settings, you can lose a lot of data. I rather suspect that it would be useful to pre-load templates for some probes, as they will likely be the same for some devices, such as the NetScaler (ie. 258 is always an HTTP report)

Be able to specify custom static attributes to add to each message.

This could be useful in the likes of logstash for message processing. For example, you could use it to specify which environment the data pertains to, or perhaps you might use it to steer data into a different set of indexes in ElasticSearch, or to help drive data retention policies.

Log IP address of device sending the messages

This is useful when you have multiple devices sending data.

I haven't verified it, but I suspect that the Template ID doesn't have a enterprise number; so I would caution against having one receiver receive data from multiple different vendors; lest you get some confusion over what a particular template ID is.

Failing to create table due to duplicate paddingOctets IE

Sample from logs

...
  field25: ie=0.210, len=1 (paddingOctets)
  field26: ie=0.210, len=2 (paddingOctets)
...
[ipfix_db] CREATE TABLE ipfix_14 ( ... , ie0_d2 VARBINARY(4096) , ie0_d2 VARBINARY(4096) , ...  )
[ipfix_db] mysql_query() failed: Duplicate column name 'ie0_d2'
[export_trecord_db] cannot build table name for template ...

Report template flush events via JSON

Occassionally, when looking at the debug output, you see:

[ipfix_col] drop template 0:279
[ipfix_col] drop template 0:278
[ipfix_col] drop template 0:277
[ipfix_col] drop template 0:276
[ipfix_col] drop template 0:275
[ipfix_col] drop template 0:274
[ipfix_col] drop template 0:273
[ipfix_col] drop template 0:272
[ipfix_col] drop template 0:271
[ipfix_col] drop template 0:270
[ipfix_col] drop template 0:269
[ipfix_col] drop template 0:267
[ipfix_col] drop template 0:266
[ipfix_col] drop template 0:265
[ipfix_col] drop template 0:264
[ipfix_col] drop template 0:263
[ipfix_col] drop template 0:262
[ipfix_col] drop template 0:261
[ipfix_col] drop template 0:260
[ipfix_col] drop template 0:259
[ipfix_col] drop template 0:258
[ipfix_col] drop template 0:257
[ipfix_col] drop template 0:256

Like #22, it would be useful to have some sort of status-lock on the templates: pre-loading or cache-locking at least would be useful.

Completely divorce the JSON emitter from MySQL, making a separate collector

This follows on from my realisation in #19

Completely divorce the JSON emitter from MySQL. Undo any such changes to MySQL collector. Make the JSON collector a separate collector in its own right (perhaps starting from the MySQL collector, and then gutting it appropriately).

Undo JSON related changes to the MySQL collector.

Remove "reverse " prefix for generated attribute names.

The most common use case (to my mind) for reverse attribute names is to turn them into things like column names (when IENAME_COLUMNS is in use), or (eventually) to turn it into JSON attribute names).

Adding "reverse " (note the inclusion of a space) creates issues, and makes it much more likely to hit a limit in MySQL identifier name length.

Eg.

+----------------------------------------------+---------------------+------+-----+---------+-------+
| Field                                        | Type                | Null | Key | Default | Extra |
+----------------------------------------------+---------------------+------+-----+---------+-------+
| id_ipfix_messages                            | int(10) unsigned    | NO   |     | NULL    |       |
| observationPointId                           | int(10) unsigned    | YES  |     | NULL    |       |
| exportingProcessId                           | int(10) unsigned    | YES  |     | NULL    |       |
| flowId                                       | bigint(20) unsigned | YES  |     | NULL    |       |
| reverse netscaler_transaction_id             | int(10) unsigned    | YES  |     | NULL    |       |
| reverse netscaler_connection_id              | int(10) unsigned    | YES  |     | NULL    |       |
| ipVersion                                    | int(10) unsigned    | YES  |     | NULL    |       |
| protocolIdentifier                           | int(10) unsigned    | YES  |     | NULL    |       |
| sourceIPv4Address                            | varbinary(4096)     | YES  |     | NULL    |       |
| destinationIPv4Address                       | varbinary(4096)     | YES  |     | NULL    |       |
| sourceTransportPort                          | int(10) unsigned    | YES  |     | NULL    |       |
| destinationTransportPort                     | int(10) unsigned    | YES  |     | NULL    |       |
| packetDeltaCount                             | bigint(20) unsigned | YES  |     | NULL    |       |
| octetDeltaCount                              | bigint(20) unsigned | YES  |     | NULL    |       |
| tcpControlBits                               | int(10) unsigned    | YES  |     | NULL    |       |
| reverse netscaler_flow_flags                 | bigint(20) unsigned | YES  |     | NULL    |       |
| flowStartMicroseconds                        | varbinary(4096)     | YES  |     | NULL    |       |
| flowEndMicroseconds                          | varbinary(4096)     | YES  |     | NULL    |       |
| ingressInterface                             | int(10) unsigned    | YES  |     | NULL    |       |
| egressInterface                              | int(10) unsigned    | YES  |     | NULL    |       |
| reverse netscaler_app_name_app_id            | int(10) unsigned    | YES  |     | NULL    |       |
| reverse netscaler_app_unit_name_app_id       | int(10) unsigned    | YES  |     | NULL    |       |
| reverse netscaler_http_res_forw_fb           | varbinary(4096)     | YES  |     | NULL    |       |
| reverse netscaler_http_res_forw_lb           | varbinary(4096)     | YES  |     | NULL    |       |
| reverse netscaler_connection_chain_id        | varbinary(4096)     | YES  |     | NULL    |       |
| reverse netscaler_connection_chain_hop_count | int(10) unsigned    | YES  |     | NULL    |       |
| reverse netscaler_aaa_username               | text                | YES  |     | NULL    |       |
| reverse netscaler_http_req_url               | text                | YES  |     | NULL    |       |
| reverse netscaler_http_req_cookie            | text                | YES  |     | NULL    |       |
| reverse netscaler_http_req_referer           | text                | YES  |     | NULL    |       |
| reverse netscaler_http_req_method            | text                | YES  |     | NULL    |       |
| reverse netscaler_http_req_host              | text                | YES  |     | NULL    |       |
| reverse netscaler_http_req_user_agent        | text                | YES  |     | NULL    |       |
| reverse netscaler_http_content_type          | text                | YES  |     | NULL    |       |
| reverse netscaler_http_req_authorization     | text                | YES  |     | NULL    |       |
| reverse netscaler_http_req_via               | text                | YES  |     | NULL    |       |
| reverse netscaler_http_req_x_forwarded_for   | text                | YES  |     | NULL    |       |
| reverse netscaler_http_domain_name           | text                | YES  |     | NULL    |       |
+----------------------------------------------+---------------------+------+-----+---------+-------+
38 rows in set (0.00 sec)

(This was evident after fixing the code to at least surround the identifiers with a backtick -- still not proper, but at least now it doesn't self-destruct)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.