package main import ( "bytes" "context" "encoding/binary" "errors" "flag" "fmt" "github.com/cilium/ebpf" "github.com/cilium/ebpf/perf" "github.com/cilium/ebpf/ringbuf" "github.com/cilium/ebpf/rlimit" "github.com/syndtr/goleveldb/leveldb" "github.com/vishvananda/netlink" "golang.org/x/sys/unix" "log" "net" "os" "os/signal" "syscall" ) // $BPF_CLANG and $BPF_CFLAGS are set by the Makefile. //go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS ringbuf ./bpf/http/tc_http.c -type http_data_event -- -I./bpf/headers //go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS perf ./bpf/http/tc_http_perf.c -type http_data_event -- -I./bpf/headers const version = "v0.0.1" var ( InterfaceName string DataPath string Debug bool Verbose bool ) func init() { flag.StringVar(&InterfaceName, "n", "lo", "a network interface name") flag.StringVar(&DataPath, "p", "./db", "a network interface name") flag.BoolVar(&Debug, "d", false, "output debug information") flag.BoolVar(&Verbose, "v", false, "output more detailed information") } func main() { flag.Parse() kernelVersion, err := GetKernelVersion() if err != nil { log.Fatalf("kernel version: NOT OK") } if !isMinKernelVer(kernelVersion) { log.Fatalf("kernel version: NOT OK: minimal supported kernel "+ "version is %s; kernel version that is running is: %s", minKernelVer, kernelVersion) } // set rlimit Memlock to INFINITY before creating any bpf resources. if err := rlimit.RemoveMemlock(); err != nil { log.Fatalf("unable to set memory resource limits, error:%s", err.Error()) } if len(InterfaceName) == 0 { log.Fatalf("Please specify a network interface") } // Look up the network interface by name. iface, err := net.InterfaceByName(InterfaceName) if err != nil { log.Fatalf("lookup network iface %s: %s", InterfaceName, err) } link, err := netlink.LinkByIndex(iface.Index) if err != nil { log.Fatalf("create net link failed: %v", err) } // Wait for a signal and close the XDP program, stopper := make(chan os.Signal, 1) signal.Notify(stopper, os.Interrupt, syscall.SIGTERM) log.Printf("Kernel version: %s", kernelVersion.String()) log.Printf(" ____ _ ") log.Printf(" | _ \\ _ __(_)___ _ __ ___ ") log.Printf(" | |_) | '__| / __| '_ ` _ \\ ") log.Printf(" | __/| | | \\__ \\ | | | | |") log.Printf(" |_| |_| |_|___/_| |_| |_|") log.Printf("") log.Printf("Version %s", version) ctx, cancel := context.WithCancel(context.Background()) if isMaxKernelVer(kernelVersion) { go attachRingBuf(ctx, link) } else { go attachPerf(ctx, link) } log.Printf("Attached TC program to iface %q (index %d)", iface.Name, iface.Index) log.Printf("Press Ctrl-C to exit and remove the program") log.Printf("Successfully started! Please run \"sudo cat /sys/kernel/debug/tracing/trace_pipe\" to see output of the BPF programs\n") <-stopper cancel() log.Println("Received signal, exiting TC program..") } func attachRingBuf(ctx context.Context, link netlink.Link) { // Load pre-compiled programs into the kernel. objs := ringbufObjects{} if err := loadRingbufObjects(&objs, nil); err != nil { log.Fatalf("loading objects: %s", err) } defer objs.Close() infIngress, err := attachTC(link, objs.IngressClsFunc, "classifier/ingress", netlink.HANDLE_MIN_INGRESS) if err != nil { log.Fatalf("attach tc ingress failed, %v", err) } defer netlink.FilterDel(infIngress) infEgress, err := attachTC(link, objs.EgressClsFunc, "classifier/egress", netlink.HANDLE_MIN_EGRESS) if err != nil { log.Fatalf("attach tc egress failed, %v", err) } defer netlink.FilterDel(infEgress) rd, err := ringbuf.NewReader(objs.HttpEvents) if err != nil { log.Fatalf("opening ringbuf reader: %s", err) } // task queue queueTask := make(chan []byte, 100) go func() { // Wait for a signal and close the ringbuf reader, // which will interrupt rd.Read() and make the program exit. <-ctx.Done() close(queueTask) if err := rd.Close(); err != nil { log.Fatalf("closing perf event reader: %s", err) } }() // run parse,save,query runRingBuf(ctx, queueTask, rd) } func runRingBuf(ctx context.Context, queueTask chan []byte, rd *ringbuf.Reader) { log.Printf("Ring buf listening for events..") db, err := leveldb.OpenFile(DataPath, nil) if err != nil { log.Fatal(err) } defer db.Close() saveChan := make(chan model, 100) go func() { for task := range queueTask { ParseHttp(task) } }() // mage http data go MageHttp(ctx, saveChan) // save to db go SaveHttpData(db, saveChan) // gin listening go RunListening(db) var merge []uint8 for { // ringbufHttpDataEvent is generated by bpf2go. var event ringbufHttpDataEvent record, err := rd.Read() if err != nil { if errors.Is(err, perf.ErrClosed) { log.Printf("file already closed") return } log.Printf("reading from perf event reader: %s", err) continue } // Parse the perf event entry into a bpfHttpDataEventT structure. if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil { log.Printf("parsing perf event: %s", err) continue } if Debug && Verbose { log.Printf("truncation:%d maxLen:%d maxLen:%d data:%+v", event.Truncation, event.MaxLen, event.DataLen, event.Data) } if event.Truncation == 0 { queueTask <- event.Data[:event.DataLen] continue } if event.Truncation == 1 { merge = append(merge, event.Data[:event.DataLen]...) if int(event.MaxLen) <= len(merge) { queueTask <- merge merge = make([]byte, 0) } } } } func attachPerf(ctx context.Context, link netlink.Link) { //Load pre-compiled programs into the kernel. objs := perfObjects{} if err := loadPerfObjects(&objs, nil); err != nil { log.Fatalf("loading objects: %s", err) } defer objs.Close() infIngress, err := attachTC(link, objs.IngressClsFunc, "classifier/ingress", netlink.HANDLE_MIN_INGRESS) if err != nil { log.Fatalf("attach tc ingress failed, %v", err) } defer netlink.FilterDel(infIngress) infEgress, err := attachTC(link, objs.EgressClsFunc, "classifier/egress", netlink.HANDLE_MIN_EGRESS) if err != nil { log.Fatalf("attach tc egress failed, %v", err) } defer netlink.FilterDel(infEgress) // Open a perf event reader from userspace on the PERF_EVENT_ARRAY map // described in the eBPF C program. rd, err := perf.NewReader(objs.HttpEvents, os.Getpagesize()*1024*4) if err != nil { log.Fatalf("creating perf event reader: %s", err) } defer rd.Close() // task queue queueTask := make(chan []byte, 100) go func() { // Wait for a signal and close the ringbuf reader, // which will interrupt rd.Read() and make the program exit. <-ctx.Done() close(queueTask) if err := rd.Close(); err != nil { log.Fatalf("closing perf event reader: %s", err) } }() runPerf(ctx, queueTask, rd) } func runPerf(ctx context.Context, queueTask chan []byte, rd *perf.Reader) { log.Printf("Perf listening for events..") db, err := leveldb.OpenFile(DataPath, nil) if err != nil { log.Fatal(err) } defer db.Close() saveChan := make(chan model, 100) go func() { for task := range queueTask { ParseHttp(task) } }() // mage http data go MageHttp(ctx, saveChan) // save to db go SaveHttpData(db, saveChan) // gin listening go RunListening(db) var merge []uint8 for { // perfHttpDataEvent is generated by bpf2go. var event perfHttpDataEvent record, err := rd.Read() if err != nil { if errors.Is(err, perf.ErrClosed) { log.Printf("file already closed") return } log.Printf("reading from perf event reader: %s", err) continue } if record.LostSamples != 0 { log.Printf("perf event ring buffer full, dropped %d samples", record.LostSamples) continue } // Parse the perf event entry into a bpfHttpDataEventT structure. if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil { log.Printf("parsing perf event: %s", err) continue } if Debug && Verbose { log.Printf("truncation:%d maxLen:%d maxLen:%d data:%+v", event.Truncation, event.MaxLen, event.DataLen, event.Data) } if event.Truncation == 0 { queueTask <- event.Data[:event.DataLen] continue } if event.Truncation == 1 { merge = append(merge, event.Data[:event.DataLen]...) if int(event.MaxLen) <= len(merge) { queueTask <- merge merge = make([]byte, 0) } } } } // attach TC program func attachTC(link netlink.Link, prog *ebpf.Program, progName string, qdiscParent uint32) (*netlink.BpfFilter, error) { if err := replaceQdisc(link); err != nil { return nil, fmt.Errorf("replacing clsact qdisc for interface %s: %w", link.Attrs().Name, err) } filter := &netlink.BpfFilter{ FilterAttrs: netlink.FilterAttrs{ LinkIndex: link.Attrs().Index, Parent: qdiscParent, Handle: 1, Protocol: unix.ETH_P_ALL, Priority: 1, }, Fd: prog.FD(), Name: fmt.Sprintf("%s-%s", progName, link.Attrs().Name), DirectAction: true, } if err := netlink.FilterReplace(filter); err != nil { return nil, fmt.Errorf("replacing tc filter: %w", err) } return filter, nil } // replace Qdisc queue func replaceQdisc(link netlink.Link) error { attrs := netlink.QdiscAttrs{ LinkIndex: link.Attrs().Index, Handle: netlink.MakeHandle(0xffff, 0), Parent: netlink.HANDLE_CLSACT, } qdisc := &netlink.GenericQdisc{ QdiscAttrs: attrs, QdiscType: "clsact", } return netlink.QdiscReplace(qdisc) }