feat: grpc support

This commit is contained in:
langhuihui
2024-04-09 20:31:39 +08:00
parent e625fe51fc
commit 8284f956fe
21 changed files with 2843 additions and 115 deletions

View File

@@ -1,5 +1,7 @@
global: global:
loglevel: debug loglevel: debug
tcp:
listenaddr: :50051
rtmp: rtmp:
chunksize: 2048 chunksize: 2048
subscribe: subscribe:

12
go.mod
View File

@@ -4,16 +4,26 @@ go 1.22
toolchain go1.22.1 toolchain go1.22.1
require github.com/quic-go/quic-go v0.42.0 require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/quic-go/quic-go v0.42.0
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.33.0
)
require ( require (
github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-ole/go-ole v1.2.6 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect github.com/tklauser/numcpus v0.6.1 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect
) )
require ( require (

35
go.sum
View File

@@ -12,8 +12,8 @@ github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
@@ -22,7 +22,11 @@ github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 h1:/c3QmbOGMGTOumP2iT/rCwB7b0QDGLKzqOmktBjT+Is=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1/go.mod h1:5SN9VR2LTsRFsrEC6FHgRbTWrTHu6tqPeKxEQv15giM=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
@@ -30,7 +34,6 @@ github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/mcuadros/go-defaults v1.2.0 h1:FODb8WSf0uGaY8elWJAkoLL0Ri6AlZ1bFlenk56oZtc= github.com/mcuadros/go-defaults v1.2.0 h1:FODb8WSf0uGaY8elWJAkoLL0Ri6AlZ1bFlenk56oZtc=
github.com/mcuadros/go-defaults v1.2.0/go.mod h1:WEZtHEVIGYVDqkKSWBdWKUVdRyKlMfulPaGDWIVeCWY= github.com/mcuadros/go-defaults v1.2.0/go.mod h1:WEZtHEVIGYVDqkKSWBdWKUVdRyKlMfulPaGDWIVeCWY=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/onsi/ginkgo/v2 v2.9.5 h1:+6Hr4uxzP4XIUyAkg61dWBw8lb/gc4/X5luuxN/EC+Q= github.com/onsi/ginkgo/v2 v2.9.5 h1:+6Hr4uxzP4XIUyAkg61dWBw8lb/gc4/X5luuxN/EC+Q=
github.com/onsi/ginkgo/v2 v2.9.5/go.mod h1:tvAoo1QUJwNEU2ITftXTpR7R1RbCzoZUOs3RonqW57k= github.com/onsi/ginkgo/v2 v2.9.5/go.mod h1:tvAoo1QUJwNEU2ITftXTpR7R1RbCzoZUOs3RonqW57k=
@@ -42,6 +45,8 @@ github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/quic-go/quic-go v0.42.0 h1:uSfdap0eveIl8KXnipv9K7nlwZ5IqLlYOpJ58u5utpM= github.com/quic-go/quic-go v0.42.0 h1:uSfdap0eveIl8KXnipv9K7nlwZ5IqLlYOpJ58u5utpM=
github.com/quic-go/quic-go v0.42.0/go.mod h1:132kz4kL3F9vxhW3CtQJLDVwcFe5wdWeJXXijhsO57M= github.com/quic-go/quic-go v0.42.0/go.mod h1:132kz4kL3F9vxhW3CtQJLDVwcFe5wdWeJXXijhsO57M=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/shirou/gopsutil/v3 v3.24.3 h1:eoUGJSmdfLzJ3mxIhmOAhgKEKgQkeOwKpz1NbhVnuPE= github.com/shirou/gopsutil/v3 v3.24.3 h1:eoUGJSmdfLzJ3mxIhmOAhgKEKgQkeOwKpz1NbhVnuPE=
github.com/shirou/gopsutil/v3 v3.24.3/go.mod h1:JpND7O217xa72ewWz9zN2eIIkPWsDN/3pl0H8Qt0uwg= github.com/shirou/gopsutil/v3 v3.24.3/go.mod h1:JpND7O217xa72ewWz9zN2eIIkPWsDN/3pl0H8Qt0uwg=
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
@@ -66,16 +71,12 @@ github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db h1:D/cFflL63o2KSLJIwjlcIt8PR064j/xsmdEJL/YvY/o= golang.org/x/exp v0.0.0-20221205204356-47842c84f3db h1:D/cFflL63o2KSLJIwjlcIt8PR064j/xsmdEJL/YvY/o=
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= golang.org/x/exp v0.0.0-20221205204356-47842c84f3db/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@@ -83,22 +84,30 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.9.1 h1:8WMNJAz3zrtPmnYC7ISf5dEn3MT0gY7jBJfw27yrrLo= golang.org/x/tools v0.9.1 h1:8WMNJAz3zrtPmnYC7ISf5dEn3MT0gY7jBJfw27yrrLo=
golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo=
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de h1:jFNzHPIeuzhdRwVhbZdiym9q0ory/xY3sA+v2wPg8I0=
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:5iCWqnniDlqZHrd3neWVTOwvh/v6s3232omMecelax8=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de h1:cZGRis4/ot9uVm639a+rHCUaG0JJHEsdyzSQTMX+suY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:H4O17MA/PE9BsGx3w+a+W2VOLLD1Qf7oJneAoU6WktY=
google.golang.org/grpc v1.63.2 h1:MUeiw1B2maTVZthpU5xvASfTh3LDbxHd6IJ6QQVU+xM=
google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -48,7 +48,7 @@ type (
AVFrame struct { AVFrame struct {
DataFrame DataFrame
Timestamp time.Duration // 绝对时间戳 Timestamp time.Duration // 绝对时间戳
Wrap IAVFrame `json:"-" yaml:"-"` // 封装格式 Wrap IAVFrame // 封装格式
} }
DataFrame struct { DataFrame struct {
sync.Cond `json:"-" yaml:"-"` sync.Cond `json:"-" yaml:"-"`

View File

@@ -20,45 +20,49 @@ type HTTP struct {
ReadTimeout time.Duration `desc:"读取超时"` ReadTimeout time.Duration `desc:"读取超时"`
WriteTimeout time.Duration `desc:"写入超时"` WriteTimeout time.Duration `desc:"写入超时"`
IdleTimeout time.Duration `desc:"空闲超时"` IdleTimeout time.Duration `desc:"空闲超时"`
mux *http.ServeMux mux http.Handler
server *http.Server server *http.Server
serverTLS *http.Server serverTLS *http.Server
middlewares []Middleware // middlewares []Middleware
} }
type HTTPConfig interface { type HTTPConfig interface {
GetHTTPConfig() *HTTP GetHTTPConfig() *HTTP
Handle(string, http.Handler) // Handle(string, http.Handler)
Handler(*http.Request) (http.Handler, string) // Handler(*http.Request) (http.Handler, string)
AddMiddleware(Middleware) // AddMiddleware(Middleware)
} }
func (config *HTTP) AddMiddleware(middleware Middleware) { func (config *HTTP) SetMux(mux http.Handler) {
config.middlewares = append(config.middlewares, middleware) config.mux = mux
} }
func (config *HTTP) Handle(path string, f http.Handler) { // func (config *HTTP) AddMiddleware(middleware Middleware) {
if config.mux == nil { // config.middlewares = append(config.middlewares, middleware)
config.mux = http.NewServeMux() // }
}
if config.CORS { // func (config *HTTP) Handle(path string, f http.Handler) {
// f = util.CORS(f) // if config.mux == nil {
} // config.mux = http.NewServeMux()
if config.UserName != "" && config.Password != "" { // }
// f = util.BasicAuth(config.UserName, config.Password, f) // if config.CORS {
} // // f = util.CORS(f)
for _, middleware := range config.middlewares { // }
f = middleware(path, f) // if config.UserName != "" && config.Password != "" {
} // // f = util.BasicAuth(config.UserName, config.Password, f)
config.mux.Handle(path, f) // }
} // for _, middleware := range config.middlewares {
// f = middleware(path, f)
// }
// config.mux.Handle(path, f)
// }
func (config *HTTP) GetHTTPConfig() *HTTP { func (config *HTTP) GetHTTPConfig() *HTTP {
return config return config
} }
func (config *HTTP) Handler(r *http.Request) (h http.Handler, pattern string) { // func (config *HTTP) Handler(r *http.Request) (h http.Handler, pattern string) {
return config.mux.Handler(r) // return config.mux.Handler(r)
} // }
func (config *HTTP) StopListen() { func (config *HTTP) StopListen() {
if config.server != nil { if config.server != nil {

417
pkg/pb/global.pb.go Normal file
View File

@@ -0,0 +1,417 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.28.1
// protoc v3.19.1
// source: global.proto
package pb
import (
_ "google.golang.org/genproto/googleapis/api/annotations"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type StreamSnapRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
StreamPath string `protobuf:"bytes,1,opt,name=streamPath,proto3" json:"streamPath,omitempty"`
}
func (x *StreamSnapRequest) Reset() {
*x = StreamSnapRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_global_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StreamSnapRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StreamSnapRequest) ProtoMessage() {}
func (x *StreamSnapRequest) ProtoReflect() protoreflect.Message {
mi := &file_global_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StreamSnapRequest.ProtoReflect.Descriptor instead.
func (*StreamSnapRequest) Descriptor() ([]byte, []int) {
return file_global_proto_rawDescGZIP(), []int{0}
}
func (x *StreamSnapRequest) GetStreamPath() string {
if x != nil {
return x.StreamPath
}
return ""
}
type Wrap struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Timestamp uint32 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
Size uint32 `protobuf:"varint,2,opt,name=size,proto3" json:"size,omitempty"`
Data string `protobuf:"bytes,3,opt,name=data,proto3" json:"data,omitempty"`
}
func (x *Wrap) Reset() {
*x = Wrap{}
if protoimpl.UnsafeEnabled {
mi := &file_global_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *Wrap) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*Wrap) ProtoMessage() {}
func (x *Wrap) ProtoReflect() protoreflect.Message {
mi := &file_global_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use Wrap.ProtoReflect.Descriptor instead.
func (*Wrap) Descriptor() ([]byte, []int) {
return file_global_proto_rawDescGZIP(), []int{1}
}
func (x *Wrap) GetTimestamp() uint32 {
if x != nil {
return x.Timestamp
}
return 0
}
func (x *Wrap) GetSize() uint32 {
if x != nil {
return x.Size
}
return 0
}
func (x *Wrap) GetData() string {
if x != nil {
return x.Data
}
return ""
}
type TrackSnapShot struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Sequence uint32 `protobuf:"varint,1,opt,name=sequence,proto3" json:"sequence,omitempty"`
Timestamp uint32 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
WriteTime uint64 `protobuf:"varint,3,opt,name=writeTime,proto3" json:"writeTime,omitempty"`
CanRead bool `protobuf:"varint,4,opt,name=canRead,proto3" json:"canRead,omitempty"`
Wrap *Wrap `protobuf:"bytes,5,opt,name=wrap,proto3" json:"wrap,omitempty"`
}
func (x *TrackSnapShot) Reset() {
*x = TrackSnapShot{}
if protoimpl.UnsafeEnabled {
mi := &file_global_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *TrackSnapShot) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*TrackSnapShot) ProtoMessage() {}
func (x *TrackSnapShot) ProtoReflect() protoreflect.Message {
mi := &file_global_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use TrackSnapShot.ProtoReflect.Descriptor instead.
func (*TrackSnapShot) Descriptor() ([]byte, []int) {
return file_global_proto_rawDescGZIP(), []int{2}
}
func (x *TrackSnapShot) GetSequence() uint32 {
if x != nil {
return x.Sequence
}
return 0
}
func (x *TrackSnapShot) GetTimestamp() uint32 {
if x != nil {
return x.Timestamp
}
return 0
}
func (x *TrackSnapShot) GetWriteTime() uint64 {
if x != nil {
return x.WriteTime
}
return 0
}
func (x *TrackSnapShot) GetCanRead() bool {
if x != nil {
return x.CanRead
}
return false
}
func (x *TrackSnapShot) GetWrap() *Wrap {
if x != nil {
return x.Wrap
}
return nil
}
type StreamSnapShot struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
VideoTrack []*TrackSnapShot `protobuf:"bytes,1,rep,name=videoTrack,proto3" json:"videoTrack,omitempty"`
AudioTrack []*TrackSnapShot `protobuf:"bytes,2,rep,name=audioTrack,proto3" json:"audioTrack,omitempty"`
}
func (x *StreamSnapShot) Reset() {
*x = StreamSnapShot{}
if protoimpl.UnsafeEnabled {
mi := &file_global_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *StreamSnapShot) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*StreamSnapShot) ProtoMessage() {}
func (x *StreamSnapShot) ProtoReflect() protoreflect.Message {
mi := &file_global_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use StreamSnapShot.ProtoReflect.Descriptor instead.
func (*StreamSnapShot) Descriptor() ([]byte, []int) {
return file_global_proto_rawDescGZIP(), []int{3}
}
func (x *StreamSnapShot) GetVideoTrack() []*TrackSnapShot {
if x != nil {
return x.VideoTrack
}
return nil
}
func (x *StreamSnapShot) GetAudioTrack() []*TrackSnapShot {
if x != nil {
return x.AudioTrack
}
return nil
}
var File_global_proto protoreflect.FileDescriptor
var file_global_proto_rawDesc = []byte{
0x0a, 0x0c, 0x67, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03,
0x6d, 0x37, 0x73, 0x1a, 0x1c, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x2f,
0x61, 0x6e, 0x6e, 0x6f, 0x74, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x22, 0x33, 0x0a, 0x11, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, 0x61, 0x70, 0x52,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d,
0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x22, 0x4c, 0x0a, 0x04, 0x57, 0x72, 0x61, 0x70, 0x12, 0x1c,
0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28,
0x0d, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x12, 0x0a, 0x04,
0x73, 0x69, 0x7a, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x04, 0x73, 0x69, 0x7a, 0x65,
0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04,
0x64, 0x61, 0x74, 0x61, 0x22, 0xa0, 0x01, 0x0a, 0x0d, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x53, 0x6e,
0x61, 0x70, 0x53, 0x68, 0x6f, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e,
0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x73, 0x65, 0x71, 0x75, 0x65, 0x6e,
0x63, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18,
0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70,
0x12, 0x1c, 0x0a, 0x09, 0x77, 0x72, 0x69, 0x74, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20,
0x01, 0x28, 0x04, 0x52, 0x09, 0x77, 0x72, 0x69, 0x74, 0x65, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x18,
0x0a, 0x07, 0x63, 0x61, 0x6e, 0x52, 0x65, 0x61, 0x64, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52,
0x07, 0x63, 0x61, 0x6e, 0x52, 0x65, 0x61, 0x64, 0x12, 0x1d, 0x0a, 0x04, 0x77, 0x72, 0x61, 0x70,
0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x09, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x57, 0x72, 0x61,
0x70, 0x52, 0x04, 0x77, 0x72, 0x61, 0x70, 0x22, 0x78, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61,
0x6d, 0x53, 0x6e, 0x61, 0x70, 0x53, 0x68, 0x6f, 0x74, 0x12, 0x32, 0x0a, 0x0a, 0x76, 0x69, 0x64,
0x65, 0x6f, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e,
0x6d, 0x37, 0x73, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x53, 0x6e, 0x61, 0x70, 0x53, 0x68, 0x6f,
0x74, 0x52, 0x0a, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x12, 0x32, 0x0a,
0x0a, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x18, 0x02, 0x20, 0x03, 0x28,
0x0b, 0x32, 0x12, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x54, 0x72, 0x61, 0x63, 0x6b, 0x53, 0x6e, 0x61,
0x70, 0x53, 0x68, 0x6f, 0x74, 0x52, 0x0a, 0x61, 0x75, 0x64, 0x69, 0x6f, 0x54, 0x72, 0x61, 0x63,
0x6b, 0x32, 0x6d, 0x0a, 0x06, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x12, 0x63, 0x0a, 0x0a, 0x53,
0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, 0x61, 0x70, 0x12, 0x16, 0x2e, 0x6d, 0x37, 0x73, 0x2e,
0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e, 0x61, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x13, 0x2e, 0x6d, 0x37, 0x73, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x6e,
0x61, 0x70, 0x53, 0x68, 0x6f, 0x74, 0x22, 0x28, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x22, 0x12, 0x20,
0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x73, 0x6e, 0x61, 0x70,
0x2f, 0x7b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x3d, 0x2a, 0x2a, 0x7d,
0x42, 0x18, 0x5a, 0x16, 0x6d, 0x37, 0x73, 0x2e, 0x6c, 0x69, 0x76, 0x65, 0x2f, 0x6d, 0x37, 0x73,
0x2f, 0x76, 0x35, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
}
var (
file_global_proto_rawDescOnce sync.Once
file_global_proto_rawDescData = file_global_proto_rawDesc
)
func file_global_proto_rawDescGZIP() []byte {
file_global_proto_rawDescOnce.Do(func() {
file_global_proto_rawDescData = protoimpl.X.CompressGZIP(file_global_proto_rawDescData)
})
return file_global_proto_rawDescData
}
var file_global_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_global_proto_goTypes = []interface{}{
(*StreamSnapRequest)(nil), // 0: m7s.StreamSnapRequest
(*Wrap)(nil), // 1: m7s.Wrap
(*TrackSnapShot)(nil), // 2: m7s.TrackSnapShot
(*StreamSnapShot)(nil), // 3: m7s.StreamSnapShot
}
var file_global_proto_depIdxs = []int32{
1, // 0: m7s.TrackSnapShot.wrap:type_name -> m7s.Wrap
2, // 1: m7s.StreamSnapShot.videoTrack:type_name -> m7s.TrackSnapShot
2, // 2: m7s.StreamSnapShot.audioTrack:type_name -> m7s.TrackSnapShot
0, // 3: m7s.Global.StreamSnap:input_type -> m7s.StreamSnapRequest
3, // 4: m7s.Global.StreamSnap:output_type -> m7s.StreamSnapShot
4, // [4:5] is the sub-list for method output_type
3, // [3:4] is the sub-list for method input_type
3, // [3:3] is the sub-list for extension type_name
3, // [3:3] is the sub-list for extension extendee
0, // [0:3] is the sub-list for field type_name
}
func init() { file_global_proto_init() }
func file_global_proto_init() {
if File_global_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_global_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StreamSnapRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_global_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*Wrap); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_global_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*TrackSnapShot); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_global_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*StreamSnapShot); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_global_proto_rawDesc,
NumEnums: 0,
NumMessages: 4,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_global_proto_goTypes,
DependencyIndexes: file_global_proto_depIdxs,
MessageInfos: file_global_proto_msgTypes,
}.Build()
File_global_proto = out.File
file_global_proto_rawDesc = nil
file_global_proto_goTypes = nil
file_global_proto_depIdxs = nil
}

189
pkg/pb/global.pb.gw.go Normal file
View File

@@ -0,0 +1,189 @@
// Code generated by protoc-gen-grpc-gateway. DO NOT EDIT.
// source: global.proto
/*
Package pb is a reverse proxy.
It translates gRPC into RESTful JSON APIs.
*/
package pb
import (
"context"
"io"
"net/http"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/grpc-ecosystem/grpc-gateway/v2/utilities"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)
// Suppress "imported and not used" errors
var _ codes.Code
var _ io.Reader
var _ status.Status
var _ = runtime.String
var _ = utilities.NewDoubleArray
var _ = metadata.Join
func request_Global_StreamSnap_0(ctx context.Context, marshaler runtime.Marshaler, client GlobalClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq StreamSnapRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["streamPath"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath")
}
protoReq.StreamPath, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "streamPath", err)
}
msg, err := client.StreamSnap(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err
}
func local_request_Global_StreamSnap_0(ctx context.Context, marshaler runtime.Marshaler, server GlobalServer, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) {
var protoReq StreamSnapRequest
var metadata runtime.ServerMetadata
var (
val string
ok bool
err error
_ = err
)
val, ok = pathParams["streamPath"]
if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "streamPath")
}
protoReq.StreamPath, err = runtime.String(val)
if err != nil {
return nil, metadata, status.Errorf(codes.InvalidArgument, "type mismatch, parameter: %s, error: %v", "streamPath", err)
}
msg, err := server.StreamSnap(ctx, &protoReq)
return msg, metadata, err
}
// RegisterGlobalHandlerServer registers the http handlers for service Global to "mux".
// UnaryRPC :call GlobalServer directly.
// StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906.
// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterGlobalHandlerFromEndpoint instead.
func RegisterGlobalHandlerServer(ctx context.Context, mux *runtime.ServeMux, server GlobalServer) error {
mux.Handle("GET", pattern_Global_StreamSnap_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
var stream runtime.ServerTransportStream
ctx = grpc.NewContextWithServerTransportStream(ctx, &stream)
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, "/m7s.Global/StreamSnap", runtime.WithHTTPPathPattern("/api/stream/snap/{streamPath=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := local_request_Global_StreamSnap_0(annotatedContext, inboundMarshaler, server, req, pathParams)
md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer())
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Global_StreamSnap_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
// RegisterGlobalHandlerFromEndpoint is same as RegisterGlobalHandler but
// automatically dials to "endpoint" and closes the connection when "ctx" gets done.
func RegisterGlobalHandlerFromEndpoint(ctx context.Context, mux *runtime.ServeMux, endpoint string, opts []grpc.DialOption) (err error) {
conn, err := grpc.DialContext(ctx, endpoint, opts...)
if err != nil {
return err
}
defer func() {
if err != nil {
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
}
return
}
go func() {
<-ctx.Done()
if cerr := conn.Close(); cerr != nil {
grpclog.Infof("Failed to close conn to %s: %v", endpoint, cerr)
}
}()
}()
return RegisterGlobalHandler(ctx, mux, conn)
}
// RegisterGlobalHandler registers the http handlers for service Global to "mux".
// The handlers forward requests to the grpc endpoint over "conn".
func RegisterGlobalHandler(ctx context.Context, mux *runtime.ServeMux, conn *grpc.ClientConn) error {
return RegisterGlobalHandlerClient(ctx, mux, NewGlobalClient(conn))
}
// RegisterGlobalHandlerClient registers the http handlers for service Global
// to "mux". The handlers forward requests to the grpc endpoint over the given implementation of "GlobalClient".
// Note: the gRPC framework executes interceptors within the gRPC handler. If the passed in "GlobalClient"
// doesn't go through the normal gRPC flow (creating a gRPC client etc.) then it will be up to the passed in
// "GlobalClient" to call the correct interceptors.
func RegisterGlobalHandlerClient(ctx context.Context, mux *runtime.ServeMux, client GlobalClient) error {
mux.Handle("GET", pattern_Global_StreamSnap_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) {
ctx, cancel := context.WithCancel(req.Context())
defer cancel()
inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req)
var err error
var annotatedContext context.Context
annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, "/m7s.Global/StreamSnap", runtime.WithHTTPPathPattern("/api/stream/snap/{streamPath=**}"))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
return
}
resp, md, err := request_Global_StreamSnap_0(annotatedContext, inboundMarshaler, client, req, pathParams)
annotatedContext = runtime.NewServerMetadataContext(annotatedContext, md)
if err != nil {
runtime.HTTPError(annotatedContext, mux, outboundMarshaler, w, req, err)
return
}
forward_Global_StreamSnap_0(annotatedContext, mux, outboundMarshaler, w, req, resp, mux.GetForwardResponseOptions()...)
})
return nil
}
var (
pattern_Global_StreamSnap_0 = runtime.MustPattern(runtime.NewPattern(1, []int{2, 0, 2, 1, 2, 2, 3, 0, 4, 1, 5, 3}, []string{"api", "stream", "snap", "streamPath"}, ""))
)
var (
forward_Global_StreamSnap_0 = runtime.ForwardResponseMessage
)

35
pkg/pb/global.proto Normal file
View File

@@ -0,0 +1,35 @@
syntax = "proto3";
import "google/api/annotations.proto";
package m7s;
option go_package="m7s.live/m7s/v5/pkg/pb";
service Global {
rpc StreamSnap (StreamSnapRequest) returns (StreamSnapShot) {
option (google.api.http) = {
get: "/api/stream/snap/{streamPath=**}"
};
}
}
message StreamSnapRequest {
string streamPath = 1;
}
message Wrap {
uint32 timestamp = 1;
uint32 size = 2;
string data = 3;
}
message TrackSnapShot {
uint32 sequence = 1;
uint32 timestamp = 2;
uint64 writeTime = 3;
bool canRead = 4;
Wrap wrap = 5;
}
message StreamSnapShot {
repeated TrackSnapShot videoTrack = 1;
repeated TrackSnapShot audioTrack = 2;
}

105
pkg/pb/global_grpc.pb.go Normal file
View File

@@ -0,0 +1,105 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.19.1
// source: global.proto
package pb
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// GlobalClient is the client API for Global service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type GlobalClient interface {
StreamSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*StreamSnapShot, error)
}
type globalClient struct {
cc grpc.ClientConnInterface
}
func NewGlobalClient(cc grpc.ClientConnInterface) GlobalClient {
return &globalClient{cc}
}
func (c *globalClient) StreamSnap(ctx context.Context, in *StreamSnapRequest, opts ...grpc.CallOption) (*StreamSnapShot, error) {
out := new(StreamSnapShot)
err := c.cc.Invoke(ctx, "/m7s.Global/StreamSnap", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// GlobalServer is the server API for Global service.
// All implementations must embed UnimplementedGlobalServer
// for forward compatibility
type GlobalServer interface {
StreamSnap(context.Context, *StreamSnapRequest) (*StreamSnapShot, error)
mustEmbedUnimplementedGlobalServer()
}
// UnimplementedGlobalServer must be embedded to have forward compatible implementations.
type UnimplementedGlobalServer struct {
}
func (UnimplementedGlobalServer) StreamSnap(context.Context, *StreamSnapRequest) (*StreamSnapShot, error) {
return nil, status.Errorf(codes.Unimplemented, "method StreamSnap not implemented")
}
func (UnimplementedGlobalServer) mustEmbedUnimplementedGlobalServer() {}
// UnsafeGlobalServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to GlobalServer will
// result in compilation errors.
type UnsafeGlobalServer interface {
mustEmbedUnimplementedGlobalServer()
}
func RegisterGlobalServer(s grpc.ServiceRegistrar, srv GlobalServer) {
s.RegisterService(&Global_ServiceDesc, srv)
}
func _Global_StreamSnap_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(StreamSnapRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(GlobalServer).StreamSnap(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/m7s.Global/StreamSnap",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(GlobalServer).StreamSnap(ctx, req.(*StreamSnapRequest))
}
return interceptor(ctx, in, info, handler)
}
// Global_ServiceDesc is the grpc.ServiceDesc for Global service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Global_ServiceDesc = grpc.ServiceDesc{
ServiceName: "m7s.Global",
HandlerType: (*GlobalServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "StreamSnap",
Handler: _Global_StreamSnap_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "global.proto",
}

View File

@@ -0,0 +1,31 @@
// Copyright 2015 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package google.api;
import "google/api/http.proto";
import "google/protobuf/descriptor.proto";
option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations";
option java_multiple_files = true;
option java_outer_classname = "AnnotationsProto";
option java_package = "com.google.api";
option objc_class_prefix = "GAPI";
extend google.protobuf.MethodOptions {
// See `HttpRule`.
HttpRule http = 72295728;
}

View File

@@ -0,0 +1,104 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package google.api;
import "google/protobuf/descriptor.proto";
option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations";
option java_multiple_files = true;
option java_outer_classname = "FieldBehaviorProto";
option java_package = "com.google.api";
option objc_class_prefix = "GAPI";
extend google.protobuf.FieldOptions {
// A designation of a specific field behavior (required, output only, etc.)
// in protobuf messages.
//
// Examples:
//
// string name = 1 [(google.api.field_behavior) = REQUIRED];
// State state = 1 [(google.api.field_behavior) = OUTPUT_ONLY];
// google.protobuf.Duration ttl = 1
// [(google.api.field_behavior) = INPUT_ONLY];
// google.protobuf.Timestamp expire_time = 1
// [(google.api.field_behavior) = OUTPUT_ONLY,
// (google.api.field_behavior) = IMMUTABLE];
repeated google.api.FieldBehavior field_behavior = 1052 [packed = false];
}
// An indicator of the behavior of a given field (for example, that a field
// is required in requests, or given as output but ignored as input).
// This **does not** change the behavior in protocol buffers itself; it only
// denotes the behavior and may affect how API tooling handles the field.
//
// Note: This enum **may** receive new values in the future.
enum FieldBehavior {
// Conventional default for enums. Do not use this.
FIELD_BEHAVIOR_UNSPECIFIED = 0;
// Specifically denotes a field as optional.
// While all fields in protocol buffers are optional, this may be specified
// for emphasis if appropriate.
OPTIONAL = 1;
// Denotes a field as required.
// This indicates that the field **must** be provided as part of the request,
// and failure to do so will cause an error (usually `INVALID_ARGUMENT`).
REQUIRED = 2;
// Denotes a field as output only.
// This indicates that the field is provided in responses, but including the
// field in a request does nothing (the server *must* ignore it and
// *must not* throw an error as a result of the field's presence).
OUTPUT_ONLY = 3;
// Denotes a field as input only.
// This indicates that the field is provided in requests, and the
// corresponding field is not included in output.
INPUT_ONLY = 4;
// Denotes a field as immutable.
// This indicates that the field may be set once in a request to create a
// resource, but may not be changed thereafter.
IMMUTABLE = 5;
// Denotes that a (repeated) field is an unordered list.
// This indicates that the service may provide the elements of the list
// in any arbitrary order, rather than the order the user originally
// provided. Additionally, the list's order may or may not be stable.
UNORDERED_LIST = 6;
// Denotes that this field returns a non-empty default value if not set.
// This indicates that if the user provides the empty value in a request,
// a non-empty value will be returned. The user will not be aware of what
// non-empty value to expect.
NON_EMPTY_DEFAULT = 7;
// Denotes that the field in a resource (a message annotated with
// google.api.resource) is used in the resource name to uniquely identify the
// resource. For AIP-compliant APIs, this should only be applied to the
// `name` field on the resource.
//
// This behavior should not be applied to references to other resources within
// the message.
//
// The identifier field of resources often have different field behavior
// depending on the request it is embedded in (e.g. for Create methods name
// is optional and unused, while for Update methods it is required). Instead
// of method-specific annotations, only `IDENTIFIER` is required.
IDENTIFIER = 8;
}

View File

@@ -0,0 +1,379 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package google.api;
option cc_enable_arenas = true;
option go_package = "google.golang.org/genproto/googleapis/api/annotations;annotations";
option java_multiple_files = true;
option java_outer_classname = "HttpProto";
option java_package = "com.google.api";
option objc_class_prefix = "GAPI";
// Defines the HTTP configuration for an API service. It contains a list of
// [HttpRule][google.api.HttpRule], each specifying the mapping of an RPC method
// to one or more HTTP REST API methods.
message Http {
// A list of HTTP configuration rules that apply to individual API methods.
//
// **NOTE:** All service configuration rules follow "last one wins" order.
repeated HttpRule rules = 1;
// When set to true, URL path parameters will be fully URI-decoded except in
// cases of single segment matches in reserved expansion, where "%2F" will be
// left encoded.
//
// The default behavior is to not decode RFC 6570 reserved characters in multi
// segment matches.
bool fully_decode_reserved_expansion = 2;
}
// # gRPC Transcoding
//
// gRPC Transcoding is a feature for mapping between a gRPC method and one or
// more HTTP REST endpoints. It allows developers to build a single API service
// that supports both gRPC APIs and REST APIs. Many systems, including [Google
// APIs](https://github.com/googleapis/googleapis),
// [Cloud Endpoints](https://cloud.google.com/endpoints), [gRPC
// Gateway](https://github.com/grpc-ecosystem/grpc-gateway),
// and [Envoy](https://github.com/envoyproxy/envoy) proxy support this feature
// and use it for large scale production services.
//
// `HttpRule` defines the schema of the gRPC/REST mapping. The mapping specifies
// how different portions of the gRPC request message are mapped to the URL
// path, URL query parameters, and HTTP request body. It also controls how the
// gRPC response message is mapped to the HTTP response body. `HttpRule` is
// typically specified as an `google.api.http` annotation on the gRPC method.
//
// Each mapping specifies a URL path template and an HTTP method. The path
// template may refer to one or more fields in the gRPC request message, as long
// as each field is a non-repeated field with a primitive (non-message) type.
// The path template controls how fields of the request message are mapped to
// the URL path.
//
// Example:
//
// service Messaging {
// rpc GetMessage(GetMessageRequest) returns (Message) {
// option (google.api.http) = {
// get: "/v1/{name=messages/*}"
// };
// }
// }
// message GetMessageRequest {
// string name = 1; // Mapped to URL path.
// }
// message Message {
// string text = 1; // The resource content.
// }
//
// This enables an HTTP REST to gRPC mapping as below:
//
// HTTP | gRPC
// -----|-----
// `GET /v1/messages/123456` | `GetMessage(name: "messages/123456")`
//
// Any fields in the request message which are not bound by the path template
// automatically become HTTP query parameters if there is no HTTP request body.
// For example:
//
// service Messaging {
// rpc GetMessage(GetMessageRequest) returns (Message) {
// option (google.api.http) = {
// get:"/v1/messages/{message_id}"
// };
// }
// }
// message GetMessageRequest {
// message SubMessage {
// string subfield = 1;
// }
// string message_id = 1; // Mapped to URL path.
// int64 revision = 2; // Mapped to URL query parameter `revision`.
// SubMessage sub = 3; // Mapped to URL query parameter `sub.subfield`.
// }
//
// This enables a HTTP JSON to RPC mapping as below:
//
// HTTP | gRPC
// -----|-----
// `GET /v1/messages/123456?revision=2&sub.subfield=foo` |
// `GetMessage(message_id: "123456" revision: 2 sub: SubMessage(subfield:
// "foo"))`
//
// Note that fields which are mapped to URL query parameters must have a
// primitive type or a repeated primitive type or a non-repeated message type.
// In the case of a repeated type, the parameter can be repeated in the URL
// as `...?param=A&param=B`. In the case of a message type, each field of the
// message is mapped to a separate parameter, such as
// `...?foo.a=A&foo.b=B&foo.c=C`.
//
// For HTTP methods that allow a request body, the `body` field
// specifies the mapping. Consider a REST update method on the
// message resource collection:
//
// service Messaging {
// rpc UpdateMessage(UpdateMessageRequest) returns (Message) {
// option (google.api.http) = {
// patch: "/v1/messages/{message_id}"
// body: "message"
// };
// }
// }
// message UpdateMessageRequest {
// string message_id = 1; // mapped to the URL
// Message message = 2; // mapped to the body
// }
//
// The following HTTP JSON to RPC mapping is enabled, where the
// representation of the JSON in the request body is determined by
// protos JSON encoding:
//
// HTTP | gRPC
// -----|-----
// `PATCH /v1/messages/123456 { "text": "Hi!" }` | `UpdateMessage(message_id:
// "123456" message { text: "Hi!" })`
//
// The special name `*` can be used in the body mapping to define that
// every field not bound by the path template should be mapped to the
// request body. This enables the following alternative definition of
// the update method:
//
// service Messaging {
// rpc UpdateMessage(Message) returns (Message) {
// option (google.api.http) = {
// patch: "/v1/messages/{message_id}"
// body: "*"
// };
// }
// }
// message Message {
// string message_id = 1;
// string text = 2;
// }
//
//
// The following HTTP JSON to RPC mapping is enabled:
//
// HTTP | gRPC
// -----|-----
// `PATCH /v1/messages/123456 { "text": "Hi!" }` | `UpdateMessage(message_id:
// "123456" text: "Hi!")`
//
// Note that when using `*` in the body mapping, it is not possible to
// have HTTP parameters, as all fields not bound by the path end in
// the body. This makes this option more rarely used in practice when
// defining REST APIs. The common usage of `*` is in custom methods
// which don't use the URL at all for transferring data.
//
// It is possible to define multiple HTTP methods for one RPC by using
// the `additional_bindings` option. Example:
//
// service Messaging {
// rpc GetMessage(GetMessageRequest) returns (Message) {
// option (google.api.http) = {
// get: "/v1/messages/{message_id}"
// additional_bindings {
// get: "/v1/users/{user_id}/messages/{message_id}"
// }
// };
// }
// }
// message GetMessageRequest {
// string message_id = 1;
// string user_id = 2;
// }
//
// This enables the following two alternative HTTP JSON to RPC mappings:
//
// HTTP | gRPC
// -----|-----
// `GET /v1/messages/123456` | `GetMessage(message_id: "123456")`
// `GET /v1/users/me/messages/123456` | `GetMessage(user_id: "me" message_id:
// "123456")`
//
// ## Rules for HTTP mapping
//
// 1. Leaf request fields (recursive expansion nested messages in the request
// message) are classified into three categories:
// - Fields referred by the path template. They are passed via the URL path.
// - Fields referred by the [HttpRule.body][google.api.HttpRule.body]. They
// are passed via the HTTP
// request body.
// - All other fields are passed via the URL query parameters, and the
// parameter name is the field path in the request message. A repeated
// field can be represented as multiple query parameters under the same
// name.
// 2. If [HttpRule.body][google.api.HttpRule.body] is "*", there is no URL
// query parameter, all fields
// are passed via URL path and HTTP request body.
// 3. If [HttpRule.body][google.api.HttpRule.body] is omitted, there is no HTTP
// request body, all
// fields are passed via URL path and URL query parameters.
//
// ### Path template syntax
//
// Template = "/" Segments [ Verb ] ;
// Segments = Segment { "/" Segment } ;
// Segment = "*" | "**" | LITERAL | Variable ;
// Variable = "{" FieldPath [ "=" Segments ] "}" ;
// FieldPath = IDENT { "." IDENT } ;
// Verb = ":" LITERAL ;
//
// The syntax `*` matches a single URL path segment. The syntax `**` matches
// zero or more URL path segments, which must be the last part of the URL path
// except the `Verb`.
//
// The syntax `Variable` matches part of the URL path as specified by its
// template. A variable template must not contain other variables. If a variable
// matches a single path segment, its template may be omitted, e.g. `{var}`
// is equivalent to `{var=*}`.
//
// The syntax `LITERAL` matches literal text in the URL path. If the `LITERAL`
// contains any reserved character, such characters should be percent-encoded
// before the matching.
//
// If a variable contains exactly one path segment, such as `"{var}"` or
// `"{var=*}"`, when such a variable is expanded into a URL path on the client
// side, all characters except `[-_.~0-9a-zA-Z]` are percent-encoded. The
// server side does the reverse decoding. Such variables show up in the
// [Discovery
// Document](https://developers.google.com/discovery/v1/reference/apis) as
// `{var}`.
//
// If a variable contains multiple path segments, such as `"{var=foo/*}"`
// or `"{var=**}"`, when such a variable is expanded into a URL path on the
// client side, all characters except `[-_.~/0-9a-zA-Z]` are percent-encoded.
// The server side does the reverse decoding, except "%2F" and "%2f" are left
// unchanged. Such variables show up in the
// [Discovery
// Document](https://developers.google.com/discovery/v1/reference/apis) as
// `{+var}`.
//
// ## Using gRPC API Service Configuration
//
// gRPC API Service Configuration (service config) is a configuration language
// for configuring a gRPC service to become a user-facing product. The
// service config is simply the YAML representation of the `google.api.Service`
// proto message.
//
// As an alternative to annotating your proto file, you can configure gRPC
// transcoding in your service config YAML files. You do this by specifying a
// `HttpRule` that maps the gRPC method to a REST endpoint, achieving the same
// effect as the proto annotation. This can be particularly useful if you
// have a proto that is reused in multiple services. Note that any transcoding
// specified in the service config will override any matching transcoding
// configuration in the proto.
//
// Example:
//
// http:
// rules:
// # Selects a gRPC method and applies HttpRule to it.
// - selector: example.v1.Messaging.GetMessage
// get: /v1/messages/{message_id}/{sub.subfield}
//
// ## Special notes
//
// When gRPC Transcoding is used to map a gRPC to JSON REST endpoints, the
// proto to JSON conversion must follow the [proto3
// specification](https://developers.google.com/protocol-buffers/docs/proto3#json).
//
// While the single segment variable follows the semantics of
// [RFC 6570](https://tools.ietf.org/html/rfc6570) Section 3.2.2 Simple String
// Expansion, the multi segment variable **does not** follow RFC 6570 Section
// 3.2.3 Reserved Expansion. The reason is that the Reserved Expansion
// does not expand special characters like `?` and `#`, which would lead
// to invalid URLs. As the result, gRPC Transcoding uses a custom encoding
// for multi segment variables.
//
// The path variables **must not** refer to any repeated or mapped field,
// because client libraries are not capable of handling such variable expansion.
//
// The path variables **must not** capture the leading "/" character. The reason
// is that the most common use case "{var}" does not capture the leading "/"
// character. For consistency, all path variables must share the same behavior.
//
// Repeated message fields must not be mapped to URL query parameters, because
// no client library can support such complicated mapping.
//
// If an API needs to use a JSON array for request or response body, it can map
// the request or response body to a repeated field. However, some gRPC
// Transcoding implementations may not support this feature.
message HttpRule {
// Selects a method to which this rule applies.
//
// Refer to [selector][google.api.DocumentationRule.selector] for syntax
// details.
string selector = 1;
// Determines the URL pattern is matched by this rules. This pattern can be
// used with any of the {get|put|post|delete|patch} methods. A custom method
// can be defined using the 'custom' field.
oneof pattern {
// Maps to HTTP GET. Used for listing and getting information about
// resources.
string get = 2;
// Maps to HTTP PUT. Used for replacing a resource.
string put = 3;
// Maps to HTTP POST. Used for creating a resource or performing an action.
string post = 4;
// Maps to HTTP DELETE. Used for deleting a resource.
string delete = 5;
// Maps to HTTP PATCH. Used for updating a resource.
string patch = 6;
// The custom pattern is used for specifying an HTTP method that is not
// included in the `pattern` field, such as HEAD, or "*" to leave the
// HTTP method unspecified for this rule. The wild-card rule is useful
// for services that provide content to Web (HTML) clients.
CustomHttpPattern custom = 8;
}
// The name of the request field whose value is mapped to the HTTP request
// body, or `*` for mapping all request fields not captured by the path
// pattern to the HTTP body, or omitted for not having any HTTP request body.
//
// NOTE: the referred field must be present at the top-level of the request
// message type.
string body = 7;
// Optional. The name of the response field whose value is mapped to the HTTP
// response body. When omitted, the entire response message will be used
// as the HTTP response body.
//
// NOTE: The referred field must be present at the top-level of the response
// message type.
string response_body = 12;
// Additional HTTP bindings for the selector. Nested bindings must
// not contain an `additional_bindings` field themselves (that is,
// the nesting may only be one level deep).
repeated HttpRule additional_bindings = 11;
}
// A custom pattern is used for defining custom HTTP verb.
message CustomHttpPattern {
// The name of this custom HTTP verb.
string kind = 1;
// The path matched by this custom verb.
string path = 2;
}

View File

@@ -0,0 +1,81 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package google.api;
import "google/protobuf/any.proto";
option cc_enable_arenas = true;
option go_package = "google.golang.org/genproto/googleapis/api/httpbody;httpbody";
option java_multiple_files = true;
option java_outer_classname = "HttpBodyProto";
option java_package = "com.google.api";
option objc_class_prefix = "GAPI";
// Message that represents an arbitrary HTTP body. It should only be used for
// payload formats that can't be represented as JSON, such as raw binary or
// an HTML page.
//
//
// This message can be used both in streaming and non-streaming API methods in
// the request as well as the response.
//
// It can be used as a top-level request field, which is convenient if one
// wants to extract parameters from either the URL or HTTP template into the
// request fields and also want access to the raw HTTP body.
//
// Example:
//
// message GetResourceRequest {
// // A unique request id.
// string request_id = 1;
//
// // The raw HTTP body is bound to this field.
// google.api.HttpBody http_body = 2;
//
// }
//
// service ResourceService {
// rpc GetResource(GetResourceRequest)
// returns (google.api.HttpBody);
// rpc UpdateResource(google.api.HttpBody)
// returns (google.protobuf.Empty);
//
// }
//
// Example with streaming methods:
//
// service CaldavService {
// rpc GetCalendar(stream google.api.HttpBody)
// returns (stream google.api.HttpBody);
// rpc UpdateCalendar(stream google.api.HttpBody)
// returns (stream google.api.HttpBody);
//
// }
//
// Use of this type only changes how the request and response bodies are
// handled, all other features will continue to work unchanged.
message HttpBody {
// The HTTP Content-Type header value specifying the content type of the body.
string content_type = 1;
// The HTTP request/response body as raw binary.
bytes data = 2;
// Application specific response metadata. Must be set in the first response
// for streaming APIs.
repeated google.protobuf.Any extensions = 3;
}

File diff suppressed because it is too large Load Diff

View File

@@ -9,16 +9,22 @@ type Promise[T any] struct {
context.Context context.Context
context.CancelCauseFunc context.CancelCauseFunc
Value T Value T
// timer *time.Timer
} }
func NewPromise[T any](v T) *Promise[T] { func NewPromise[T any](v T) *Promise[T] {
p := &Promise[T]{Value: v} p := &Promise[T]{Value: v}
p.Context, p.CancelCauseFunc = context.WithCancelCause(context.Background()) p.Context, p.CancelCauseFunc = context.WithCancelCause(context.Background())
// p.timer = time.AfterFunc(time.Second, func() {
// p.CancelCauseFunc(ErrTimeout)
// })
return p return p
} }
var ErrResolve = errors.New("promise resolved") var ErrResolve = errors.New("promise resolved")
var ErrTimeout = errors.New("promise timeout")
func (p *Promise[T]) Fulfill(err error) { func (p *Promise[T]) Fulfill(err error) {
// p.timer.Stop()
p.CancelCauseFunc(Conditoinal(err == nil, ErrResolve, err)) p.CancelCauseFunc(Conditoinal(err == nil, ErrResolve, err))
} }

View File

@@ -126,11 +126,12 @@ func (r *Ring[T]) Len() int {
// Do calls function f on each element of the ring, in forward order. // Do calls function f on each element of the ring, in forward order.
// The behavior of Do is undefined if f changes *r. // The behavior of Do is undefined if f changes *r.
func (r *Ring[T]) Do(f func(T)) { func (r *Ring[T]) Do(f func(*T)) {
if r != nil { if r != nil {
f(r.Value) f(&r.Value)
for p := r.Next(); p != r; p = p.next { for p := r.Next(); p != r; p = p.next {
f(p.Value) f(&p.Value)
} }
} }
} }

View File

@@ -3,7 +3,6 @@ package m7s
import ( import (
"context" "context"
"net" "net"
"net/http"
"os" "os"
"path/filepath" "path/filepath"
"reflect" "reflect"
@@ -160,7 +159,7 @@ func (p *Plugin) assign() {
} }
p.Config.ParseModifyFile(modifyConfig) p.Config.ParseModifyFile(modifyConfig)
} }
p.registerHandler() // p.registerHandler()
} }
func (p *Plugin) Stop(err error) { func (p *Plugin) Stop(err error) {
@@ -265,48 +264,48 @@ func (p *Plugin) Subscribe(streamPath string, options ...any) (subscriber *Subsc
return subscriber, sendPromiseToServer(p.server, subscriber) return subscriber, sendPromiseToServer(p.server, subscriber)
} }
func (p *Plugin) registerHandler() { // func (p *Plugin) registerHandler() {
t := reflect.TypeOf(p.handler) // t := reflect.TypeOf(p.handler)
v := reflect.ValueOf(p.handler) // v := reflect.ValueOf(p.handler)
// 注册http响应 // // 注册http响应
for i, j := 0, t.NumMethod(); i < j; i++ { // for i, j := 0, t.NumMethod(); i < j; i++ {
name := t.Method(i).Name // name := t.Method(i).Name
if name == "ServeHTTP" { // if name == "ServeHTTP" {
continue // continue
} // }
switch handler := v.Method(i).Interface().(type) { // switch handler := v.Method(i).Interface().(type) {
case func(http.ResponseWriter, *http.Request): // case func(http.ResponseWriter, *http.Request):
patten := strings.ToLower(strings.ReplaceAll(name, "_", "/")) // patten := strings.ToLower(strings.ReplaceAll(name, "_", "/"))
p.handle(patten, http.HandlerFunc(handler)) // p.handle(patten, http.HandlerFunc(handler))
} // }
} // }
if rootHandler, ok := p.handler.(http.Handler); ok { // if rootHandler, ok := p.handler.(http.Handler); ok {
p.handle("/", rootHandler) // p.handle("/", rootHandler)
} // }
} // }
func (p *Plugin) logHandler(handler http.Handler) http.Handler { // func (p *Plugin) logHandler(handler http.Handler) http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { // return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
p.Debug("visit", "path", r.URL.String(), "remote", r.RemoteAddr) // p.Debug("visit", "path", r.URL.String(), "remote", r.RemoteAddr)
name := strings.ToLower(p.Meta.Name) // name := strings.ToLower(p.Meta.Name)
r.URL.Path = strings.TrimPrefix(r.URL.Path, "/"+name) // r.URL.Path = strings.TrimPrefix(r.URL.Path, "/"+name)
handler.ServeHTTP(rw, r) // handler.ServeHTTP(rw, r)
}) // })
} // }
func (p *Plugin) handle(pattern string, handler http.Handler) { // func (p *Plugin) handle(pattern string, handler http.Handler) {
if p == nil { // if p == nil {
return // return
} // }
if !strings.HasPrefix(pattern, "/") { // if !strings.HasPrefix(pattern, "/") {
pattern = "/" + pattern // pattern = "/" + pattern
} // }
handler = p.logHandler(handler) // handler = p.logHandler(handler)
p.GetCommonConf().Handle(pattern, handler) // p.GetCommonConf().Handle(pattern, handler)
if p.server != p.handler { // if p.server != p.handler {
pattern = "/" + strings.ToLower(p.Meta.Name) + pattern // pattern = "/" + strings.ToLower(p.Meta.Name) + pattern
p.Debug("http handle added to server", "pattern", pattern) // p.Debug("http handle added to server", "pattern", pattern)
p.server.GetCommonConf().Handle(pattern, handler) // p.server.GetCommonConf().Handle(pattern, handler)
} // }
p.server.apiList = append(p.server.apiList, pattern) // p.server.apiList = append(p.server.apiList, pattern)
} // }

View File

@@ -29,6 +29,10 @@ func (avcc *RTMPData) GetSize() int {
return avcc.Length return avcc.Length
} }
func (avcc *RTMPData) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`{"Timestamp":%d,"Size":%d,"Data":"%s"}`, avcc.Timestamp, avcc.Length, avcc.Print())), nil
}
func (avcc *RTMPData) Print() string { func (avcc *RTMPData) Print() string {
return fmt.Sprintf("% 02X", avcc.Buffers.Buffers[0][:5]) return fmt.Sprintf("% 02X", avcc.Buffers.Buffers[0][:5])
} }

View File

@@ -7,6 +7,7 @@ import (
. "m7s.live/m7s/v5/pkg" . "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config" "m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/pb"
) )
type PublisherState int type PublisherState int
@@ -20,14 +21,14 @@ const (
type Publisher struct { type Publisher struct {
PubSubBase PubSubBase
sync.RWMutex sync.RWMutex `json:"-" yaml:"-"`
config.Publish config.Publish
State PublisherState State PublisherState
VideoTrack *AVTrack VideoTrack *AVTrack
AudioTrack *AVTrack AudioTrack *AVTrack
DataTrack *DataTrack DataTrack *DataTrack
TransTrack map[reflect.Type]*AVTrack TransTrack map[reflect.Type]*AVTrack `json:"-" yaml:"-"`
Subscribers map[*Subscriber]struct{} Subscribers map[*Subscriber]struct{} `json:"-" yaml:"-"`
GOP int GOP int
baseTs time.Duration baseTs time.Duration
lastTs time.Duration lastTs time.Duration
@@ -222,3 +223,42 @@ func (p *Publisher) TakeOver(old *Publisher) {
// track.ICodecCtx = nil // track.ICodecCtx = nil
// } // }
} }
func (p *Publisher) SnapShot() (ret *pb.StreamSnapShot) {
ret = &pb.StreamSnapShot{}
if p.VideoTrack != nil {
p.VideoTrack.Ring.Do(func(v *AVFrame) {
var snap pb.TrackSnapShot
snap.CanRead = v.CanRead
snap.Sequence = v.Sequence
snap.Timestamp = uint32(v.Timestamp)
snap.WriteTime = uint64(v.WriteTime.UnixNano())
if v.Wrap != nil {
snap.Wrap = &pb.Wrap{
Timestamp: uint32(v.Wrap.GetTimestamp()),
Size: uint32(v.Wrap.GetSize()),
Data: v.Wrap.Print(),
}
}
ret.VideoTrack = append(ret.VideoTrack, &snap)
})
}
if p.AudioTrack != nil {
p.AudioTrack.Ring.Do(func(v *AVFrame) {
var snap pb.TrackSnapShot
snap.CanRead = v.CanRead
snap.Sequence = v.Sequence
snap.Timestamp = uint32(v.Timestamp)
snap.WriteTime = uint64(v.WriteTime.UnixNano())
if v.Wrap != nil {
snap.Wrap = &pb.Wrap{
Timestamp: uint32(v.Wrap.GetTimestamp()),
Size: uint32(v.Wrap.GetSize()),
Data: v.Wrap.Print(),
}
}
ret.AudioTrack = append(ret.AudioTrack, &snap)
})
}
return
}

View File

@@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"log/slog" "log/slog"
"net"
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
@@ -13,10 +14,14 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/mcuadros/go-defaults" "github.com/mcuadros/go-defaults"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
. "m7s.live/m7s/v5/pkg" . "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/config" "m7s.live/m7s/v5/pkg/config"
"m7s.live/m7s/v5/pkg/pb"
"m7s.live/m7s/v5/pkg/util" "m7s.live/m7s/v5/pkg/util"
) )
@@ -34,6 +39,7 @@ var (
) )
type Server struct { type Server struct {
pb.UnimplementedGlobalServer
Plugin Plugin
config.Engine config.Engine
eventChan chan any eventChan chan any
@@ -69,8 +75,11 @@ func Run(ctx context.Context, conf any) error {
func (s *Server) Run(ctx context.Context, conf any) (err error) { func (s *Server) Run(ctx context.Context, conf any) (err error) {
s.Logger = slog.With("server", serverIndexG.Add(1)) s.Logger = slog.With("server", serverIndexG.Add(1))
s.Context, s.CancelCauseFunc = context.WithCancelCause(ctx) s.Context, s.CancelCauseFunc = context.WithCancelCause(ctx)
mux := runtime.NewServeMux()
s.config.HTTP.SetMux(mux)
s.config.HTTP.ListenAddrTLS = ":8443" s.config.HTTP.ListenAddrTLS = ":8443"
s.config.HTTP.ListenAddr = ":8080" s.config.HTTP.ListenAddr = ":8080"
s.Info("start") s.Info("start")
var cg map[string]map[string]any var cg map[string]map[string]any
@@ -103,7 +112,7 @@ func (s *Server) Run(ctx context.Context, conf any) (err error) {
var lv slog.LevelVar var lv slog.LevelVar
lv.UnmarshalText([]byte(s.LogLevel)) lv.UnmarshalText([]byte(s.LogLevel))
slog.SetLogLoggerLevel(lv.Level()) slog.SetLogLoggerLevel(lv.Level())
s.registerHandler() // s.registerHandler()
if s.config.HTTP.ListenAddrTLS != "" { if s.config.HTTP.ListenAddrTLS != "" {
s.Info("https listen at ", "addr", s.config.HTTP.ListenAddrTLS) s.Info("https listen at ", "addr", s.config.HTTP.ListenAddrTLS)
go func() { go func() {
@@ -116,6 +125,24 @@ func (s *Server) Run(ctx context.Context, conf any) (err error) {
s.Stop(s.config.HTTP.Listen()) s.Stop(s.config.HTTP.Listen())
}() }()
} }
if s.config.TCP.ListenAddr != "" {
lis, err := net.Listen("tcp", s.config.TCP.ListenAddr)
if err != nil {
s.Error("failed to listen", "error", err)
return err
}
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
pb.RegisterGlobalServer(grpcServer, s)
gwopts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
if err = pb.RegisterGlobalHandlerFromEndpoint(ctx, mux, s.config.TCP.ListenAddr, gwopts); err != nil {
s.Error("register handler faild", "error", err)
return err
}
go func() {
s.Stop(grpcServer.Serve(lis))
}()
}
for _, plugin := range plugins { for _, plugin := range plugins {
plugin.Init(s, cg[strings.ToLower(plugin.Name)]) plugin.Init(s, cg[strings.ToLower(plugin.Name)])
} }
@@ -138,6 +165,16 @@ func (s *Server) eventLoop() {
defer pulse.Stop() defer pulse.Stop()
cases := []reflect.SelectCase{{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(s.Done())}, {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(pulse.C)}, {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(s.eventChan)}} cases := []reflect.SelectCase{{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(s.Done())}, {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(pulse.C)}, {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(s.eventChan)}}
var pubCount, subCount int var pubCount, subCount int
addPublisher := func(publisher *Publisher) {
if nl := len(s.Publishers); nl > pubCount {
pubCount = nl
if subCount == 0 {
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(publisher.Done())})
} else {
cases = slices.Insert(cases, 3+pubCount, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(publisher.Done())})
}
}
}
for { for {
switch chosen, rev, _ := reflect.Select(cases); chosen { switch chosen, rev, _ := reflect.Select(cases); chosen {
case 0: case 0:
@@ -161,18 +198,17 @@ func (s *Server) eventLoop() {
event := rev.Interface() event := rev.Interface()
switch v := event.(type) { switch v := event.(type) {
case *util.Promise[*Publisher]: case *util.Promise[*Publisher]:
v.Fulfill(s.OnPublish(v.Value)) err := s.OnPublish(v.Value)
if v.Fulfill(err); err != nil {
continue
}
event = v.Value event = v.Value
if nl := len(s.Publishers); nl > pubCount { addPublisher(v.Value)
pubCount = nl
if subCount == 0 {
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(v.Value.Done())})
} else {
cases = slices.Insert(cases, 3+pubCount, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(v.Value.Done())})
}
}
case *util.Promise[*Subscriber]: case *util.Promise[*Subscriber]:
v.Fulfill(s.OnSubscribe(v.Value)) err := s.OnSubscribe(v.Value)
if v.Fulfill(err); err != nil {
continue
}
if nl := len(s.Subscribers); nl > subCount { if nl := len(s.Subscribers); nl > subCount {
subCount = nl subCount = nl
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(v.Value.Done())}) cases = append(cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(v.Value.Done())})
@@ -182,19 +218,24 @@ func (s *Server) eventLoop() {
} }
event = v.Value event = v.Value
case *util.Promise[*Puller]: case *util.Promise[*Puller]:
err := s.OnPublish(&v.Value.Publisher)
if err != nil {
v.Fulfill(err)
} else {
if _, ok := s.Pulls[v.Value.StreamPath]; ok { if _, ok := s.Pulls[v.Value.StreamPath]; ok {
v.Fulfill(ErrStreamExist) v.Fulfill(ErrStreamExist)
continue
} else { } else {
err := s.OnPublish(&v.Value.Publisher)
v.Fulfill(err)
if err != nil {
continue
}
s.Pulls[v.Value.StreamPath] = v.Value s.Pulls[v.Value.StreamPath] = v.Value
s.Pullers = append(s.Pullers, v.Value) s.Pullers = append(s.Pullers, v.Value)
v.Fulfill(nil) addPublisher(&v.Value.Publisher)
event = v.Value event = v.Value
} }
} case *util.Promise[*StreamSnapShot]:
v.Value.Publisher = s.Streams[v.Value.StreamPath]
v.Fulfill(nil)
continue
} }
for _, plugin := range s.Plugins { for _, plugin := range s.Plugins {
if plugin.Disabled { if plugin.Disabled {
@@ -315,3 +356,18 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "%s\n", api) fmt.Fprintf(w, "%s\n", api)
} }
} }
type StreamSnapShot struct {
StreamPath string
*Publisher
}
func (s *Server) StreamSnap(ctx context.Context, req *pb.StreamSnapRequest) (res *pb.StreamSnapShot, err error) {
promise := util.NewPromise(&StreamSnapShot{StreamPath: req.StreamPath})
s.eventChan <- promise
<-promise.Done()
if promise.Value.Publisher == nil {
return nil, context.Cause(promise.Context)
}
return promise.Value.SnapShot(), nil
}

View File

@@ -16,11 +16,11 @@ import (
type PubSubBase struct { type PubSubBase struct {
Unit Unit
ID int ID int
Plugin *Plugin Plugin *Plugin `json:"-" yaml:"-"`
StreamPath string StreamPath string
Args url.Values Args url.Values
TimeoutTimer *time.Timer TimeoutTimer *time.Timer `json:"-" yaml:"-"`
io.Closer io.Closer `json:"-" yaml:"-"`
} }
// func (ps *PubSubBase) Stop(reason error) { // func (ps *PubSubBase) Stop(reason error) {