mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-09-27 01:15:52 +08:00
feat: use MemoryAllocator
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
global:
|
||||
loglevel: debug
|
||||
rtmp:
|
||||
chunksize: 65535
|
||||
subscribe:
|
||||
# submode: 1
|
||||
subaudio: false
|
@@ -4,8 +4,9 @@ import (
|
||||
"context"
|
||||
|
||||
"m7s.live/m7s/v5"
|
||||
_ "m7s.live/m7s/v5/plugin/rtmp"
|
||||
_ "m7s.live/m7s/v5/plugin/debug"
|
||||
_ "m7s.live/m7s/v5/plugin/hdl"
|
||||
_ "m7s.live/m7s/v5/plugin/rtmp"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
BIN
favicon.ico
Normal file
BIN
favicon.ico
Normal file
Binary file not shown.
After Width: | Height: | Size: 3.7 KiB |
21
go.mod
21
go.mod
@@ -1,24 +1,35 @@
|
||||
module m7s.live/m7s/v5
|
||||
|
||||
go 1.22
|
||||
|
||||
toolchain go1.22.1
|
||||
|
||||
require github.com/quic-go/quic-go v0.42.0
|
||||
|
||||
require (
|
||||
github.com/go-ole/go-ole v1.2.6 // indirect
|
||||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
|
||||
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
|
||||
github.com/shoenig/go-m1cpu v0.1.6 // indirect
|
||||
github.com/tklauser/go-sysconf v0.3.12 // indirect
|
||||
github.com/tklauser/numcpus v0.6.1 // indirect
|
||||
github.com/yusufpapurcu/wmi v1.2.4 // indirect
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/bluenviron/mediacommon v1.9.2
|
||||
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
|
||||
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
|
||||
github.com/logrusorgru/aurora/v4 v4.0.0
|
||||
github.com/gorilla/websocket v1.5.1
|
||||
github.com/mcuadros/go-defaults v1.2.0
|
||||
github.com/onsi/ginkgo/v2 v2.9.5 // indirect
|
||||
github.com/shirou/gopsutil/v3 v3.24.3
|
||||
go.uber.org/mock v0.4.0 // indirect
|
||||
golang.org/x/crypto v0.4.0 // indirect
|
||||
golang.org/x/crypto v0.14.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db // indirect
|
||||
golang.org/x/mod v0.11.0 // indirect
|
||||
golang.org/x/net v0.10.0 // indirect
|
||||
golang.org/x/sync v0.6.0
|
||||
golang.org/x/sys v0.8.0 // indirect
|
||||
golang.org/x/net v0.17.0 // indirect
|
||||
golang.org/x/sys v0.18.0 // indirect
|
||||
golang.org/x/tools v0.9.1 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
68
go.sum
68
go.sum
@@ -4,44 +4,94 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR
|
||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
|
||||
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
|
||||
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
|
||||
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
|
||||
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
|
||||
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
|
||||
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
|
||||
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
|
||||
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
|
||||
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
|
||||
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
|
||||
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
|
||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||
github.com/logrusorgru/aurora/v4 v4.0.0 h1:sRjfPpun/63iADiSvGGjgA1cAYegEWMPCJdUpJYn9JA=
|
||||
github.com/logrusorgru/aurora/v4 v4.0.0/go.mod h1:lP0iIa2nrnT/qoFXcOZSrZQpJ1o6n2CUf/hyHi2Q4ZQ=
|
||||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
|
||||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
|
||||
github.com/mcuadros/go-defaults v1.2.0 h1:FODb8WSf0uGaY8elWJAkoLL0Ri6AlZ1bFlenk56oZtc=
|
||||
github.com/mcuadros/go-defaults v1.2.0/go.mod h1:WEZtHEVIGYVDqkKSWBdWKUVdRyKlMfulPaGDWIVeCWY=
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
|
||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
|
||||
github.com/onsi/ginkgo/v2 v2.9.5 h1:+6Hr4uxzP4XIUyAkg61dWBw8lb/gc4/X5luuxN/EC+Q=
|
||||
github.com/onsi/ginkgo/v2 v2.9.5/go.mod h1:tvAoo1QUJwNEU2ITftXTpR7R1RbCzoZUOs3RonqW57k=
|
||||
github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE=
|
||||
github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
|
||||
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
|
||||
github.com/quic-go/quic-go v0.42.0 h1:uSfdap0eveIl8KXnipv9K7nlwZ5IqLlYOpJ58u5utpM=
|
||||
github.com/quic-go/quic-go v0.42.0/go.mod h1:132kz4kL3F9vxhW3CtQJLDVwcFe5wdWeJXXijhsO57M=
|
||||
github.com/shirou/gopsutil/v3 v3.24.3 h1:eoUGJSmdfLzJ3mxIhmOAhgKEKgQkeOwKpz1NbhVnuPE=
|
||||
github.com/shirou/gopsutil/v3 v3.24.3/go.mod h1:JpND7O217xa72ewWz9zN2eIIkPWsDN/3pl0H8Qt0uwg=
|
||||
github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM=
|
||||
github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ=
|
||||
github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU=
|
||||
github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
|
||||
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
|
||||
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
|
||||
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
|
||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU=
|
||||
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
|
||||
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
|
||||
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
|
||||
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
|
||||
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
|
||||
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
|
||||
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
|
||||
golang.org/x/crypto v0.4.0 h1:UVQgzMY87xqpKNgb+kDsll2Igd33HszWHFLmpaRMq/8=
|
||||
golang.org/x/crypto v0.4.0/go.mod h1:3quD/ATkf6oY+rnes5c3ExXTbLc8mueNue5/DoinL80=
|
||||
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
|
||||
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
|
||||
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db h1:D/cFflL63o2KSLJIwjlcIt8PR064j/xsmdEJL/YvY/o=
|
||||
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
|
||||
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
|
||||
golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
|
||||
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
|
||||
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
|
||||
golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ=
|
||||
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
|
||||
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
|
||||
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
|
||||
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
|
||||
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
|
||||
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
|
||||
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
golang.org/x/tools v0.9.1 h1:8WMNJAz3zrtPmnYC7ISf5dEn3MT0gY7jBJfw27yrrLo=
|
||||
golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
|
||||
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
|
||||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
|
@@ -93,6 +93,7 @@ type IAVFrame interface {
|
||||
ToRaw(*AVTrack) (any, error)
|
||||
FromRaw(*AVTrack, any) error
|
||||
GetTimestamp() time.Duration
|
||||
GetSize() int
|
||||
Recycle()
|
||||
IsIDR() bool
|
||||
Print() string
|
||||
|
@@ -4,7 +4,6 @@ import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -70,13 +69,11 @@ func (c *Subscribe) GetSubscribeConfig() *Subscribe {
|
||||
}
|
||||
|
||||
type Pull struct {
|
||||
RePull int `desc:"断开后自动重试次数,0:不重试,-1:无限重试"` // 断开后自动重拉,0 表示不自动重拉,-1 表示无限重拉,高于0 的数代表最大重拉次数
|
||||
EnableRegexp bool `desc:"是否启用正则表达式"` // 是否启用正则表达式
|
||||
PullOnStart map[string]string `desc:"启动时拉流的列表"` // 启动时拉流的列表
|
||||
PullOnSub map[string]string `desc:"订阅时自动拉流的列表"` // 订阅时自动拉流的列表
|
||||
Proxy string `desc:"代理地址"` // 代理地址
|
||||
PullOnSubLocker sync.RWMutex `yaml:"-" json:"-"`
|
||||
PullOnStartLocker sync.RWMutex `yaml:"-" json:"-"`
|
||||
RePull int `desc:"断开后自动重试次数,0:不重试,-1:无限重试"` // 断开后自动重拉,0 表示不自动重拉,-1 表示无限重拉,高于0 的数代表最大重拉次数
|
||||
EnableRegexp bool `desc:"是否启用正则表达式"` // 是否启用正则表达式
|
||||
PullOnStart map[string]string `desc:"启动时拉流的列表"` // 启动时拉流的列表
|
||||
PullOnSub map[string]string `desc:"订阅时自动拉流的列表"` // 订阅时自动拉流的列表
|
||||
Proxy string `desc:"代理地址"` // 代理地址
|
||||
}
|
||||
|
||||
func (p *Pull) GetPullConfig() *Pull {
|
||||
@@ -84,8 +81,8 @@ func (p *Pull) GetPullConfig() *Pull {
|
||||
}
|
||||
|
||||
func (p *Pull) CheckPullOnStart(streamPath string) string {
|
||||
p.PullOnStartLocker.RLock()
|
||||
defer p.PullOnStartLocker.RUnlock()
|
||||
// p.PullOnStartLocker.RLock()
|
||||
// defer p.PullOnStartLocker.RUnlock()
|
||||
if p.PullOnStart == nil {
|
||||
return ""
|
||||
}
|
||||
@@ -107,8 +104,8 @@ func (p *Pull) CheckPullOnStart(streamPath string) string {
|
||||
}
|
||||
|
||||
func (p *Pull) CheckPullOnSub(streamPath string) string {
|
||||
p.PullOnSubLocker.RLock()
|
||||
defer p.PullOnSubLocker.RUnlock()
|
||||
// p.PullOnSubLocker.RLock()
|
||||
// defer p.PullOnSubLocker.RUnlock()
|
||||
if p.PullOnSub == nil {
|
||||
return ""
|
||||
}
|
||||
|
@@ -141,10 +141,14 @@ func (b *Buffer) Read(buf []byte) (n int, err error) {
|
||||
|
||||
func (b *Buffer) ReadN(n int) Buffer {
|
||||
l := b.Len()
|
||||
if n > l {
|
||||
n = l
|
||||
}
|
||||
r := (*b)[:n]
|
||||
*b = (*b)[n:l]
|
||||
return r
|
||||
}
|
||||
|
||||
func (b *Buffer) ReadFloat64() float64 {
|
||||
return math.Float64frombits(b.ReadUint64())
|
||||
}
|
||||
|
@@ -153,12 +153,11 @@ func (buffers *Buffers) ReadBytes(n int) ([]byte, error) {
|
||||
}
|
||||
|
||||
func (buffers *Buffers) WriteNTo(n int, result *net.Buffers) (actual int) {
|
||||
for actual = n; buffers.Length > 0; buffers.move0() {
|
||||
level0 := buffers.GetLevel0()
|
||||
for actual = n; buffers.Length > 0 && n > 0; buffers.move0() {
|
||||
level1 := buffers.GetLevel1()
|
||||
remain1 := len(level1)
|
||||
if remain1 > n {
|
||||
*result = append(*result, level0[buffers.offset1:buffers.offset1+n])
|
||||
*result = append(*result, level1[:n])
|
||||
buffers.move1(n)
|
||||
return actual
|
||||
}
|
||||
@@ -182,5 +181,9 @@ func (buffers *Buffers) ReadBE(n int) (num int, err error) {
|
||||
func (buffers *Buffers) ToBytes() []byte {
|
||||
ret := make([]byte, buffers.Length)
|
||||
buffers.Read(ret)
|
||||
buffers.offset0 = 0
|
||||
buffers.offset1 = 0
|
||||
buffers.Offset = 0
|
||||
buffers.Length = 0
|
||||
return ret
|
||||
}
|
||||
|
236
pkg/util/list.go
Normal file
236
pkg/util/list.go
Normal file
@@ -0,0 +1,236 @@
|
||||
package util
|
||||
|
||||
// Copyright 2009 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package list implements a doubly linked list.
|
||||
//
|
||||
// To iterate over a list (where l is a *List[T]):
|
||||
//
|
||||
// for e := l.Front(); e != nil; e = e.Next() {
|
||||
// // do something with e.Value
|
||||
// }
|
||||
//
|
||||
// Element[T] is an element of a linked list.
|
||||
type Element[T any] struct {
|
||||
// Next and previous pointers in the doubly-linked list of elements.
|
||||
// To simplify the implementation, internally a list l is implemented
|
||||
// as a ring, such that &l.root is both the next element of the last
|
||||
// list element (l.Back()) and the previous element of the first list
|
||||
// element (l.Front()).
|
||||
next, prev *Element[T]
|
||||
|
||||
// The list to which this element belongs.
|
||||
list *List[T]
|
||||
|
||||
// The value stored with this element.
|
||||
Value T
|
||||
}
|
||||
|
||||
// Next returns the next list element or nil.
|
||||
func (e *Element[T]) Next() *Element[T] {
|
||||
if p := e.next; e.list != nil && p != &e.list.root {
|
||||
return p
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Prev returns the previous list element or nil.
|
||||
func (e *Element[T]) Prev() *Element[T] {
|
||||
if p := e.prev; e.list != nil && p != &e.list.root {
|
||||
return p
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// List[T] represents a doubly linked list.
|
||||
// The zero value for List[T] is an empty list ready to use.
|
||||
type List[T any] struct {
|
||||
root Element[T] // sentinel list element, only &root, root.prev, and root.next are used
|
||||
len int // current list length excluding (this) sentinel element
|
||||
}
|
||||
|
||||
// Init initializes or clears list l.
|
||||
func (l *List[T]) Init() *List[T] {
|
||||
l.root.next = &l.root
|
||||
l.root.prev = &l.root
|
||||
l.len = 0
|
||||
return l
|
||||
}
|
||||
|
||||
// New returns an initialized list.
|
||||
func NewList[T any]() *List[T] { return new(List[T]).Init() }
|
||||
|
||||
// Len returns the number of elements of list l.
|
||||
// The complexity is O(1).
|
||||
func (l *List[T]) Len() int { return l.len }
|
||||
|
||||
// Front returns the first element of list l or nil if the list is empty.
|
||||
func (l *List[T]) Front() *Element[T] {
|
||||
if l.len == 0 {
|
||||
return nil
|
||||
}
|
||||
return l.root.next
|
||||
}
|
||||
|
||||
// Back returns the last element of list l or nil if the list is empty.
|
||||
func (l *List[T]) Back() *Element[T] {
|
||||
if l.len == 0 {
|
||||
return nil
|
||||
}
|
||||
return l.root.prev
|
||||
}
|
||||
|
||||
// lazyInit lazily initializes a zero List[T] value.
|
||||
func (l *List[T]) lazyInit() {
|
||||
if l.root.next == nil {
|
||||
l.Init()
|
||||
}
|
||||
}
|
||||
|
||||
// insert inserts e after at, increments l.len, and returns e.
|
||||
func (l *List[T]) insert(e, at *Element[T]) *Element[T] {
|
||||
e.prev = at
|
||||
e.next = at.next
|
||||
e.prev.next = e
|
||||
e.next.prev = e
|
||||
e.list = l
|
||||
l.len++
|
||||
return e
|
||||
}
|
||||
|
||||
// insertValue is a convenience wrapper for insert(&Element[T]{Value: v}, at).
|
||||
func (l *List[T]) insertValue(v T, at *Element[T]) *Element[T] {
|
||||
return l.insert(&Element[T]{Value: v}, at)
|
||||
}
|
||||
|
||||
// remove removes e from its list, decrements l.len
|
||||
func (l *List[T]) remove(e *Element[T]) {
|
||||
e.prev.next = e.next
|
||||
e.next.prev = e.prev
|
||||
e.next = nil // avoid memory leaks
|
||||
e.prev = nil // avoid memory leaks
|
||||
e.list = nil
|
||||
l.len--
|
||||
}
|
||||
|
||||
// move moves e to next to at.
|
||||
func (l *List[T]) move(e, at *Element[T]) {
|
||||
if e == at {
|
||||
return
|
||||
}
|
||||
e.prev.next = e.next
|
||||
e.next.prev = e.prev
|
||||
|
||||
e.prev = at
|
||||
e.next = at.next
|
||||
e.prev.next = e
|
||||
e.next.prev = e
|
||||
}
|
||||
|
||||
// Remove removes e from l if e is an element of list l.
|
||||
// It returns the element value e.Value.
|
||||
// The element must not be nil.
|
||||
func (l *List[T]) Remove(e *Element[T]) any {
|
||||
if e.list == l {
|
||||
// if e.list == l, l must have been initialized when e was inserted
|
||||
// in l or l == nil (e is a zero Element[T]) and l.remove will crash
|
||||
l.remove(e)
|
||||
}
|
||||
return e.Value
|
||||
}
|
||||
|
||||
// PushFront inserts a new element e with value v at the front of list l and returns e.
|
||||
func (l *List[T]) PushFront(v T) *Element[T] {
|
||||
l.lazyInit()
|
||||
return l.insertValue(v, &l.root)
|
||||
}
|
||||
|
||||
// PushBack inserts a new element e with value v at the back of list l and returns e.
|
||||
func (l *List[T]) PushBack(v T) *Element[T] {
|
||||
l.lazyInit()
|
||||
return l.insertValue(v, l.root.prev)
|
||||
}
|
||||
|
||||
// InsertBefore inserts a new element e with value v immediately before mark and returns e.
|
||||
// If mark is not an element of l, the list is not modified.
|
||||
// The mark must not be nil.
|
||||
func (l *List[T]) InsertBefore(v T, mark *Element[T]) *Element[T] {
|
||||
if mark.list != l {
|
||||
return nil
|
||||
}
|
||||
// see comment in List[T].Remove about initialization of l
|
||||
return l.insertValue(v, mark.prev)
|
||||
}
|
||||
|
||||
// InsertAfter inserts a new element e with value v immediately after mark and returns e.
|
||||
// If mark is not an element of l, the list is not modified.
|
||||
// The mark must not be nil.
|
||||
func (l *List[T]) InsertAfter(v T, mark *Element[T]) *Element[T] {
|
||||
if mark.list != l {
|
||||
return nil
|
||||
}
|
||||
// see comment in List[T].Remove about initialization of l
|
||||
return l.insertValue(v, mark)
|
||||
}
|
||||
|
||||
// MoveToFront moves element e to the front of list l.
|
||||
// If e is not an element of l, the list is not modified.
|
||||
// The element must not be nil.
|
||||
func (l *List[T]) MoveToFront(e *Element[T]) {
|
||||
if e.list != l || l.root.next == e {
|
||||
return
|
||||
}
|
||||
// see comment in List[T].Remove about initialization of l
|
||||
l.move(e, &l.root)
|
||||
}
|
||||
|
||||
// MoveToBack moves element e to the back of list l.
|
||||
// If e is not an element of l, the list is not modified.
|
||||
// The element must not be nil.
|
||||
func (l *List[T]) MoveToBack(e *Element[T]) {
|
||||
if e.list != l || l.root.prev == e {
|
||||
return
|
||||
}
|
||||
// see comment in List[T].Remove about initialization of l
|
||||
l.move(e, l.root.prev)
|
||||
}
|
||||
|
||||
// MoveBefore moves element e to its new position before mark.
|
||||
// If e or mark is not an element of l, or e == mark, the list is not modified.
|
||||
// The element and mark must not be nil.
|
||||
func (l *List[T]) MoveBefore(e, mark *Element[T]) {
|
||||
if e.list != l || e == mark || mark.list != l {
|
||||
return
|
||||
}
|
||||
l.move(e, mark.prev)
|
||||
}
|
||||
|
||||
// MoveAfter moves element e to its new position after mark.
|
||||
// If e or mark is not an element of l, or e == mark, the list is not modified.
|
||||
// The element and mark must not be nil.
|
||||
func (l *List[T]) MoveAfter(e, mark *Element[T]) {
|
||||
if e.list != l || e == mark || mark.list != l {
|
||||
return
|
||||
}
|
||||
l.move(e, mark)
|
||||
}
|
||||
|
||||
// PushBackList inserts a copy of another list at the back of list l.
|
||||
// The lists l and other may be the same. They must not be nil.
|
||||
func (l *List[T]) PushBackList(other *List[T]) {
|
||||
l.lazyInit()
|
||||
for i, e := other.Len(), other.Front(); i > 0; i, e = i-1, e.Next() {
|
||||
l.insertValue(e.Value, l.root.prev)
|
||||
}
|
||||
}
|
||||
|
||||
// PushFrontList inserts a copy of another list at the front of list l.
|
||||
// The lists l and other may be the same. They must not be nil.
|
||||
func (l *List[T]) PushFrontList(other *List[T]) {
|
||||
l.lazyInit()
|
||||
for i, e := other.Len(), other.Back(); i > 0; i, e = i-1, e.Prev() {
|
||||
l.insertValue(e.Value, &l.root)
|
||||
}
|
||||
}
|
107
pkg/util/mem.go
Normal file
107
pkg/util/mem.go
Normal file
@@ -0,0 +1,107 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"unsafe"
|
||||
)
|
||||
|
||||
type Block [2]int
|
||||
|
||||
type MemoryAllocator struct {
|
||||
start int64
|
||||
memory []byte
|
||||
Size int
|
||||
blocks *List[Block]
|
||||
}
|
||||
|
||||
func NewMemoryAllocator(size int) (ret *MemoryAllocator) {
|
||||
ret = &MemoryAllocator{
|
||||
Size: size,
|
||||
memory: make([]byte, size),
|
||||
blocks: NewList[Block](),
|
||||
}
|
||||
ret.start = int64(uintptr(unsafe.Pointer(&ret.memory[0])))
|
||||
ret.blocks.PushBack(Block{0, size})
|
||||
return
|
||||
}
|
||||
|
||||
func (ma *MemoryAllocator) Malloc2(size int) (memory []byte, start, end int) {
|
||||
for be := ma.blocks.Front(); be != nil; be = be.Next() {
|
||||
start, end = be.Value[0], be.Value[1]
|
||||
if e := start + size; end >= e {
|
||||
memory = ma.memory[start:e]
|
||||
if be.Value[0] = e; end == e {
|
||||
ma.blocks.Remove(be)
|
||||
}
|
||||
end = e
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (ma *MemoryAllocator) Malloc(size int) (memory []byte) {
|
||||
memory, _, _ = ma.Malloc2(size)
|
||||
return
|
||||
}
|
||||
|
||||
func (ma *MemoryAllocator) Make(size int) (memory []byte) {
|
||||
memory = ma.Malloc(size)
|
||||
if memory == nil {
|
||||
return make([]byte, size)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (ma *MemoryAllocator) Free2(start, end int) {
|
||||
if start < 0 || end > ma.Size {
|
||||
return
|
||||
}
|
||||
for e := ma.blocks.Front(); e != nil; e = e.Next() {
|
||||
block := e.Value
|
||||
if block[1] == start {
|
||||
block[1] = end
|
||||
return
|
||||
}
|
||||
if block[0] == end {
|
||||
block[0] = start
|
||||
return
|
||||
}
|
||||
if end > block[0] {
|
||||
ma.blocks.InsertBefore(Block{start, end}, e)
|
||||
return
|
||||
}
|
||||
}
|
||||
ma.blocks.PushBack(Block{start, end})
|
||||
}
|
||||
|
||||
func (ma *MemoryAllocator) Free(mem []byte) {
|
||||
ptr := uintptr(unsafe.Pointer(&mem[:1][0]))
|
||||
start := int(int64(ptr) - ma.start)
|
||||
end := start + len(mem)
|
||||
ma.Free2(start, end)
|
||||
}
|
||||
|
||||
type RecyclableMemory struct {
|
||||
*MemoryAllocator
|
||||
mem []int
|
||||
}
|
||||
|
||||
func (r *RecyclableMemory) Malloc(size int) (memory []byte) {
|
||||
ret, start, end := r.Malloc2(size)
|
||||
if ret == nil {
|
||||
return make([]byte, size)
|
||||
}
|
||||
if lastI := len(r.mem) - 1; lastI > 0 && r.mem[lastI] == start {
|
||||
r.mem[lastI] = end
|
||||
} else {
|
||||
r.mem = append(r.mem, start, end)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (r *RecyclableMemory) Recycle() {
|
||||
for i := 0; i < len(r.mem); i += 2 {
|
||||
r.Free2(r.mem[i], r.mem[i+1])
|
||||
}
|
||||
r.mem = r.mem[:0]
|
||||
}
|
25
pkg/util/mem_test.go
Normal file
25
pkg/util/mem_test.go
Normal file
@@ -0,0 +1,25 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMem(t *testing.T) {
|
||||
t.Run(t.Name(), func(t *testing.T) {
|
||||
mem := NewMemoryAllocator(1024)
|
||||
b1 := mem.Malloc(512)
|
||||
b2 := mem.Malloc(256)
|
||||
b3 := mem.Malloc(256)
|
||||
mem.Free(b2)
|
||||
mem.Free(b3)
|
||||
b2 = mem.Malloc(512)
|
||||
if b2 == nil {
|
||||
t.Fail()
|
||||
}
|
||||
mem.Free(b2)
|
||||
mem.Free(b1)
|
||||
if mem.Malloc(1024) == nil {
|
||||
t.Fail()
|
||||
}
|
||||
})
|
||||
}
|
@@ -1,9 +1,5 @@
|
||||
package util
|
||||
|
||||
import (
|
||||
"net"
|
||||
)
|
||||
|
||||
type Pool[T any] struct {
|
||||
pool []T
|
||||
}
|
||||
@@ -27,25 +23,16 @@ func (p *Pool[T]) Put(t T) {
|
||||
p.pool = append(p.pool, t)
|
||||
}
|
||||
|
||||
func (p *Pool[T]) Puts(t []T) {
|
||||
p.pool = append(p.pool, t...)
|
||||
}
|
||||
|
||||
type IPool[T any] interface {
|
||||
Get() T
|
||||
Put(T)
|
||||
Clear()
|
||||
}
|
||||
|
||||
type RecyclableMemory struct {
|
||||
IPool[[]byte]
|
||||
Data net.Buffers
|
||||
}
|
||||
|
||||
func (r *RecyclableMemory) Recycle() {
|
||||
if r.IPool != nil {
|
||||
for _, b := range r.Data {
|
||||
r.Put(b)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type BytesPool struct {
|
||||
Pool[[]byte]
|
||||
ItemSize int
|
||||
@@ -59,7 +46,7 @@ func (bp *BytesPool) GetN(size int) []byte {
|
||||
if ret == nil {
|
||||
return make([]byte, size)
|
||||
}
|
||||
return ret
|
||||
return ret[:size]
|
||||
}
|
||||
|
||||
func (bp *BytesPool) Put(b []byte) {
|
||||
|
12
plugin.go
12
plugin.go
@@ -241,6 +241,14 @@ func (p *Plugin) Publish(streamPath string, options ...any) (publisher *Publishe
|
||||
return
|
||||
}
|
||||
|
||||
func (p *Plugin) Pull(streamPath string, url string, options ...any) (puller *Puller, err error) {
|
||||
puller = &Puller{Pull: p.config.Pull}
|
||||
puller.RemoteURL = url
|
||||
puller.StreamPath = streamPath
|
||||
puller.Init(p, streamPath, options...)
|
||||
return puller, sendPromiseToServer(p.server, puller)
|
||||
}
|
||||
|
||||
func (p *Plugin) Subscribe(streamPath string, options ...any) (subscriber *Subscriber, err error) {
|
||||
subscriber = &Subscriber{Subscribe: p.config.Subscribe}
|
||||
subscriber.Init(p, streamPath, options...)
|
||||
@@ -286,10 +294,10 @@ func (p *Plugin) handle(pattern string, handler http.Handler) {
|
||||
}
|
||||
handler = p.logHandler(handler)
|
||||
p.GetCommonConf().Handle(pattern, handler)
|
||||
if p.server != nil {
|
||||
if p.server != p.handler {
|
||||
pattern = "/" + strings.ToLower(p.Meta.Name) + pattern
|
||||
p.Debug("http handle added to server", "pattern", pattern)
|
||||
p.server.GetCommonConf().Handle(pattern, handler)
|
||||
}
|
||||
// apiList = append(apiList, pattern)
|
||||
p.server.apiList = append(p.server.apiList, pattern)
|
||||
}
|
||||
|
290
plugin/debug/chart.go
Normal file
290
plugin/debug/chart.go
Normal file
@@ -0,0 +1,290 @@
|
||||
package plugin_debug
|
||||
|
||||
import (
|
||||
"embed"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"os"
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/shirou/gopsutil/v3/cpu"
|
||||
"github.com/shirou/gopsutil/v3/process"
|
||||
)
|
||||
|
||||
//go:embed static/*
|
||||
var staticFS embed.FS
|
||||
var staticFSHandler = http.FileServer(http.FS(staticFS))
|
||||
|
||||
type update struct {
|
||||
Ts int64
|
||||
BytesAllocated uint64
|
||||
GcPause uint64
|
||||
CPUUser float64
|
||||
CPUSys float64
|
||||
Block int
|
||||
Goroutine int
|
||||
Heap int
|
||||
Mutex int
|
||||
Threadcreate int
|
||||
}
|
||||
|
||||
type consumer struct {
|
||||
id uint
|
||||
c chan update
|
||||
}
|
||||
|
||||
type server struct {
|
||||
consumers []consumer
|
||||
consumersMutex sync.RWMutex
|
||||
}
|
||||
|
||||
type SimplePair struct {
|
||||
Ts uint64
|
||||
Value uint64
|
||||
}
|
||||
|
||||
type CPUPair struct {
|
||||
Ts uint64
|
||||
User float64
|
||||
Sys float64
|
||||
}
|
||||
|
||||
type PprofPair struct {
|
||||
Ts uint64
|
||||
Block int
|
||||
Goroutine int
|
||||
Heap int
|
||||
Mutex int
|
||||
Threadcreate int
|
||||
}
|
||||
|
||||
type DataStorage struct {
|
||||
BytesAllocated []SimplePair
|
||||
GcPauses []SimplePair
|
||||
CPUUsage []CPUPair
|
||||
Pprof []PprofPair
|
||||
}
|
||||
|
||||
const (
|
||||
maxCount int = 86400
|
||||
)
|
||||
|
||||
var (
|
||||
data DataStorage
|
||||
lastPause uint32
|
||||
mutex sync.RWMutex
|
||||
lastConsumerID uint
|
||||
s server
|
||||
upgrader = websocket.Upgrader{
|
||||
ReadBufferSize: 1024,
|
||||
WriteBufferSize: 1024,
|
||||
}
|
||||
prevSysTime float64
|
||||
prevUserTime float64
|
||||
myProcess *process.Process
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
||||
myProcess, _ = process.NewProcess(int32(os.Getpid()))
|
||||
|
||||
// preallocate arrays in data, helps save on reallocations caused by append()
|
||||
// when maxCount is large
|
||||
data.BytesAllocated = make([]SimplePair, 0, maxCount)
|
||||
data.GcPauses = make([]SimplePair, 0, maxCount)
|
||||
data.CPUUsage = make([]CPUPair, 0, maxCount)
|
||||
data.Pprof = make([]PprofPair, 0, maxCount)
|
||||
|
||||
go s.gatherData()
|
||||
}
|
||||
|
||||
func (s *server) gatherData() {
|
||||
timer := time.Tick(time.Second)
|
||||
|
||||
for now := range timer {
|
||||
nowUnix := now.Unix()
|
||||
|
||||
var ms runtime.MemStats
|
||||
runtime.ReadMemStats(&ms)
|
||||
|
||||
u := update{
|
||||
Ts: nowUnix * 1000,
|
||||
Block: pprof.Lookup("block").Count(),
|
||||
Goroutine: pprof.Lookup("goroutine").Count(),
|
||||
Heap: pprof.Lookup("heap").Count(),
|
||||
Mutex: pprof.Lookup("mutex").Count(),
|
||||
Threadcreate: pprof.Lookup("threadcreate").Count(),
|
||||
}
|
||||
data.Pprof = append(data.Pprof, PprofPair{
|
||||
uint64(nowUnix) * 1000,
|
||||
u.Block,
|
||||
u.Goroutine,
|
||||
u.Heap,
|
||||
u.Mutex,
|
||||
u.Threadcreate,
|
||||
})
|
||||
|
||||
cpuTimes, err := myProcess.Times()
|
||||
if err != nil {
|
||||
cpuTimes = &cpu.TimesStat{}
|
||||
}
|
||||
|
||||
if prevUserTime != 0 {
|
||||
u.CPUUser = cpuTimes.User - prevUserTime
|
||||
u.CPUSys = cpuTimes.System - prevSysTime
|
||||
data.CPUUsage = append(data.CPUUsage, CPUPair{uint64(nowUnix) * 1000, u.CPUUser, u.CPUSys})
|
||||
}
|
||||
|
||||
prevUserTime = cpuTimes.User
|
||||
prevSysTime = cpuTimes.System
|
||||
|
||||
mutex.Lock()
|
||||
|
||||
bytesAllocated := ms.Alloc
|
||||
u.BytesAllocated = bytesAllocated
|
||||
data.BytesAllocated = append(data.BytesAllocated, SimplePair{uint64(nowUnix) * 1000, bytesAllocated})
|
||||
if lastPause == 0 || lastPause != ms.NumGC {
|
||||
gcPause := ms.PauseNs[(ms.NumGC+255)%256]
|
||||
u.GcPause = gcPause
|
||||
data.GcPauses = append(data.GcPauses, SimplePair{uint64(nowUnix) * 1000, gcPause})
|
||||
lastPause = ms.NumGC
|
||||
}
|
||||
|
||||
if len(data.BytesAllocated) > maxCount {
|
||||
data.BytesAllocated = data.BytesAllocated[len(data.BytesAllocated)-maxCount:]
|
||||
}
|
||||
|
||||
if len(data.GcPauses) > maxCount {
|
||||
data.GcPauses = data.GcPauses[len(data.GcPauses)-maxCount:]
|
||||
}
|
||||
|
||||
mutex.Unlock()
|
||||
|
||||
s.sendToConsumers(u)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) sendToConsumers(u update) {
|
||||
s.consumersMutex.RLock()
|
||||
defer s.consumersMutex.RUnlock()
|
||||
|
||||
for _, c := range s.consumers {
|
||||
c.c <- u
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) removeConsumer(id uint) {
|
||||
s.consumersMutex.Lock()
|
||||
defer s.consumersMutex.Unlock()
|
||||
|
||||
var consumerID uint
|
||||
var consumerFound bool
|
||||
|
||||
for i, c := range s.consumers {
|
||||
if c.id == id {
|
||||
consumerFound = true
|
||||
consumerID = uint(i)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if consumerFound {
|
||||
s.consumers = append(s.consumers[:consumerID], s.consumers[consumerID+1:]...)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *server) addConsumer() consumer {
|
||||
s.consumersMutex.Lock()
|
||||
defer s.consumersMutex.Unlock()
|
||||
|
||||
lastConsumerID++
|
||||
|
||||
c := consumer{
|
||||
id: lastConsumerID,
|
||||
c: make(chan update),
|
||||
}
|
||||
|
||||
s.consumers = append(s.consumers, c)
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (s *server) dataFeedHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var (
|
||||
lastPing time.Time
|
||||
lastPong time.Time
|
||||
)
|
||||
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
conn.SetPongHandler(func(s string) error {
|
||||
lastPong = time.Now()
|
||||
return nil
|
||||
})
|
||||
|
||||
// read and discard all messages
|
||||
go func(c *websocket.Conn) {
|
||||
for {
|
||||
if _, _, err := c.NextReader(); err != nil {
|
||||
c.Close()
|
||||
break
|
||||
}
|
||||
}
|
||||
}(conn)
|
||||
|
||||
c := s.addConsumer()
|
||||
|
||||
defer func() {
|
||||
s.removeConsumer(c.id)
|
||||
conn.Close()
|
||||
}()
|
||||
|
||||
var i uint
|
||||
|
||||
for u := range c.c {
|
||||
conn.WriteJSON(u)
|
||||
i++
|
||||
|
||||
if i%10 == 0 {
|
||||
if diff := lastPing.Sub(lastPong); diff > time.Second*60 {
|
||||
return
|
||||
}
|
||||
now := time.Now()
|
||||
if err := conn.WriteControl(websocket.PingMessage, nil, now.Add(time.Second)); err != nil {
|
||||
return
|
||||
}
|
||||
lastPing = now
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func dataHandler(w http.ResponseWriter, r *http.Request) {
|
||||
mutex.RLock()
|
||||
defer mutex.RUnlock()
|
||||
|
||||
if e := r.ParseForm(); e != nil {
|
||||
log.Print("error parsing form")
|
||||
return
|
||||
}
|
||||
|
||||
callback := r.FormValue("callback")
|
||||
|
||||
fmt.Fprintf(w, "%v(", callback)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
encoder := json.NewEncoder(w)
|
||||
encoder.Encode(data)
|
||||
|
||||
fmt.Fprint(w, ")")
|
||||
}
|
68
plugin/debug/index.go
Normal file
68
plugin/debug/index.go
Normal file
@@ -0,0 +1,68 @@
|
||||
package plugin_debug
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/pprof"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"m7s.live/m7s/v5"
|
||||
)
|
||||
|
||||
var _ = m7s.InstallPlugin[DebugPlugin]()
|
||||
|
||||
type DebugPlugin struct {
|
||||
m7s.Plugin
|
||||
ChartPeriod time.Duration `default:"1s" desc:"图表更新周期"`
|
||||
}
|
||||
|
||||
type WriteToFile struct {
|
||||
header http.Header
|
||||
io.Writer
|
||||
}
|
||||
|
||||
func (w *WriteToFile) Header() http.Header {
|
||||
// return w.w.Header()
|
||||
return w.header
|
||||
}
|
||||
|
||||
// func (w *WriteToFile) Write(p []byte) (int, error) {
|
||||
// // w.w.Write(p)
|
||||
// return w.Writer.Write(p)
|
||||
// }
|
||||
func (w *WriteToFile) WriteHeader(statusCode int) {
|
||||
// w.w.WriteHeader(statusCode)
|
||||
}
|
||||
|
||||
func (p *DebugPlugin) Pprof_Trace(w http.ResponseWriter, r *http.Request) {
|
||||
r.URL.Path = "/debug" + r.URL.Path
|
||||
pprof.Trace(w, r)
|
||||
}
|
||||
|
||||
func (p *DebugPlugin) Pprof_profile(w http.ResponseWriter, r *http.Request) {
|
||||
r.URL.Path = "/debug" + r.URL.Path
|
||||
pprof.Profile(w, r)
|
||||
}
|
||||
|
||||
func (p *DebugPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path == "/pprof" {
|
||||
http.Redirect(w, r, "/debug/pprof/", http.StatusFound)
|
||||
return
|
||||
}
|
||||
r.URL.Path = "/debug" + r.URL.Path
|
||||
pprof.Index(w, r)
|
||||
}
|
||||
|
||||
func (p *DebugPlugin) Charts_(w http.ResponseWriter, r *http.Request) {
|
||||
r.URL.Path = "/static" + strings.TrimPrefix(r.URL.Path, "/charts")
|
||||
staticFSHandler.ServeHTTP(w, r)
|
||||
}
|
||||
|
||||
func (p *DebugPlugin) Charts_data(w http.ResponseWriter, r *http.Request) {
|
||||
dataHandler(w, r)
|
||||
}
|
||||
|
||||
func (p *DebugPlugin) Charts_datafeed(w http.ResponseWriter, r *http.Request) {
|
||||
s.dataFeedHandler(w, r)
|
||||
}
|
18
plugin/debug/static/index.html
Normal file
18
plugin/debug/static/index.html
Normal file
@@ -0,0 +1,18 @@
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
<title></title>
|
||||
<meta charset="utf-8" />
|
||||
<script src="jquery-2.1.4.min.js"></script>
|
||||
<script src="moment.min.js"></script>
|
||||
<script src="plotly-1.51.3.min.js"></script>
|
||||
</head>
|
||||
<body>
|
||||
<div id="container1" style="min-width: 310px; height: 400px; margin: 0 auto"></div>
|
||||
<div id="container2" style="min-width: 310px; height: 400px; margin: 0 auto"></div>
|
||||
<div id="container3" style="min-width: 310px; height: 400px; margin: 0 auto"></div>
|
||||
<div id="container4" style="min-width: 310px; height: 400px; margin: 0 auto"></div>
|
||||
<script src="main.js"></script>
|
||||
</body>
|
||||
</html>
|
||||
|
4
plugin/debug/static/jquery-2.1.4.min.js
vendored
Normal file
4
plugin/debug/static/jquery-2.1.4.min.js
vendored
Normal file
File diff suppressed because one or more lines are too long
122
plugin/debug/static/main.js
Normal file
122
plugin/debug/static/main.js
Normal file
@@ -0,0 +1,122 @@
|
||||
var chart1;
|
||||
var chart2;
|
||||
var chart3;
|
||||
var chart4;
|
||||
|
||||
function stackedArea(traces) {
|
||||
for(var i=1; i<traces.length; i++) {
|
||||
for(var j=0; j<(Math.min(traces[i]['y'].length, traces[i-1]['y'].length)); j++) {
|
||||
traces[i]['y'][j] += traces[i-1]['y'][j];
|
||||
}
|
||||
}
|
||||
return traces;
|
||||
}
|
||||
|
||||
$(function () {
|
||||
|
||||
$.getJSON('/debug/charts/data?callback=?', function (data) {
|
||||
var pDataChart1 = [{x: [], y: [], type: "scattergl"}];
|
||||
|
||||
for (i = 0; i < data.GcPauses.length; i++) {
|
||||
var d = moment(data.GcPauses[i].Ts).format('YYYY-MM-DD HH:mm:ss');
|
||||
pDataChart1[0].x.push(d);
|
||||
pDataChart1[0].y.push(data.GcPauses[i].Value);
|
||||
}
|
||||
|
||||
chart1 = Plotly.newPlot('container1', pDataChart1, {
|
||||
title: "GC Pauses",
|
||||
xaxis: {
|
||||
type: "date"
|
||||
},
|
||||
yaxis: {
|
||||
title: "Nanoseconds"
|
||||
}
|
||||
});
|
||||
|
||||
var pDataChart2 = [{x: [], y: [], type: "scattergl"}];
|
||||
|
||||
for (i = 0; i < data.BytesAllocated.length; i++) {
|
||||
var d = moment(data.BytesAllocated[i].Ts).format('YYYY-MM-DD HH:mm:ss');
|
||||
pDataChart2[0].x.push(d);
|
||||
pDataChart2[0].y.push(data.BytesAllocated[i].Value);
|
||||
}
|
||||
|
||||
chart2 = Plotly.newPlot('container2', pDataChart2, {
|
||||
title: "Memory Allocated",
|
||||
xaxis: {
|
||||
type: "date"
|
||||
},
|
||||
yaxis: {
|
||||
title: "Bytes"
|
||||
}
|
||||
});
|
||||
|
||||
var pDataChart3 = [
|
||||
{x: [], y: [], fill: 'tozeroy', name: 'sys', hoverinfo: 'none', type: "scattergl"},
|
||||
{x: [], y: [], fill: 'tonexty', name: 'user', hoverinfo: 'none', type: "scattergl"}
|
||||
];
|
||||
|
||||
for (i = 0; i < data.CPUUsage.length; i++) {
|
||||
var d = moment(data.CPUUsage[i].Ts).format('YYYY-MM-DD HH:mm:ss');
|
||||
pDataChart3[0].x.push(d);
|
||||
pDataChart3[1].x.push(d);
|
||||
pDataChart3[0].y.push(data.CPUUsage[i].Sys);
|
||||
pDataChart3[1].y.push(data.CPUUsage[i].User);
|
||||
}
|
||||
|
||||
pDataChart3 = stackedArea(pDataChart3);
|
||||
|
||||
chart3 = Plotly.newPlot('container3', pDataChart3, {
|
||||
title: "CPU Usage",
|
||||
xaxis: {
|
||||
type: "date"
|
||||
},
|
||||
yaxis: {
|
||||
title: "Seconds"
|
||||
}
|
||||
});
|
||||
|
||||
var pprofList = ["Block", "Goroutine", "Heap", "Mutex", "Threadcreate"];
|
||||
var pDataChart4 = []
|
||||
for (i = 0; i < pprofList.length; i++) {
|
||||
pDataChart4.push({x: [], y: [], name: pprofList[i].toLowerCase()})
|
||||
}
|
||||
|
||||
for (i = 0; i < data.Pprof.length; i++) {
|
||||
var d = moment(data.Pprof[i].Ts).format('YYYY-MM-DD HH:mm:ss');
|
||||
for (j = 0; j < pprofList.length; j++) {
|
||||
pDataChart4[j].x.push(d);
|
||||
pDataChart4[j].y.push(data.Pprof[i][pprofList[j]])
|
||||
}
|
||||
}
|
||||
|
||||
chart4 = Plotly.newPlot('container4', pDataChart4, {
|
||||
title: "PPROF",
|
||||
xaxis: {
|
||||
type: "date",
|
||||
},
|
||||
yaxis: {
|
||||
title: "Count"
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
function wsurl() {
|
||||
var l = window.location;
|
||||
return ((l.protocol === "https:") ? "wss://" : "ws://") + l.hostname + (((l.port != 80) && (l.port != 443)) ? ":" + l.port : "") + "/debug/charts/datafeed";
|
||||
}
|
||||
|
||||
ws = new WebSocket(wsurl());
|
||||
ws.onopen = function () {
|
||||
ws.onmessage = function (evt) {
|
||||
var data = JSON.parse(evt.data);
|
||||
var d = moment(data.Ts).format('YYYY-MM-DD HH:mm:ss');
|
||||
if (data.GcPause != 0) {
|
||||
Plotly.extendTraces('container1', {x:[[d]],y:[[data.GcPause]]}, [0], 86400);
|
||||
}
|
||||
Plotly.extendTraces('container2', {x:[[d]],y:[[data.BytesAllocated]]}, [0], 86400);
|
||||
Plotly.extendTraces('container3', {x:[[d], [d]],y:[[data.CPUSys], [data.CPUUser]]}, [0, 1], 86400);
|
||||
Plotly.extendTraces('container4', {x:[[d], [d], [d], [d], [d]],y:[[data.Block], [data.Goroutine], [data.Heap], [data.Mutex], [data.Threadcreate]]}, [0, 1, 2, 3, 4], 86400);
|
||||
}
|
||||
};
|
||||
})
|
7
plugin/debug/static/moment.min.js
vendored
Normal file
7
plugin/debug/static/moment.min.js
vendored
Normal file
File diff suppressed because one or more lines are too long
7
plugin/debug/static/plotly-1.51.3.min.js
vendored
Normal file
7
plugin/debug/static/plotly-1.51.3.min.js
vendored
Normal file
File diff suppressed because one or more lines are too long
@@ -41,7 +41,8 @@ func (a *AnnexB) IsIDR() bool {
|
||||
|
||||
// ToRaw implements pkg.IAVFrame.
|
||||
func (a *AnnexB) ToRaw(*pkg.AVTrack) (any, error) {
|
||||
return a.Data, nil
|
||||
// return a.Data, nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type DemoPlugin struct {
|
||||
|
@@ -1,6 +1,7 @@
|
||||
package plugin_hdl
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
@@ -8,7 +9,6 @@ import (
|
||||
"time"
|
||||
|
||||
"m7s.live/m7s/v5"
|
||||
"m7s.live/m7s/v5/pkg/util"
|
||||
. "m7s.live/m7s/v5/plugin/hdl/pkg"
|
||||
rtmp "m7s.live/m7s/v5/plugin/rtmp/pkg"
|
||||
)
|
||||
@@ -52,7 +52,7 @@ func (p *HDLPlugin) WriteFlvHeader(sub *m7s.Subscriber, w io.Writer) {
|
||||
// }
|
||||
// amf.Marshal(metaData)
|
||||
// 写入FLV头
|
||||
w.Write([]byte{'F', 'L', 'V', 0x01, flags, 0, 0, 0, 9, 0, 0, 0, 0})
|
||||
w.Write([]byte{'F', 'L', 'V', 0x01, flags, 0, 0, 0, 9})
|
||||
// codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_SCRIPT, 0, amf.Buffer)
|
||||
}
|
||||
|
||||
@@ -71,46 +71,42 @@ func (p *HDLPlugin) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Transfer-Encoding", "identity")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
wto := p.GetCommonConf().WriteTimeout
|
||||
var gotFlvTag func(tag *net.Buffers) error
|
||||
var gotFlvTag func() error
|
||||
var b [15]byte
|
||||
var flv net.Buffers
|
||||
if hijacker, ok := w.(http.Hijacker); ok && wto > 0 {
|
||||
conn, _, _ := hijacker.Hijack()
|
||||
conn.SetWriteDeadline(time.Now().Add(wto))
|
||||
sub.Closer = conn
|
||||
p.WriteFlvHeader(sub, conn)
|
||||
gotFlvTag = func(tag *net.Buffers) (err error) {
|
||||
gotFlvTag = func() (err error) {
|
||||
conn.SetWriteDeadline(time.Now().Add(wto))
|
||||
_, err = tag.WriteTo(conn)
|
||||
_, err = flv.WriteTo(conn)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
w.(http.Flusher).Flush()
|
||||
p.WriteFlvHeader(sub, w)
|
||||
gotFlvTag = func(tag *net.Buffers) (err error) {
|
||||
_, err = tag.WriteTo(w)
|
||||
gotFlvTag = func() (err error) {
|
||||
_, err = flv.WriteTo(w)
|
||||
return
|
||||
}
|
||||
}
|
||||
b := util.Buffer(make([]byte, 0, 15))
|
||||
var flv net.Buffers
|
||||
rtmpData2FlvTag := func(data *rtmp.RTMPData) error {
|
||||
dataSize := uint32(data.Length)
|
||||
b[5], b[6], b[7] = byte(dataSize>>16), byte(dataSize>>8), byte(dataSize)
|
||||
b[8], b[9], b[10], b[11] = byte(data.Timestamp>>16), byte(data.Timestamp>>8), byte(data.Timestamp), byte(data.Timestamp>>24)
|
||||
flv = append(append(flv, b[:]), data.Buffers.Buffers...)
|
||||
defer binary.BigEndian.PutUint32(b[:4], dataSize+11)
|
||||
return gotFlvTag()
|
||||
}
|
||||
sub.Handle(func(audio *rtmp.RTMPAudio) error {
|
||||
b.Reset()
|
||||
b.WriteByte(FLV_TAG_TYPE_AUDIO)
|
||||
dataSize := audio.Length
|
||||
b.WriteUint24(uint32(dataSize))
|
||||
b.WriteUint24(audio.Timestamp)
|
||||
b.WriteByte(byte(audio.Timestamp >> 24))
|
||||
b.WriteUint24(0)
|
||||
flv = append(append(append(flv, b), audio.Buffers.Buffers...), util.PutBE(b.Malloc(4), dataSize+11))
|
||||
return gotFlvTag(&flv)
|
||||
b[4] = FLV_TAG_TYPE_AUDIO
|
||||
return rtmpData2FlvTag(&audio.RTMPData)
|
||||
}, func(video *rtmp.RTMPVideo) error {
|
||||
b.Reset()
|
||||
b.WriteByte(FLV_TAG_TYPE_VIDEO)
|
||||
dataSize := video.Length
|
||||
b.WriteUint24(uint32(dataSize))
|
||||
b.WriteUint24(video.Timestamp)
|
||||
b.WriteByte(byte(video.Timestamp >> 24))
|
||||
b.WriteUint24(0)
|
||||
flv = append(append(append(flv, b), video.Buffers.Buffers...), util.PutBE(b.Malloc(4), dataSize+11))
|
||||
return gotFlvTag(&flv)
|
||||
b[4] = FLV_TAG_TYPE_VIDEO
|
||||
return rtmpData2FlvTag(&video.RTMPData)
|
||||
})
|
||||
flv = append(flv, b[:4])
|
||||
gotFlvTag()
|
||||
}
|
||||
|
@@ -6,6 +6,7 @@ import (
|
||||
"net"
|
||||
|
||||
"m7s.live/m7s/v5"
|
||||
"m7s.live/m7s/v5/pkg/util"
|
||||
. "m7s.live/m7s/v5/plugin/rtmp/pkg"
|
||||
)
|
||||
|
||||
@@ -145,6 +146,9 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
|
||||
delete(receivers, cmd.StreamId)
|
||||
err = receiver.Response(cmd.TransactionId, NetStream_Publish_BadName, Level_Error)
|
||||
} else {
|
||||
if nc.ByteChunkPool.Size != 1<<20 {
|
||||
nc.ByteChunkPool = util.NewMemoryAllocator(1 << 20)
|
||||
}
|
||||
receivers[cmd.StreamId] = receiver
|
||||
err = receiver.BeginPublish(cmd.TransactionId)
|
||||
}
|
||||
@@ -175,12 +179,16 @@ func (p *RTMPPlugin) OnTCPConnect(conn *net.TCPConn) {
|
||||
case RTMP_MSG_AUDIO:
|
||||
if r, ok := receivers[msg.MessageStreamID]; ok {
|
||||
r.WriteAudio(&RTMPAudio{msg.AVData})
|
||||
msg.AVData = RTMPData{}
|
||||
msg.AVData.MemoryAllocator = nc.ByteChunkPool
|
||||
} else {
|
||||
logger.Warn("ReceiveAudio", "MessageStreamID", msg.MessageStreamID)
|
||||
}
|
||||
case RTMP_MSG_VIDEO:
|
||||
if r, ok := receivers[msg.MessageStreamID]; ok {
|
||||
r.WriteVideo(&RTMPVideo{msg.AVData})
|
||||
msg.AVData = RTMPData{}
|
||||
msg.AVData.MemoryAllocator = nc.ByteChunkPool
|
||||
} else {
|
||||
logger.Warn("ReceiveVideo", "MessageStreamID", msg.MessageStreamID)
|
||||
}
|
||||
|
@@ -45,8 +45,7 @@ func (avcc *RTMPAudio) DecodeConfig(track *AVTrack) error {
|
||||
ctx.FrameLengthFlag = (b1 >> 2) & 0x01
|
||||
ctx.DependsOnCoreCoder = (b1 >> 1) & 0x01
|
||||
ctx.ExtensionFlag = b1 & 0x01
|
||||
ctx.SequenceFrame = &RTMPAudio{}
|
||||
ctx.SequenceFrame.ReadFromBytes(avcc.ToBytes())
|
||||
ctx.SequenceFrame = avcc
|
||||
track.ICodecCtx = &ctx
|
||||
}
|
||||
}
|
||||
|
@@ -25,6 +25,10 @@ type RTMPData struct {
|
||||
util.RecyclableMemory
|
||||
}
|
||||
|
||||
func (avcc *RTMPData) GetSize() int {
|
||||
return avcc.Length
|
||||
}
|
||||
|
||||
func (avcc *RTMPData) Print() string {
|
||||
return fmt.Sprintf("% 02X", avcc.Buffers.Buffers[0][:5])
|
||||
}
|
||||
|
@@ -66,14 +66,18 @@ var (
|
||||
|
||||
// C2 S2 : 参考C1 S1
|
||||
|
||||
func ReadBuf(r io.Reader, length int) (buf []byte) {
|
||||
buf = make([]byte, length)
|
||||
io.ReadFull(r, buf)
|
||||
func (nc *NetConnection) ReadBuf(length int) (buf []byte, err error) {
|
||||
buf = nc.ByteChunkPool.Make(length)
|
||||
_, err = io.ReadFull(nc.Reader, buf)
|
||||
return
|
||||
}
|
||||
|
||||
func (nc *NetConnection) Handshake() error {
|
||||
C0C1 := ReadBuf(nc.Reader, C1S1_SIZE+1)
|
||||
C0C1, err := nc.ReadBuf(C1S1_SIZE + 1)
|
||||
defer nc.ByteChunkPool.Free(C0C1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if C0C1[0] != RTMP_HANDSHAKE_VERSION {
|
||||
return errors.New("C0 Error")
|
||||
}
|
||||
@@ -92,7 +96,8 @@ func (nc *NetConnection) Handshake() error {
|
||||
}
|
||||
|
||||
func (client *NetConnection) ClientHandshake() (err error) {
|
||||
C0C1 := make([]byte, C1S1_SIZE+1)
|
||||
C0C1 := client.ByteChunkPool.Make(C1S1_SIZE + 1)
|
||||
defer client.ByteChunkPool.Free(C0C1)
|
||||
C0C1[0] = RTMP_HANDSHAKE_VERSION
|
||||
if _, err = client.Write(C0C1); err == nil {
|
||||
// read S0 S1
|
||||
@@ -109,13 +114,19 @@ func (client *NetConnection) ClientHandshake() (err error) {
|
||||
}
|
||||
|
||||
func (nc *NetConnection) simple_handshake(C1 []byte) error {
|
||||
S0S1 := make([]byte, C1S1_SIZE+1)
|
||||
S0S1 := nc.ByteChunkPool.Make(C1S1_SIZE + 1)
|
||||
S0S1[0] = RTMP_HANDSHAKE_VERSION
|
||||
util.PutBE(S0S1[1:5], time.Now().Unix()&0xFFFFFFFF)
|
||||
copy(S0S1[5:], "Monibuca")
|
||||
nc.Write(S0S1)
|
||||
nc.Write(C1) // S2
|
||||
if C2 := ReadBuf(nc.Reader, C1S1_SIZE); bytes.Compare(C2[8:], S0S1[9:]) != 0 {
|
||||
nc.ByteChunkPool.Free(S0S1)
|
||||
C2, err := nc.ReadBuf(C1S1_SIZE)
|
||||
defer nc.ByteChunkPool.Free(C2)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if bytes.Compare(C2[8:], S0S1[9:]) != 0 {
|
||||
return errors.New("C2 Error")
|
||||
}
|
||||
return nil
|
||||
@@ -170,8 +181,8 @@ func (nc *NetConnection) complex_handshake(C1 []byte) error {
|
||||
|
||||
buffer := net.Buffers{[]byte{RTMP_HANDSHAKE_VERSION}, S1, S2_Random, S2_Digest}
|
||||
buffer.WriteTo(nc)
|
||||
|
||||
ReadBuf(nc.Reader, 1536)
|
||||
b, _ := nc.ReadBuf(1536)
|
||||
nc.ByteChunkPool.Free(b)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -2,7 +2,6 @@ package rtmp
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
"runtime"
|
||||
|
||||
"m7s.live/m7s/v5"
|
||||
@@ -37,7 +36,7 @@ func (av *AVSender) sendFrame(frame *RTMPData) (err error) {
|
||||
// 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括))
|
||||
// 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7))
|
||||
// 当Chunk Type为0时(即Chunk12),
|
||||
if av.lastAbs > 0 {
|
||||
if av.lastAbs == 0 {
|
||||
av.SetTimestamp(frame.Timestamp)
|
||||
av.WriteTo(RTMP_CHUNK_HEAD_12, &av.chunkHeader)
|
||||
} else {
|
||||
@@ -50,18 +49,21 @@ func (av *AVSender) sendFrame(frame *RTMPData) (err error) {
|
||||
// return errors.New("sequence is not equal")
|
||||
// }
|
||||
r := frame.Buffers
|
||||
chunk := net.Buffers{av.chunkHeader}
|
||||
av.writeSeqNum += uint32(av.chunkHeader.Len() + r.WriteNTo(av.WriteChunkSize, &chunk))
|
||||
for r.Length > 0 {
|
||||
item := util.Buffer(av.byte16Pool.GetN(16))
|
||||
defer av.byte16Pool.Put(item)
|
||||
// item := util.Buffer(make([]byte, 16))
|
||||
av.WriteTo(RTMP_CHUNK_HEAD_1, &item)
|
||||
// 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1))
|
||||
chunk = append(chunk, item)
|
||||
av.writeSeqNum += uint32(item.Len() + r.WriteNTo(av.WriteChunkSize, &chunk))
|
||||
chunkHeader := av.chunkHeader
|
||||
av.chunk = append(av.chunk, chunkHeader)
|
||||
// var buffer util.Buffer = r.ToBytes()
|
||||
av.writeSeqNum += uint32(chunkHeader.Len() + r.WriteNTo(av.WriteChunkSize, &av.chunk))
|
||||
if r.Length > 0 {
|
||||
defer av.mem.Recycle()
|
||||
for r.Length > 0 {
|
||||
chunkHeader = av.mem.Malloc(5)
|
||||
av.WriteTo(RTMP_CHUNK_HEAD_1, &chunkHeader)
|
||||
// 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1))
|
||||
av.chunk = append(av.chunk, chunkHeader)
|
||||
av.writeSeqNum += uint32(chunkHeader.Len() + r.WriteNTo(av.WriteChunkSize, &av.chunk))
|
||||
}
|
||||
}
|
||||
_, err = chunk.WriteTo(av.Conn)
|
||||
_, err = av.chunk.WriteTo(av.Conn)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -69,6 +71,7 @@ type RTMPSender struct {
|
||||
*m7s.Subscriber
|
||||
NetStream
|
||||
audio, video AVSender
|
||||
mem util.RecyclableMemory
|
||||
}
|
||||
|
||||
func (r *RTMPSender) Init() {
|
||||
@@ -80,6 +83,7 @@ func (r *RTMPSender) Init() {
|
||||
r.video.MessageTypeID = RTMP_MSG_VIDEO
|
||||
r.audio.MessageStreamID = r.StreamID
|
||||
r.video.MessageStreamID = r.StreamID
|
||||
r.mem.MemoryAllocator = r.ByteChunkPool
|
||||
}
|
||||
|
||||
// func (rtmp *RTMPSender) OnEvent(event any) {
|
||||
|
@@ -61,8 +61,8 @@ type NetConnection struct {
|
||||
AppName string
|
||||
tmpBuf util.Buffer //用来接收/发送小数据,复用内存
|
||||
chunkHeader util.Buffer
|
||||
byteChunkPool util.BytesPool
|
||||
byte16Pool util.BytesPool
|
||||
ByteChunkPool *util.MemoryAllocator
|
||||
chunk net.Buffers
|
||||
writing atomic.Bool // false 可写,true 不可写
|
||||
}
|
||||
|
||||
@@ -76,6 +76,7 @@ func NewNetConnection(conn net.Conn) *NetConnection {
|
||||
bandwidth: RTMP_MAX_CHUNK_SIZE << 3,
|
||||
tmpBuf: make(util.Buffer, 4),
|
||||
chunkHeader: make(util.Buffer, 0, 16),
|
||||
ByteChunkPool: util.NewMemoryAllocator(2048),
|
||||
}
|
||||
}
|
||||
func (conn *NetConnection) ReadFull(buf []byte) (n int, err error) {
|
||||
@@ -138,7 +139,7 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) {
|
||||
if !ok {
|
||||
chunk = &Chunk{}
|
||||
conn.incommingChunks[ChunkStreamID] = chunk
|
||||
chunk.AVData.IPool = &conn.byteChunkPool
|
||||
chunk.AVData.MemoryAllocator = conn.ByteChunkPool
|
||||
}
|
||||
|
||||
if err = conn.readChunkType(&chunk.ChunkHeader, ChunkType); err != nil {
|
||||
@@ -146,19 +147,16 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) {
|
||||
}
|
||||
msgLen := int(chunk.MessageLength)
|
||||
|
||||
needRead := conn.readChunkSize
|
||||
if unRead := msgLen - chunk.AVData.Length; unRead < needRead {
|
||||
needRead = unRead
|
||||
mem := chunk.AVData.Malloc(conn.readChunkSize)
|
||||
if unRead := msgLen - chunk.AVData.Length; unRead < conn.readChunkSize {
|
||||
mem = mem[:unRead]
|
||||
}
|
||||
// mem := make([]byte, needRead)
|
||||
mem := conn.byteChunkPool.GetN(needRead)
|
||||
if n, err := conn.ReadFull(mem); err != nil {
|
||||
conn.byteChunkPool.Put(mem)
|
||||
chunk.AVData.Recycle()
|
||||
return nil, err
|
||||
} else {
|
||||
conn.readSeqNum += uint32(n)
|
||||
}
|
||||
chunk.AVData.Data = append(chunk.AVData.Data, mem)
|
||||
if chunk.AVData.ReadFromBytes(mem); chunk.AVData.Length == msgLen {
|
||||
chunk.ChunkHeader.ExtendTimestamp += chunk.ChunkHeader.Timestamp
|
||||
msg = chunk
|
||||
@@ -169,9 +167,6 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) {
|
||||
err = GetRtmpMessage(msg, msg.AVData.ToBytes())
|
||||
msg.AVData.Recycle()
|
||||
}
|
||||
conn.incommingChunks[ChunkStreamID] = &Chunk{
|
||||
ChunkHeader: chunk.ChunkHeader,
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
@@ -36,8 +36,7 @@ func (avcc *RTMPVideo) DecodeConfig(track *AVTrack) error {
|
||||
ctx.NalulenSize = int(info.LengthSizeMinusOne&3 + 1)
|
||||
ctx.SPS = info.SequenceParameterSetNALUnit
|
||||
ctx.PPS = info.PictureParameterSetNALUnit
|
||||
ctx.SequenceFrame = &RTMPVideo{}
|
||||
ctx.SequenceFrame.ReadFromBytes(avcc.ToBytes())
|
||||
ctx.SequenceFrame = avcc
|
||||
track.ICodecCtx = &ctx
|
||||
}
|
||||
case "h265":
|
||||
|
@@ -90,8 +90,9 @@ func (p *Publisher) AddSubscriber(subscriber *Subscriber) (err error) {
|
||||
}
|
||||
|
||||
func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) {
|
||||
t.Value.Wrap = data
|
||||
t.Value.Timestamp = data.GetTimestamp()
|
||||
frame := &t.Value
|
||||
frame.Wrap = data
|
||||
frame.Timestamp = data.GetTimestamp()
|
||||
t.Step()
|
||||
if t.Value.Wrap != nil {
|
||||
t.Value.Wrap.Recycle()
|
||||
|
23
puller.go
Normal file
23
puller.go
Normal file
@@ -0,0 +1,23 @@
|
||||
package m7s
|
||||
|
||||
import "m7s.live/m7s/v5/pkg/config"
|
||||
|
||||
type PullHandler interface {
|
||||
Connect() error
|
||||
OnConnected()
|
||||
Disconnect()
|
||||
Pull() error
|
||||
Reconnect() bool
|
||||
}
|
||||
|
||||
type Puller struct {
|
||||
Publisher
|
||||
PullHandler
|
||||
config.Pull
|
||||
RemoteURL string // 远程服务器地址(用于推拉)
|
||||
ReConnectCount int //重连次数
|
||||
}
|
||||
|
||||
func (p *Puller) Start() error {
|
||||
return nil
|
||||
}
|
55
server.go
55
server.go
@@ -2,6 +2,7 @@ package m7s
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
@@ -19,12 +20,17 @@ import (
|
||||
"m7s.live/m7s/v5/pkg/util"
|
||||
)
|
||||
|
||||
var Version = "v5.0.0"
|
||||
var (
|
||||
MergeConfigs = []string{"Publish", "Subscribe", "HTTP"}
|
||||
ExecPath = os.Args[0]
|
||||
ExecDir = filepath.Dir(ExecPath)
|
||||
serverIndexG atomic.Uint32
|
||||
Version = "v5.0.0"
|
||||
MergeConfigs = []string{"Publish", "Subscribe", "HTTP"}
|
||||
ExecPath = os.Args[0]
|
||||
ExecDir = filepath.Dir(ExecPath)
|
||||
serverIndexG atomic.Uint32
|
||||
DefaultServer = NewServer()
|
||||
serverMeta = PluginMeta{
|
||||
Name: "Global",
|
||||
Version: Version,
|
||||
}
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
@@ -33,21 +39,26 @@ type Server struct {
|
||||
eventChan chan any
|
||||
Plugins []*Plugin
|
||||
Streams map[string]*Publisher
|
||||
Pulls map[string]*Puller
|
||||
Waiting map[string][]*Subscriber
|
||||
Publishers []*Publisher
|
||||
Subscribers []*Subscriber
|
||||
Pullers []*Puller
|
||||
pidG int
|
||||
sidG int
|
||||
apiList []string
|
||||
}
|
||||
|
||||
var DefaultServer = NewServer()
|
||||
|
||||
func NewServer() *Server {
|
||||
return &Server{
|
||||
func NewServer() (s *Server) {
|
||||
s = &Server{
|
||||
Streams: make(map[string]*Publisher),
|
||||
Waiting: make(map[string][]*Subscriber),
|
||||
eventChan: make(chan any, 10),
|
||||
}
|
||||
s.handler = s
|
||||
s.server = s
|
||||
s.Meta = &serverMeta
|
||||
return
|
||||
}
|
||||
|
||||
func Run(ctx context.Context, conf any) error {
|
||||
@@ -59,7 +70,6 @@ func (s *Server) Run(ctx context.Context, conf any) (err error) {
|
||||
s.Context, s.CancelCauseFunc = context.WithCancelCause(ctx)
|
||||
s.config.HTTP.ListenAddrTLS = ":8443"
|
||||
s.config.HTTP.ListenAddr = ":8080"
|
||||
s.handler = s
|
||||
s.Info("start")
|
||||
|
||||
var cg map[string]map[string]any
|
||||
@@ -170,6 +180,20 @@ func (s *Server) eventLoop() {
|
||||
continue
|
||||
}
|
||||
event = v.Value
|
||||
case *util.Promise[*Puller]:
|
||||
err := s.OnPublish(&v.Value.Publisher)
|
||||
if err != nil {
|
||||
v.Fulfill(err)
|
||||
} else {
|
||||
if _, ok := s.Pulls[v.Value.StreamPath]; ok {
|
||||
v.Fulfill(ErrStreamExist)
|
||||
} else {
|
||||
s.Pulls[v.Value.StreamPath] = v.Value
|
||||
s.Pullers = append(s.Pullers, v.Value)
|
||||
v.Fulfill(nil)
|
||||
event = v.Value
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, plugin := range s.Plugins {
|
||||
if plugin.Disabled {
|
||||
@@ -258,4 +282,15 @@ func (s *Server) OnSubscribe(subscriber *Subscriber) error {
|
||||
}
|
||||
|
||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path == "/favicon.ico" {
|
||||
http.ServeFile(w, r, "favicon.ico")
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(w, "Monibuca Engine %s StartTime:%s\n", Version, s.StartTime)
|
||||
for _, plugin := range s.Plugins {
|
||||
fmt.Fprintf(w, "Plugin %s Version:%s\n", plugin.Meta.Name, plugin.Meta.Version)
|
||||
}
|
||||
for _, api := range s.apiList {
|
||||
fmt.Fprintf(w, "%s\n", api)
|
||||
}
|
||||
}
|
||||
|
@@ -119,7 +119,7 @@ func (s *Subscriber) Handle(audioHandler, videoHandler any) {
|
||||
}
|
||||
sendVideoFrame := func() {
|
||||
lastSentVF = videoFrame
|
||||
s.Debug("send video frame", "seq", videoFrame.Sequence, "data", videoFrame.Wrap.Print())
|
||||
s.Debug("send video frame", "seq", videoFrame.Sequence, "data", videoFrame.Wrap.Print(), "size", videoFrame.Wrap.GetSize())
|
||||
res := vh.Call([]reflect.Value{reflect.ValueOf(videoFrame.Wrap)})
|
||||
if len(res) > 0 && !res[0].IsNil() {
|
||||
s.Stop(res[0].Interface().(error))
|
||||
|
Reference in New Issue
Block a user