fix add refresh interface

This commit is contained in:
zhouchaowen
2023-09-18 23:04:08 +08:00
parent c950d573b4
commit 2065cdbcd1
6 changed files with 104 additions and 103 deletions

View File

@@ -8,9 +8,10 @@ CFLAGS := -O2 -g -Wall -Werror $(CFLAGS)
DEV ?= lo
HOST ?= 10.2.0.105
STOREHOUSE ?= zmosquito
NAME ?= prism
VERSION ?= v0.0.1
IMAGE ?= $(NAME):$(VERSION)
IMAGE ?= $(STOREHOUSE)/$(NAME):$(VERSION)
format:
find . -type f -name "*.c" | xargs clang-format -i

View File

@@ -20,7 +20,7 @@
#define DNS_HLEN sizeof(struct dns_hdr)
#define HTTP_DATA_MIN_SIZE 91
#define MAX_DATA_SIZE 4000
#define MAX_DATA_SIZE 1024*4
#define MAX_TRUNCATION 10
enum tc_type { Egress, Ingress };

105
merge.go
View File

@@ -15,66 +15,69 @@ import (
"time"
)
var ackRequest = AckRequest{ackRequest: map[uint32]FlyHttp{}}
var ackResponse = AckResponse{ackResponse: map[uint32][]FlyHttp{}}
var ackToRequest = AckToRequest{mp: map[uint32]FlyHttp{}}
var ackToResponse = AckToResponse{mp: map[uint32][]FlyHttp{}}
var seqToAck = SeqToAck{seqToAck: map[uint32]uint32{}}
type AckRequest struct {
ackRequest map[uint32]FlyHttp
lock sync.RWMutex
// AckToRequest save the ACK and corresponding request in the HTTP request message
type AckToRequest struct {
mp map[uint32]FlyHttp
lock sync.RWMutex
}
func (a *AckRequest) Get(key uint32) (FlyHttp, bool) {
func (a *AckToRequest) Get(key uint32) (FlyHttp, bool) {
a.lock.Lock()
defer a.lock.Unlock()
v, ok := a.ackRequest[key]
v, ok := a.mp[key]
return v, ok
}
func (a *AckRequest) List() map[uint32]FlyHttp {
func (a *AckToRequest) List() map[uint32]FlyHttp {
a.lock.Lock()
defer a.lock.Unlock()
tmp, _ := json.Marshal(a.ackRequest)
tmp, _ := json.Marshal(a.mp)
ret := map[uint32]FlyHttp{}
json.Unmarshal(tmp, &ret)
return ret
}
func (a *AckRequest) Save(http FlyHttp) {
func (a *AckToRequest) Save(http FlyHttp) {
a.lock.Lock()
defer a.lock.Unlock()
a.ackRequest[http.Ack] = http
a.mp[http.Ack] = http
}
func (a *AckRequest) Delete(key uint32) {
func (a *AckToRequest) Delete(key uint32) {
a.lock.Lock()
defer a.lock.Unlock()
delete(a.ackRequest, key)
delete(a.mp, key)
}
type AckResponse struct {
ackResponse map[uint32][]FlyHttp
lock sync.RWMutex
// AckToResponse save the ACK and corresponding response in the HTTP response message
type AckToResponse struct {
mp map[uint32][]FlyHttp
lock sync.RWMutex
}
func (a *AckResponse) Get(key uint32) []FlyHttp {
func (a *AckToResponse) Get(key uint32) []FlyHttp {
a.lock.Lock()
defer a.lock.Unlock()
return a.ackResponse[key]
return a.mp[key]
}
func (a *AckResponse) Save(http FlyHttp) {
func (a *AckToResponse) Save(http FlyHttp) {
a.lock.Lock()
defer a.lock.Unlock()
a.ackResponse[http.Ack] = append(a.ackResponse[http.Ack], http)
a.mp[http.Ack] = append(a.mp[http.Ack], http)
}
func (a *AckResponse) Delete(key uint32) {
func (a *AckToResponse) Delete(key uint32) {
a.lock.Lock()
defer a.lock.Unlock()
delete(a.ackResponse, key)
delete(a.mp, key)
}
// SeqToAck save the association between HTTP request and HTTP response message, Seq and Ack
type SeqToAck struct {
seqToAck map[uint32]uint32
lock sync.RWMutex
@@ -106,36 +109,34 @@ func MageHttp(ctx context.Context, save chan<- model) {
case <-ctx.Done():
break
case <-ticker:
request := ackRequest.List()
request := ackToRequest.List()
for k, v := range request {
ack, ok := seqToAck.Get(k)
if !ok {
continue
}
if Verbose {
log.Printf("req ack:%+v\n\tseq:%+v,ack:%+v,url:%+v,value:%+v\n", k, v.Seq, v.Ack, v.Data.RequestLine, v.Data.Headers)
}
flyHttps := ackResponse.Get(ack)
if flyHttps == nil {
flyResponses := ackToResponse.Get(ack)
if flyResponses == nil {
continue
}
if Verbose {
log.Printf("res ack:%+v\n", ack)
for _, v := range flyHttps {
log.Printf("\tseq:%+v,ack:%+v,value:%+v\n", v.Seq, v.Ack, v.Data.Headers)
log.Printf("[PRISM] request ack:%+v\n", k)
log.Printf("[PRISM] \tseq:%+v,ack:%+v,url:%+v,value:%+v\n", v.Seq, v.Ack, v.Data.RequestLine, v.Data.Headers)
log.Printf("[PRISM] response ack:%+v\n", ack)
for _, v := range flyResponses {
log.Printf("[PRISM] \tseq:%+v,ack:%+v,value:%+v\n", v.Seq, v.Ack, v.Data.Headers)
}
}
if !checkoutBodyLen(flyHttps) {
if !checkoutBodyLen(flyResponses) {
continue
}
save <- mergeOperation(v, flyHttps)
ackRequest.Delete(k)
save <- mergeOperation(v, flyResponses)
ackToRequest.Delete(k)
seqToAck.Delete(k)
ackResponse.Delete(ack)
ackToResponse.Delete(ack)
}
}
}
@@ -154,7 +155,7 @@ func checkoutBodyLen(flyHttps []FlyHttp) bool {
}
if Verbose {
log.Printf("maxBody:%+v,currentBody:%+v", maxBody, currentBody)
log.Printf("[PRISM] HTTP max body len:%+v, current body len:%+v", maxBody, currentBody)
}
if currentBody == maxBody {
@@ -175,15 +176,15 @@ func checkoutBodyLen(flyHttps []FlyHttp) bool {
func mergeOperation(request FlyHttp, responses []FlyHttp) model {
fmt.Println()
log.Printf("request: %+v", request.Data.RequestLine)
log.Printf("[PRISM] HTTP request: %+v", request.Data.RequestLine)
if Debug {
printFormatHeader(request.Data.Headers)
log.Printf("request param: %+v", string(request.Data.Body))
log.Printf("[PRISM] HTTP request param: %+v", string(request.Data.Body))
}
urls, err := url.Parse(request.Data.RequestLine.URN)
if err != nil {
log.Printf("url.Parse error %+v", err.Error())
log.Printf("[ERROR] url parse (%+v)", err.Error())
return model{}
}
Parma, _ := url.ParseQuery(urls.RawQuery)
@@ -226,7 +227,7 @@ func mergeOperation(request FlyHttp, responses []FlyHttp) model {
if Verbose {
if _, ok := responseHeaders[ContentLength]; !ok {
log.Printf("response[%d] body: %+v", i, string(responses[i].Data.Body))
log.Printf("[PRISM] HTTP response[%d] body: %+v", i, string(responses[i].Data.Body))
}
}
@@ -254,7 +255,7 @@ func mergeOperation(request FlyHttp, responses []FlyHttp) model {
chunkData := make([]byte, chunkSize)
_, err = body.Read(chunkData)
if err != nil {
log.Printf("read chunked error %+v", err.Error())
log.Printf("[ERROR] read chunked (%+v)", err.Error())
continue
}
mergedBody.Write(chunkData)
@@ -269,7 +270,7 @@ func mergeOperation(request FlyHttp, responses []FlyHttp) model {
md.ResponseContextType = responseHeaders[ContentType]
if Debug {
log.Printf("response: %+v", responseLine.String())
log.Printf("[PRISM] HTTP response: %+v", responseLine.String())
printFormatHeader(responseHeaders)
}
@@ -277,20 +278,20 @@ func mergeOperation(request FlyHttp, responses []FlyHttp) model {
if encoding, ok := responseHeaders[ContentEncoding]; ok && encoding == "gzip" {
ret, err := parseGzip(mergedBody.Bytes())
if err != nil && err.Error() != "unexpected EOF" {
log.Printf("gzip decode err: %s", err.Error())
log.Printf("[PRISM] gzip decode (%s)", err.Error())
}
if contentType, ok := responseHeaders[ContentType]; ok &&
(strings.Contains(contentType, ContentTypePlain) ||
strings.Contains(contentType, ContentTypeJSON)) {
log.Printf("gzip response body: %+v", string(ret))
(strings.Contains(contentType, ContentTypePlain) || strings.Contains(contentType, ContentTypeJSON)) {
log.Printf("[PRISM] HTTP gzip response body: %+v", string(ret))
md.ResponseBody = string(ret)
}
} else {
if contentType, ok := responseHeaders[ContentType]; ok &&
(strings.Contains(contentType, ContentTypePlain) ||
strings.Contains(contentType, ContentTypeJSON)) {
log.Printf("response body: %+v", mergedBody.String())
(strings.Contains(contentType, ContentTypePlain) || strings.Contains(contentType, ContentTypeJSON)) {
log.Printf("[PRISM] HTTP response body: %+v", mergedBody.String())
md.ResponseBody = mergedBody.String()
}
}
@@ -318,9 +319,9 @@ func parseGzip(in []byte) ([]byte, error) {
}
func printFormatHeader(headers map[string]string) {
fmt.Printf("headers:\n")
log.Printf("[PRISM] HTTP headers:\n")
for k, v := range headers {
fmt.Printf("\t%s: %s\n", k, v)
log.Printf("[PRISM] \t\t%s: %s\n", k, v)
}
}

View File

@@ -31,27 +31,27 @@ const (
func ParseHttp(data []byte) error {
if Debug && Verbose {
log.Printf("[PACKAGE] data:%+v", data)
log.Printf("[PRISM] data:%+v", data)
}
flyHttp, err := extractFlyHttp(data)
if err != nil {
log.Printf("extract fly http error:%+v", err.Error())
log.Printf("[ERROR] extract fly http error (%+v)", err.Error())
return err
}
rType := flyHttp.Data.Type
if rType == IsRequest {
if Debug && Verbose {
log.Printf("[HTTP] Request Body: %+v", string(flyHttp.Data.Body))
log.Printf("[PRISM] HTTP Request Body: %+v", string(flyHttp.Data.Body))
log.Println()
}
ackRequest.Save(flyHttp)
ackToRequest.Save(flyHttp)
}
if rType == IsResponse {
if Debug && Verbose {
log.Printf("[HTTP] Response Body: %+v", flyHttp.Data.Body)
log.Printf("[PRISM] HTTP Response Body: %+v", flyHttp.Data.Body)
log.Println()
}
@@ -61,7 +61,7 @@ func ParseHttp(data []byte) error {
seqToAck.Save(flyHttp)
}
ackResponse.Save(flyHttp)
ackToResponse.Save(flyHttp)
}
return nil
@@ -89,43 +89,43 @@ func extractFlyHttp(data []byte) (FlyHttp, error) {
}
if Debug {
log.Printf("[ETH] SrcMAC: %s, DstMAC: %s", eth.SrcMAC, eth.DstMAC)
log.Printf("[IPV4] SrcIP: %s, DstIP: %s", ipv4.SrcIP, ipv4.DstIP)
log.Printf("[TCP] SrcPort: %s, DstPort: %s", tcp.SrcPort, tcp.DstPort)
log.Printf("[IPV4] Version: %d", ipv4.Version)
log.Printf("[IPV4] Length: %d", ipv4.Length)
log.Printf("[TCP] Seq: %d", tcp.Seq)
log.Printf("[TCP] Ack: %d", tcp.Ack)
log.Printf("[TCP] FIN: %t", tcp.FIN)
log.Printf("[TCP] SYN: %t", tcp.SYN)
log.Printf("[TCP] RST: %t", tcp.RST)
log.Printf("[TCP] PSH: %t", tcp.PSH)
log.Printf("[TCP] ACK: %t", tcp.ACK)
log.Printf("[TCP] URG: %t", tcp.URG)
log.Printf("[TCP] ECE: %t", tcp.ECE)
log.Printf("[TCP] CWR: %t", tcp.CWR)
log.Printf("[TCP] NS: %t", tcp.NS)
log.Printf("[TCP] Window: %d", tcp.Window)
log.Printf("[TCP] Checksum: %d", tcp.Checksum)
log.Printf("[TCP] Urgent: %d", tcp.Urgent)
log.Printf("[TCP] Options: %d", tcp.Options)
log.Printf("[TCP] Padding: %+v", tcp.Padding)
log.Printf("[PRISM] ETH SrcMAC: %s, DstMAC: %s", eth.SrcMAC, eth.DstMAC)
log.Printf("[PRISM] IPV4 SrcIP: %s, DstIP: %s", ipv4.SrcIP, ipv4.DstIP)
log.Printf("[PRISM] TCP SrcPort: %s, DstPort: %s", tcp.SrcPort, tcp.DstPort)
log.Printf("[PRISM] IPV4 Version: %d", ipv4.Version)
log.Printf("[PRISM] IPV4 Length: %d", ipv4.Length)
log.Printf("[PRISM] TCP Seq: %d", tcp.Seq)
log.Printf("[PRISM] TCP Ack: %d", tcp.Ack)
log.Printf("[PRISM] TCP FIN: %t", tcp.FIN)
log.Printf("[PRISM] TCP SYN: %t", tcp.SYN)
log.Printf("[PRISM] TCP RST: %t", tcp.RST)
log.Printf("[PRISM] TCP PSH: %t", tcp.PSH)
log.Printf("[PRISM] TCP ACK: %t", tcp.ACK)
log.Printf("[PRISM] TCP URG: %t", tcp.URG)
log.Printf("[PRISM] TCP ECE: %t", tcp.ECE)
log.Printf("[PRISM] TCP CWR: %t", tcp.CWR)
log.Printf("[PRISM] TCP NS: %t", tcp.NS)
log.Printf("[PRISM] TCP Window: %d", tcp.Window)
log.Printf("[PRISM] TCP Checksum: %d", tcp.Checksum)
log.Printf("[PRISM] TCP Urgent: %d", tcp.Urgent)
log.Printf("[PRISM] TCP Options: %d", tcp.Options)
log.Printf("[PRISM] TCP Padding: %+v", tcp.Padding)
}
reqOrResData := parseReqOrResData(data)
if reqOrResData.Type == IsRequest {
if Debug && !reqOrResData.IsTruncation {
log.Printf("[HTTP] Request Line: %+v", reqOrResData.RequestLine.String())
log.Printf("[HTTP] Request Headers: %+v", reqOrResData.Headers)
log.Printf("[HTTP] Request Line: %+v", reqOrResData.RequestLine.String())
log.Printf("[HTTP] Request Headers: %+v", reqOrResData.Headers)
log.Println()
}
}
if reqOrResData.Type == IsResponse {
if Debug && !reqOrResData.IsTruncation {
log.Printf("[HTTP] Response Line: %+v", reqOrResData.ResponseLine.String())
log.Printf("[HTTP] Response Headers: %+v", reqOrResData.Headers)
log.Printf("[PRISM] HTTP Response Line: %+v", reqOrResData.ResponseLine.String())
log.Printf("[PRISM] HTTP Response Headers: %+v", reqOrResData.Headers)
log.Println()
}
}
@@ -165,7 +165,7 @@ func parseReqOrResData(data []byte) ReqOrResData {
if IsTruncation {
if Debug {
log.Printf("is truncation")
log.Printf("[PRISM] HTTP is truncation")
}
// is truncation
return ReqOrResData{

View File

@@ -10,18 +10,18 @@ import (
func SaveHttpData(db *leveldb.DB, save <-chan model) {
for md := range save {
if !strings.Contains(md.ResponseContextType, "text/plain") && !strings.Contains(md.ResponseContextType, "application/json") {
log.Printf("package is no text/plain,application/json")
log.Printf("[PRISM] package is no text/plain,application/json")
continue
}
md.key()
byt, err := json.Marshal(md)
if err != nil {
log.Printf("marshal error %s", err.Error())
log.Printf("[ERROR] marshal error (%s)", err.Error())
continue
}
if err := db.Put([]byte(md.Id), byt, nil); err != nil {
log.Printf("put error %s", err.Error())
log.Printf("[ERROR] put error (%s)", err.Error())
continue
}
}

27
web.go
View File

@@ -22,19 +22,8 @@ func RunListening(db *leveldb.DB) {
db: db,
}
go func() {
ticker := time.Tick(60 * time.Second)
for {
select {
case <-ticker:
stat := time.Now()
h.load()
log.Printf("================load data success %fs================", time.Since(stat).Seconds())
}
}
}()
router.GET("/interface", h.list)
router.GET("/refresh", h.refresh)
router.Run(":8080")
}
@@ -100,6 +89,16 @@ func (h Handler) list(ctx *gin.Context) {
return
}
func (h Handler) refresh(ctx *gin.Context) {
stat := time.Now()
h.load()
log.Printf("[PRISM] load data success %fs", time.Since(stat).Seconds())
ctx.JSON(http.StatusOK, gin.H{
"msg": "success",
})
return
}
func (h *Handler) load() {
var ret []model
iter := h.db.NewIterator(nil, nil)
@@ -108,13 +107,13 @@ func (h *Handler) load() {
value := iter.Value()
md := model{}
if err := json.Unmarshal(value, &md); err != nil {
log.Printf("json.unmarshal:%s\n", err.Error())
log.Printf("[PRISM] json unmarshal error (%s)", err.Error())
}
ret = append(ret, md)
}
iter.Release()
if err := iter.Error(); err != nil {
log.Printf("iter error:%s\n", err.Error())
log.Printf("[PRISM] iter error (%s)", err.Error())
}
h.cache = &ret
}