Git Product home page Git Product logo

goflow2's People

Contributors

aams-eam avatar amorenoz avatar bswinnerton avatar dependabot[bot] avatar i3149 avatar iqbalaydrus avatar jotak avatar leoluk avatar loganmc10 avatar lspgn avatar mieczkowski avatar oliviercazade avatar packetslave avatar s-chekanov avatar shyam334-w1 avatar simpod avatar vincentbernat avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

goflow2's Issues

Detect missing flows using NetFlow5/NetFlow9/IPFIX Sequence Number

Hi,

We are using goflow2 as library and trying to see if there are ways to to detect that some flows are missing NetFlow5/NetFlow9/IPFIX Sequence Number.

It seems there are currently no easy reliable way to detect if sFlow/NetFlow/IPFIX packets have been dropped between the device/exporter sending the flow and goflow2 collector.

Should goflow2 provide a way to monitor/report if packets are missing using Sequence Number ?


Sequence Number behaviour is a bit different for NetFlow 5 and NetFlow 9:

Sequence Number meaning in NetFlow9:
Incremental sequence counter of all export packets sent by this export device;
this value is cumulative, and it can be used to identify whether any export packets have been missed.
Note: This is a change from the NetFlow Version 5 and Version 8 headers, where this number represented "total flows."
nfdump example: https://github.com/phaag/nfdump/blob/b0c7a5ec2e11a683460b312ba192bc00590c4acd/bin/netflow_v5_v7.c#L382-L395

Sequence Number in NetFlow 5, 6, 7, and Version 8:
The sequence number is equal to the sequence number of the previous datagram plus the number of flows in the
previous datagram. After receiving a new datagram, the receiving application can subtract the expected
sequence number from the sequence number in the header to derive the number of missed flows.
nfdump example: https://github.com/phaag/nfdump/blob/28ad878ac807e82fb95a77df6fc9b98000bcc81c/bin/netflow_v9.c#L2094-L2106


Since the field SequenceNum is part of FlowMessage, I tried to used it to detect if flows have been dropped, but does not work well for NetFlow 9.

For NetFlow 5, it should work since SequenceNum represent the number of total flows. We can compare the SequenceNum change to actual number of FlowMessage.

For NetFlow 9, it seems that analysing FlowMessage.SequenceNum might not work since template flows are counted in sequence number but not reported as FlowMessage.
Example: https://github.com/Graylog2/graylog-plugin-netflow/blob/master/src/test/resources/netflow-data/nprobe-netflow9-3.pcap

IPFix data only on IPv6 ?

Hi there,

I capture IPFiX data on Port 2055, template will be send every 5 minutes... if I start goflow2 with the parameters:

goflow2 -loglevel="debug" -transport.file="./goflow2.output2" -listen="sflow://[IPV4_IP]:6343,netflow:///[IPV4_IP]:4739"

I see only packets with EtypeName:"IPv6" like that:

{"Type":"IPFIX","TimeReceived":1658818721,"SequenceNum":1466255583,"SamplingRate":1,"SamplerAddress":"111.111.111.111,"TimeFlowStart":1658818606,"TimeFlowEnd":1658818606,"TimeFlowStartMs":1658818606182,"TimeFlowEndMs":1658818606182,"Bytes":143,"Packets":1,"**SrcAddr":"::","DstAddr":"::"**,"Etype":34525,"Proto":17,"SrcPort":58635,"DstPort":27016,"InIf":0,"OutIf":0,"SrcMac":"b0:a8:6e:7c:07:c2","DstMac":"30:b6:4f:e0:22:ce","SrcVlan":0,"DstVlan":0,"VlanId":0,"IngressVrfID":1187,"EgressVrfID":0,"IPTos":32,"ForwardingStatus":0,"IPTTL":0,"TCPFlags":0,"IcmpType":0,"IcmpCode":0,"IPv6FlowLabel":0,"FragmentId":0,"FragmentOffset":0,"BiFlowDirection":0,"SrcAS":0,"DstAS":0,"NextHop":"::","NextHopAS":0,"SrcNet":0,"DstNet":0,"**EtypeName":"IPv6"**,"ProtoName":"UDP","IcmpName":""}

And I see no IP addresses

But I know that these packets are IPv4 ( crosschecked with pmacct ). I ran goflow2 20 minutes, a template have to be send. That was crosschecked with pmacct as well. I made a traffic dump to compare the tools output..

I can see that goflow2 captures not all flows. Goflow2 around 11500 - PMACCT 12886.

My suspect is that goflow2 thinks that is all IPv6 and can't read the IP addresses if a IPFIX flow is IPv4.

If a flow is IPv6 then it works..

Do I something wrong ?

Does anybody has an idea ?

Help would be very appreciated..

thanks and best regards

Christian

[Security] Workflow docker.yaml is using vulnerable action actions/checkout

The workflow docker.yaml is referencing action actions/checkout using references v1. However this reference is missing the commit a6747255bd19d7a757dbdda8c654a9f84db19839 which may contain fix to the some vulnerability.
The vulnerability fix that is missing by actions version could be related to:
(1) CVE fix
(2) upgrade of vulnerable dependency
(3) fix to secret leak and others.
Please consider to update the reference to the action.

Making fields of "ErrorTemplateNotFound" exportable

Hi! I need to store Flow Data Records in the case there are still not Template Records yet, as it says here. I read the code of Netflow decoder and did not find any handler that processes this situation, but throwing the ErrorTemplateNotFound. I think I can catch ErrorTemplateNotFound when I try decode a payload and store the payload with the corresponding Template Data (Version, DomainId, TemplateId). The problem is I can't use fields of ErrorTemplateNotFound because they are not exportable.

Is it possible to make these fields exportable? I think I could make PR

enricher mismatching end group marker

Hello!
I'm trying to map IP to AS with enricher, but got this errors:

ERRO[0002] unexpected EOF                               
ERRO[0002] proto: invalid field number                  
ERRO[0002] proto: mismatching end group marker          
ERRO[0002] unexpected EOF                               
ERRO[0002] proto: invalid field number                  
ERRO[0002] proto: mismatching end group marker          
ERRO[0002] unexpected EOF                               
ERRO[0002] proto: invalid field number                  
ERRO[0002] proto: mismatching end group marker          
ERRO[0002] unexpected EOF                               
ERRO[0002] proto: invalid field number     

GoFlow2 v1.0.4 (2021-06-12T16:48:35+0000)

I run this pipe as follow:
goflow2 -format=pb | ./enricher -db.asn=/var/lib/geoip/GeoLite2-ASN.mmdb -db.country=/var/lib/geoip/GeoLite2-Country.mmdb

And if I understand correctly, the full pipe must look like this, if I want to map IP to AS:
sflow_stream -> goflow2 -> enricher -> kafka -> ch
Am I correct?

Support for Netflow v5

Any plans to support Netflow v5 for goflow2? We are receiving errors in the goflow2 adapter when attempting to deserialize netflow v5 traffic using the latest build of goflow2.

time="2022-07-20T12:17:31Z" level=error msg="Error from: NetFlow (1) duration: 2.79µs. NetFlow/IPFIX version error: 5"

Bug With Parsing Custom Fields In Yaml

I'm trying to follow the example in parsing a custom mapping file for ipfix at https://github.com/netsampler/goflow2/blob/main/cmd/goflow2/main.go#L73

The parsed config file doesn't have any Type values because there's no yaml name tag for this value. The rest of the fields work because the struct name is the same as the yaml name. Below diff fixes things for me:

diff --git a/producer/reflect.go b/producer/reflect.go
index 6655870..1b91b0b 100644
--- a/producer/reflect.go
+++ b/producer/reflect.go
@@ -73,11 +73,11 @@ func MapCustom(flowMessage *flowmessage.FlowMessage, v []byte, destination strin
 }
 
 type NetFlowMapField struct {
-       PenProvided bool   `json:"penprovided"`
-       Type        uint16 `json:"field"`
-       Pen         uint32 `json:"pen"`
+       PenProvided bool   `json:"penprovided" yaml:"penprovided"`
+       Type        uint16 `json:"field" yaml:"field"`
+       Pen         uint32 `json:"pen" yaml:"pen"`
 
-       Destination string `json:"destination"`
+       Destination string `json:"destination" yaml:"destination"`
        //DestinationLength uint8  `json:"dlen"` // could be used if populating a slice of uint16 that aren't in protobuf
 }

Strange result in TcpFlags

Hi,

I got strange values in TcpFlags. According to the documentation, the total sum of all flags can be 63 but i got strange values:

SELECT                                                                                                                                                                                                                                                                                                                       
    multiIf(
        EType = 2048, IPv4NumToString(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4))), 
        EType = 34525, IPv6NumToString(SrcAddr), 
    '') AS SrcAddr,                                                                                                                                                                 
    multiIf(
        EType = 2048, IPv4NumToString(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4))), 
        EType = 34525, IPv6NumToString(DstAddr), 
    '') AS DstAddr,                                                                                                                                                                 
    TCPFlags                                                                                                                                                                                                                                                                                                                 
FROM kafka                                                                                                                                                                                                                                                                                                                   
WHERE TCPFlags > 32        


┌─SrcAddr────────┬─DstAddr────────┬─TCPFlags─┐                                                                                                                                                                                                                                                                               
│ 91.145.253.180 │ 46.151.199.42  │      245 │                                                                                                                                                                                                                                                                               
│ 91.145.217.139 │ 94.176.199.211 │      202 │                                                                                                                                                                                                                                                                               
│ 62.122.202.122 │ 194.44.64.42   │      194 │                                                                                                                                                                                                                                                                               
│ 91.145.198.194 │ 188.190.54.223 │      238 │                                                                                                                                                                                                                                                                               
│ 91.145.232.239 │ 94.176.199.211 │      212 │                                                                                                                                                                                                                                                                               
│ 91.145.238.141 │ 94.176.199.215 │      255 │                                                                                                                                                                                                                                                                               
│ 91.145.218.50  │ 94.176.199.212 │       60 │                                                                                                                                                                                                                                                                               
│ 176.37.98.58   │ 77.120.247.111 │      194 │                                                                                                                                                                                                                                                                               
│ 91.145.244.58  │ 94.176.199.213 │       94 │                                                                                                                                                                                                                                                                               
│ 35.241.52.229  │ 46.175.253.143 │       82 │                                                                                                                                                                                                                                                                               
└────────────────┴────────────────┴──────────┘ 

SCTP support in ProtoName

Please add SCTP support for ProtoName in IPFIX/Netflowv9/sFlow, currently collector report Null:
"ProtoName": "",

enricher with kcg pipeline

I am trying to use the enricher with the flow-pipeline of kcg
This is what I am doing (Almost sure it is not the best thing )
in the Dockerfile i am building the enricher and add it to the image
and I am running the following command

./goflow2 -transport.file.sep= -format=pb -format.protobuf.fixedlen=true | ./enricher -transport.kafka.brokers=kafka:9092 -transport=kafka -transport.kafka.topic=flows -format=pb -format.protobuf.fixedlen=true -db.asn=/etc/clickhouse-server/GeoLite2-ASN.mmdb -db.country=/etc/clickhouse-server/GeoLite2-Country.mmdb

But it looks like it is printing the output to STDOUT instead to send it to Kfaka (I don't see a connection in netstat )

Netflow v5 problem

IPFIX seems to work just fine, however when feeding goflow2 with netflow v5 data, I've got: "ERRO[0027] Error from: NetFlow (0) duration: 1.581644ms. NetFlow/IPFIX version error: 5". Tested on OpenBSD with native pflow exporter, however nfcapd digests v5 data without issues, so I believe that's not an OpenBSD implementation quirk. Did I miss some config options?

Add ARM builds

Likely just Makefile and GitHub actions configurations.
Would also need to do do a docker manifest create for Docker.

utils.go udpDataCh: panic: send on closed channel

Hi 👋,

I just encountered this error.

panic: send on closed channel

goroutine 405 [running]:
github.com/netsampler/goflow2/utils.UDPStoppableRoutine.func1()
	go/pkg/mod/github.com/netsampler/[email protected]/utils/utils.go:171 +0x9a
created by github.com/netsampler/goflow2/utils.UDPStoppableRoutine
	go/pkg/mod/github.com/netsampler/[email protected]/utils/utils.go:164 +0x952

It seems that udpDataCh might be closed here https://github.com/netsampler/goflow2/blob/main/utils/utils.go#L184

But data will still be pushed to it from here https://github.com/netsampler/goflow2/blob/main/utils/utils.go#L171

Refactor the MPLS protobuf

Right now 3 fields in the protobuf are allocated.
Should change into a list.

Could also embed the exp/s flags.

Support for RFC 6313 Flow (Cisco VZFLOW)

RFC 6313 (https://www.rfc-editor.org/rfc/rfc6313.html) defines an extension to IPFIX which includes more complex data structures like lists.

Cisco seems to be using this for their nvzFlow product -- https://blogs.cisco.com/security/an-introduction-to-the-new-cisco-network-visibility-flow-protocol-nvzflow.

Right now, when VZFLOW is sent to Goflow it results in errors like:

2022-08-31T22:15:17.823 ktranslate/netflow9 [Error] flow Error from: NetFlow (0) duration: 40.168µs. NetFlow/IPFIX version error: 25701

I also found an elasitflow bug on this but I'm not sure of the fix -- robcowart/elastiflow#343.

incorrect display ip address in clickhouse

i started the project Flows + Kafka + Clicklhouse + Grafana + Prometheus. Run generate traffic with hsflowd service. In clickhouse incorrect display ip in field (SamplerAddress, SrcAddr, DstAddr).
Display value as
d�� �Rr� �U�^
icorrect_ip

Destination mac address capture behavior

Hi, I'm not exactly sure if this is a bug or the correct behavior.
The issue is that when I capture flow of L2 traffic from my Nexus 6k, i get attribute DstMaс in proto is zero

As I was able to determine, this is due to the fact that here

// Mac
case netflow.NFV9_FIELD_IN_SRC_MAC:
DecodeUNumber(v, &(flowMessage.SrcMac))
case netflow.NFV9_FIELD_OUT_DST_MAC:
DecodeUNumber(v, &(flowMessage.DstMac))

For dstMas, 57 type of field is used, while apparently 80 should be used

I changed this in my code, and now I get the correct mac-addresses.

// Mac
case netflow.NFV9_FIELD_IN_SRC_MAC:
  DecodeUNumber(v, &(flowMessage.SrcMac))
case netflow.NFV9_FIELD_IN_DST_MAC:
  DecodeUNumber(v, &(flowMessage.DstMac))

Please tell me if this is normal or why the original uses Outgoing destination MAC address for the destination MAC address field

Externalising template cache

The feature request is to have the ability to externalise the template cache,
such that when goflow restarts, it can immediately pick up the cache and serve the data. (minimising recovery time)

As an extension, perhaps we could use the mechanism to create a HA solution,
but I'd think that's a seperate piece of work.

Noticed that there is already some work done in the related area.

What are your thoughts.

P.S.
Great to see the project being actively developed!

IPFIX (with OVS) not working

Hello,

I tried goflow2/ovs with sflow and it is working fine. With IPFIX it is throwing the following:

fatal error: runtime: out of memory


runtime stack:
runtime.throw(0x9ae1e7, 0x16)
        /usr/local/go/src/runtime/panic.go:1117 +0x72
runtime.sysMap(0xc164000000, 0x58000000, 0xd561f0)
        /usr/local/go/src/runtime/mem_linux.go:169 +0xc6
runtime.(*mheap).sysAlloc(0xd3cca0, 0x55800000, 0x42dc77, 0xd3cca8)
        /usr/local/go/src/runtime/malloc.go:729 +0x1e5
runtime.(*mheap).grow(0xd3cca0, 0x2abf6, 0x0)
        /usr/local/go/src/runtime/mheap.go:1346 +0x85
runtime.(*mheap).allocSpan(0xd3cca0, 0x2abf6, 0x0, 0x7f8680e73d28)
        /usr/local/go/src/runtime/mheap.go:1173 +0x609
runtime.(*mheap).alloc.func1()
        /usr/local/go/src/runtime/mheap.go:910 +0x59
runtime.systemstack(0x0)
        /usr/local/go/src/runtime/asm_amd64.s:379 +0x66
runtime.mstart()
        /usr/local/go/src/runtime/proc.go:1246

goroutine 39 [running]:
runtime.systemstack_switch()
        /usr/local/go/src/runtime/asm_amd64.s:339 fp=0xc000099290 sp=0xc000099288 pc=0x46db40
runtime.(*mheap).alloc(0xd3cca0, 0x2abf6, 0x100, 0x8f4ae0)
        /usr/local/go/src/runtime/mheap.go:904 +0x85 fp=0xc0000992e0 sp=0xc000099290 pc=0x429925
runtime.(*mcache).allocLarge(0x7f86b9f52108, 0x557ec000, 0xc000030001, 0x0)
        /usr/local/go/src/runtime/mcache.go:224 +0x97 fp=0xc000099338 sp=0xc0000992e0 pc=0x419f97
runtime.mallocgc(0x557ec000, 0x92e2e0, 0xc00003cc01, 0x0)
        /usr/local/go/src/runtime/malloc.go:1078 +0x925 fp=0xc0000993c0 sp=0xc000099338 pc=0x40f9e5
runtime.growslice(0x92e2e0, 0xc0f5480000, 0x16cc755, 0x16cc755, 0x16cc756, 0xc1615ea738, 0x1, 0x1)
        /usr/local/go/src/runtime/slice.go:230 +0x1e9 fp=0xc000099428 sp=0xc0000993c0 pc=0x4515a9
github.com/netsampler/goflow2/decoders/netflow.DecodeOptionsDataSet(0xc00000000a, 0xc0000997c8, 0xc0002a8750, 0x1, 0x1, 0xc0002a87a0, 0x1, 0x1, 0x20300000000000, 0x7f8693620fff, ...)
        /build/decoders/netflow/netflow.go:246 +0x47b fp=0xc000099530 sp=0xc000099428 pc=0x85cb1b
github.com/netsampler/goflow2/decoders/netflow.DecodeMessage(0xc0002a1ef0, 0xa3eea0, 0xc00000e6c0, 0xa, 0xc00009ae08, 0xd25001, 0x8de0)
        /build/decoders/netflow/netflow.go:464 +0x991 fp=0xc000099a38 sp=0xc000099530 pc=0x85e031
github.com/netsampler/goflow2/utils.(*StateNetFlow).DecodeFlow(0xc0001a0640, 0x961520, 0xc00019ff20, 0x18558eba, 0xb56c5fe8f5)
        /build/utils/netflow.go:102 +0x4ff fp=0xc000099e78 sp=0xc000099a38 pc=0x87127f
github.com/netsampler/goflow2/utils.(*StateNetFlow).DecodeFlow-fm(0x961520, 0xc00019ff20, 0xd250e0, 0x1)
        /build/utils/netflow.go:64 +0x47 fp=0xc000099eb0 sp=0xc000099e78 pc=0x8794c7
github.com/netsampler/goflow2/decoders.Worker.Start.func1(0x0, 0xc000188670, 0x9cac58, 0xc000188690, 0xc0001cc000, 0x9a5e3f, 0x7, 0xc0001cc060, 0xc0001cc0c0)
        /build/decoders/decoder.go:48 +0x168 fp=0xc000099f98 sp=0xc000099eb0 pc=0x86fae8
runtime.goexit()
        /usr/local/go/src/runtime/asm_amd64.s:1371 +0x1 fp=0xc000099fa0 sp=0xc000099f98 pc=0x46f981
created by github.com/netsampler/goflow2/decoders.Worker.Start
        /build/decoders/decoder.go:39 +0x70

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc0001b2238)
        /usr/local/go/src/runtime/sema.go:56 +0x45
sync.(*WaitGroup).Wait(0xc0001b2230)
        /usr/local/go/src/sync/waitgroup.go:130 +0x65
main.main()
        /build/cmd/goflow2/main.go:153 +0x43d

goroutine 36 [IO wait]:
internal/poll.runtime_pollWait(0x7f8693239dd0, 0x72, 0x0)
        /usr/local/go/src/runtime/netpoll.go:222 +0x55
internal/poll.(*pollDesc).wait(0xc000114418, 0x72, 0x0, 0x0, 0x9a5f96)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:87 +0x45
internal/poll.(*pollDesc).waitRead(...)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:92
internal/poll.(*FD).Accept(0xc000114400, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
        /usr/local/go/src/internal/poll/fd_unix.go:401 +0x212
net.(*netFD).accept(0xc000114400, 0x30, 0x30, 0x7f86b9f52108)
        /usr/local/go/src/net/fd_unix.go:172 +0x45
net.(*TCPListener).accept(0xc00000e6a8, 0xc00004bdd0, 0x40fb78, 0x30)
        /usr/local/go/src/net/tcpsock_posix.go:139 +0x32
net.(*TCPListener).Accept(0xc00000e6a8, 0x960020, 0xc000128930, 0x8ff2a0, 0xd13340)
        /usr/local/go/src/net/tcpsock.go:261 +0x65
net/http.(*Server).Serve(0xc00013e000, 0xa42870, 0xc00000e6a8, 0x0, 0x0)
        /usr/local/go/src/net/http/server.go:2981 +0x285
net/http.(*Server).ListenAndServe(0xc00013e000, 0xc00013e000, 0x8)
        /usr/local/go/src/net/http/server.go:2910 +0xba
net/http.ListenAndServe(...)
        /usr/local/go/src/net/http/server.go:3164
main.httpServer()
        /build/cmd/goflow2/main.go:56 +0xbc
created by main.main
        /build/cmd/goflow2/main.go:90 +0x30c

goroutine 37 [IO wait]:
internal/poll.runtime_pollWait(0x7f8693239ce8, 0x72, 0x0)
        /usr/local/go/src/runtime/netpoll.go:222 +0x55
internal/poll.(*pollDesc).wait(0xc000194198, 0x72, 0x2300, 0x2328, 0x0)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:87 +0x45
internal/poll.(*pollDesc).waitRead(...)
        /usr/local/go/src/internal/poll/fd_poll_runtime.go:92
internal/poll.(*FD).ReadFrom(0xc000194180, 0xc0001d36d0, 0x2328, 0x2328, 0x0, 0x0, 0x0, 0x0, 0x0)
        /usr/local/go/src/internal/poll/fd_unix.go:222 +0x1e6
net.(*netFD).readFrom(0xc000194180, 0xc0001d36d0, 0x2328, 0x2328, 0x0, 0x0, 0xc000194180, 0xc0001d3618, 0x5b2688)
        /usr/local/go/src/net/fd_posix.go:61 +0x5b
net.(*UDPConn).readFrom(0xc00018c0b8, 0xc0001d36d0, 0x2328, 0x2328, 0xd25720, 0xc00018ef30, 0x0, 0x0)
        /usr/local/go/src/net/udpsock_posix.go:47 +0x6a
net.(*UDPConn).ReadFromUDP(0xc00018c0b8, 0xc0001d36d0, 0x2328, 0x2328, 0x5, 0x0, 0xc000188720, 0x9a573c)
        /usr/local/go/src/net/udpsock.go:106 +0x5d
github.com/netsampler/goflow2/utils.UDPRoutine(0x9a573c, 0x5, 0xc000188700, 0x1, 0x9b2896, 0x0, 0x18c7, 0x0, 0xa48878, 0xc000187570, ...)
        /build/utils/utils.go:138 +0x7dd
github.com/netsampler/goflow2/utils.(*StateSFlow).FlowRoutine(...)
        /build/utils/sflow.go:151
main.main.func1(0xc0001b2230, 0xc0001885d0, 0xc0001885e0, 0x9b288e, 0xd)
        /build/cmd/goflow2/main.go:125 +0x9f1
created by main.main
        /build/cmd/goflow2/main.go:97 +0x419

goroutine 38 [chan receive]:
github.com/netsampler/goflow2/decoders.Processor.ProcessMessage(...)
        /build/decoders/decoder.go:113
github.com/netsampler/goflow2/utils.UDPRoutine(0x9a5e3f, 0x7, 0xc000188670, 0x1, 0x9b28a6, 0x0, 0x807, 0x40fb00, 0xa48878, 0xc000187570, ...)
        /build/utils/utils.go:147 +0x9e5
github.com/netsampler/goflow2/utils.(*StateNetFlow).FlowRoutine(0xc0001a0640, 0x1, 0x9b28a6, 0x0, 0x807, 0x0, 0x0, 0x0)
        /build/utils/netflow.go:370 +0x15c
main.main.func1(0xc0001b2230, 0xc0001885d0, 0xc0001885e0, 0x9b289c, 0xf)
        /build/cmd/goflow2/main.go:132 +0x849
created by main.main
        /build/cmd/goflow2/main.go:97 +0x419

goroutine 40 [select]:
github.com/netsampler/goflow2/decoders.Worker.Start.func1(0x0, 0xc000188700, 0x9cac58, 0xc000188720, 0xc0001cc120, 0x9a573c, 0x5, 0xc0001cc180, 0xc0001cc1e0)
        /build/decoders/decoder.go:42 +0xbd
created by github.com/netsampler/goflow2/decoders.Worker.Start
        /build/decoders/decoder.go:39 +0x70

FIRST_SWITCHED and LAST_SWITCH not corresponding RFC or CISCO

RFC 3954 and NetFlow Version 9 Flow-Record Format both specify the fields FIRST_SWITCHED and LAST_SWITCHED as System uptime in milliseconds, however, goflow2 produces them in seconds:

if version == 9 {
// NetFlow v9 time works with a differential based on router's uptime
switch df.Type {
case netflow.NFV9_FIELD_FIRST_SWITCHED:
var timeFirstSwitched uint32
DecodeUNumber(v, &timeFirstSwitched)
timeDiff := (uptime - timeFirstSwitched) / 1000
flowMessage.TimeFlowStart = uint64(baseTime - timeDiff)
case netflow.NFV9_FIELD_LAST_SWITCHED:
var timeLastSwitched uint32
DecodeUNumber(v, &timeLastSwitched)
timeDiff := (uptime - timeLastSwitched) / 1000
flowMessage.TimeFlowEnd = uint64(baseTime - timeDiff)
}

Is this an error, or are both fields produced in seconds for a reason I am unaware of?

Start when kafka is down

goflow2 works correctly, if loose kafka while already started, but goflow2 can't start if Kafka is down
level=fatal msg="kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"

Sometimes it happens in real life when server restarts and goflow2 can't start during boot because it can't reach kafka.

Is it possible to ignore it and start despite to unreachable Kafka, and start to produce new messages to Kafka, when Kafka brokers will be online?

Add compression type option for kafka transport type

Description: Add an option for the producer when transport.type=kafka to configure a compression type per the spec configuration parameters for kafka "compression.type". See details per kafka's documentation here: https://kafka.apache.org/documentation/#brokerconfigs_compression.type

Steps to Implement:

  1. Modify https://github.com/netsampler/goflow2/blob/main/transport/kafka/kafka.go
  2. Add transport.kafka.compression.type as a String type parameter
  3. Add it to the kafka producer configuration options (default to lz4)

Modified Class:

package kafka

import (
	"context"
	"crypto/tls"
	"crypto/x509"
	"errors"
	"flag"
	"fmt"
	"os"
	"strings"
	"time"

	sarama "github.com/Shopify/sarama"
	"github.com/netsampler/goflow2/transport"
	"github.com/netsampler/goflow2/utils"

	log "github.com/sirupsen/logrus"
)

type KafkaDriver struct {
	kafkaTLS            bool
	kafkaSASL           bool
	kafkaTopic          string
	kafkaSrv            string
	kafkaBrk            string
	kafkaMaxMsgBytes    int
	kafkaFlushBytes     int
	kafkaFlushFrequency time.Duration

	kafkaLogErrors bool

	kafkaHashing bool
	kafkaVersion string
        kafkaCompressionType string

	producer sarama.AsyncProducer

	q chan bool
}

func (d *KafkaDriver) Prepare() error {
	flag.BoolVar(&d.kafkaTLS, "transport.kafka.tls", false, "Use TLS to connect to Kafka")

	flag.BoolVar(&d.kafkaSASL, "transport.kafka.sasl", false, "Use SASL/PLAIN data to connect to Kafka (TLS is recommended and the environment variables KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set)")
	flag.StringVar(&d.kafkaTopic, "transport.kafka.topic", "flow-messages", "Kafka topic to produce to")
	flag.StringVar(&d.kafkaSrv, "transport.kafka.srv", "", "SRV record containing a list of Kafka brokers (or use brokers)")
	flag.StringVar(&d.kafkaBrk, "transport.kafka.brokers", "127.0.0.1:9092,[::1]:9092", "Kafka brokers list separated by commas")
	flag.IntVar(&d.kafkaMaxMsgBytes, "transport.kafka.maxmsgbytes", 1000000, "Kafka max message bytes")
	flag.IntVar(&d.kafkaFlushBytes, "transport.kafka.flushbytes", int(sarama.MaxRequestSize), "Kafka flush bytes")
	flag.DurationVar(&d.kafkaFlushFrequency, "transport.kafka.flushfreq", time.Second*5, "Kafka flush frequency")

	flag.BoolVar(&d.kafkaLogErrors, "transport.kafka.log.err", false, "Log Kafka errors")
	flag.BoolVar(&d.kafkaHashing, "transport.kafka.hashing", false, "Enable partition hashing")

	//flag.StringVar(&d.kafkaKeying, "transport.kafka.key", "SamplerAddress,DstAS", "Kafka list of fields to do hashing on (partition) separated by commas")
	flag.StringVar(&d.kafkaVersion, "transport.kafka.version", "2.8.0", "Kafka version")
        flag.StringVar(&d.kafkaCompressionType, "transport.kafka.compression.type", "lz4", "Kafka compression type")

	return nil
}

func (d *KafkaDriver) Init(context.Context) error {
	kafkaConfigVersion, err := sarama.ParseKafkaVersion(d.kafkaVersion)
	if err != nil {
		return err
	}

	kafkaConfig := sarama.NewConfig()
	kafkaConfig.Version = kafkaConfigVersion
	kafkaConfig.Producer.Return.Successes = false
	kafkaConfig.Producer.Return.Errors = d.kafkaLogErrors
	kafkaConfig.Producer.MaxMessageBytes = d.kafkaMaxMsgBytes
	kafkaConfig.Producer.Flush.Bytes = d.kafkaFlushBytes
	kafkaConfig.Producer.Flush.Frequency = d.kafkaFlushFrequency
        kafkaConfig.Producer.Compression = sarama.CompressionCodec(d.kafkaCompressionType)

	if d.kafkaTLS {
		rootCAs, err := x509.SystemCertPool()
		if err != nil {
			return errors.New(fmt.Sprintf("Error initializing TLS: %v", err))
		}
		kafkaConfig.Net.TLS.Enable = true
		kafkaConfig.Net.TLS.Config = &tls.Config{RootCAs: rootCAs}
	}

	if d.kafkaHashing {
		kafkaConfig.Producer.Partitioner = sarama.NewHashPartitioner
	}

	if d.kafkaSASL {
		if !d.kafkaTLS /*&& log != nil*/ {
			log.Warn("Using SASL without TLS will transmit the authentication in plaintext!")
		}
		kafkaConfig.Net.SASL.Enable = true
		kafkaConfig.Net.SASL.User = os.Getenv("KAFKA_SASL_USER")
		kafkaConfig.Net.SASL.Password = os.Getenv("KAFKA_SASL_PASS")
		if kafkaConfig.Net.SASL.User == "" && kafkaConfig.Net.SASL.Password == "" {
			return errors.New("Kafka SASL config from environment was unsuccessful. KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set.")
		} else /*if log != nil*/ {
			log.Infof("Authenticating as user '%s'...", kafkaConfig.Net.SASL.User)
		}
	}

	var addrs []string
	if d.kafkaSrv != "" {
		addrs, _ = utils.GetServiceAddresses(d.kafkaSrv)
	} else {
		addrs = strings.Split(d.kafkaBrk, ",")
	}

	kafkaProducer, err := sarama.NewAsyncProducer(addrs, kafkaConfig)
	if err != nil {
		return err
	}
	d.producer = kafkaProducer

	d.q = make(chan bool)

	if d.kafkaLogErrors {
		go func() {
			for {
				select {
				case msg := <-kafkaProducer.Errors():
					//if log != nil {
					log.Error(msg)
					//}
				case <-d.q:
					return
				}
			}
		}()
	}

	return err
}

func (d *KafkaDriver) Send(key, data []byte) error {
	d.producer.Input() <- &sarama.ProducerMessage{
		Topic: d.kafkaTopic,
		Key:   sarama.ByteEncoder(key),
		Value: sarama.ByteEncoder(data),
	}
	return nil
}

func (d *KafkaDriver) Close(context.Context) error {
	d.producer.Close()
	close(d.q)
	return nil
}

func init() {
	d := &KafkaDriver{}
	transport.RegisterTransportDriver("kafka", d)
}

goflow2 doesn't automatically multiply sampling rate with bytes/packets

Hi Louis,

I've finished PoC with goflow2 in k8s and sending JSON data to kafka and than to ELK, but it's seems i see incorrect graphics because goflow2 doesn't automatically multiply sampling rate with bytes and packets before sending data to kafka or file and netflow/ipfix data incorrect.

For example i have sampling rate 100 on Juniper routers and i see ~15GB on the 15 minutes timeframe on another collector (nfacctd), but with the same diagramm on kibana from goflow2 i see only ~1.3MB.

SAMPLING_INTERVAL not used when included in the data template

MikroTik devices do not include the SAMPLING_INTERVAL field in the Option Data Sets, instead it's included in the Data Set and defined by the Data Template - the problem is GoFlow2 never looks for or uses the SAMPLING_INTERVAL field in the Data Set, so the SamplingRate field in the output is always 0.

I am not sure if this is out of spec and the MikroTik doing things wrong, but to resolve the issue I had to tweak producer/producer_nf.go:

Add the following in ConvertNetFlowDataSet()

case netflow.NFV9_FIELD_SAMPLING_INTERVAL:                                                                                                                                          
    DecodeUNumber(v, &(flowMessage.SamplingRate))

Alter the line fmsg.SamplingRate = uint64(samplingRate) to prevent overwriting the above with 0:

if samplingRate > 0 {                                                                                                                                                       
    fmsg.SamplingRate = uint64(samplingRate)
}

I haven't submitted the above as a pull request as I'm not 100% sure this is the best way to handle this scenario, alongside the existing sampling system logic. I have a pcap if useful.

Sidenote: MikroTik devices seem to send the sampling interval in little endian rather than big endian expected by DecodeUNumber(), so an interval of 256 is actually coming through as 65536 - I'm correcting that at a later stage.

How to differentiate between IPv4 and IPv6 address of the sampler?

I looked into code, and it seems that goflow2 just collapses both into byte-array without any differentiation between IPv4 and IPv6 addresses. When data are read from (f.e.) Kafka, ipv4 addresses are mapped into random parts of IPv6 numbering plan, and there is no reliable way to separate them.

May be a flag for address family can help?

Race condition in stopper.go when shutting down

Hi,

When running go test ./utils -race the race detector is raising those warning below.

I think in practice it shouldn't be an issue.

But some project using goflow2 as library might have CI tests (with race detector enabled) failing due to this.

I think it might worth fixing this theoretical race condition, what do you think ?

$ go test ./utils -race
==================
WARNING: DATA RACE
Write at 0x00c000226c30 by goroutine 27:
  github.com/netsampler/goflow2/utils.(*stopper).Shutdown()
      /go/src/github.com/goflow2/utils/stopper.go:32 +0xc4
  github.com/netsampler/goflow2/utils.TestStopper()
      /go/src/github.com/goflow2/utils/stopper_test.go:16 +0x112
  testing.tRunner()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1259 +0x22f
  testing.(*T).Run·dwrap·21()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1306 +0x47

Previous read at 0x00c000226c30 by goroutine 28:
  github.com/netsampler/goflow2/utils.(*routine).StartRoutine.func1()
      /go/src/github.com/goflow2/utils/stopper_test.go:46 +0x45

Goroutine 27 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1306 +0x726
  testing.runTests.func1()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1598 +0x99
  testing.tRunner()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1259 +0x22f
  testing.runTests()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1596 +0x7ca
  testing.(*M).Run()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1504 +0x9d1
  main.main()
      _testmain.go:49 +0x22b

Goroutine 28 (finished) created at:
  github.com/netsampler/goflow2/utils.(*routine).StartRoutine()
      /go/src/github.com/goflow2/utils/stopper_test.go:44 +0x118
  github.com/netsampler/goflow2/utils.TestStopper()
      /go/src/github.com/goflow2/utils/stopper_test.go:14 +0xbc
  testing.tRunner()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1259 +0x22f
  testing.(*T).Run·dwrap·21()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1306 +0x47
==================
==================
WARNING: DATA RACE
Read at 0x00c000226c50 by goroutine 30:
  github.com/netsampler/goflow2/utils.TestStopper.func1()
      /go/src/github.com/goflow2/utils/stopper_test.go:18 +0x34
  github.com/stretchr/testify/assert.Eventually.func1()
      /go/pkg/mod/github.com/stretchr/[email protected]/assert/assertions.go:1655 +0x39

Previous write at 0x00c000226c50 by goroutine 28:
  github.com/netsampler/goflow2/utils.(*routine).StartRoutine.func1()
      /go/src/github.com/goflow2/utils/stopper_test.go:47 +0x64

Goroutine 30 (running) created at:
  github.com/stretchr/testify/assert.Eventually()
      /go/pkg/mod/github.com/stretchr/[email protected]/assert/assertions.go:1655 +0x3b7
  github.com/netsampler/goflow2/utils.TestStopper()
      /go/src/github.com/goflow2/utils/stopper_test.go:17 +0x196
  testing.tRunner()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1259 +0x22f
  testing.(*T).Run·dwrap·21()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1306 +0x47

Goroutine 28 (finished) created at:
  github.com/netsampler/goflow2/utils.(*routine).StartRoutine()
      /go/src/github.com/goflow2/utils/stopper_test.go:44 +0x118
  github.com/netsampler/goflow2/utils.TestStopper()
      /go/src/github.com/goflow2/utils/stopper_test.go:14 +0xbc
  testing.tRunner()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1259 +0x22f
  testing.(*T).Run·dwrap·21()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1306 +0x47
==================
--- FAIL: TestStopper (0.00s)
    testing.go:1152: race detected during execution of test
==================
WARNING: DATA RACE
Read at 0x00c000226e00 by goroutine 34:
  github.com/netsampler/goflow2/utils.TestCancelUDPRoutine.func3()
      /go/src/github.com/goflow2/utils/utils_test.go:41 +0x5e
  github.com/netsampler/goflow2/utils.TestCancelUDPRoutine()
      /go/src/github.com/goflow2/utils/utils_test.go:51 +0x318
  testing.tRunner()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1259 +0x22f
  testing.(*T).Run·dwrap·21()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1306 +0x47

Previous write at 0x00c000226e00 by goroutine 35:
  github.com/netsampler/goflow2/utils.(*dummyFlowProcessor).FlowRoutine()
      /go/src/github.com/goflow2/utils/utils_test.go:74 +0x90
  github.com/netsampler/goflow2/utils.TestCancelUDPRoutine.func1()
      /go/src/github.com/goflow2/utils/utils_test.go:20 +0x53

Goroutine 34 (running) created at:
  testing.(*T).Run()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1306 +0x726
  testing.runTests.func1()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1598 +0x99
  testing.tRunner()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1259 +0x22f
  testing.runTests()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1596 +0x7ca
  testing.(*M).Run()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1504 +0x9d1
  main.main()
      _testmain.go:49 +0x22b

Goroutine 35 (running) created at:
  github.com/netsampler/goflow2/utils.TestCancelUDPRoutine()
      /go/src/github.com/goflow2/utils/utils_test.go:19 +0x1e4
  testing.tRunner()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1259 +0x22f
  testing.(*T).Run·dwrap·21()
      /usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1306 +0x47
==================
--- FAIL: TestCancelUDPRoutine (0.10s)
    testing.go:1152: race detected during execution of test
FAIL
FAIL  github.com/netsampler/goflow2/utils 0.463s
FAIL

Runtime Error

Hey Louis,

first of all a big thanks for this amazing peace of software! <3

We are using goflow2 with your sample enricher as a simple POC pipeline for sFlow. We are only using Juniper gear (MX480, MX204). Today we stumbled upon the following problem...

Execution:
/root/goflow2/goflow2 -transport.file.sep= -format=pb -format.protobuf.fixedlen=true | /root/goflow2/enricher -db.asn /root/goflow2/GeoLite2-ASN.mmdb -db.country /root/goflow2/GeoLite2-Country.mmdb -transport.kafka.brokers=localhost:9092 -transport=kafka -transport.kafka.topic=flows -format=pb -format.protobuf.fixedlen=true

Error:

INFO[0000] Starting collection                           hostname= port=6343 scheme=sflow
INFO[0000] Starting collection                           hostname= port=2055 scheme=netflow
INFO[0000] Starting enricher

[root@flowtest ~]# ERRO[2006] proto: cannot parse invalid wire-format data
ERRO[2006] proto: cannot parse invalid wire-format data
panic: runtime error: slice bounds out of range [:9] with capacity 4

goroutine 1 [running]:
main.main()
        /root/goflow2/cmd/enricher/main.go:166 +0xdc5
[root@flowtest ~]#

I am not sure if we have hit some issue with goflow or if it is related to Junipers crappy sFlow implementation...

We have just pulled the latest code from the main branch to check if your latest commit was our problem. Problem still exists...

Add back "normal" output

It looks like when this was forked, cloudflare/goflow#73 was dropped from the formatting controls.

The "normal" format is useful for simplified parsing with things like mtail which doesn't really handle json well.

It's also useful to be able to drop unused fields from the output by filtering which fields are included.

Get values of IPFIX IP addresses in human readable format..

Hi there..

I'm trying around with the goflow2, kafka, clickhouse pipeline and I'm searching for a way to make the IP adresses of the inserted IPFIX flows human readable. Maybe it is a question more related to Clickhouse, but I want to try my luck here as well ;)..

As seen in flow.proto protobuf schema the datatypes of the fields with IP adresses are bytes format. But that is not really handy for me to save it in CH database, because I don't know how to read/make use of the byte objects in selects in DB usage. If I use the DB Pipeline tutorial I saw the CH DB column for that is fixed_string(16) but with that I got also byte objects in DB table.

--- snip ----

// Sampler information
bytes SamplerAddress = 11;

// Found inside packet
uint64 TimeFlowStart = 38;
uint64 TimeFlowEnd = 5;
uint64 TimeFlowStartMs = 63;
uint64 TimeFlowEndMs = 64;

// Size of the sampled packet
uint64 Bytes = 9;
uint64 Packets = 10;

// Source/destination addresses
bytes SrcAddr = 6;
bytes DstAddr = 7;

---- snap ---

My tries to change the format in flow.proto ( SrcAddress, DstAddress, SamplerAdress ) to string ( because I think the docker container compile the Proto format file at startup ? ) ended up with byte objects in the specific columns of the CH DB.
That are the DB schema to read from KAFKA:

CREATE TABLE IF NOT EXISTS sisr_dev.kafka_raw_goflow2_test1_proto_ipfix_consumer ON CLUSTER "dev-ch-cluster" (
Samplingrate UInt64,
SequenceNum UInt32,
SrcAddr String,
DstAddr String,
Bytes UInt64,
Packets UInt64,
SamplerAddress String,
NextHop String,
Proto UInt32,
SrcPort UInt32,
DstPort UInt32,
SrcMac UInt64,
DstMac UInt64,
Etype UInt32,
TimeFlowStart UInt64,
IngressVrfID UInt32,
TimeFlowStartMs UInt64,
TimeFlowEndMs UInt64,
)
ENGINE = Kafka ()
SETTINGS kafka_broker_list =
[kafka_broker]
kafka_topic_list = 'goflow2_test1_raw_ipfix_proto',
kafka_group_name = 'goflow2_raw_ch_kafka', kafka_num_consumers = 8,
kafka_thread_per_consumer = 4,
kafka_format = 'Protobuf',
kafka_schema = 'flow.proto:FlowMessage',
kafka_skip_broken_messages = 1

Data will be written correctly into database.. everything is working like a charm excluding the thing with the byte format ip addresses..

Does anybody can help me ? Or has a example for better understanding and usage of the data ?

best regards and thanks ..
Christian

Kafka error: Message was too large

Was switching over from goflow(1) to goflow2 recently, with mostly identical settings and setup.
Using the Kafka backend and on fair amount of requests (>20-30k messages/sec), the error message below is periodically emitted by goflow2.

(Note that I haven't tested to find what threshold does the issue occur)

ERRO[3562] kafka: Failed to produce message to topic : kafka server: Message was too large, server rejected it to avoid allocation error

Relevant settings for reference (other settings omitted) :

-format=pb
-transport=kafka
-transport.kafka.log.err
-transport.kafka.hashing=false
-transport.kafka.version="2.4.1"
-workers=10

Kafka (v2.4) settings are close to stock.

Amount/load :

  • Goflow2 to Kafka: ~20-50k message/s.

kcg on k8s

Hey @lspgn 👋
I've done a POC recently of one of your example pipelines (Flows + Kafka + Clicklhouse + Grafana) deployed on top of k8s.
Would you be interested in that? I still have all of the commands and files available.

Question to adding exported IPFix flow fields not shown in goflow2 output

Hi Louis,

I have another question.. may be you are so kind to help out..

In my IPFix Flows I see the fields Dot1q_Vlan, Post_Dot1q_Vlan, Dot1q_Cvlan, Post_Dot1q_Cvlan in PMACCT and tcpdump.
I followed your manual to add exotic files by mapping.

That is my mapping:

ipfix:
mapping:
- field: 252
destination: InIf
- field: 253
destination: OutIf
- field: 243
destination: Dot1q_Vlan
- field: 254
destination: Post_Dot1q_Vlan
- field: 245
destination: Dot1q_Cvlan
- field: 255
destination: Post_Dot1q_Cvlan

Furthermore I add the following to flow.proto :

// Q_Vlan
uint32 Dot1q_Vlan = 243;
uint32 Post_Dot1q_Vlan = 254;
uint32 Dot1q_Cvlan = 245;
uint32 Post_Dot1q_Cvlan = 255;

After that I rebuild the container with "docker build ." and startet the goflow2 container

But I don't get values for that 4 new fields in clickhouse and as you can see in the data I send in the last issue, the data will be exported. In ClickhouseDB the columns are zero..

Do I configure sth. wrong ?

Do you have an idea ?

Thanks a lot again..

Best regards
Christian

flow logs are sent to /var/log/messages

I installed to goflow2 using rpm, and using the below config to transport the netflow data to a file, but the netflow data are written to /var/log/messages also, how do we avoid writing the netflow data to /var/log/messages

/usr/bin/goflow2 -listen nfl://10.0.0.53:8888 -transport file -transport.file /var/log/goflow/goflow.log -format text

Add the support of some MPLS fields for IPFIX (Junos MPLS-IP template)

Hello,

I've recently patched the producer_nf.go file to produce some MPLS fields. Some fields were already present in the flow.proto file but I needed to add one more.

I added this part of code in producer_nf.go (after line 335) :

	//MPLS
	case netflow.IPFIX_FIELD_mplsTopLabelStackSection:
		var mplsLabel uint32
		DecodeUNumber(v, &mplsLabel)
		flowMessage.MPLS1Label = uint32(mplsLabel >> 4)
		flowMessage.HasMPLS = true
	case netflow.IPFIX_FIELD_mplsLabelStackSection2:
		var mplsLabel uint32
		DecodeUNumber(v, &mplsLabel)
		flowMessage.MPLS2Label = uint32(mplsLabel >> 4)
	case netflow.IPFIX_FIELD_mplsLabelStackSection3:
		var mplsLabel uint32
		DecodeUNumber(v, &mplsLabel)
		flowMessage.MPLS3Label = uint32(mplsLabel >> 4)
	case netflow.IPFIX_FIELD_mplsTopLabelIPv4Address:
		flowMessage.MPLSTopLabelIpv4Addr = v

And to support the new field "MPLSTopLabelIpv4Addr" I modified the flow.proto like that :

bytes MPLSTopLabelIpv4Addr = 63; // mplsTopLabelIPv4Address

Could you please take into account this enhancement in the main branch ?

BR
David

Procedures to decode hsflowd along with sflow v5

Hi!
Great resource @lspgn, thanks for taking goflow to the moon!..
tried and liked it very much, very high performance with minimum resource use...
Only problem i have right now is the hsflowd packets.
What should be the way to update the goflow2 to be able decode and insert new sflow types all in together to the same db in one machine.
e.g.; hsflowd + sflow v5 ---> goflow2 ---> kafka --> ch

flow_data | 0 | 2100 | extended_socket_ipv4 | sFlow Host Structures
flow_data | 0 | 2101 | extended_socket_ipv6 | sFlow Host Structures

                                        +

sample_data | 0 | 1 | flow_sample | sFlow Version 5
sample_data | 0 | 2 | counter_sample | sFlow Version 5
sample_data | 0 | 3 | flow_sample_expanded | sFlow Version 5
sample_data | 0 | 4 | counter_sample_expanded | sFlow Version 5
flow_data | 0 | 1 | sampled_header | sFlow Version 5
flow_data | 0 | 2 | sampled_ethernet | sFlow Version 5
flow_data | 0 | 3 | sampled_ipv4 | sFlow Version 5
flow_data | 0 | 4 | sampled_ipv6 | sFlow Version 5
flow_data | 0 | 1001 | extended_switch | sFlow Version 5
flow_data | 0 | 1002 | extended_router | sFlow Version 5
flow_data | 0 | 1003 | extended_gateway | sFlow Version 5
flow_data | 0 | 1004 | extended_user | sFlow Version 5
flow_data | 0 | 1005 | extended_url (deprecated) | sFlow Version 5
flow_data | 0 | 1006 | extended_mpls | sFlow Version 5
flow_data | 0 | 1007 | extended_nat | sFlow Version 5
flow_data | 0 | 1008 | extended_mpls_tunnel | sFlow Version 5
flow_data | 0 | 1009 | extended_mpls_vc | sFlow Version 5
flow_data | 0 | 1010 | extended_mpls_FTN | sFlow Version 5
flow_data | 0 | 1011 | extended_mpls_LDP_FEC | sFlow Version 5
flow_data | 0 | 1012 | extended_vlantunnel | sFlow Version 5

or another combination..

Sincerely,
F.

How do add the interface name/desc provided by options templates into Netflow data flow.

I have been looking through goflow2 project, and I know you add the sampling rate provided by options template. As we all know, the sampling rate is related with Observation Domain, so you maintain a map by version+obs Domain to sampling rate, I think that is absolutely right. However, I have a question how I can correlate the interface name/desc provided by options template into Netflow data flow. thx.

Netflow V5 Decode

I could decode Netflow V5 packets in goflow version 1. Can you release an application can decode Netflow v5?

Refactoring roadmap?

Opening an issue to jot down a few ideas.
I think a refactor is going to be necessary, not super satisfied how samples are processed anymore.

Use-cases have changed over the years:

  • More towards JSON export, protobuf still strong for unifying diverse samples
  • More fields necessary (Enterprise/custom)
  • Requiring custom handling over the messages that aren't data (eg: Options templates in NetFlow/IPFIX)
  • Less code edits when adding fields, library for decoding being used externally

The producer element is too strict I believe. Should think about using reflect and allow populating maps.
The message dispatch could also be improved (sharing socket for NetFlow). Less buffer copy operations?
Transport could be improved, especially with Kafka, by adding delivery guarantees and improving access to configuration options.
At the same time, would like to avoid becoming a multi-output tool like Telegraf or Vector.

SASL/SCRAM authentication for Kafka

Amazon Managed Service for Kafka (MSK) does not support SASL/PLAIN authentication, even over TSL. However, it does support SASL/SCRAM.

[2022-09-12 03:44:30,722] INFO [SocketServer brokerId=2] Failed authentication with ip-10-10-101-111.us-west-2.compute.internal/INTERNAL_IP (Unsupported SASL mechanism PLAIN) (org.apache.kafka.common.network.Selector)

For our use case (distributed goflow2 agents sending flows to regional MSK deployments), we need goflow2 to be able to support SCRAM. Fortunately, it looks like Sarama already supports it, so this should (hopefully) be straightforward. See example code from the Sarama repository.

I'm happy to take a crack at adding this to goflow2, if it's something you think it valuable.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.