Kafka Output

pcap2json Kafka Integration

pcap2json can push flow snapshot JSON information to a Kafka bus using the libkafka client library. This is an alternative flow to directly pushing to Elastic Search, there are Pros/Cons of doing this which depends on customers infrastructure.

Example configuration file located in /opt/fmadio/etc/pcap2json.lua

local Config =
{
["General"] =
{
    IsMultiFE       = false,
    WatchdogTimeout = 5*60,
    --SingleStack       = "analytics",
}
,
["TCPEngine"]  =
{
    Enable      = false,
    Debug       = false,
    --Enable        = false,
}
,
["pcap2json"] =
{
    "-v",
    "--flow-samplerate 1e9 ",
    "--flow-index-depth 8",
    "--flow-max 4e6",
}
,
["backend"] =
{
    "--index-name pcap2json_index",

    "--output-buffercnt 128",
    --"--output-buffercnt 4",
    --"--output-stdout",
    --"--output-null",
    --"--output-espush",
    "--output-kafka",
    "--output-debug",

    "--pipe-window 30e9",

    "--kafka-broker 192.168.2.82:9092",
    "--kafka-topic pcap2json",

    --"--geoip /mnt/store0/etc/pcap2json_map.geo",
}
,
["RTT"] =
{
    ["Enable"]      = false,
    ["Period"]      = 120e9,
    ["Offset"]      = 120e9,
    ["Duration"]    = 60e9,
}
,
["stream_cat"] =
{
        --["BPF"]       = "net 192.168.1.0/24",
        --["FollowNow"] = 1e6,
}
}
return Config

In the above example its pushing to Kafaka broker at "192.168.2.82:9092" per

    "--kafka-broker 192.168.2.82:9092",

Also pushing to Kafka topic "pcap2json" per

    "--kafka-topic pcap2json",

Please modify based on you infrastructure, the other items can remain the same.

Running pcap2json Kafaka

Running pcap2json offline is as follows, in the example we are using the FMADIO Capture "interop17_20220927_0348"

Execute the command in the /opt/fmadio/analytics directory

fmadio@fmadio100v2-228U:/opt/fmadio/analytics$ ./pcap2json_realtime.lua  --force --offline interop17_20220927_0348

Example output is as follows

fmadio@fmadio100v2-228U:/opt/fmadio/analytics$ ./pcap2json_realtime.lua  --force --offline interop17_20220927_0348
fmad fmadlua Sep 21 2022 (/opt/fmadio/bin/fmadiolua ./pcap2json_realtime.lua --force --offline interop17_20220927_0348 )
calibrating...
0 : 2095073184           2.0951 cycles/nsec offset:4.927 Mhz
Cycles/Sec 2095073184.0000 Std:       0 cycle std(  0.00000000) Target:2.10 Ghz
Args: 1 --force
Args: 2 --offline
Args: 3 interop17_20220927_0348
OpenCtrl [/opt/fmadio/status/analytics] (fSysAnalytics_t*) Length 1048576B
Cmd[sudo killall stream_cat]
killall: stream_cat: no process killed
Cmd[sudo killall pcap2json]
killall: pcap2json: no process killed
Cmd[sudo killall pcap2json_backend]
killall: pcap2json_backend: no process killed
Cmd[sudo mkdir -p /mnt/store0/protocol/pcap2json]
Cmd[sudo chown fmadio.staff /mnt/store0/protocol/pcap2json]
Got BPF Filter []
Got  Filter []
Watchdog Timeout: 300
[Tue Sep 27 10:14:30 2022] pcap2json pcap2json init
RTT:Enable   false
RTT:Period   0.00
RTT:Duration 0.00
RTT:Offset   0.00
[Tue Sep 27 10:14:30 2022] pcap2json {"module":"pcap2json","subsystem":"system","timestamp":1664273670.566757,"event":"start","description":"RunLoop:0 MultiFE:false FollowNow:nil Watchdog:false"}
Dedicated pcap2json 100G node Analytics
Cmd[sudo /opt/fmadio/bin/pcap2json_backend --uid pcap2json_1664273670497136896 --cpu-flow 4 16 17 18 19 --cpu-pipe 0 15 --cpu-output 1 59   --pipe-count 1  --input-pipe 0 /opt/fmadio/queue/pcap2json_0  --index-name pcap2json_index --output-buffercnt 128 --output-kafka --output-debug --pipe-window 30e9 --kafka-broker 192.168.2.82:9092 --kafka-topic pcap2json --geoip /mnt/store0/etc/pcap2json_map.geo > /mnt/store0/log/pcap2json_backend_20220927_1014.stdout  2>&1 &]
Cmd[rm -R /mnt/store0/log/pcap2json_backend.stdout.cur]
Cmd[ln -s /mnt/store0/log/pcap2json_backend_20220927_1014.stdout /mnt/store0/log/pcap2json_backend.stdout.cur]
Cmd[sudo /opt/fmadio/bin/stream_cat -v --uid pcap2json_1664273670497136896 --shmring  --cpu 64  --ignore_fcs --pktslice 96   interop17_20220927_0348 | /opt/fmadio/bin/pcap2json --uid pcap2json_1664273670497136896 --cpu-core 50 --cpu-flow 24 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95  --output-pipe /opt/fmadio/queue/pcap2json_0  --instance-id 0 --instance-max 1  -v --flow-samplerate 1e9  --flow-index-depth 8 --flow-max 4e6 > /mnt/store0/log/pcap2js
on_0_20220927_1014.stdout  2>&1 &]
Cmd[rm -R /mnt/store0/log/pcap2json_0.cur]
Cmd[ln -s /mnt/store0/log/pcap2json_0_20220927_1014.stdout /mnt/store0/log/pcap2json_0.cur]
stream_cat UID [pcap2json_1664273670497136896]
SHM Ring Output
stream_cat CPU Affinity: 64
stream_cat: PktSlice: 96
stream_cat ioqueue: 4
map:0x7fba4dbb8000
calibrating...
[Tue Sep 27 10:14:35 2022] Starting
0 : 2095072468           2.0951 cycles/nsec offset:4.928 Mhz
Cycles/Sec 2095072468.0000 Std:       0 cycle std(  0.00000000) Target:2.10 Ghz
StartChunkID: 13512
StartChunk: 13512 Offset: 0 Stride: 1
StartChunk: 13512
SHM Ring Name [/stream_cat_1664273676726910000]
SHMRing Memory: 4.25 MB
Spining up worker thread CPU:65
[Tue Sep 27 10:14:36 2022] pcap2json {"module":"pcap2json","subsystem":"monitor","timestamp":1664273676.727087,"IsUp_Backend":true,"IsUp_stream_cat":true,"IsUp_Frontend":true,"FE1_Gbps": 0.00,"FE1_Klps":    0.00,"PCAPPendingByte":           0,"BE_LagSec":       0,"BE_dT": 0.0,"BE_dGB": 0.000,"BE_FlowPerSnapshot":       0,"BE_Gbps":  0.00,"BE_Mpps":  0.00,"ES_PushCnt":       0,"ES_Error":       0,"ES_DocCnt":     0,"ES_DocsPerSecK":0.000,"Watchdog_Error":"    1/  300"}
[Tue Sep 27 10:14:37 2022] pcap2json {"module":"pcap2json","subsystem":"monitor","timestamp":1664273677.853794,"IsUp_Backend":true,"IsUp_stream_cat":true,"IsUp_Frontend":true,"FE1_Gbps": 0.00,"FE1_Klps":    0.00,"PCAPPendingByte": 13432000000,"BE_LagSec":167258084,"BE_dT": 1.0,"BE_dGB": 9.000,"BE_FlowPerSnapshot":       0,"BE_Gbps": 14.38,"BE_Mpps":  1.56,"ES_PushCnt":       0,"ES_Error":       0,"ES_DocCnt":     0,"ES_DocsPerSecK":0.000,"Watchdog_Error":"    0/  300"}
[Tue Sep 27 10:14:39 2022] pcap2json {"module":"pcap2json","subsystem":"monitor","timestamp":1664273678.977203,"IsUp_Backend":true,"IsUp_stream_cat":true,"IsUp_Frontend":true,"FE1_Gbps":255.13,"FE1_Klps":    0.00,"PCAPPendingByte": 10384000000,"BE_LagSec":167258086,"BE_dT": 1.0,"BE_dGB": 0.000,"BE_FlowPerSnapshot":       0,"BE_Gbps": 11.99,"BE_Mpps":  1.30,"ES_PushCnt":       0,"ES_Error":       0,"ES_DocCnt":     0,"ES_DocsPerSecK":0.000,"Watchdog_Error":"    1/  300"}
[Tue Sep 27 10:14:40 2022] pcap2json {"module":"pcap2json","subsystem":"monitor","timestamp":1664273680.113995,"IsUp_Backend":true,"IsUp_stream_cat":true,"IsUp_Frontend":true,"FE1_Gbps":254.54,"FE1_Klps":  659.34,"PCAPPendingByte":  7371000000,"BE_LagSec":167258087,"BE_dT": 1.0,"BE_dGB":31.940,"BE_FlowPerSnapshot":       0,"BE_Gbps": 46.71,"BE_Mpps":  5.14,"ES_PushCnt":       0,"ES_Error":       0,"ES_DocCnt":139353,"ES_DocsPerSecK":139.194,"Watchdog_Error":"    0/  300"}
.
.
[0]    Worker thread EOF Offset:00000010 TS: 1664250879474030000 LenCap:       0 LenWir:       0 Flag:       1 EOF:1 Get:f965 Put:f968 Depth:3
[0]2nd Worker thread EOF Offset:00000010 TS: 1664250879474030000 LenCap:       0 LenWir:       0 Flag:       1 EOF:1 Get:f965 Put:f968 Depth:3
[0]    Worker thread EOF Offset:00000010 TS: 1664250879475718000 LenCap:       0 LenWir:       0 Flag:       1 EOF:1 Get:f966 Put:f968 Depth:2
[0]2nd Worker thread EOF Offset:00000010 TS: 1664250879475718000 LenCap:       0 LenWir:       0 Flag:       1 EOF:1 Get:f966 Put:f968 Depth:2
[0]    Worker thread EOF Offset:00000010 TS: 1664250879477502000 LenCap:       0 LenWir:       0 Flag:       1 EOF:1 Get:f967 Put:f968 Depth:1
[0]2nd Worker thread EOF Offset:00000010 TS: 1664250879477502000 LenCap:       0 LenWir:       0 Flag:       1 EOF:1 Get:f967 Put:f968 Depth:1
20220927_101442 : FMADRING Worker thread exiting 63848 PayloadCRC:00000000 Exit:0
20220927_101442 : straem_cat FMADRING exit is clean
20220927_101442 6.582s : Pkt:86781353 Byte:-1833097856 SUCCESS
[Tue Sep 27 10:14:42 2022] pcap2json {"module":"pcap2json","subsystem":"monitor","timestamp":1664273682.349006,"IsUp_Backend":true,"IsUp_stream_cat":false,"IsUp_Frontend":true,"FE1_Gbps":254.18,"FE1_Klps":  532.05,"PCAPPendingByte":  1307000000,"BE_LagSec":167258088,"BE_dT": 1.0,"BE_dGB": 3.470,"BE_FlowPerSnapshot":   98288,"BE_Gbps": 39.41,"BE_Mpps":  4.50,"ES_PushCnt":       0,"ES_Error":       0,"ES_DocCnt":550806,"ES_DocsPerSecK":255.588,"Watchdog_Error":"    0/  300"}
Tue Sep 27 10:14:42 2022 stream cat existed
[Tue Sep 27 10:14:42 2022] PCAP2JSON existed
[Tue Sep 27 10:14:42 2022] Shutting down sleep 0 backend:true
[Tue Sep 27 10:14:43 2022] Shutting down sleep 1 backend:true
[Tue Sep 27 10:14:44 2022] Shutting down sleep 2 backend:true
[Tue Sep 27 10:14:45 2022] Shutting down sleep 3 backend:true
[Tue Sep 27 10:14:46 2022] Shutting down sleep 4 backend:true
[Tue Sep 27 10:14:47 2022] Shutting down sleep 5 backend:true
[Tue Sep 27 10:14:48 2022] Shutting down sleep 6 backend:true
[Tue Sep 27 10:14:49 2022] Shutting down sleep 7 backend:false
Cmd[sudo killall stream_cat]
killall: stream_cat: no process killed
Cmd[sudo killall pcap2json]
killall: pcap2json: no process killed
Cmd[sudo killall pcap2json_backend]
killall: pcap2json_backend: no process killed
[Tue Sep 27 10:14:49 2022] pcap2json {"module":"pcap2json","subsystem":"system","timestamp":1664273689.699359,"event":"stop","description":"RunLoop:0"}
[Tue Sep 27 10:14:49 2022] 0 finished Took: 0.32006743466667 minoffline mode exiting
[Tue Sep 27 10:14:49 2022] finished Took: 0.32006866773333 mindone 19.238943Sec 0.320649Min
fmadio@fmadio100v2-228U:/opt/fmadio/analytics$

Depending on the size of the capture, this may take 10sec, or 10 minutes.

Debugging

Any errors with the configuration can be checked in the log file.

/mnt/store0/log/pcap2json_backend.cur

NOTE: this file is a symbolic link of the latest run of pcap2json.

Realtime Operation

After confirming operation in offline mode works correctly, enabling realtime via the GUI as follows

Last updated