netsampler / goflow2 Goto Github PK
View Code? Open in Web Editor NEWHigh performance sFlow/IPFIX/NetFlow Collector
License: BSD 3-Clause "New" or "Revised" License
High performance sFlow/IPFIX/NetFlow Collector
License: BSD 3-Clause "New" or "Revised" License
Hi,
We are using goflow2 as library and trying to see if there are ways to to detect that some flows are missing NetFlow5/NetFlow9/IPFIX Sequence Number
.
It seems there are currently no easy reliable way to detect if sFlow/NetFlow/IPFIX packets have been dropped between the device/exporter sending the flow and goflow2 collector.
Should goflow2 provide a way to monitor/report if packets are missing using Sequence Number ?
Sequence Number behaviour is a bit different for NetFlow 5 and NetFlow 9:
Sequence Number meaning in NetFlow9:
Incremental sequence counter of all export packets sent by this export device;
this value is cumulative, and it can be used to identify whether any export packets have been missed.
Note: This is a change from the NetFlow Version 5 and Version 8 headers, where this number represented "total flows."
nfdump example: https://github.com/phaag/nfdump/blob/b0c7a5ec2e11a683460b312ba192bc00590c4acd/bin/netflow_v5_v7.c#L382-L395
Sequence Number in NetFlow 5, 6, 7, and Version 8:
The sequence number is equal to the sequence number of the previous datagram plus the number of flows in the
previous datagram. After receiving a new datagram, the receiving application can subtract the expected
sequence number from the sequence number in the header to derive the number of missed flows.
nfdump example: https://github.com/phaag/nfdump/blob/28ad878ac807e82fb95a77df6fc9b98000bcc81c/bin/netflow_v9.c#L2094-L2106
Since the field SequenceNum
is part of FlowMessage
, I tried to used it to detect if flows have been dropped, but does not work well for NetFlow 9.
For NetFlow 5, it should work since SequenceNum
represent the number of total flows. We can compare the SequenceNum
change to actual number of FlowMessage
.
For NetFlow 9, it seems that analysing FlowMessage.SequenceNum
might not work since template flows are counted in sequence number but not reported as FlowMessage
.
Example: https://github.com/Graylog2/graylog-plugin-netflow/blob/master/src/test/resources/netflow-data/nprobe-netflow9-3.pcap
Hi there,
I capture IPFiX data on Port 2055, template will be send every 5 minutes... if I start goflow2 with the parameters:
goflow2 -loglevel="debug" -transport.file="./goflow2.output2" -listen="sflow://[IPV4_IP]:6343,netflow:///[IPV4_IP]:4739"
I see only packets with EtypeName:"IPv6" like that:
{"Type":"IPFIX","TimeReceived":1658818721,"SequenceNum":1466255583,"SamplingRate":1,"SamplerAddress":"111.111.111.111,"TimeFlowStart":1658818606,"TimeFlowEnd":1658818606,"TimeFlowStartMs":1658818606182,"TimeFlowEndMs":1658818606182,"Bytes":143,"Packets":1,"**SrcAddr":"::","DstAddr":"::"**,"Etype":34525,"Proto":17,"SrcPort":58635,"DstPort":27016,"InIf":0,"OutIf":0,"SrcMac":"b0:a8:6e:7c:07:c2","DstMac":"30:b6:4f:e0:22:ce","SrcVlan":0,"DstVlan":0,"VlanId":0,"IngressVrfID":1187,"EgressVrfID":0,"IPTos":32,"ForwardingStatus":0,"IPTTL":0,"TCPFlags":0,"IcmpType":0,"IcmpCode":0,"IPv6FlowLabel":0,"FragmentId":0,"FragmentOffset":0,"BiFlowDirection":0,"SrcAS":0,"DstAS":0,"NextHop":"::","NextHopAS":0,"SrcNet":0,"DstNet":0,"**EtypeName":"IPv6"**,"ProtoName":"UDP","IcmpName":""}
And I see no IP addresses
But I know that these packets are IPv4 ( crosschecked with pmacct ). I ran goflow2 20 minutes, a template have to be send. That was crosschecked with pmacct as well. I made a traffic dump to compare the tools output..
I can see that goflow2 captures not all flows. Goflow2 around 11500 - PMACCT 12886.
My suspect is that goflow2 thinks that is all IPv6 and can't read the IP addresses if a IPFIX flow is IPv4.
If a flow is IPv6 then it works..
Do I something wrong ?
Does anybody has an idea ?
Help would be very appreciated..
thanks and best regards
Christian
The workflow docker.yaml is referencing action actions/checkout using references v1. However this reference is missing the commit a6747255bd19d7a757dbdda8c654a9f84db19839 which may contain fix to the some vulnerability.
The vulnerability fix that is missing by actions version could be related to:
(1) CVE fix
(2) upgrade of vulnerable dependency
(3) fix to secret leak and others.
Please consider to update the reference to the action.
In order to support log rotation, it would be nice if the file transport would close and open the log file on SIGHUP.
Something like https://github.com/nytimes/logrotate/ could be used.
Hi! I need to store Flow Data Records in the case there are still not Template Records yet, as it says here. I read the code of Netflow decoder and did not find any handler that processes this situation, but throwing the ErrorTemplateNotFound. I think I can catch ErrorTemplateNotFound when I try decode a payload and store the payload with the corresponding Template Data (Version, DomainId, TemplateId). The problem is I can't use fields of ErrorTemplateNotFound because they are not exportable.
Is it possible to make these fields exportable? I think I could make PR
Hello!
I'm trying to map IP to AS with enricher, but got this errors:
ERRO[0002] unexpected EOF
ERRO[0002] proto: invalid field number
ERRO[0002] proto: mismatching end group marker
ERRO[0002] unexpected EOF
ERRO[0002] proto: invalid field number
ERRO[0002] proto: mismatching end group marker
ERRO[0002] unexpected EOF
ERRO[0002] proto: invalid field number
ERRO[0002] proto: mismatching end group marker
ERRO[0002] unexpected EOF
ERRO[0002] proto: invalid field number
GoFlow2 v1.0.4 (2021-06-12T16:48:35+0000)
I run this pipe as follow:
goflow2 -format=pb | ./enricher -db.asn=/var/lib/geoip/GeoLite2-ASN.mmdb -db.country=/var/lib/geoip/GeoLite2-Country.mmdb
And if I understand correctly, the full pipe must look like this, if I want to map IP to AS:
sflow_stream -> goflow2 -> enricher -> kafka -> ch
Am I correct?
I don't know if it's on purpose or not, the NetFlowStats
metric counts flow sets: https://github.com/netsampler/goflow2/blob/main/utils/netflow.go#L223-L228
From its name / description, I would assume it should count flows instead:
prometheus.CounterOpts{
Name: "flow_process_nf_count",
Help: "NetFlows processed.",
},
Is it ok to change it so it counts flows? Or should we introduce another metric?
Any plans to support Netflow v5 for goflow2? We are receiving errors in the goflow2 adapter when attempting to deserialize netflow v5 traffic using the latest build of goflow2.
time="2022-07-20T12:17:31Z" level=error msg="Error from: NetFlow (1) duration: 2.79µs. NetFlow/IPFIX version error: 5"
I'm trying to follow the example in parsing a custom mapping file for ipfix at https://github.com/netsampler/goflow2/blob/main/cmd/goflow2/main.go#L73
The parsed config file doesn't have any Type
values because there's no yaml name tag for this value. The rest of the fields work because the struct name is the same as the yaml name. Below diff fixes things for me:
diff --git a/producer/reflect.go b/producer/reflect.go
index 6655870..1b91b0b 100644
--- a/producer/reflect.go
+++ b/producer/reflect.go
@@ -73,11 +73,11 @@ func MapCustom(flowMessage *flowmessage.FlowMessage, v []byte, destination strin
}
type NetFlowMapField struct {
- PenProvided bool `json:"penprovided"`
- Type uint16 `json:"field"`
- Pen uint32 `json:"pen"`
+ PenProvided bool `json:"penprovided" yaml:"penprovided"`
+ Type uint16 `json:"field" yaml:"field"`
+ Pen uint32 `json:"pen" yaml:"pen"`
- Destination string `json:"destination"`
+ Destination string `json:"destination" yaml:"destination"`
//DestinationLength uint8 `json:"dlen"` // could be used if populating a slice of uint16 that aren't in protobuf
}
Hi,
I got strange values in TcpFlags. According to the documentation, the total sum of all flags can be 63 but i got strange values:
SELECT
multiIf(
EType = 2048, IPv4NumToString(reinterpretAsUInt32(substring(reverse(SrcAddr), 13, 4))),
EType = 34525, IPv6NumToString(SrcAddr),
'') AS SrcAddr,
multiIf(
EType = 2048, IPv4NumToString(reinterpretAsUInt32(substring(reverse(DstAddr), 13, 4))),
EType = 34525, IPv6NumToString(DstAddr),
'') AS DstAddr,
TCPFlags
FROM kafka
WHERE TCPFlags > 32
┌─SrcAddr────────┬─DstAddr────────┬─TCPFlags─┐
│ 91.145.253.180 │ 46.151.199.42 │ 245 │
│ 91.145.217.139 │ 94.176.199.211 │ 202 │
│ 62.122.202.122 │ 194.44.64.42 │ 194 │
│ 91.145.198.194 │ 188.190.54.223 │ 238 │
│ 91.145.232.239 │ 94.176.199.211 │ 212 │
│ 91.145.238.141 │ 94.176.199.215 │ 255 │
│ 91.145.218.50 │ 94.176.199.212 │ 60 │
│ 176.37.98.58 │ 77.120.247.111 │ 194 │
│ 91.145.244.58 │ 94.176.199.213 │ 94 │
│ 35.241.52.229 │ 46.175.253.143 │ 82 │
└────────────────┴────────────────┴──────────┘
Please add SCTP support for ProtoName in IPFIX/Netflowv9/sFlow, currently collector report Null:
"ProtoName": "",
I am trying to use the enricher with the flow-pipeline of kcg
This is what I am doing (Almost sure it is not the best thing )
in the Dockerfile i am building the enricher and add it to the image
and I am running the following command
./goflow2 -transport.file.sep= -format=pb -format.protobuf.fixedlen=true | ./enricher -transport.kafka.brokers=kafka:9092 -transport=kafka -transport.kafka.topic=flows -format=pb -format.protobuf.fixedlen=true -db.asn=/etc/clickhouse-server/GeoLite2-ASN.mmdb -db.country=/etc/clickhouse-server/GeoLite2-Country.mmdb
But it looks like it is printing the output to STDOUT instead to send it to Kfaka (I don't see a connection in netstat )
IPFIX seems to work just fine, however when feeding goflow2 with netflow v5 data, I've got: "ERRO[0027] Error from: NetFlow (0) duration: 1.581644ms. NetFlow/IPFIX version error: 5". Tested on OpenBSD with native pflow exporter, however nfcapd digests v5 data without issues, so I believe that's not an OpenBSD implementation quirk. Did I miss some config options?
Likely just Makefile and GitHub actions configurations.
Would also need to do do a docker manifest create
for Docker.
Hi 👋,
I just encountered this error.
panic: send on closed channel
goroutine 405 [running]:
github.com/netsampler/goflow2/utils.UDPStoppableRoutine.func1()
go/pkg/mod/github.com/netsampler/[email protected]/utils/utils.go:171 +0x9a
created by github.com/netsampler/goflow2/utils.UDPStoppableRoutine
go/pkg/mod/github.com/netsampler/[email protected]/utils/utils.go:164 +0x952
It seems that udpDataCh
might be closed here https://github.com/netsampler/goflow2/blob/main/utils/utils.go#L184
But data will still be pushed to it from here https://github.com/netsampler/goflow2/blob/main/utils/utils.go#L171
I can't figure out how to put together a project with transport rabbitmq.Please, tell me how to do it?
Right now 3 fields in the protobuf are allocated.
Should change into a list.
Could also embed the exp/s flags.
RFC 6313 (https://www.rfc-editor.org/rfc/rfc6313.html) defines an extension to IPFIX which includes more complex data structures like lists.
Cisco seems to be using this for their nvzFlow product -- https://blogs.cisco.com/security/an-introduction-to-the-new-cisco-network-visibility-flow-protocol-nvzflow.
Right now, when VZFLOW is sent to Goflow it results in errors like:
2022-08-31T22:15:17.823 ktranslate/netflow9 [Error] flow Error from: NetFlow (0) duration: 40.168µs. NetFlow/IPFIX version error: 25701
I also found an elasitflow bug on this but I'm not sure of the fix -- robcowart/elastiflow#343.
Hi, I'm not exactly sure if this is a bug or the correct behavior.
The issue is that when I capture flow of L2 traffic from my Nexus 6k, i get attribute DstMaс in proto is zero
As I was able to determine, this is due to the fact that here
goflow2/producer/producer_nf.go
Lines 254 to 258 in 88cbf58
I changed this in my code, and now I get the correct mac-addresses.
// Mac
case netflow.NFV9_FIELD_IN_SRC_MAC:
DecodeUNumber(v, &(flowMessage.SrcMac))
case netflow.NFV9_FIELD_IN_DST_MAC:
DecodeUNumber(v, &(flowMessage.DstMac))
Please tell me if this is normal or why the original uses Outgoing destination MAC address for the destination MAC address field
The feature request is to have the ability to externalise the template cache,
such that when goflow restarts, it can immediately pick up the cache and serve the data. (minimising recovery time)
As an extension, perhaps we could use the mechanism to create a HA solution,
but I'd think that's a seperate piece of work.
Noticed that there is already some work done in the related area.
What are your thoughts.
P.S.
Great to see the project being actively developed!
Hello,
I tried goflow2/ovs with sflow and it is working fine. With IPFIX it is throwing the following:
fatal error: runtime: out of memory
runtime stack:
runtime.throw(0x9ae1e7, 0x16)
/usr/local/go/src/runtime/panic.go:1117 +0x72
runtime.sysMap(0xc164000000, 0x58000000, 0xd561f0)
/usr/local/go/src/runtime/mem_linux.go:169 +0xc6
runtime.(*mheap).sysAlloc(0xd3cca0, 0x55800000, 0x42dc77, 0xd3cca8)
/usr/local/go/src/runtime/malloc.go:729 +0x1e5
runtime.(*mheap).grow(0xd3cca0, 0x2abf6, 0x0)
/usr/local/go/src/runtime/mheap.go:1346 +0x85
runtime.(*mheap).allocSpan(0xd3cca0, 0x2abf6, 0x0, 0x7f8680e73d28)
/usr/local/go/src/runtime/mheap.go:1173 +0x609
runtime.(*mheap).alloc.func1()
/usr/local/go/src/runtime/mheap.go:910 +0x59
runtime.systemstack(0x0)
/usr/local/go/src/runtime/asm_amd64.s:379 +0x66
runtime.mstart()
/usr/local/go/src/runtime/proc.go:1246
goroutine 39 [running]:
runtime.systemstack_switch()
/usr/local/go/src/runtime/asm_amd64.s:339 fp=0xc000099290 sp=0xc000099288 pc=0x46db40
runtime.(*mheap).alloc(0xd3cca0, 0x2abf6, 0x100, 0x8f4ae0)
/usr/local/go/src/runtime/mheap.go:904 +0x85 fp=0xc0000992e0 sp=0xc000099290 pc=0x429925
runtime.(*mcache).allocLarge(0x7f86b9f52108, 0x557ec000, 0xc000030001, 0x0)
/usr/local/go/src/runtime/mcache.go:224 +0x97 fp=0xc000099338 sp=0xc0000992e0 pc=0x419f97
runtime.mallocgc(0x557ec000, 0x92e2e0, 0xc00003cc01, 0x0)
/usr/local/go/src/runtime/malloc.go:1078 +0x925 fp=0xc0000993c0 sp=0xc000099338 pc=0x40f9e5
runtime.growslice(0x92e2e0, 0xc0f5480000, 0x16cc755, 0x16cc755, 0x16cc756, 0xc1615ea738, 0x1, 0x1)
/usr/local/go/src/runtime/slice.go:230 +0x1e9 fp=0xc000099428 sp=0xc0000993c0 pc=0x4515a9
github.com/netsampler/goflow2/decoders/netflow.DecodeOptionsDataSet(0xc00000000a, 0xc0000997c8, 0xc0002a8750, 0x1, 0x1, 0xc0002a87a0, 0x1, 0x1, 0x20300000000000, 0x7f8693620fff, ...)
/build/decoders/netflow/netflow.go:246 +0x47b fp=0xc000099530 sp=0xc000099428 pc=0x85cb1b
github.com/netsampler/goflow2/decoders/netflow.DecodeMessage(0xc0002a1ef0, 0xa3eea0, 0xc00000e6c0, 0xa, 0xc00009ae08, 0xd25001, 0x8de0)
/build/decoders/netflow/netflow.go:464 +0x991 fp=0xc000099a38 sp=0xc000099530 pc=0x85e031
github.com/netsampler/goflow2/utils.(*StateNetFlow).DecodeFlow(0xc0001a0640, 0x961520, 0xc00019ff20, 0x18558eba, 0xb56c5fe8f5)
/build/utils/netflow.go:102 +0x4ff fp=0xc000099e78 sp=0xc000099a38 pc=0x87127f
github.com/netsampler/goflow2/utils.(*StateNetFlow).DecodeFlow-fm(0x961520, 0xc00019ff20, 0xd250e0, 0x1)
/build/utils/netflow.go:64 +0x47 fp=0xc000099eb0 sp=0xc000099e78 pc=0x8794c7
github.com/netsampler/goflow2/decoders.Worker.Start.func1(0x0, 0xc000188670, 0x9cac58, 0xc000188690, 0xc0001cc000, 0x9a5e3f, 0x7, 0xc0001cc060, 0xc0001cc0c0)
/build/decoders/decoder.go:48 +0x168 fp=0xc000099f98 sp=0xc000099eb0 pc=0x86fae8
runtime.goexit()
/usr/local/go/src/runtime/asm_amd64.s:1371 +0x1 fp=0xc000099fa0 sp=0xc000099f98 pc=0x46f981
created by github.com/netsampler/goflow2/decoders.Worker.Start
/build/decoders/decoder.go:39 +0x70
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc0001b2238)
/usr/local/go/src/runtime/sema.go:56 +0x45
sync.(*WaitGroup).Wait(0xc0001b2230)
/usr/local/go/src/sync/waitgroup.go:130 +0x65
main.main()
/build/cmd/goflow2/main.go:153 +0x43d
goroutine 36 [IO wait]:
internal/poll.runtime_pollWait(0x7f8693239dd0, 0x72, 0x0)
/usr/local/go/src/runtime/netpoll.go:222 +0x55
internal/poll.(*pollDesc).wait(0xc000114418, 0x72, 0x0, 0x0, 0x9a5f96)
/usr/local/go/src/internal/poll/fd_poll_runtime.go:87 +0x45
internal/poll.(*pollDesc).waitRead(...)
/usr/local/go/src/internal/poll/fd_poll_runtime.go:92
internal/poll.(*FD).Accept(0xc000114400, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
/usr/local/go/src/internal/poll/fd_unix.go:401 +0x212
net.(*netFD).accept(0xc000114400, 0x30, 0x30, 0x7f86b9f52108)
/usr/local/go/src/net/fd_unix.go:172 +0x45
net.(*TCPListener).accept(0xc00000e6a8, 0xc00004bdd0, 0x40fb78, 0x30)
/usr/local/go/src/net/tcpsock_posix.go:139 +0x32
net.(*TCPListener).Accept(0xc00000e6a8, 0x960020, 0xc000128930, 0x8ff2a0, 0xd13340)
/usr/local/go/src/net/tcpsock.go:261 +0x65
net/http.(*Server).Serve(0xc00013e000, 0xa42870, 0xc00000e6a8, 0x0, 0x0)
/usr/local/go/src/net/http/server.go:2981 +0x285
net/http.(*Server).ListenAndServe(0xc00013e000, 0xc00013e000, 0x8)
/usr/local/go/src/net/http/server.go:2910 +0xba
net/http.ListenAndServe(...)
/usr/local/go/src/net/http/server.go:3164
main.httpServer()
/build/cmd/goflow2/main.go:56 +0xbc
created by main.main
/build/cmd/goflow2/main.go:90 +0x30c
goroutine 37 [IO wait]:
internal/poll.runtime_pollWait(0x7f8693239ce8, 0x72, 0x0)
/usr/local/go/src/runtime/netpoll.go:222 +0x55
internal/poll.(*pollDesc).wait(0xc000194198, 0x72, 0x2300, 0x2328, 0x0)
/usr/local/go/src/internal/poll/fd_poll_runtime.go:87 +0x45
internal/poll.(*pollDesc).waitRead(...)
/usr/local/go/src/internal/poll/fd_poll_runtime.go:92
internal/poll.(*FD).ReadFrom(0xc000194180, 0xc0001d36d0, 0x2328, 0x2328, 0x0, 0x0, 0x0, 0x0, 0x0)
/usr/local/go/src/internal/poll/fd_unix.go:222 +0x1e6
net.(*netFD).readFrom(0xc000194180, 0xc0001d36d0, 0x2328, 0x2328, 0x0, 0x0, 0xc000194180, 0xc0001d3618, 0x5b2688)
/usr/local/go/src/net/fd_posix.go:61 +0x5b
net.(*UDPConn).readFrom(0xc00018c0b8, 0xc0001d36d0, 0x2328, 0x2328, 0xd25720, 0xc00018ef30, 0x0, 0x0)
/usr/local/go/src/net/udpsock_posix.go:47 +0x6a
net.(*UDPConn).ReadFromUDP(0xc00018c0b8, 0xc0001d36d0, 0x2328, 0x2328, 0x5, 0x0, 0xc000188720, 0x9a573c)
/usr/local/go/src/net/udpsock.go:106 +0x5d
github.com/netsampler/goflow2/utils.UDPRoutine(0x9a573c, 0x5, 0xc000188700, 0x1, 0x9b2896, 0x0, 0x18c7, 0x0, 0xa48878, 0xc000187570, ...)
/build/utils/utils.go:138 +0x7dd
github.com/netsampler/goflow2/utils.(*StateSFlow).FlowRoutine(...)
/build/utils/sflow.go:151
main.main.func1(0xc0001b2230, 0xc0001885d0, 0xc0001885e0, 0x9b288e, 0xd)
/build/cmd/goflow2/main.go:125 +0x9f1
created by main.main
/build/cmd/goflow2/main.go:97 +0x419
goroutine 38 [chan receive]:
github.com/netsampler/goflow2/decoders.Processor.ProcessMessage(...)
/build/decoders/decoder.go:113
github.com/netsampler/goflow2/utils.UDPRoutine(0x9a5e3f, 0x7, 0xc000188670, 0x1, 0x9b28a6, 0x0, 0x807, 0x40fb00, 0xa48878, 0xc000187570, ...)
/build/utils/utils.go:147 +0x9e5
github.com/netsampler/goflow2/utils.(*StateNetFlow).FlowRoutine(0xc0001a0640, 0x1, 0x9b28a6, 0x0, 0x807, 0x0, 0x0, 0x0)
/build/utils/netflow.go:370 +0x15c
main.main.func1(0xc0001b2230, 0xc0001885d0, 0xc0001885e0, 0x9b289c, 0xf)
/build/cmd/goflow2/main.go:132 +0x849
created by main.main
/build/cmd/goflow2/main.go:97 +0x419
goroutine 40 [select]:
github.com/netsampler/goflow2/decoders.Worker.Start.func1(0x0, 0xc000188700, 0x9cac58, 0xc000188720, 0xc0001cc120, 0x9a573c, 0x5, 0xc0001cc180, 0xc0001cc1e0)
/build/decoders/decoder.go:42 +0xbd
created by github.com/netsampler/goflow2/decoders.Worker.Start
/build/decoders/decoder.go:39 +0x70
I'm using compose ELK and I wanted to add the enricher to the stream, how could I do that?
RFC 3954 and NetFlow Version 9 Flow-Record Format both specify the fields FIRST_SWITCHED
and LAST_SWITCHED
as System uptime in milliseconds, however, goflow2 produces them in seconds:
goflow2/producer/producer_nf.go
Lines 337 to 350 in 58f0f97
Is this an error, or are both fields produced in seconds for a reason I am unaware of?
goflow2 works correctly, if loose kafka while already started, but goflow2 can't start if Kafka is down
level=fatal msg="kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"
Sometimes it happens in real life when server restarts and goflow2 can't start during boot because it can't reach kafka.
Is it possible to ignore it and start despite to unreachable Kafka, and start to produce new messages to Kafka, when Kafka brokers will be online?
Description: Add an option for the producer when transport.type=kafka to configure a compression type per the spec configuration parameters for kafka "compression.type". See details per kafka's documentation here: https://kafka.apache.org/documentation/#brokerconfigs_compression.type
Steps to Implement:
Modified Class:
package kafka
import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"flag"
"fmt"
"os"
"strings"
"time"
sarama "github.com/Shopify/sarama"
"github.com/netsampler/goflow2/transport"
"github.com/netsampler/goflow2/utils"
log "github.com/sirupsen/logrus"
)
type KafkaDriver struct {
kafkaTLS bool
kafkaSASL bool
kafkaTopic string
kafkaSrv string
kafkaBrk string
kafkaMaxMsgBytes int
kafkaFlushBytes int
kafkaFlushFrequency time.Duration
kafkaLogErrors bool
kafkaHashing bool
kafkaVersion string
kafkaCompressionType string
producer sarama.AsyncProducer
q chan bool
}
func (d *KafkaDriver) Prepare() error {
flag.BoolVar(&d.kafkaTLS, "transport.kafka.tls", false, "Use TLS to connect to Kafka")
flag.BoolVar(&d.kafkaSASL, "transport.kafka.sasl", false, "Use SASL/PLAIN data to connect to Kafka (TLS is recommended and the environment variables KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set)")
flag.StringVar(&d.kafkaTopic, "transport.kafka.topic", "flow-messages", "Kafka topic to produce to")
flag.StringVar(&d.kafkaSrv, "transport.kafka.srv", "", "SRV record containing a list of Kafka brokers (or use brokers)")
flag.StringVar(&d.kafkaBrk, "transport.kafka.brokers", "127.0.0.1:9092,[::1]:9092", "Kafka brokers list separated by commas")
flag.IntVar(&d.kafkaMaxMsgBytes, "transport.kafka.maxmsgbytes", 1000000, "Kafka max message bytes")
flag.IntVar(&d.kafkaFlushBytes, "transport.kafka.flushbytes", int(sarama.MaxRequestSize), "Kafka flush bytes")
flag.DurationVar(&d.kafkaFlushFrequency, "transport.kafka.flushfreq", time.Second*5, "Kafka flush frequency")
flag.BoolVar(&d.kafkaLogErrors, "transport.kafka.log.err", false, "Log Kafka errors")
flag.BoolVar(&d.kafkaHashing, "transport.kafka.hashing", false, "Enable partition hashing")
//flag.StringVar(&d.kafkaKeying, "transport.kafka.key", "SamplerAddress,DstAS", "Kafka list of fields to do hashing on (partition) separated by commas")
flag.StringVar(&d.kafkaVersion, "transport.kafka.version", "2.8.0", "Kafka version")
flag.StringVar(&d.kafkaCompressionType, "transport.kafka.compression.type", "lz4", "Kafka compression type")
return nil
}
func (d *KafkaDriver) Init(context.Context) error {
kafkaConfigVersion, err := sarama.ParseKafkaVersion(d.kafkaVersion)
if err != nil {
return err
}
kafkaConfig := sarama.NewConfig()
kafkaConfig.Version = kafkaConfigVersion
kafkaConfig.Producer.Return.Successes = false
kafkaConfig.Producer.Return.Errors = d.kafkaLogErrors
kafkaConfig.Producer.MaxMessageBytes = d.kafkaMaxMsgBytes
kafkaConfig.Producer.Flush.Bytes = d.kafkaFlushBytes
kafkaConfig.Producer.Flush.Frequency = d.kafkaFlushFrequency
kafkaConfig.Producer.Compression = sarama.CompressionCodec(d.kafkaCompressionType)
if d.kafkaTLS {
rootCAs, err := x509.SystemCertPool()
if err != nil {
return errors.New(fmt.Sprintf("Error initializing TLS: %v", err))
}
kafkaConfig.Net.TLS.Enable = true
kafkaConfig.Net.TLS.Config = &tls.Config{RootCAs: rootCAs}
}
if d.kafkaHashing {
kafkaConfig.Producer.Partitioner = sarama.NewHashPartitioner
}
if d.kafkaSASL {
if !d.kafkaTLS /*&& log != nil*/ {
log.Warn("Using SASL without TLS will transmit the authentication in plaintext!")
}
kafkaConfig.Net.SASL.Enable = true
kafkaConfig.Net.SASL.User = os.Getenv("KAFKA_SASL_USER")
kafkaConfig.Net.SASL.Password = os.Getenv("KAFKA_SASL_PASS")
if kafkaConfig.Net.SASL.User == "" && kafkaConfig.Net.SASL.Password == "" {
return errors.New("Kafka SASL config from environment was unsuccessful. KAFKA_SASL_USER and KAFKA_SASL_PASS need to be set.")
} else /*if log != nil*/ {
log.Infof("Authenticating as user '%s'...", kafkaConfig.Net.SASL.User)
}
}
var addrs []string
if d.kafkaSrv != "" {
addrs, _ = utils.GetServiceAddresses(d.kafkaSrv)
} else {
addrs = strings.Split(d.kafkaBrk, ",")
}
kafkaProducer, err := sarama.NewAsyncProducer(addrs, kafkaConfig)
if err != nil {
return err
}
d.producer = kafkaProducer
d.q = make(chan bool)
if d.kafkaLogErrors {
go func() {
for {
select {
case msg := <-kafkaProducer.Errors():
//if log != nil {
log.Error(msg)
//}
case <-d.q:
return
}
}
}()
}
return err
}
func (d *KafkaDriver) Send(key, data []byte) error {
d.producer.Input() <- &sarama.ProducerMessage{
Topic: d.kafkaTopic,
Key: sarama.ByteEncoder(key),
Value: sarama.ByteEncoder(data),
}
return nil
}
func (d *KafkaDriver) Close(context.Context) error {
d.producer.Close()
close(d.q)
return nil
}
func init() {
d := &KafkaDriver{}
transport.RegisterTransportDriver("kafka", d)
}
Hi Louis,
I've finished PoC with goflow2 in k8s and sending JSON data to kafka and than to ELK, but it's seems i see incorrect graphics because goflow2 doesn't automatically multiply sampling rate with bytes and packets before sending data to kafka or file and netflow/ipfix data incorrect.
For example i have sampling rate 100 on Juniper routers and i see ~15GB on the 15 minutes timeframe on another collector (nfacctd), but with the same diagramm on kibana from goflow2 i see only ~1.3MB.
MikroTik devices do not include the SAMPLING_INTERVAL field in the Option Data Sets, instead it's included in the Data Set and defined by the Data Template - the problem is GoFlow2 never looks for or uses the SAMPLING_INTERVAL field in the Data Set, so the SamplingRate field in the output is always 0.
I am not sure if this is out of spec and the MikroTik doing things wrong, but to resolve the issue I had to tweak producer/producer_nf.go:
Add the following in ConvertNetFlowDataSet()
case netflow.NFV9_FIELD_SAMPLING_INTERVAL:
DecodeUNumber(v, &(flowMessage.SamplingRate))
Alter the line fmsg.SamplingRate = uint64(samplingRate)
to prevent overwriting the above with 0:
if samplingRate > 0 {
fmsg.SamplingRate = uint64(samplingRate)
}
I haven't submitted the above as a pull request as I'm not 100% sure this is the best way to handle this scenario, alongside the existing sampling system logic. I have a pcap if useful.
Sidenote: MikroTik devices seem to send the sampling interval in little endian rather than big endian expected by DecodeUNumber(), so an interval of 256 is actually coming through as 65536 - I'm correcting that at a later stage.
I looked into code, and it seems that goflow2 just collapses both into byte-array without any differentiation between IPv4 and IPv6 addresses. When data are read from (f.e.) Kafka, ipv4 addresses are mapped into random parts of IPv6 numbering plan, and there is no reliable way to separate them.
May be a flag for address family can help?
Hey @lspgn ,
We'd like to get the TimeFlowStart/End in ms (at least), currently it's in seconds and the ipfix fields handling better accuracy are all approximated to seconds, as we can see here: https://github.com/netsampler/goflow2/blob/main/producer/producer_nf.go#L353-L382
How could we do that without breaking compatibility? Provide a separate field for milliseconds, or having some config to choose time precision?
There seems to be some issue with go.dev detecting this repo's license file. According to the docs, they employ https://pkg.go.dev/github.com/google/licensecheck.
This is how goflow2's docs look for me at the moment:
Hi,
When running go test ./utils -race
the race detector is raising those warning below.
I think in practice it shouldn't be an issue.
But some project using goflow2 as library might have CI tests (with race detector enabled) failing due to this.
I think it might worth fixing this theoretical race condition, what do you think ?
$ go test ./utils -race
==================
WARNING: DATA RACE
Write at 0x00c000226c30 by goroutine 27:
github.com/netsampler/goflow2/utils.(*stopper).Shutdown()
/go/src/github.com/goflow2/utils/stopper.go:32 +0xc4
github.com/netsampler/goflow2/utils.TestStopper()
/go/src/github.com/goflow2/utils/stopper_test.go:16 +0x112
testing.tRunner()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1259 +0x22f
testing.(*T).Run·dwrap·21()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1306 +0x47
Previous read at 0x00c000226c30 by goroutine 28:
github.com/netsampler/goflow2/utils.(*routine).StartRoutine.func1()
/go/src/github.com/goflow2/utils/stopper_test.go:46 +0x45
Goroutine 27 (running) created at:
testing.(*T).Run()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1306 +0x726
testing.runTests.func1()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1598 +0x99
testing.tRunner()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1259 +0x22f
testing.runTests()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1596 +0x7ca
testing.(*M).Run()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1504 +0x9d1
main.main()
_testmain.go:49 +0x22b
Goroutine 28 (finished) created at:
github.com/netsampler/goflow2/utils.(*routine).StartRoutine()
/go/src/github.com/goflow2/utils/stopper_test.go:44 +0x118
github.com/netsampler/goflow2/utils.TestStopper()
/go/src/github.com/goflow2/utils/stopper_test.go:14 +0xbc
testing.tRunner()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1259 +0x22f
testing.(*T).Run·dwrap·21()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1306 +0x47
==================
==================
WARNING: DATA RACE
Read at 0x00c000226c50 by goroutine 30:
github.com/netsampler/goflow2/utils.TestStopper.func1()
/go/src/github.com/goflow2/utils/stopper_test.go:18 +0x34
github.com/stretchr/testify/assert.Eventually.func1()
/go/pkg/mod/github.com/stretchr/[email protected]/assert/assertions.go:1655 +0x39
Previous write at 0x00c000226c50 by goroutine 28:
github.com/netsampler/goflow2/utils.(*routine).StartRoutine.func1()
/go/src/github.com/goflow2/utils/stopper_test.go:47 +0x64
Goroutine 30 (running) created at:
github.com/stretchr/testify/assert.Eventually()
/go/pkg/mod/github.com/stretchr/[email protected]/assert/assertions.go:1655 +0x3b7
github.com/netsampler/goflow2/utils.TestStopper()
/go/src/github.com/goflow2/utils/stopper_test.go:17 +0x196
testing.tRunner()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1259 +0x22f
testing.(*T).Run·dwrap·21()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1306 +0x47
Goroutine 28 (finished) created at:
github.com/netsampler/goflow2/utils.(*routine).StartRoutine()
/go/src/github.com/goflow2/utils/stopper_test.go:44 +0x118
github.com/netsampler/goflow2/utils.TestStopper()
/go/src/github.com/goflow2/utils/stopper_test.go:14 +0xbc
testing.tRunner()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1259 +0x22f
testing.(*T).Run·dwrap·21()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1306 +0x47
==================
--- FAIL: TestStopper (0.00s)
testing.go:1152: race detected during execution of test
==================
WARNING: DATA RACE
Read at 0x00c000226e00 by goroutine 34:
github.com/netsampler/goflow2/utils.TestCancelUDPRoutine.func3()
/go/src/github.com/goflow2/utils/utils_test.go:41 +0x5e
github.com/netsampler/goflow2/utils.TestCancelUDPRoutine()
/go/src/github.com/goflow2/utils/utils_test.go:51 +0x318
testing.tRunner()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1259 +0x22f
testing.(*T).Run·dwrap·21()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1306 +0x47
Previous write at 0x00c000226e00 by goroutine 35:
github.com/netsampler/goflow2/utils.(*dummyFlowProcessor).FlowRoutine()
/go/src/github.com/goflow2/utils/utils_test.go:74 +0x90
github.com/netsampler/goflow2/utils.TestCancelUDPRoutine.func1()
/go/src/github.com/goflow2/utils/utils_test.go:20 +0x53
Goroutine 34 (running) created at:
testing.(*T).Run()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1306 +0x726
testing.runTests.func1()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1598 +0x99
testing.tRunner()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1259 +0x22f
testing.runTests()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1596 +0x7ca
testing.(*M).Run()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1504 +0x9d1
main.main()
_testmain.go:49 +0x22b
Goroutine 35 (running) created at:
github.com/netsampler/goflow2/utils.TestCancelUDPRoutine()
/go/src/github.com/goflow2/utils/utils_test.go:19 +0x1e4
testing.tRunner()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1259 +0x22f
testing.(*T).Run·dwrap·21()
/usr/local/Cellar/go/1.17.6/libexec/src/testing/testing.go:1306 +0x47
==================
--- FAIL: TestCancelUDPRoutine (0.10s)
testing.go:1152: race detected during execution of test
FAIL
FAIL github.com/netsampler/goflow2/utils 0.463s
FAIL
Does it support to sending data to kafka in json format?
Hey Louis,
first of all a big thanks for this amazing peace of software! <3
We are using goflow2 with your sample enricher as a simple POC pipeline for sFlow. We are only using Juniper gear (MX480, MX204). Today we stumbled upon the following problem...
Execution:
/root/goflow2/goflow2 -transport.file.sep= -format=pb -format.protobuf.fixedlen=true | /root/goflow2/enricher -db.asn /root/goflow2/GeoLite2-ASN.mmdb -db.country /root/goflow2/GeoLite2-Country.mmdb -transport.kafka.brokers=localhost:9092 -transport=kafka -transport.kafka.topic=flows -format=pb -format.protobuf.fixedlen=true
Error:
INFO[0000] Starting collection hostname= port=6343 scheme=sflow
INFO[0000] Starting collection hostname= port=2055 scheme=netflow
INFO[0000] Starting enricher
[root@flowtest ~]# ERRO[2006] proto: cannot parse invalid wire-format data
ERRO[2006] proto: cannot parse invalid wire-format data
panic: runtime error: slice bounds out of range [:9] with capacity 4
goroutine 1 [running]:
main.main()
/root/goflow2/cmd/enricher/main.go:166 +0xdc5
[root@flowtest ~]#
I am not sure if we have hit some issue with goflow or if it is related to Junipers crappy sFlow implementation...
We have just pulled the latest code from the main branch to check if your latest commit was our problem. Problem still exists...
It looks like when this was forked, cloudflare/goflow#73 was dropped from the formatting controls.
The "normal" format is useful for simplified parsing with things like mtail which doesn't really handle json well.
It's also useful to be able to drop unused fields from the output by filtering which fields are included.
is there any way to get template id and other packet related fields in flowMessageSet
Hi there..
I'm trying around with the goflow2, kafka, clickhouse pipeline and I'm searching for a way to make the IP adresses of the inserted IPFIX flows human readable. Maybe it is a question more related to Clickhouse, but I want to try my luck here as well ;)..
As seen in flow.proto protobuf schema the datatypes of the fields with IP adresses are bytes format. But that is not really handy for me to save it in CH database, because I don't know how to read/make use of the byte objects in selects in DB usage. If I use the DB Pipeline tutorial I saw the CH DB column for that is fixed_string(16) but with that I got also byte objects in DB table.
--- snip ----
// Sampler information
bytes SamplerAddress = 11;
// Found inside packet
uint64 TimeFlowStart = 38;
uint64 TimeFlowEnd = 5;
uint64 TimeFlowStartMs = 63;
uint64 TimeFlowEndMs = 64;
// Size of the sampled packet
uint64 Bytes = 9;
uint64 Packets = 10;
// Source/destination addresses
bytes SrcAddr = 6;
bytes DstAddr = 7;
---- snap ---
My tries to change the format in flow.proto ( SrcAddress, DstAddress, SamplerAdress ) to string ( because I think the docker container compile the Proto format file at startup ? ) ended up with byte objects in the specific columns of the CH DB.
That are the DB schema to read from KAFKA:
CREATE TABLE IF NOT EXISTS sisr_dev.kafka_raw_goflow2_test1_proto_ipfix_consumer ON CLUSTER "dev-ch-cluster" (
Samplingrate UInt64,
SequenceNum UInt32,
SrcAddr String,
DstAddr String,
Bytes UInt64,
Packets UInt64,
SamplerAddress String,
NextHop String,
Proto UInt32,
SrcPort UInt32,
DstPort UInt32,
SrcMac UInt64,
DstMac UInt64,
Etype UInt32,
TimeFlowStart UInt64,
IngressVrfID UInt32,
TimeFlowStartMs UInt64,
TimeFlowEndMs UInt64,
)
ENGINE = Kafka ()
SETTINGS kafka_broker_list =
[kafka_broker]
kafka_topic_list = 'goflow2_test1_raw_ipfix_proto',
kafka_group_name = 'goflow2_raw_ch_kafka', kafka_num_consumers = 8,
kafka_thread_per_consumer = 4,
kafka_format = 'Protobuf',
kafka_schema = 'flow.proto:FlowMessage',
kafka_skip_broken_messages = 1
Data will be written correctly into database.. everything is working like a charm excluding the thing with the byte format ip addresses..
Does anybody can help me ? Or has a example for better understanding and usage of the data ?
best regards and thanks ..
Christian
Was switching over from goflow(1) to goflow2 recently, with mostly identical settings and setup.
Using the Kafka backend and on fair amount of requests (>20-30k messages/sec), the error message below is periodically emitted by goflow2.
(Note that I haven't tested to find what threshold does the issue occur)
ERRO[3562] kafka: Failed to produce message to topic : kafka server: Message was too large, server rejected it to avoid allocation error
Relevant settings for reference (other settings omitted) :
-format=pb
-transport=kafka
-transport.kafka.log.err
-transport.kafka.hashing=false
-transport.kafka.version="2.4.1"
-workers=10
Kafka (v2.4) settings are close to stock.
Amount/load :
HI i tried to use a flow generator https://github.com/sheacloud/flowgen,
i can see tool is generating IPFIX packets with sequenceNum=0, and goflow2 is parsing these packets. i can see other parsers are not capturing those packets.
not sure if this is valid behaviour
Hey @lspgn 👋
I've done a POC recently of one of your example pipelines (Flows + Kafka + Clicklhouse + Grafana) deployed on top of k8s.
Would you be interested in that? I still have all of the commands and files available.
Hi Louis,
I have another question.. may be you are so kind to help out..
In my IPFix Flows I see the fields Dot1q_Vlan, Post_Dot1q_Vlan, Dot1q_Cvlan, Post_Dot1q_Cvlan in PMACCT and tcpdump.
I followed your manual to add exotic files by mapping.
That is my mapping:
ipfix:
mapping:
- field: 252
destination: InIf
- field: 253
destination: OutIf
- field: 243
destination: Dot1q_Vlan
- field: 254
destination: Post_Dot1q_Vlan
- field: 245
destination: Dot1q_Cvlan
- field: 255
destination: Post_Dot1q_Cvlan
Furthermore I add the following to flow.proto :
// Q_Vlan
uint32 Dot1q_Vlan = 243;
uint32 Post_Dot1q_Vlan = 254;
uint32 Dot1q_Cvlan = 245;
uint32 Post_Dot1q_Cvlan = 255;
After that I rebuild the container with "docker build ." and startet the goflow2 container
But I don't get values for that 4 new fields in clickhouse and as you can see in the data I send in the last issue, the data will be exported. In ClickhouseDB the columns are zero..
Do I configure sth. wrong ?
Do you have an idea ?
Thanks a lot again..
Best regards
Christian
I installed to goflow2 using rpm, and using the below config to transport the netflow data to a file, but the netflow data are written to /var/log/messages also, how do we avoid writing the netflow data to /var/log/messages
/usr/bin/goflow2 -listen nfl://10.0.0.53:8888 -transport file -transport.file /var/log/goflow/goflow.log -format text
Hello,
I've recently patched the producer_nf.go file to produce some MPLS fields. Some fields were already present in the flow.proto file but I needed to add one more.
I added this part of code in producer_nf.go (after line 335) :
//MPLS
case netflow.IPFIX_FIELD_mplsTopLabelStackSection:
var mplsLabel uint32
DecodeUNumber(v, &mplsLabel)
flowMessage.MPLS1Label = uint32(mplsLabel >> 4)
flowMessage.HasMPLS = true
case netflow.IPFIX_FIELD_mplsLabelStackSection2:
var mplsLabel uint32
DecodeUNumber(v, &mplsLabel)
flowMessage.MPLS2Label = uint32(mplsLabel >> 4)
case netflow.IPFIX_FIELD_mplsLabelStackSection3:
var mplsLabel uint32
DecodeUNumber(v, &mplsLabel)
flowMessage.MPLS3Label = uint32(mplsLabel >> 4)
case netflow.IPFIX_FIELD_mplsTopLabelIPv4Address:
flowMessage.MPLSTopLabelIpv4Addr = v
And to support the new field "MPLSTopLabelIpv4Addr" I modified the flow.proto like that :
bytes MPLSTopLabelIpv4Addr = 63; // mplsTopLabelIPv4Address
Could you please take into account this enhancement in the main branch ?
BR
David
Hi!
Great resource @lspgn, thanks for taking goflow to the moon!..
tried and liked it very much, very high performance with minimum resource use...
Only problem i have right now is the hsflowd packets.
What should be the way to update the goflow2 to be able decode and insert new sflow types all in together to the same db in one machine.
e.g.; hsflowd + sflow v5 ---> goflow2 ---> kafka --> ch
flow_data | 0 | 2100 | extended_socket_ipv4 | sFlow Host Structures
flow_data | 0 | 2101 | extended_socket_ipv6 | sFlow Host Structures
+
sample_data | 0 | 1 | flow_sample | sFlow Version 5
sample_data | 0 | 2 | counter_sample | sFlow Version 5
sample_data | 0 | 3 | flow_sample_expanded | sFlow Version 5
sample_data | 0 | 4 | counter_sample_expanded | sFlow Version 5
flow_data | 0 | 1 | sampled_header | sFlow Version 5
flow_data | 0 | 2 | sampled_ethernet | sFlow Version 5
flow_data | 0 | 3 | sampled_ipv4 | sFlow Version 5
flow_data | 0 | 4 | sampled_ipv6 | sFlow Version 5
flow_data | 0 | 1001 | extended_switch | sFlow Version 5
flow_data | 0 | 1002 | extended_router | sFlow Version 5
flow_data | 0 | 1003 | extended_gateway | sFlow Version 5
flow_data | 0 | 1004 | extended_user | sFlow Version 5
flow_data | 0 | 1005 | extended_url (deprecated) | sFlow Version 5
flow_data | 0 | 1006 | extended_mpls | sFlow Version 5
flow_data | 0 | 1007 | extended_nat | sFlow Version 5
flow_data | 0 | 1008 | extended_mpls_tunnel | sFlow Version 5
flow_data | 0 | 1009 | extended_mpls_vc | sFlow Version 5
flow_data | 0 | 1010 | extended_mpls_FTN | sFlow Version 5
flow_data | 0 | 1011 | extended_mpls_LDP_FEC | sFlow Version 5
flow_data | 0 | 1012 | extended_vlantunnel | sFlow Version 5
or another combination..
Sincerely,
F.
I have been looking through goflow2 project, and I know you add the sampling rate provided by options template. As we all know, the sampling rate is related with Observation Domain, so you maintain a map by version+obs Domain to sampling rate, I think that is absolutely right. However, I have a question how I can correlate the interface name/desc provided by options template into Netflow data flow. thx.
I could decode Netflow V5 packets in goflow version 1. Can you release an application can decode Netflow v5?
Opening an issue to jot down a few ideas.
I think a refactor is going to be necessary, not super satisfied how samples are processed anymore.
Use-cases have changed over the years:
The producer element is too strict I believe. Should think about using reflect and allow populating maps.
The message dispatch could also be improved (sharing socket for NetFlow). Less buffer copy operations?
Transport could be improved, especially with Kafka, by adding delivery guarantees and improving access to configuration options.
At the same time, would like to avoid becoming a multi-output tool like Telegraf or Vector.
Amazon Managed Service for Kafka (MSK) does not support SASL/PLAIN authentication, even over TSL. However, it does support SASL/SCRAM.
[2022-09-12 03:44:30,722] INFO [SocketServer brokerId=2] Failed authentication with ip-10-10-101-111.us-west-2.compute.internal/INTERNAL_IP (Unsupported SASL mechanism PLAIN) (org.apache.kafka.common.network.Selector)
For our use case (distributed goflow2 agents sending flows to regional MSK deployments), we need goflow2 to be able to support SCRAM. Fortunately, it looks like Sarama already supports it, so this should (hopefully) be straightforward. See example code from the Sarama repository.
I'm happy to take a crack at adding this to goflow2, if it's something you think it valuable.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.