mirror of
https://github.com/Zhouchaowen/prism.git
synced 2025-09-26 20:11:19 +08:00
369 lines
9.3 KiB
Go
369 lines
9.3 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"github.com/cilium/ebpf"
|
|
"github.com/cilium/ebpf/perf"
|
|
"github.com/cilium/ebpf/ringbuf"
|
|
"github.com/cilium/ebpf/rlimit"
|
|
"github.com/syndtr/goleveldb/leveldb"
|
|
"github.com/vishvananda/netlink"
|
|
"golang.org/x/sys/unix"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
)
|
|
|
|
// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
|
|
//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS ringbuf ./bpf/http/tc_http.c -type http_data_event -- -I./bpf/headers
|
|
//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS perf ./bpf/http/tc_http_perf.c -type http_data_event -- -I./bpf/headers
|
|
|
|
const version = "v0.0.1"
|
|
|
|
var (
|
|
InterfaceName string
|
|
DataPath string
|
|
Debug bool
|
|
Verbose bool
|
|
)
|
|
|
|
func init() {
|
|
flag.StringVar(&InterfaceName, "n", "lo", "a network interface name")
|
|
flag.StringVar(&DataPath, "p", "./db", "a network interface name")
|
|
flag.BoolVar(&Debug, "d", false, "output debug information")
|
|
flag.BoolVar(&Verbose, "v", false, "output more detailed information")
|
|
}
|
|
|
|
func main() {
|
|
flag.Parse()
|
|
|
|
kernelVersion, err := GetKernelVersion()
|
|
if err != nil {
|
|
log.Fatalf("kernel version: NOT OK")
|
|
}
|
|
if !isMinKernelVer(kernelVersion) {
|
|
log.Fatalf("kernel version: NOT OK: minimal supported kernel "+
|
|
"version is %s; kernel version that is running is: %s", minKernelVer, kernelVersion)
|
|
}
|
|
|
|
// set rlimit Memlock to INFINITY before creating any bpf resources.
|
|
if err := rlimit.RemoveMemlock(); err != nil {
|
|
log.Fatalf("unable to set memory resource limits, error:%s", err.Error())
|
|
}
|
|
|
|
if len(InterfaceName) == 0 {
|
|
log.Fatalf("Please specify a network interface")
|
|
}
|
|
|
|
// Look up the network interface by name.
|
|
iface, err := net.InterfaceByName(InterfaceName)
|
|
if err != nil {
|
|
log.Fatalf("lookup network iface %s: %s", InterfaceName, err)
|
|
}
|
|
|
|
link, err := netlink.LinkByIndex(iface.Index)
|
|
if err != nil {
|
|
log.Fatalf("create net link failed: %v", err)
|
|
}
|
|
|
|
// Wait for a signal and close the XDP program,
|
|
stopper := make(chan os.Signal, 1)
|
|
signal.Notify(stopper, os.Interrupt, syscall.SIGTERM)
|
|
|
|
log.Printf("Kernel version: %s", kernelVersion.String())
|
|
log.Printf(" ____ _ ")
|
|
log.Printf(" | _ \\ _ __(_)___ _ __ ___ ")
|
|
log.Printf(" | |_) | '__| / __| '_ ` _ \\ ")
|
|
log.Printf(" | __/| | | \\__ \\ | | | | |")
|
|
log.Printf(" |_| |_| |_|___/_| |_| |_|")
|
|
log.Printf("")
|
|
log.Printf("Version %s", version)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
if isMaxKernelVer(kernelVersion) {
|
|
go attachRingBuf(ctx, link)
|
|
} else {
|
|
go attachPerf(ctx, link)
|
|
}
|
|
|
|
log.Printf("Attached TC program to iface %q (index %d)", iface.Name, iface.Index)
|
|
log.Printf("Press Ctrl-C to exit and remove the program")
|
|
log.Printf("Successfully started! Please run \"sudo cat /sys/kernel/debug/tracing/trace_pipe\" to see output of the BPF programs\n")
|
|
|
|
<-stopper
|
|
cancel()
|
|
log.Println("Received signal, exiting TC program..")
|
|
}
|
|
|
|
func attachRingBuf(ctx context.Context, link netlink.Link) {
|
|
// Load pre-compiled programs into the kernel.
|
|
objs := ringbufObjects{}
|
|
if err := loadRingbufObjects(&objs, nil); err != nil {
|
|
log.Fatalf("loading objects: %s", err)
|
|
}
|
|
defer objs.Close()
|
|
|
|
infIngress, err := attachTC(link, objs.IngressClsFunc, "classifier/ingress", netlink.HANDLE_MIN_INGRESS)
|
|
if err != nil {
|
|
log.Fatalf("attach tc ingress failed, %v", err)
|
|
}
|
|
defer netlink.FilterDel(infIngress)
|
|
|
|
infEgress, err := attachTC(link, objs.EgressClsFunc, "classifier/egress", netlink.HANDLE_MIN_EGRESS)
|
|
if err != nil {
|
|
log.Fatalf("attach tc egress failed, %v", err)
|
|
}
|
|
defer netlink.FilterDel(infEgress)
|
|
|
|
rd, err := ringbuf.NewReader(objs.HttpEvents)
|
|
if err != nil {
|
|
log.Fatalf("opening ringbuf reader: %s", err)
|
|
}
|
|
|
|
// task queue
|
|
queueTask := make(chan []byte, 100)
|
|
|
|
go func() {
|
|
// Wait for a signal and close the ringbuf reader,
|
|
// which will interrupt rd.Read() and make the program exit.
|
|
<-ctx.Done()
|
|
close(queueTask)
|
|
|
|
if err := rd.Close(); err != nil {
|
|
log.Fatalf("closing perf event reader: %s", err)
|
|
}
|
|
}()
|
|
|
|
// run parse,save,query
|
|
runRingBuf(ctx, queueTask, rd)
|
|
}
|
|
|
|
func runRingBuf(ctx context.Context, queueTask chan []byte, rd *ringbuf.Reader) {
|
|
log.Printf("Ring buf listening for events..")
|
|
db, err := leveldb.OpenFile(DataPath, nil)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
defer db.Close()
|
|
|
|
saveChan := make(chan model, 100)
|
|
go func() {
|
|
for task := range queueTask {
|
|
ParseHttp(task)
|
|
}
|
|
}()
|
|
|
|
// mage http data
|
|
go MageHttp(ctx, saveChan)
|
|
|
|
// save to db
|
|
go SaveHttpData(db, saveChan)
|
|
|
|
// gin listening
|
|
go RunListening(db)
|
|
|
|
var merge []uint8
|
|
for {
|
|
// ringbufHttpDataEvent is generated by bpf2go.
|
|
var event ringbufHttpDataEvent
|
|
record, err := rd.Read()
|
|
if err != nil {
|
|
if errors.Is(err, perf.ErrClosed) {
|
|
log.Printf("file already closed")
|
|
return
|
|
}
|
|
log.Printf("reading from perf event reader: %s", err)
|
|
continue
|
|
}
|
|
|
|
// Parse the perf event entry into a bpfHttpDataEventT structure.
|
|
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil {
|
|
log.Printf("parsing perf event: %s", err)
|
|
continue
|
|
}
|
|
|
|
if Debug && Verbose {
|
|
log.Printf("truncation:%d maxLen:%d maxLen:%d data:%+v", event.Truncation,
|
|
event.MaxLen, event.DataLen, event.Data)
|
|
}
|
|
|
|
if event.Truncation == 0 {
|
|
queueTask <- event.Data[:event.DataLen]
|
|
continue
|
|
}
|
|
|
|
if event.Truncation == 1 {
|
|
merge = append(merge, event.Data[:event.DataLen]...)
|
|
|
|
if int(event.MaxLen) <= len(merge) {
|
|
queueTask <- merge
|
|
merge = make([]byte, 0)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func attachPerf(ctx context.Context, link netlink.Link) {
|
|
//Load pre-compiled programs into the kernel.
|
|
objs := perfObjects{}
|
|
if err := loadPerfObjects(&objs, nil); err != nil {
|
|
log.Fatalf("loading objects: %s", err)
|
|
}
|
|
defer objs.Close()
|
|
|
|
infIngress, err := attachTC(link, objs.IngressClsFunc, "classifier/ingress", netlink.HANDLE_MIN_INGRESS)
|
|
if err != nil {
|
|
log.Fatalf("attach tc ingress failed, %v", err)
|
|
}
|
|
defer netlink.FilterDel(infIngress)
|
|
|
|
infEgress, err := attachTC(link, objs.EgressClsFunc, "classifier/egress", netlink.HANDLE_MIN_EGRESS)
|
|
if err != nil {
|
|
log.Fatalf("attach tc egress failed, %v", err)
|
|
}
|
|
defer netlink.FilterDel(infEgress)
|
|
|
|
// Open a perf event reader from userspace on the PERF_EVENT_ARRAY map
|
|
// described in the eBPF C program.
|
|
rd, err := perf.NewReader(objs.HttpEvents, os.Getpagesize()*1024*4)
|
|
if err != nil {
|
|
log.Fatalf("creating perf event reader: %s", err)
|
|
}
|
|
defer rd.Close()
|
|
|
|
// task queue
|
|
queueTask := make(chan []byte, 100)
|
|
|
|
go func() {
|
|
// Wait for a signal and close the ringbuf reader,
|
|
// which will interrupt rd.Read() and make the program exit.
|
|
<-ctx.Done()
|
|
close(queueTask)
|
|
|
|
if err := rd.Close(); err != nil {
|
|
log.Fatalf("closing perf event reader: %s", err)
|
|
}
|
|
}()
|
|
|
|
runPerf(ctx, queueTask, rd)
|
|
}
|
|
|
|
func runPerf(ctx context.Context, queueTask chan []byte, rd *perf.Reader) {
|
|
log.Printf("Perf listening for events..")
|
|
db, err := leveldb.OpenFile(DataPath, nil)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
defer db.Close()
|
|
|
|
saveChan := make(chan model, 100)
|
|
go func() {
|
|
for task := range queueTask {
|
|
ParseHttp(task)
|
|
}
|
|
}()
|
|
|
|
// mage http data
|
|
go MageHttp(ctx, saveChan)
|
|
|
|
// save to db
|
|
go SaveHttpData(db, saveChan)
|
|
|
|
// gin listening
|
|
go RunListening(db)
|
|
|
|
var merge []uint8
|
|
for {
|
|
// perfHttpDataEvent is generated by bpf2go.
|
|
var event perfHttpDataEvent
|
|
record, err := rd.Read()
|
|
if err != nil {
|
|
if errors.Is(err, perf.ErrClosed) {
|
|
log.Printf("file already closed")
|
|
return
|
|
}
|
|
log.Printf("reading from perf event reader: %s", err)
|
|
continue
|
|
}
|
|
|
|
if record.LostSamples != 0 {
|
|
log.Printf("perf event ring buffer full, dropped %d samples", record.LostSamples)
|
|
continue
|
|
}
|
|
|
|
// Parse the perf event entry into a bpfHttpDataEventT structure.
|
|
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil {
|
|
log.Printf("parsing perf event: %s", err)
|
|
continue
|
|
}
|
|
|
|
if Debug && Verbose {
|
|
log.Printf("truncation:%d maxLen:%d maxLen:%d data:%+v", event.Truncation,
|
|
event.MaxLen, event.DataLen, event.Data)
|
|
}
|
|
|
|
if event.Truncation == 0 {
|
|
queueTask <- event.Data[:event.DataLen]
|
|
continue
|
|
}
|
|
|
|
if event.Truncation == 1 {
|
|
merge = append(merge, event.Data[:event.DataLen]...)
|
|
|
|
if int(event.MaxLen) <= len(merge) {
|
|
queueTask <- merge
|
|
merge = make([]byte, 0)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// attach TC program
|
|
func attachTC(link netlink.Link, prog *ebpf.Program, progName string, qdiscParent uint32) (*netlink.BpfFilter, error) {
|
|
if err := replaceQdisc(link); err != nil {
|
|
return nil, fmt.Errorf("replacing clsact qdisc for interface %s: %w", link.Attrs().Name, err)
|
|
}
|
|
|
|
filter := &netlink.BpfFilter{
|
|
FilterAttrs: netlink.FilterAttrs{
|
|
LinkIndex: link.Attrs().Index,
|
|
Parent: qdiscParent,
|
|
Handle: 1,
|
|
Protocol: unix.ETH_P_ALL,
|
|
Priority: 1,
|
|
},
|
|
Fd: prog.FD(),
|
|
Name: fmt.Sprintf("%s-%s", progName, link.Attrs().Name),
|
|
DirectAction: true,
|
|
}
|
|
|
|
if err := netlink.FilterReplace(filter); err != nil {
|
|
return nil, fmt.Errorf("replacing tc filter: %w", err)
|
|
}
|
|
|
|
return filter, nil
|
|
}
|
|
|
|
// replace Qdisc queue
|
|
func replaceQdisc(link netlink.Link) error {
|
|
attrs := netlink.QdiscAttrs{
|
|
LinkIndex: link.Attrs().Index,
|
|
Handle: netlink.MakeHandle(0xffff, 0),
|
|
Parent: netlink.HANDLE_CLSACT,
|
|
}
|
|
|
|
qdisc := &netlink.GenericQdisc{
|
|
QdiscAttrs: attrs,
|
|
QdiscType: "clsact",
|
|
}
|
|
|
|
return netlink.QdiscReplace(qdisc)
|
|
}
|