diff --git a/Makefile b/Makefile index 862a6b2..01c8c6f 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ pl: scp -r root@10.2.0.105:/root/prism/* . test-run: - go run -exec sudo main.go bpf_bpfel.go -n $(DEV) + export GO111MODULE=on && go run -exec sudo main.go ringbuf_bpfel.go perf_bpfel.go parse_http.go save.go web.go utils.go -n $(DEV) build: docker build -t $(IMAGE) . diff --git a/README.md b/README.md index 24d1d4d..c355201 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Prism is an eBPF-Based interface generator that captures the HTTP traffic of Ing # Run -- kernel >= 5.8.0 +- Kernel >= 5.8.0 ## operating run @@ -28,7 +28,7 @@ docker run --net host --privileged --name prism -itd prism:v0.0.1 ./prism -n = 14 - Clang >= 14 - Golang >= 1.18 -- cmake +- Cmake ```bash # Ubuntu 22.04 diff --git a/bpf/http/tc_http.c b/bpf/http/tc_http.c index 2f15bd5..2805be6 100644 --- a/bpf/http/tc_http.c +++ b/bpf/http/tc_http.c @@ -29,11 +29,6 @@ struct http_data_event { __u32 data_len; }; -//struct -//{ -// __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY); -//} http_events SEC(".maps"); - // BPF ringbuf map struct { __uint(type, BPF_MAP_TYPE_RINGBUF); @@ -139,7 +134,6 @@ static __inline int capture_packets(struct __sk_buff *skb,enum tc_type type) { name_pos++; } -// bpf_perf_event_output(skb, &http_events, BPF_F_CURRENT_CPU, event,sizeof(struct http_data_event)); bpf_ringbuf_submit(event, 0); return TC_ACT_OK; diff --git a/go.mod b/go.mod index 7b2c512..f12e0ed 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module prism go 1.18 require ( + github.com/blang/semver/v4 v4.0.0 github.com/cilium/ebpf v0.11.0 github.com/gin-gonic/gin v1.9.1 github.com/google/gopacket v1.1.19 diff --git a/main.go b/main.go index 696c557..3bc5500 100644 --- a/main.go +++ b/main.go @@ -20,7 +20,8 @@ import ( ) // $BPF_CLANG and $BPF_CFLAGS are set by the Makefile. -//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS bpf ./bpf/http/tc_http.c -- -I./bpf/headers +//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS ringbuf ./bpf/http/tc_http.c -type http_data_event -- -I./bpf/headers +//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS perf ./bpf/http/tc_http_perf.c -type http_data_event -- -I./bpf/headers const version = "v0.0.1" @@ -50,30 +51,16 @@ func main() { log.Fatalf("lookup network iface %s: %s", InterfaceName, err) } - // Load pre-compiled programs into the kernel. - objs := bpfObjects{} - if err := loadBpfObjects(&objs, nil); err != nil { - log.Fatalf("loading objects: %s", err) - } - defer objs.Close() - - link, err := netlink.LinkByIndex(iface.Index) + kernelVersion, err := GetKernelVersion() if err != nil { - log.Fatalf("create net link failed: %v", err) + log.Fatalf("kernel version: NOT OK") + } + if !isMinKernelVer(kernelVersion) { + log.Fatalf("kernel version: NOT OK: minimal supported kernel "+ + "version is %s; kernel version that is running is: %s", minKernelVer, kernelVersion) } - infIngress, err := attachTC(link, objs.IngressClsFunc, "classifier/ingress", netlink.HANDLE_MIN_INGRESS) - if err != nil { - log.Fatalf("attach tc ingress failed, %v", err) - } - defer netlink.FilterDel(infIngress) - - infEgress, err := attachTC(link, objs.EgressClsFunc, "classifier/egress", netlink.HANDLE_MIN_EGRESS) - if err != nil { - log.Fatalf("attach tc egress failed, %v", err) - } - defer netlink.FilterDel(infEgress) - + log.Printf("Kernel version: %s", kernelVersion.String()) log.Printf(" ____ _ ") log.Printf(" | _ \\ _ __(_)___ _ __ ___ ") log.Printf(" | |_) | '__| / __| '_ ` _ \\ ") @@ -81,41 +68,118 @@ func main() { log.Printf(" |_| |_| |_|___/_| |_| |_|") log.Printf("") log.Printf("Version %s", version) - log.Printf("Attached TC program to iface %q (index %d)", iface.Name, iface.Index) log.Printf("Press Ctrl-C to exit and remove the program") log.Printf("Successfully started! Please run \"sudo cat /sys/kernel/debug/tracing/trace_pipe\" to see output of the BPF programs\n") - // Wait for a signal and close the XDP program, - stopper := make(chan os.Signal, 1) - signal.Notify(stopper, os.Interrupt, syscall.SIGTERM) - - rd, err := ringbuf.NewReader(objs.HttpEvents) - if err != nil { - log.Fatalf("opening ringbuf reader: %s", err) - } - - // task queue - queueTask := make(chan bpfHttpDataEvent, 100) - - go func() { - // Wait for a signal and close the ringbuf reader, - // which will interrupt rd.Read() and make the program exit. - <-stopper - close(queueTask) - - if err := rd.Close(); err != nil { - log.Fatalf("closing perf event reader: %s", err) + if isMaxKernelVer(kernelVersion) { + // Load pre-compiled programs into the kernel. + objs := ringbufObjects{} + if err := loadRingbufObjects(&objs, nil); err != nil { + log.Fatalf("loading objects: %s", err) } - }() + defer objs.Close() - // run parse,save,query - run(queueTask, rd) + link, err := netlink.LinkByIndex(iface.Index) + if err != nil { + log.Fatalf("create net link failed: %v", err) + } + + infIngress, err := attachTC(link, objs.IngressClsFunc, "classifier/ingress", netlink.HANDLE_MIN_INGRESS) + if err != nil { + log.Fatalf("attach tc ingress failed, %v", err) + } + defer netlink.FilterDel(infIngress) + + infEgress, err := attachTC(link, objs.EgressClsFunc, "classifier/egress", netlink.HANDLE_MIN_EGRESS) + if err != nil { + log.Fatalf("attach tc egress failed, %v", err) + } + defer netlink.FilterDel(infEgress) + + // Wait for a signal and close the XDP program, + stopper := make(chan os.Signal, 1) + signal.Notify(stopper, os.Interrupt, syscall.SIGTERM) + + rd, err := ringbuf.NewReader(objs.HttpEvents) + if err != nil { + log.Fatalf("opening ringbuf reader: %s", err) + } + + // task queue + queueTask := make(chan ringbufHttpDataEvent, 100) + + go func() { + // Wait for a signal and close the ringbuf reader, + // which will interrupt rd.Read() and make the program exit. + <-stopper + close(queueTask) + + if err := rd.Close(); err != nil { + log.Fatalf("closing perf event reader: %s", err) + } + }() + + // run parse,save,query + runRingBuf(queueTask, rd) + } else { + // Load pre-compiled programs into the kernel. + objs := perfObjects{} + if err := loadPerfObjects(&objs, nil); err != nil { + log.Fatalf("loading objects: %s", err) + } + defer objs.Close() + + link, err := netlink.LinkByIndex(iface.Index) + if err != nil { + log.Fatalf("create net link failed: %v", err) + } + + infIngress, err := attachTC(link, objs.IngressClsFunc, "classifier/ingress", netlink.HANDLE_MIN_INGRESS) + if err != nil { + log.Fatalf("attach tc ingress failed, %v", err) + } + defer netlink.FilterDel(infIngress) + + infEgress, err := attachTC(link, objs.EgressClsFunc, "classifier/egress", netlink.HANDLE_MIN_EGRESS) + if err != nil { + log.Fatalf("attach tc egress failed, %v", err) + } + defer netlink.FilterDel(infEgress) + + // Wait for a signal and close the XDP program, + stopper := make(chan os.Signal, 1) + signal.Notify(stopper, os.Interrupt, syscall.SIGTERM) + + // Open a perf event reader from userspace on the PERF_EVENT_ARRAY map + // described in the eBPF C program. + rd, err := perf.NewReader(objs.HttpEvents, os.Getpagesize()) + if err != nil { + log.Fatalf("creating perf event reader: %s", err) + } + defer rd.Close() + + // task queue + queueTask := make(chan perfHttpDataEvent, 100) + + go func() { + // Wait for a signal and close the ringbuf reader, + // which will interrupt rd.Read() and make the program exit. + <-stopper + close(queueTask) + + if err := rd.Close(); err != nil { + log.Fatalf("closing perf event reader: %s", err) + } + }() + + runPerf(queueTask, rd) + } log.Println("Received signal, exiting TC program..") } -func run(queueTask chan bpfHttpDataEvent, rd *ringbuf.Reader) { +func runRingBuf(queueTask chan ringbufHttpDataEvent, rd *ringbuf.Reader) { log.Printf("Listening for events..") db, err := leveldb.OpenFile(DataPath, nil) if err != nil { @@ -138,7 +202,50 @@ func run(queueTask chan bpfHttpDataEvent, rd *ringbuf.Reader) { // bpfHttpDataEventT is generated by bpf2go. for { - var event bpfHttpDataEvent + var event ringbufHttpDataEvent + record, err := rd.Read() + if err != nil { + if errors.Is(err, perf.ErrClosed) { + log.Printf("file already closed") + return + } + log.Printf("reading from perf event reader: %s", err) + continue + } + + // Parse the perf event entry into a bpfHttpDataEventT structure. + if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil { + log.Printf("parsing perf event: %s", err) + continue + } + queueTask <- event + } +} + +func runPerf(queueTask chan perfHttpDataEvent, rd *perf.Reader) { + log.Printf("Listening for events..") + db, err := leveldb.OpenFile(DataPath, nil) + if err != nil { + log.Fatal(err) + } + defer db.Close() + + saveChan := make(chan *MergeBuilder, 100) + go func() { + for task := range queueTask { + parseHttp(saveChan, task.Data[:task.DataLen]) + } + }() + + // save to db + go saveHttpData(db, saveChan) + + // gin listening + go runListening(db) + + // bpfHttpDataEventT is generated by bpf2go. + for { + var event perfHttpDataEvent record, err := rd.Read() if err != nil { if errors.Is(err, perf.ErrClosed) {