Files
prism/main.go
2023-09-18 17:06:04 +08:00

369 lines
9.3 KiB
Go

package main
import (
"bytes"
"context"
"encoding/binary"
"errors"
"flag"
"fmt"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/ringbuf"
"github.com/cilium/ebpf/rlimit"
"github.com/syndtr/goleveldb/leveldb"
"github.com/vishvananda/netlink"
"golang.org/x/sys/unix"
"log"
"net"
"os"
"os/signal"
"syscall"
)
// $BPF_CLANG and $BPF_CFLAGS are set by the Makefile.
//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS ringbuf ./bpf/http/tc_http.c -type http_data_event -- -I./bpf/headers
//go:generate bpf2go -cc $BPF_CLANG -cflags $BPF_CFLAGS perf ./bpf/http/tc_http_perf.c -type http_data_event -- -I./bpf/headers
const version = "v0.0.1"
var (
InterfaceName string
DataPath string
Debug bool
Verbose bool
)
func init() {
flag.StringVar(&InterfaceName, "n", "lo", "a network interface name")
flag.StringVar(&DataPath, "p", "./db", "a network interface name")
flag.BoolVar(&Debug, "d", false, "output debug information")
flag.BoolVar(&Verbose, "v", false, "output more detailed information")
}
func main() {
flag.Parse()
kernelVersion, err := GetKernelVersion()
if err != nil {
log.Fatalf("kernel version: NOT OK")
}
if !isMinKernelVer(kernelVersion) {
log.Fatalf("kernel version: NOT OK: minimal supported kernel "+
"version is %s; kernel version that is running is: %s", minKernelVer, kernelVersion)
}
// set rlimit Memlock to INFINITY before creating any bpf resources.
if err := rlimit.RemoveMemlock(); err != nil {
log.Fatalf("unable to set memory resource limits, error:%s", err.Error())
}
if len(InterfaceName) == 0 {
log.Fatalf("Please specify a network interface")
}
// Look up the network interface by name.
iface, err := net.InterfaceByName(InterfaceName)
if err != nil {
log.Fatalf("lookup network iface %s: %s", InterfaceName, err)
}
link, err := netlink.LinkByIndex(iface.Index)
if err != nil {
log.Fatalf("create net link failed: %v", err)
}
// Wait for a signal and close the XDP program,
stopper := make(chan os.Signal, 1)
signal.Notify(stopper, os.Interrupt, syscall.SIGTERM)
log.Printf("Kernel version: %s", kernelVersion.String())
log.Printf(" ____ _ ")
log.Printf(" | _ \\ _ __(_)___ _ __ ___ ")
log.Printf(" | |_) | '__| / __| '_ ` _ \\ ")
log.Printf(" | __/| | | \\__ \\ | | | | |")
log.Printf(" |_| |_| |_|___/_| |_| |_|")
log.Printf("")
log.Printf("Version %s", version)
ctx, cancel := context.WithCancel(context.Background())
if isMaxKernelVer(kernelVersion) {
go attachRingBuf(ctx, link)
} else {
go attachPerf(ctx, link)
}
log.Printf("Attached TC program to iface %q (index %d)", iface.Name, iface.Index)
log.Printf("Press Ctrl-C to exit and remove the program")
log.Printf("Successfully started! Please run \"sudo cat /sys/kernel/debug/tracing/trace_pipe\" to see output of the BPF programs\n")
<-stopper
cancel()
log.Println("Received signal, exiting TC program..")
}
func attachRingBuf(ctx context.Context, link netlink.Link) {
// Load pre-compiled programs into the kernel.
objs := ringbufObjects{}
if err := loadRingbufObjects(&objs, nil); err != nil {
log.Fatalf("loading objects: %s", err)
}
defer objs.Close()
infIngress, err := attachTC(link, objs.IngressClsFunc, "classifier/ingress", netlink.HANDLE_MIN_INGRESS)
if err != nil {
log.Fatalf("attach tc ingress failed, %v", err)
}
defer netlink.FilterDel(infIngress)
infEgress, err := attachTC(link, objs.EgressClsFunc, "classifier/egress", netlink.HANDLE_MIN_EGRESS)
if err != nil {
log.Fatalf("attach tc egress failed, %v", err)
}
defer netlink.FilterDel(infEgress)
rd, err := ringbuf.NewReader(objs.HttpEvents)
if err != nil {
log.Fatalf("opening ringbuf reader: %s", err)
}
// task queue
queueTask := make(chan []byte, 100)
go func() {
// Wait for a signal and close the ringbuf reader,
// which will interrupt rd.Read() and make the program exit.
<-ctx.Done()
close(queueTask)
if err := rd.Close(); err != nil {
log.Fatalf("closing perf event reader: %s", err)
}
}()
// run parse,save,query
runRingBuf(ctx, queueTask, rd)
}
func runRingBuf(ctx context.Context, queueTask chan []byte, rd *ringbuf.Reader) {
log.Printf("Ring buf listening for events..")
db, err := leveldb.OpenFile(DataPath, nil)
if err != nil {
log.Fatal(err)
}
defer db.Close()
saveChan := make(chan model, 100)
go func() {
for task := range queueTask {
ParseHttp(task)
}
}()
// mage http data
go MageHttp(ctx, saveChan)
// save to db
go SaveHttpData(db, saveChan)
// gin listening
go RunListening(db)
var merge []uint8
for {
// ringbufHttpDataEvent is generated by bpf2go.
var event ringbufHttpDataEvent
record, err := rd.Read()
if err != nil {
if errors.Is(err, perf.ErrClosed) {
log.Printf("file already closed")
return
}
log.Printf("reading from perf event reader: %s", err)
continue
}
// Parse the perf event entry into a bpfHttpDataEventT structure.
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil {
log.Printf("parsing perf event: %s", err)
continue
}
if Debug && Verbose {
log.Printf("truncation:%d maxLen:%d maxLen:%d data:%+v", event.Truncation,
event.MaxLen, event.DataLen, event.Data)
}
if event.Truncation == 0 {
queueTask <- event.Data[:event.DataLen]
continue
}
if event.Truncation == 1 {
merge = append(merge, event.Data[:event.DataLen]...)
if int(event.MaxLen) <= len(merge) {
queueTask <- merge
merge = make([]byte, 0)
}
}
}
}
func attachPerf(ctx context.Context, link netlink.Link) {
//Load pre-compiled programs into the kernel.
objs := perfObjects{}
if err := loadPerfObjects(&objs, nil); err != nil {
log.Fatalf("loading objects: %s", err)
}
defer objs.Close()
infIngress, err := attachTC(link, objs.IngressClsFunc, "classifier/ingress", netlink.HANDLE_MIN_INGRESS)
if err != nil {
log.Fatalf("attach tc ingress failed, %v", err)
}
defer netlink.FilterDel(infIngress)
infEgress, err := attachTC(link, objs.EgressClsFunc, "classifier/egress", netlink.HANDLE_MIN_EGRESS)
if err != nil {
log.Fatalf("attach tc egress failed, %v", err)
}
defer netlink.FilterDel(infEgress)
// Open a perf event reader from userspace on the PERF_EVENT_ARRAY map
// described in the eBPF C program.
rd, err := perf.NewReader(objs.HttpEvents, os.Getpagesize()*1024*4)
if err != nil {
log.Fatalf("creating perf event reader: %s", err)
}
defer rd.Close()
// task queue
queueTask := make(chan []byte, 100)
go func() {
// Wait for a signal and close the ringbuf reader,
// which will interrupt rd.Read() and make the program exit.
<-ctx.Done()
close(queueTask)
if err := rd.Close(); err != nil {
log.Fatalf("closing perf event reader: %s", err)
}
}()
runPerf(ctx, queueTask, rd)
}
func runPerf(ctx context.Context, queueTask chan []byte, rd *perf.Reader) {
log.Printf("Perf listening for events..")
db, err := leveldb.OpenFile(DataPath, nil)
if err != nil {
log.Fatal(err)
}
defer db.Close()
saveChan := make(chan model, 100)
go func() {
for task := range queueTask {
ParseHttp(task)
}
}()
// mage http data
go MageHttp(ctx, saveChan)
// save to db
go SaveHttpData(db, saveChan)
// gin listening
go RunListening(db)
var merge []uint8
for {
// perfHttpDataEvent is generated by bpf2go.
var event perfHttpDataEvent
record, err := rd.Read()
if err != nil {
if errors.Is(err, perf.ErrClosed) {
log.Printf("file already closed")
return
}
log.Printf("reading from perf event reader: %s", err)
continue
}
if record.LostSamples != 0 {
log.Printf("perf event ring buffer full, dropped %d samples", record.LostSamples)
continue
}
// Parse the perf event entry into a bpfHttpDataEventT structure.
if err := binary.Read(bytes.NewBuffer(record.RawSample), binary.LittleEndian, &event); err != nil {
log.Printf("parsing perf event: %s", err)
continue
}
if Debug && Verbose {
log.Printf("truncation:%d maxLen:%d maxLen:%d data:%+v", event.Truncation,
event.MaxLen, event.DataLen, event.Data)
}
if event.Truncation == 0 {
queueTask <- event.Data[:event.DataLen]
continue
}
if event.Truncation == 1 {
merge = append(merge, event.Data[:event.DataLen]...)
if int(event.MaxLen) <= len(merge) {
queueTask <- merge
merge = make([]byte, 0)
}
}
}
}
// attach TC program
func attachTC(link netlink.Link, prog *ebpf.Program, progName string, qdiscParent uint32) (*netlink.BpfFilter, error) {
if err := replaceQdisc(link); err != nil {
return nil, fmt.Errorf("replacing clsact qdisc for interface %s: %w", link.Attrs().Name, err)
}
filter := &netlink.BpfFilter{
FilterAttrs: netlink.FilterAttrs{
LinkIndex: link.Attrs().Index,
Parent: qdiscParent,
Handle: 1,
Protocol: unix.ETH_P_ALL,
Priority: 1,
},
Fd: prog.FD(),
Name: fmt.Sprintf("%s-%s", progName, link.Attrs().Name),
DirectAction: true,
}
if err := netlink.FilterReplace(filter); err != nil {
return nil, fmt.Errorf("replacing tc filter: %w", err)
}
return filter, nil
}
// replace Qdisc queue
func replaceQdisc(link netlink.Link) error {
attrs := netlink.QdiscAttrs{
LinkIndex: link.Attrs().Index,
Handle: netlink.MakeHandle(0xffff, 0),
Parent: netlink.HANDLE_CLSACT,
}
qdisc := &netlink.GenericQdisc{
QdiscAttrs: attrs,
QdiscType: "clsact",
}
return netlink.QdiscReplace(qdisc)
}