mirror of
https://github.com/telanflow/mps.git
synced 2025-09-26 20:41:25 +08:00
a simple http proxy examples
This commit is contained in:
60
examples/simple-http-proxy/main.go
Normal file
60
examples/simple-http-proxy/main.go
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"github.com/telanflow/mps"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"regexp"
|
||||||
|
"syscall"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A simple http proxy server
|
||||||
|
func main() {
|
||||||
|
quitSignChan := make(chan os.Signal)
|
||||||
|
|
||||||
|
// create a http proxy server
|
||||||
|
proxy := mps.NewHttpProxy()
|
||||||
|
proxy.UseFunc(func(req *http.Request, ctx *mps.Context) (*http.Response, error) {
|
||||||
|
log.Printf("[INFO] middleware -- %s", req.URL)
|
||||||
|
return ctx.Next(req)
|
||||||
|
})
|
||||||
|
|
||||||
|
reqGroup := proxy.OnRequest(mps.FilterHostMatches(regexp.MustCompile("^.*$")))
|
||||||
|
reqGroup.DoFunc(func(req *http.Request, ctx *mps.Context) (*http.Request, *http.Response) {
|
||||||
|
log.Printf("[INFO] req -- %s\n", req.URL)
|
||||||
|
return req, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
respGroup := proxy.OnResponse()
|
||||||
|
respGroup.DoFunc(func(resp *http.Response, ctx *mps.Context) (*http.Response, error) {
|
||||||
|
log.Printf("[INFO] resp -- %d\n", resp.StatusCode)
|
||||||
|
return resp, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// Start server
|
||||||
|
srv := &http.Server{
|
||||||
|
Addr: "127.0.0.1:8081",
|
||||||
|
Handler: proxy,
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
log.Printf("HttpProxy started listen: http://%s", srv.Addr)
|
||||||
|
err := srv.ListenAndServe()
|
||||||
|
if errors.Is(err, http.ErrServerClosed) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
quitSignChan <- syscall.SIGKILL
|
||||||
|
log.Fatalf("HttpProxy start fail: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// quit signal
|
||||||
|
signal.Notify(quitSignChan, syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGQUIT)
|
||||||
|
|
||||||
|
<-quitSignChan
|
||||||
|
_ = srv.Close()
|
||||||
|
log.Fatal("HttpProxy server stop!")
|
||||||
|
}
|
@@ -124,7 +124,6 @@ func (mitm *MitmHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
|
|||||||
req.URL, err = url.Parse("https://" + r.Host + req.URL.String())
|
req.URL, err = url.Parse("https://" + r.Host + req.URL.String())
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
//ctx.Warnf("Illegal URL %s", "https://"+r.Host+req.URL.Path)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -145,7 +144,6 @@ func (mitm *MitmHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
// always use 1.1 to support chunked encoding
|
// always use 1.1 to support chunked encoding
|
||||||
if _, err := io.WriteString(rawClientTls, "HTTP/1.1"+" "+statusCode+status+"\r\n"); err != nil {
|
if _, err := io.WriteString(rawClientTls, "HTTP/1.1"+" "+statusCode+status+"\r\n"); err != nil {
|
||||||
//ctx.Warnf("Cannot write TLS response HTTP status from mitm'd client: %v", err)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -2,7 +2,14 @@ package pool
|
|||||||
|
|
||||||
import "net"
|
import "net"
|
||||||
|
|
||||||
|
// ConnContainer connection pool interface
|
||||||
type ConnContainer interface {
|
type ConnContainer interface {
|
||||||
|
// Get returned a idle net.Conn
|
||||||
Get(addr string) (net.Conn, error)
|
Get(addr string) (net.Conn, error)
|
||||||
|
|
||||||
|
// Put place a idle net.Conn into the pool
|
||||||
Put(conn net.Conn) error
|
Put(conn net.Conn) error
|
||||||
|
|
||||||
|
// Release connection pool
|
||||||
|
Release() error
|
||||||
}
|
}
|
||||||
|
@@ -63,8 +63,6 @@ RETRY:
|
|||||||
func (p *ConnProvider) Put(conn net.Conn) error {
|
func (p *ConnProvider) Put(conn net.Conn) error {
|
||||||
closed := atomic.LoadInt32(&p.closed)
|
closed := atomic.LoadInt32(&p.closed)
|
||||||
if closed == 1 {
|
if closed == 1 {
|
||||||
// pool is closed, this conn must be closed
|
|
||||||
conn.Close()
|
|
||||||
return errors.New("pool is closed")
|
return errors.New("pool is closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -80,7 +78,6 @@ func (p *ConnProvider) Put(conn net.Conn) error {
|
|||||||
// The timeout will be verified at the next `Get()`
|
// The timeout will be verified at the next `Get()`
|
||||||
err := conn.SetDeadline(time.Now().Add(p.options.Timeout))
|
err := conn.SetDeadline(time.Now().Add(p.options.Timeout))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = conn.Close()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -88,8 +85,7 @@ func (p *ConnProvider) Put(conn net.Conn) error {
|
|||||||
case p.idleConnMap[addr] <- conn:
|
case p.idleConnMap[addr] <- conn:
|
||||||
return nil
|
return nil
|
||||||
default:
|
default:
|
||||||
err := conn.Close()
|
return fmt.Errorf("beyond max capacity")
|
||||||
return fmt.Errorf("beyond max capacity. conn closed: %v", err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -96,7 +96,13 @@ func (tunnel *TunnelHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request
|
|||||||
// If the ConnContainer is exists,
|
// If the ConnContainer is exists,
|
||||||
// When io.CopyBuffer is complete,
|
// When io.CopyBuffer is complete,
|
||||||
// put the idle connection into the ConnContainer so can reuse it next time
|
// put the idle connection into the ConnContainer so can reuse it next time
|
||||||
defer tunnel.connContainer().Put(targetConn)
|
defer func() {
|
||||||
|
err := tunnel.connContainer().Put(targetConn)
|
||||||
|
if err != nil {
|
||||||
|
// put conn fail, conn must be closed
|
||||||
|
_ = targetConn.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// The cascade proxy needs to forward the request
|
// The cascade proxy needs to forward the request
|
||||||
if isCascadeProxy {
|
if isCascadeProxy {
|
||||||
@@ -118,6 +124,26 @@ func (tunnel *TunnelHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request
|
|||||||
tunnel.buffer().Put(buf)
|
tunnel.buffer().Put(buf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Use registers an Middleware to proxy
|
||||||
|
func (tunnel *TunnelHandler) Use(middleware ...Middleware) {
|
||||||
|
tunnel.Ctx.Use(middleware...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UseFunc registers an MiddlewareFunc to proxy
|
||||||
|
func (tunnel *TunnelHandler) UseFunc(fus ...MiddlewareFunc) {
|
||||||
|
tunnel.Ctx.UseFunc(fus...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnRequest filter requests through Filters
|
||||||
|
func (tunnel *TunnelHandler) OnRequest(filters ...Filter) *ReqFilterGroup {
|
||||||
|
return &ReqFilterGroup{ctx: tunnel.Ctx, filters: filters}
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnResponse filter response through Filters
|
||||||
|
func (tunnel *TunnelHandler) OnResponse(filters ...Filter) *RespFilterGroup {
|
||||||
|
return &RespFilterGroup{ctx: tunnel.Ctx, filters: filters}
|
||||||
|
}
|
||||||
|
|
||||||
func (tunnel *TunnelHandler) ConnectDial(network, addr string) (net.Conn, error) {
|
func (tunnel *TunnelHandler) ConnectDial(network, addr string) (net.Conn, error) {
|
||||||
if tunnel.Ctx.Transport != nil && tunnel.Ctx.Transport.DialContext != nil {
|
if tunnel.Ctx.Transport != nil && tunnel.Ctx.Transport.DialContext != nil {
|
||||||
return tunnel.Ctx.Transport.DialContext(tunnel.context(), network, addr)
|
return tunnel.Ctx.Transport.DialContext(tunnel.context(), network, addr)
|
||||||
@@ -125,7 +151,7 @@ func (tunnel *TunnelHandler) ConnectDial(network, addr string) (net.Conn, error)
|
|||||||
return net.DialTimeout(network, addr, 30*time.Second)
|
return net.DialTimeout(network, addr, 30*time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Transport
|
// Transport get http.Transport instance
|
||||||
func (tunnel *TunnelHandler) Transport() *http.Transport {
|
func (tunnel *TunnelHandler) Transport() *http.Transport {
|
||||||
return tunnel.Ctx.Transport
|
return tunnel.Ctx.Transport
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user