This commit is contained in:
zhouchaowen
2023-09-09 22:09:55 +08:00
parent cccdbd32be
commit f8c08b2207
5 changed files with 159 additions and 57 deletions

View File

@@ -29,7 +29,7 @@ pl:
scp -r root@10.2.0.105:/root/prism/* .
test-run:
go run -exec sudo main.go bpf_bpfel.go -n $(DEV)
export GO111MODULE=on && go run -exec sudo main.go ringbuf_bpfel.go perf_bpfel.go parse_http.go save.go web.go utils.go -n $(DEV)
build:
docker build -t $(IMAGE) .

View File

@@ -4,7 +4,7 @@ Prism is an eBPF-Based interface generator that captures the HTTP traffic of Ing
# Run
- kernel >= 5.8.0
- Kernel >= 5.8.0
## operating run
@@ -28,7 +28,7 @@ docker run --net host --privileged --name prism -itd prism:v0.0.1 ./prism -n <de
- LLvm >= 14
- Clang >= 14
- Golang >= 1.18
- cmake
- Cmake
```bash
# Ubuntu 22.04

View File

@@ -29,11 +29,6 @@ struct http_data_event {
__u32 data_len;
};
//struct
//{
// __uint(type, BPF_MAP_TYPE_PERF_EVENT_ARRAY);
//} http_events SEC(".maps");
// BPF ringbuf map
struct {
__uint(type, BPF_MAP_TYPE_RINGBUF);
@@ -139,7 +134,6 @@ static __inline int capture_packets(struct __sk_buff *skb,enum tc_type type) {
name_pos++;
}
// bpf_perf_event_output(skb, &http_events, BPF_F_CURRENT_CPU, event,sizeof(struct http_data_event));
bpf_ringbuf_submit(event, 0);
return TC_ACT_OK;

1
go.mod
View File

@@ -3,6 +3,7 @@ module prism
go 1.18
require (
github.com/blang/semver/v4 v4.0.0
github.com/cilium/ebpf v0.11.0
github.com/gin-gonic/gin v1.9.1
github.com/google/gopacket v1.1.19

203
main.go
View File

@@ -20,7 +20,8 @@ import (
)
// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS bpf ./bpf/http/tc_http.c -- -I./bpf/headers
//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS ringbuf ./bpf/http/tc_http.c -type http_data_event -- -I./bpf/headers
//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS perf ./bpf/http/tc_http_perf.c -type http_data_event -- -I./bpf/headers
const version = "v0.0.1"
@@ -50,30 +51,16 @@ func main() {
log.Fatalf("lookup network iface %s: %s", InterfaceName, err)
}
// Load pre-compiled programs into the kernel.
objs := bpfObjects{}
if err := loadBpfObjects(&objs, nil); err != nil {
log.Fatalf("loading objects: %s", err)
}
defer objs.Close()
link, err := netlink.LinkByIndex(iface.Index)
kernelVersion, err := GetKernelVersion()
if err != nil {
log.Fatalf("create net link failed: %v", err)
log.Fatalf("kernel version: NOT OK")
}
if !isMinKernelVer(kernelVersion) {
log.Fatalf("kernel version: NOT OK: minimal supported kernel "+
"version is %s; kernel version that is running is: %s", minKernelVer, kernelVersion)
}
infIngress, err := attachTC(link, objs.IngressClsFunc, "classifier/ingress", netlink.HANDLE_MIN_INGRESS)
if err != nil {
log.Fatalf("attach tc ingress failed, %v", err)
}
defer netlink.FilterDel(infIngress)
infEgress, err := attachTC(link, objs.EgressClsFunc, "classifier/egress", netlink.HANDLE_MIN_EGRESS)
if err != nil {
log.Fatalf("attach tc egress failed, %v", err)
}
defer netlink.FilterDel(infEgress)
log.Printf("Kernel version: %s", kernelVersion.String())
log.Printf(" ____ _ ")
log.Printf(" | _ \\ _ __(_)___ _ __ ___ ")
log.Printf(" | |_) | '__| / __| '_ ` _ \\ ")
@@ -81,41 +68,118 @@ func main() {
log.Printf(" |_| |_| |_|___/_| |_| |_|")
log.Printf("")
log.Printf("Version %s", version)
log.Printf("Attached TC program to iface %q (index %d)", iface.Name, iface.Index)
log.Printf("Press Ctrl-C to exit and remove the program")
log.Printf("Successfully started! Please run \"sudo cat /sys/kernel/debug/tracing/trace_pipe\" to see output of the BPF programs\n")
// Wait for a signal and close the XDP program,
stopper := make(chan os.Signal, 1)
signal.Notify(stopper, os.Interrupt, syscall.SIGTERM)
rd, err := ringbuf.NewReader(objs.HttpEvents)
if err != nil {
log.Fatalf("opening ringbuf reader: %s", err)
}
// task queue
queueTask := make(chan bpfHttpDataEvent, 100)
go func() {
// Wait for a signal and close the ringbuf reader,
// which will interrupt rd.Read() and make the program exit.
<-stopper
close(queueTask)
if err := rd.Close(); err != nil {
log.Fatalf("closing perf event reader: %s", err)
if isMaxKernelVer(kernelVersion) {
// Load pre-compiled programs into the kernel.
objs := ringbufObjects{}
if err := loadRingbufObjects(&objs, nil); err != nil {
log.Fatalf("loading objects: %s", err)
}
}()
defer objs.Close()
// run parse,save,query
run(queueTask, rd)
link, err := netlink.LinkByIndex(iface.Index)
if err != nil {
log.Fatalf("create net link failed: %v", err)
}
infIngress, err := attachTC(link, objs.IngressClsFunc, "classifier/ingress", netlink.HANDLE_MIN_INGRESS)
if err != nil {
log.Fatalf("attach tc ingress failed, %v", err)
}
defer netlink.FilterDel(infIngress)
infEgress, err := attachTC(link, objs.EgressClsFunc, "classifier/egress", netlink.HANDLE_MIN_EGRESS)
if err != nil {
log.Fatalf("attach tc egress failed, %v", err)
}
defer netlink.FilterDel(infEgress)
// Wait for a signal and close the XDP program,
stopper := make(chan os.Signal, 1)
signal.Notify(stopper, os.Interrupt, syscall.SIGTERM)
rd, err := ringbuf.NewReader(objs.HttpEvents)
if err != nil {
log.Fatalf("opening ringbuf reader: %s", err)
}
// task queue
queueTask := make(chan ringbufHttpDataEvent, 100)
go func() {
// Wait for a signal and close the ringbuf reader,
// which will interrupt rd.Read() and make the program exit.
<-stopper
close(queueTask)
if err := rd.Close(); err != nil {
log.Fatalf("closing perf event reader: %s", err)
}
}()
// run parse,save,query
runRingBuf(queueTask, rd)
} else {
// Load pre-compiled programs into the kernel.
objs := perfObjects{}
if err := loadPerfObjects(&objs, nil); err != nil {
log.Fatalf("loading objects: %s", err)
}
defer objs.Close()
link, err := netlink.LinkByIndex(iface.Index)
if err != nil {
log.Fatalf("create net link failed: %v", err)
}
infIngress, err := attachTC(link, objs.IngressClsFunc, "classifier/ingress", netlink.HANDLE_MIN_INGRESS)
if err != nil {
log.Fatalf("attach tc ingress failed, %v", err)
}
defer netlink.FilterDel(infIngress)
infEgress, err := attachTC(link, objs.EgressClsFunc, "classifier/egress", netlink.HANDLE_MIN_EGRESS)
if err != nil {
log.Fatalf("attach tc egress failed, %v", err)
}
defer netlink.FilterDel(infEgress)
// Wait for a signal and close the XDP program,
stopper := make(chan os.Signal, 1)
signal.Notify(stopper, os.Interrupt, syscall.SIGTERM)
// Open a perf event reader from userspace on the PERF_EVENT_ARRAY map
// described in the eBPF C program.
rd, err := perf.NewReader(objs.HttpEvents, os.Getpagesize())
if err != nil {
log.Fatalf("creating perf event reader: %s", err)
}
defer rd.Close()
// task queue
queueTask := make(chan perfHttpDataEvent, 100)
go func() {
// Wait for a signal and close the ringbuf reader,
// which will interrupt rd.Read() and make the program exit.
<-stopper
close(queueTask)
if err := rd.Close(); err != nil {
log.Fatalf("closing perf event reader: %s", err)
}
}()
runPerf(queueTask, rd)
}
log.Println("Received signal, exiting TC program..")
}
func run(queueTask chan bpfHttpDataEvent, rd *ringbuf.Reader) {
func runRingBuf(queueTask chan ringbufHttpDataEvent, rd *ringbuf.Reader) {
log.Printf("Listening for events..")
db, err := leveldb.OpenFile(DataPath, nil)
if err != nil {
@@ -138,7 +202,50 @@ func run(queueTask chan bpfHttpDataEvent, rd *ringbuf.Reader) {
// bpfHttpDataEventT is generated by bpf2go.
for {
var event bpfHttpDataEvent
var event ringbufHttpDataEvent
record, err := rd.Read()
if err != nil {
if errors.Is(err, perf.ErrClosed) {
log.Printf("file already closed")
return
}
log.Printf("reading from perf event reader: %s", err)
continue
}
// Parse the perf event entry into a bpfHttpDataEventT structure.
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil {
log.Printf("parsing perf event: %s", err)
continue
}
queueTask <- event
}
}
func runPerf(queueTask chan perfHttpDataEvent, rd *perf.Reader) {
log.Printf("Listening for events..")
db, err := leveldb.OpenFile(DataPath, nil)
if err != nil {
log.Fatal(err)
}
defer db.Close()
saveChan := make(chan *MergeBuilder, 100)
go func() {
for task := range queueTask {
parseHttp(saveChan, task.Data[:task.DataLen])
}
}()
// save to db
go saveHttpData(db, saveChan)
// gin listening
go runListening(db)
// bpfHttpDataEventT is generated by bpf2go.
for {
var event perfHttpDataEvent
record, err := rd.Read()
if err != nil {
if errors.Is(err, perf.ErrClosed) {