feat: add webrtc

This commit is contained in:
langhuihui
2024-04-18 09:01:09 +08:00
parent fc0d4de942
commit f4eab4cf51
24 changed files with 1145 additions and 25 deletions

View File

@@ -6,6 +6,7 @@ import (
"m7s.live/m7s/v5"
_ "m7s.live/m7s/v5/plugin/debug"
_ "m7s.live/m7s/v5/plugin/hdl"
_ "m7s.live/m7s/v5/plugin/webrtc"
_ "m7s.live/m7s/v5/plugin/rtmp"
)

19
go.mod
View File

@@ -7,6 +7,7 @@ toolchain go1.22.1
require (
github.com/cnotch/ipchub v1.1.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/pion/interceptor v0.1.29
github.com/q191201771/naza v0.30.48
github.com/quic-go/quic-go v0.42.0
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de
@@ -16,7 +17,23 @@ require (
require (
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/pion/datachannel v1.5.6 // indirect
github.com/pion/dtls/v2 v2.2.10 // indirect
github.com/pion/ice/v3 v3.0.6 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/mdns/v2 v2.0.7 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.14 // indirect
github.com/pion/rtp v1.8.5 // indirect
github.com/pion/sctp v1.8.16 // indirect
github.com/pion/sdp/v3 v3.0.9 // indirect
github.com/pion/srtp/v3 v3.0.1 // indirect
github.com/pion/stun/v2 v2.0.0 // indirect
github.com/pion/transport/v2 v2.2.4 // indirect
github.com/pion/transport/v3 v3.0.2 // indirect
github.com/pion/turn/v3 v3.0.2 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
@@ -35,6 +52,8 @@ require (
github.com/gorilla/websocket v1.5.1
github.com/mcuadros/go-defaults v1.2.0
github.com/onsi/ginkgo/v2 v2.9.5 // indirect
github.com/phsym/console-slog v0.3.1
github.com/pion/webrtc/v4 v4.0.0-beta.17
github.com/shirou/gopsutil/v3 v3.24.3
go.uber.org/mock v0.4.0 // indirect
golang.org/x/crypto v0.22.0 // indirect

95
go.sum
View File

@@ -32,6 +32,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
@@ -42,6 +44,7 @@ github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:
github.com/kelindar/process v0.0.0-20170730150328-69a29e249ec3/go.mod h1:+lTCLnZFXOkqwD8sLPl6u4erAc0cP8wFegQHfipz7KE=
github.com/kelindar/rate v1.0.0/go.mod h1:AjT4G+hTItNwt30lucEGZIz8y7Uk5zPho6vurIZ+1Es=
github.com/kelindar/tcp v1.0.0/go.mod h1:JB5hj1cshLU60XrLij2BBxW3JQ4hOye8vqbyvuKb52k=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
@@ -59,8 +62,49 @@ github.com/onsi/ginkgo/v2 v2.9.5/go.mod h1:tvAoo1QUJwNEU2ITftXTpR7R1RbCzoZUOs3Ro
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE=
github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg=
github.com/phsym/console-slog v0.3.1 h1:Fuzcrjr40xTc004S9Kni8XfNsk+qrptQmyR+wZw9/7A=
github.com/phsym/console-slog v0.3.1/go.mod h1:oJskjp/X6e6c0mGpfP8ELkfKUsrkDifYRAqJQgmdDS0=
github.com/pion/datachannel v1.5.6 h1:1IxKJntfSlYkpUj8LlYRSWpYiTTC02nUrOE8T3DqGeg=
github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNIVb/NfGW4=
github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s=
github.com/pion/dtls/v2 v2.2.10 h1:u2Axk+FyIR1VFTPurktB+1zoEPGIW3bmyj3LEFrXjAA=
github.com/pion/dtls/v2 v2.2.10/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/ice/v3 v3.0.6 h1:UC5vZCMhmve7yv+Y6E5eTnRTl+t9LLtmeBYQ9038Zm8=
github.com/pion/ice/v3 v3.0.6/go.mod h1:4eMTUKQEjC1fGQGB6qUzy2ux9Pc1v9EsO3hNaii+kXI=
github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M=
github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns/v2 v2.0.7 h1:c9kM8ewCgjslaAmicYMFQIde2H9/lrZpjBkN8VwoVtM=
github.com/pion/mdns/v2 v2.0.7/go.mod h1:vAdSYNAT0Jy3Ru0zl2YiW3Rm/fJCwIeM0nToenfOJKA=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.12/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4=
github.com/pion/rtcp v1.2.14 h1:KCkGV3vJ+4DAJmvP0vaQShsb0xkRfWkO540Gy102KyE=
github.com/pion/rtcp v1.2.14/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4=
github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/rtp v1.8.5 h1:uYzINfaK+9yWs7r537z/Rc1SvT8ILjBcmDOpJcTB+OU=
github.com/pion/rtp v1.8.5/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU=
github.com/pion/sctp v1.8.13/go.mod h1:YKSgO/bO/6aOMP9LCie1DuD7m+GamiK2yIiPM6vH+GA=
github.com/pion/sctp v1.8.16 h1:PKrMs+o9EMLRvFfXq59WFsC+V8mN1wnKzqrv+3D/gYY=
github.com/pion/sctp v1.8.16/go.mod h1:P6PbDVA++OJMrVNg2AL3XtYHV4uD6dvfyOovCgMs0PE=
github.com/pion/sdp/v3 v3.0.9 h1:pX++dCHoHUwq43kuwf3PyJfHlwIj4hXA7Vrifiq0IJY=
github.com/pion/sdp/v3 v3.0.9/go.mod h1:B5xmvENq5IXJimIO4zfp6LAe1fD9N+kFv+V/1lOdz8M=
github.com/pion/srtp/v3 v3.0.1 h1:AkIQRIZ+3tAOJMQ7G301xtrD1vekQbNeRO7eY1K8ZHk=
github.com/pion/srtp/v3 v3.0.1/go.mod h1:3R3a1qIOIxBkVTLGFjafKK6/fJoTdQDhcC67HOyMbJ8=
github.com/pion/stun/v2 v2.0.0 h1:A5+wXKLAypxQri59+tmQKVs7+l6mMM+3d+eER9ifRU0=
github.com/pion/stun/v2 v2.0.0/go.mod h1:22qRSh08fSEttYUmJZGlriq9+03jtVmXNODgLccj8GQ=
github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g=
github.com/pion/transport/v2 v2.2.4 h1:41JJK6DZQYSeVLxILA2+F4ZkKb4Xd/tFJZRFZQ9QAlo=
github.com/pion/transport/v2 v2.2.4/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0=
github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0=
github.com/pion/transport/v3 v3.0.2 h1:r+40RJR25S9w3jbA6/5uEPTzcdn7ncyU44RWCbHkLg4=
github.com/pion/transport/v3 v3.0.2/go.mod h1:nIToODoOlb5If2jF9y2Igfx3PFYWfuXi37m0IlWa/D0=
github.com/pion/turn/v3 v3.0.2 h1:iBonAIIKRwkVUJBFiFd/kSjytP7FlX0HwCyBDJPRDdU=
github.com/pion/turn/v3 v3.0.2/go.mod h1:vw0Dz420q7VYAF3J4wJKzReLHIo2LGp4ev8nXQexYsc=
github.com/pion/webrtc/v4 v4.0.0-beta.17 h1:KdAbozM+lQ3Dz1NJ0JATRDQ4W02WUhWwIkvjyBRODL0=
github.com/pion/webrtc/v4 v4.0.0-beta.17/go.mod h1:I/Z0MFtc6Ok7mN7kZmA1xqU7KA9ycZZx/6eXz5+yD+4=
github.com/pixelbender/go-sdp v1.1.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
@@ -87,6 +131,7 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
@@ -94,42 +139,91 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA
github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI=
github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk=
github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE=
golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw=
golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db h1:D/cFflL63o2KSLJIwjlcIt8PR064j/xsmdEJL/YvY/o=
golang.org/x/exp v0.0.0-20221205204356-47842c84f3db/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU=
golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.9.1 h1:8WMNJAz3zrtPmnYC7ISf5dEn3MT0gY7jBJfw27yrrLo=
golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY=
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo=
@@ -142,6 +236,7 @@ google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDom
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=

View File

@@ -35,7 +35,7 @@ type (
GetSize() int
Recycle()
IsIDR() bool
Print() string
String() string
}
Nalu = [][]byte

View File

@@ -66,9 +66,11 @@ func (config *HTTP) Handle(path string, f http.Handler) {
if strings.HasSuffix(path, "/") {
path += "{streamPath=**}"
}
mux.HandlePath("GET", path, func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) {
handler := func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) {
f.ServeHTTP(w, r)
})
}
mux.HandlePath("GET", path, handler)
mux.HandlePath("POST", path, handler)
}
}

View File

@@ -14,4 +14,5 @@ var (
ErrSubscribeTimeout = errors.New("subscribe timeout")
ErrRestart = errors.New("restart")
ErrInterrupt = errors.New("interrupt")
ErrUnsupportCodec = errors.New("unsupport codec")
)

83
pkg/port.go Normal file
View File

@@ -0,0 +1,83 @@
package pkg
import (
"strconv"
"strings"
)
type (
TCPPort int
UDPPort int
TCPRangePort [2]int
UDPRangePort [2]int
Port struct {
Protocol string
Ports [2]int
}
IPort interface {
IsTCP() bool
IsUDP() bool
IsRange() bool
}
)
func (p Port) String() string {
if p.Ports[0] == p.Ports[1] {
return p.Protocol + ":" + strconv.Itoa(p.Ports[0])
}
return p.Protocol + ":" + strconv.Itoa(p.Ports[0]) + "-" + strconv.Itoa(p.Ports[1])
}
func (p Port) IsTCP() bool {
return p.Protocol == "tcp"
}
func (p Port) IsUDP() bool {
return p.Protocol == "udp"
}
func (p Port) IsRange() bool {
return p.Ports[0] != p.Ports[1]
}
func ParsePort2(conf string) (ret any, err error) {
var port Port
port, err = ParsePort(conf)
if err != nil {
return
}
if port.IsTCP() {
if port.IsRange() {
return TCPRangePort(port.Ports), nil
}
return TCPPort(port.Ports[0]), nil
}
if port.IsRange() {
return UDPRangePort(port.Ports), nil
}
return UDPPort(port.Ports[0]), nil
}
func ParsePort(conf string) (ret Port, err error) {
var port string
var min, max int
ret.Protocol, port, _ = strings.Cut(conf, ":")
if r := strings.Split(port, "-"); len(r) == 2 {
min, err = strconv.Atoi(r[0])
if err != nil {
return
}
max, err = strconv.Atoi(r[1])
if err != nil {
return
}
if min < max {
ret.Ports[0], ret.Ports[1] = min, max
} else {
ret.Ports[0], ret.Ports[1] = max, min
}
} else if p, err := strconv.Atoi(port); err == nil {
ret.Ports[0], ret.Ports[1] = p, p
}
return
}

View File

@@ -6,6 +6,8 @@ import (
"time"
)
const TraceLevel = slog.Level(-8)
type Unit struct {
StartTime time.Time
*slog.Logger `json:"-" yaml:"-"`
@@ -13,6 +15,10 @@ type Unit struct {
context.CancelCauseFunc `json:"-" yaml:"-"`
}
func (unit *Unit) Trace(msg string, fields ...any) {
unit.Log(unit.Context, TraceLevel, msg, fields...)
}
func (unit *Unit) IsStopped() bool {
select {
case <-unit.Done():

View File

@@ -62,7 +62,7 @@ func (ma *MemoryAllocator) Malloc(size int) (memory []byte) {
}
func (ma *MemoryAllocator) Free2(start, end int) bool {
if start < 0 || end > ma.Size {
if start < 0 || end > ma.Size || start >= end {
return false
}
for e := ma.blocks.Front(); e != nil; e = e.Next() {
@@ -148,3 +148,11 @@ func (r *RecyclableMemory) Recycle() {
}
r.mem = r.mem[:0]
}
func (r *RecyclableMemory) RecycleBack(n int) {
l := len(r.mem)
end := &r.mem[l-1]
start := *end - n
r.Free2(r.mem[l-3], start, *end)
*end = start
}

View File

@@ -26,7 +26,10 @@ type PluginMeta struct {
}
func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) {
instance := reflect.New(plugin.Type).Interface().(IPlugin)
instance, ok := reflect.New(plugin.Type).Interface().(IPlugin)
if !ok {
panic("plugin must implement IPlugin")
}
defaults.SetDefaults(instance)
p := reflect.ValueOf(instance).Elem().FieldByName("Plugin").Addr().Interface().(*Plugin)
p.handler = instance
@@ -38,7 +41,6 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) {
p.Warn("disabled by env")
return
}
s.Plugins = append(s.Plugins, p)
p.Config.Parse(p.GetCommonConf())
p.Config.Parse(instance, strings.ToUpper(plugin.Name))
for _, fname := range MergeConfigs {
@@ -70,7 +72,13 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) {
p.assign()
}
p.Info("init", "version", plugin.Version)
instance.OnInit()
err := instance.OnInit()
if err != nil {
p.Error("init", "error", err)
p.Stop(err)
return
}
s.Plugins = append(s.Plugins, p)
p.Start()
}
@@ -79,7 +87,7 @@ type iPlugin interface {
}
type IPlugin interface {
OnInit()
OnInit() error
OnEvent(any)
}
@@ -200,8 +208,8 @@ func (p *Plugin) Start() {
}
}
func (p *Plugin) OnInit() {
func (p *Plugin) OnInit() error {
return nil
}
func (p *Plugin) onEvent(event any) {

View File

@@ -17,10 +17,11 @@ type HDLPlugin struct {
m7s.Plugin
}
func (p *HDLPlugin) OnInit() {
func (p *HDLPlugin) OnInit() error {
for streamPath, url := range p.GetCommonConf().PullOnStart {
go p.Pull(streamPath, url, NewHDLPuller())
}
return nil
}
var _ = m7s.InstallPlugin[HDLPlugin]()

View File

@@ -18,10 +18,11 @@ type RTMPPlugin struct {
var _ = m7s.InstallPlugin[RTMPPlugin](m7s.DefaultYaml(`tcp:
listenaddr: :1935`))
func (p *RTMPPlugin) OnInit() {
func (p *RTMPPlugin) OnInit() error {
for streamPath, url := range p.GetCommonConf().PullOnStart {
go p.Pull(streamPath, url, &Client{})
}
return nil
}
func (p *RTMPPlugin) OnPull(puller *m7s.Puller) {

View File

@@ -27,10 +27,10 @@ func (avcc *RTMPData) GetSize() int {
}
func (avcc *RTMPData) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`{"Timestamp":%d,"Size":%d,"Data":"%s"}`, avcc.Timestamp, avcc.Length, avcc.Print())), nil
return []byte(fmt.Sprintf(`{"Timestamp":%d,"Size":%d,"Data":"%s"}`, avcc.Timestamp, avcc.Length, avcc.String())), nil
}
func (avcc *RTMPData) Print() string {
func (avcc *RTMPData) String() string {
return fmt.Sprintf("% 02X", avcc.Buffers.Buffers[0][:5])
}

2
plugin/rtp/index.go Normal file
View File

@@ -0,0 +1,2 @@
package plugin_rtp

112
plugin/rtp/pkg/audio.go Normal file
View File

@@ -0,0 +1,112 @@
package rtp
import (
"time"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
. "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/util"
)
type RTPData struct {
*webrtc.RTPCodecParameters
rtp.Packet
util.RecyclableMemory
}
func (r *RTPData) GetTimestamp() time.Duration {
return time.Duration(r.Timestamp) * time.Second / time.Duration(r.ClockRate)
}
func (r *RTPData) GetSize() int {
return r.Packet.MarshalSize()
}
func (r *RTPData) IsIDR() bool {
return false
}
type (
RTPCtx struct {
webrtc.RTPCodecParameters
}
RTPG711Ctx struct {
RTPCtx
}
RTPOPUSCtx struct {
RTPCtx
}
RTPAACCtx struct {
RTPCtx
}
)
func (r *RTPCtx) GetRTPCodecCapability() webrtc.RTPCodecCapability {
return r.RTPCodecCapability
}
func (r *RTPCtx) GetSequenceFrame() IAVFrame {
return nil
}
func (r *RTPData) DecodeConfig(track *AVTrack) error {
switch r.MimeType {
case webrtc.MimeTypeOpus:
track.Codec = codec.FourCC_OPUS
var ctx RTPOPUSCtx
ctx.RTPCodecParameters = *r.RTPCodecParameters
track.ICodecCtx = &ctx
case webrtc.MimeTypePCMA:
track.Codec = codec.FourCC_ALAW
var ctx RTPG711Ctx
ctx.RTPCodecParameters = *r.RTPCodecParameters
track.ICodecCtx = &ctx
case webrtc.MimeTypePCMU:
track.Codec = codec.FourCC_ULAW
var ctx RTPG711Ctx
ctx.RTPCodecParameters = *r.RTPCodecParameters
track.ICodecCtx = &ctx
case webrtc.MimeTypeH264:
track.Codec = codec.FourCC_H264
var ctx RTPH264Ctx
ctx.RTPCodecParameters = *r.RTPCodecParameters
track.ICodecCtx = &ctx
case webrtc.MimeTypeVP9:
track.Codec = codec.FourCC_VP9
var ctx RTPVP9Ctx
ctx.RTPCodecParameters = *r.RTPCodecParameters
track.ICodecCtx = &ctx
case webrtc.MimeTypeAV1:
track.Codec = codec.FourCC_AV1
var ctx RTPAV1Ctx
ctx.RTPCodecParameters = *r.RTPCodecParameters
track.ICodecCtx = &ctx
case webrtc.MimeTypeH265:
track.Codec = codec.FourCC_H265
var ctx RTPH265Ctx
ctx.RTPCodecParameters = *r.RTPCodecParameters
track.ICodecCtx = &ctx
case "audio/MPEG4-GENERIC", "audio/AAC":
track.Codec = codec.FourCC_MP4A
var ctx RTPAACCtx
ctx.RTPCodecParameters = *r.RTPCodecParameters
track.ICodecCtx = &ctx
default:
return ErrUnsupportCodec
}
return nil
}
type RTPAudio struct {
RTPData
}
func (r *RTPAudio) FromRaw(*AVTrack, any) error {
panic("unimplemented")
}
func (r *RTPAudio) ToRaw(*AVTrack) (any, error) {
return r.Payload, nil
}

33
plugin/rtp/pkg/video.go Normal file
View File

@@ -0,0 +1,33 @@
package rtp
import . "m7s.live/m7s/v5/pkg"
type (
RTPH264Ctx struct {
RTPCtx
SPS [][]byte
PPS [][]byte
}
RTPH265Ctx struct {
RTPH264Ctx
VPS [][]byte
}
RTPAV1Ctx struct {
RTPCtx
}
RTPVP9Ctx struct {
RTPCtx
}
)
type RTPVideo struct {
RTPData
}
func (r *RTPVideo) FromRaw(*AVTrack, any) error {
panic("unimplemented")
}
func (r *RTPVideo) ToRaw(*AVTrack) (any, error) {
return r.Payload, nil
}

398
plugin/webrtc/index.go Normal file
View File

@@ -0,0 +1,398 @@
package plugin_webrtc
import (
_ "embed"
"errors"
"fmt"
"io"
"net"
"net/http"
"regexp"
"strings"
"time"
"github.com/pion/interceptor"
"github.com/pion/rtcp"
. "github.com/pion/webrtc/v4"
"m7s.live/m7s/v5"
. "m7s.live/m7s/v5/pkg"
"m7s.live/m7s/v5/pkg/codec"
"m7s.live/m7s/v5/pkg/util"
rtp "m7s.live/m7s/v5/plugin/rtp/pkg"
. "m7s.live/m7s/v5/plugin/webrtc/pkg"
)
var (
//go:embed publish.html
publishHTML []byte
//go:embed subscribe.html
subscribeHTML []byte
reg_level = regexp.MustCompile("profile-level-id=(4.+f)")
_ = m7s.InstallPlugin[WebRTCPlugin]()
)
type WebRTCPlugin struct {
m7s.Plugin
ICEServers []ICEServer `desc:"ice服务器配置"`
PublicIP string `desc:"公网IP"`
PublicIPv6 string `desc:"公网IPv6"`
Port string `default:"tcp:9000" desc:"监听端口"`
PLI time.Duration `default:"2s" desc:"发送PLI请求间隔"` // 视频流丢包后发送PLI请求
EnableOpus bool `default:"true" desc:"是否启用opus编码"` // 是否启用opus编码
EnableVP9 bool `default:"false" desc:"是否启用vp9编码"` // 是否启用vp9编码
EnableAv1 bool `default:"true" desc:"是否启用av1编码"` // 是否启用av1编码
EnableDC bool `default:"true" desc:"是否启用DataChannel"` // 在不支持编码格式的情况下是否启用DataChannel传输
m MediaEngine
s SettingEngine
api *API
}
func (p *WebRTCPlugin) OnInit() (err error) {
if len(p.ICEServers) > 0 {
for i := range p.ICEServers {
b, _ := p.ICEServers[i].MarshalJSON()
p.ICEServers[i].UnmarshalJSON(b)
}
}
RegisterCodecs(&p.m)
if p.EnableOpus {
p.m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{MimeTypeOpus, 48000, 2, "minptime=10;useinbandfec=1", nil},
PayloadType: 111,
}, RTPCodecTypeAudio)
}
if p.EnableVP9 {
p.m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{MimeTypeVP9, 90000, 0, "", nil},
PayloadType: 100,
}, RTPCodecTypeVideo)
}
if p.EnableAv1 {
p.m.RegisterCodec(RTPCodecParameters{
RTPCodecCapability: RTPCodecCapability{MimeTypeAV1, 90000, 0, "profile=2;level-idx=8;tier=1", nil},
PayloadType: 45,
}, RTPCodecTypeVideo)
}
i := &interceptor.Registry{}
if p.PublicIP != "" {
ips := []string{p.PublicIP}
if p.PublicIPv6 != "" {
ips = append(ips, p.PublicIPv6)
}
p.s.SetNAT1To1IPs(ips, ICECandidateTypeHost)
}
ports, err := ParsePort2(p.Port)
if err != nil {
p.Error("webrtc port config error", "error", err, "port", p.Port)
return err
}
switch v := ports.(type) {
case TCPPort:
tcpport := int(v)
tcpl, err := net.ListenTCP("tcp", &net.TCPAddr{
IP: net.IP{0, 0, 0, 0},
Port: tcpport,
})
if err != nil {
p.Error("webrtc listener tcp", "error", err)
}
p.Info("webrtc start listen", "port", tcpport)
p.s.SetICETCPMux(NewICETCPMux(nil, tcpl, 4096))
p.s.SetNetworkTypes([]NetworkType{NetworkTypeTCP4, NetworkTypeTCP6})
case UDPRangePort:
p.s.SetEphemeralUDPPortRange(uint16(v[0]), uint16(v[1]))
case UDPPort:
// 创建共享WEBRTC端口 默认9000
udpListener, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IP{0, 0, 0, 0},
Port: int(v),
})
if err != nil {
p.Error("webrtc listener udp", "error", err)
return err
}
p.Info("webrtc start listen", "port", v)
p.s.SetICEUDPMux(NewICEUDPMux(nil, udpListener))
p.s.SetNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6})
}
if err = RegisterDefaultInterceptors(&p.m, i); err != nil {
return err
}
p.api = NewAPI(WithMediaEngine(&p.m),
WithInterceptorRegistry(i), WithSettingEngine(p.s))
return
}
func (*WebRTCPlugin) Test_Publish(w http.ResponseWriter, r *http.Request) {
w.Write(publishHTML)
}
func (*WebRTCPlugin) Test_ScreenShare(w http.ResponseWriter, r *http.Request) {
w.Write(publishHTML)
}
func (*WebRTCPlugin) Test_Subscribe(w http.ResponseWriter, r *http.Request) {
w.Write(subscribeHTML)
}
// https://datatracker.ietf.org/doc/html/draft-ietf-wish-whip
func (conf *WebRTCPlugin) Push_(w http.ResponseWriter, r *http.Request) {
streamPath := r.URL.Path[len("/push/"):]
rawQuery := r.URL.RawQuery
auth := r.Header.Get("Authorization")
if strings.HasPrefix(auth, "Bearer ") {
auth = auth[len("Bearer "):]
if rawQuery != "" {
rawQuery += "&bearer=" + auth
} else {
rawQuery = "bearer=" + auth
}
conf.Info("push", "stream", streamPath, "bearer", auth)
}
w.Header().Set("Content-Type", "application/sdp")
w.Header().Set("Location", "/webrtc/api/stop/push/"+streamPath)
if rawQuery != "" {
streamPath += "?" + rawQuery
}
bytes, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var conn Connection
conn.SDP = string(bytes)
if conn.PeerConnection, err = conf.api.NewPeerConnection(Configuration{
ICEServers: conf.ICEServers,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var publisher *m7s.Publisher
if publisher, err = conf.Publish(streamPath, conn.PeerConnection); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
conn.OnTrack(func(track *TrackRemote, receiver *RTPReceiver) {
publisher.Info("OnTrack", "kind", track.Kind().String(), "payloadType", uint8(track.Codec().PayloadType))
var n int
var err error
if codecP := track.Codec(); track.Kind() == RTPCodecTypeAudio {
mem := util.NewScalableMemoryAllocator(1460 * 100)
for {
var packet rtp.RTPAudio
packet.RTPCodecParameters = &codecP
packet.ScalableMemoryAllocator = mem
buf := packet.Malloc(1460)
if n, _, err = track.Read(buf); err == nil {
err = packet.Unmarshal(buf[:n])
packet.RecycleBack(1460 - n)
}
if err != nil {
return
}
publisher.WriteAudio(&packet)
}
} else {
var lastPLISent time.Time
mem := util.NewScalableMemoryAllocator(1460 * 100)
for {
if time.Since(lastPLISent) > conf.PLI {
if rtcpErr := conn.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}); rtcpErr != nil {
publisher.Error("writeRTCP", "error", rtcpErr)
return
}
lastPLISent = time.Now()
}
var packet rtp.RTPVideo
packet.RTPCodecParameters = &codecP
packet.ScalableMemoryAllocator = mem
buf := packet.Malloc(1460)
if n, _, err = track.Read(buf); err == nil {
err = packet.Unmarshal(buf[:n])
packet.RecycleBack(1460 - n)
}
if err != nil {
return
}
publisher.WriteVideo(&packet)
}
}
})
conn.OnICECandidate(func(ice *ICECandidate) {
if ice != nil {
publisher.Info(ice.ToJSON().Candidate)
}
})
conn.OnDataChannel(func(d *DataChannel) {
publisher.Info("OnDataChannel", "label", d.Label())
d.OnMessage(func(msg DataChannelMessage) {
conn.SDP = string(msg.Data[1:])
publisher.Debug("dc message", "sdp", conn.SDP)
if err := conn.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: conn.SDP}); err != nil {
return
}
if answer, err := conn.GetAnswer(); err == nil {
d.SendText(answer)
} else {
return
}
switch msg.Data[0] {
case '0':
publisher.Stop(errors.New("stop by remote"))
case '1':
}
})
})
conn.OnConnectionStateChange(func(state PeerConnectionState) {
publisher.Info("Connection State has changed:" + state.String())
switch state {
case PeerConnectionStateConnected:
case PeerConnectionStateDisconnected, PeerConnectionStateFailed, PeerConnectionStateClosed:
publisher.Stop(errors.New("connection state:" + state.String()))
}
})
if err := conn.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: conn.SDP}); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if answer, err := conn.GetAnswer(); err == nil {
w.WriteHeader(http.StatusCreated)
fmt.Fprint(w, answer)
} else {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
}
func (conf *WebRTCPlugin) Play_(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/sdp")
streamPath := r.URL.Path[len("/play/"):]
rawQuery := r.URL.RawQuery
var conn Connection
bytes, err := io.ReadAll(r.Body)
defer func() {
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}()
if err != nil {
return
}
conn.SDP = string(bytes)
if conn.PeerConnection, err = conf.api.NewPeerConnection(Configuration{
ICEServers: conf.ICEServers,
}); err != nil {
return
}
var suber *m7s.Subscriber
if rawQuery != "" {
streamPath += "?" + rawQuery
}
if suber, err = conf.Subscribe(streamPath, conn.PeerConnection); err != nil {
return
}
var useDC bool
var audioTLSRTP, videoTLSRTP *TrackLocalStaticRTP
var audioSender, videoSender *RTPSender
if suber.Publisher != nil {
if vt := suber.Publisher.VideoTrack; vt != nil {
if vt.Codec == codec.FourCC_H265 {
useDC = true
} else {
ctx := vt.ICodecCtx.(interface {
GetRTPCodecCapability() RTPCodecCapability
})
videoTLSRTP, err = NewTrackLocalStaticRTP(ctx.GetRTPCodecCapability(), vt.Codec.String(), suber.StreamPath)
if err != nil {
return
}
videoSender, err = conn.PeerConnection.AddTrack(videoTLSRTP)
if err != nil {
return
}
go func() {
rtcpBuf := make([]byte, 1500)
for {
if n, _, rtcpErr := videoSender.Read(rtcpBuf); rtcpErr != nil {
suber.Warn("rtcp read error", "error", rtcpErr)
return
} else {
if p, err := rtcp.Unmarshal(rtcpBuf[:n]); err == nil {
for _, pp := range p {
switch pp.(type) {
case *rtcp.PictureLossIndication:
// fmt.Println("PictureLossIndication")
}
}
}
}
}
}()
}
}
if at := suber.Publisher.AudioTrack; at != nil {
if at.Codec == codec.FourCC_MP4A {
useDC = true
} else {
ctx := at.ICodecCtx.(interface {
GetRTPCodecCapability() RTPCodecCapability
})
audioTLSRTP, err = NewTrackLocalStaticRTP(ctx.GetRTPCodecCapability(), at.Codec.String(), suber.StreamPath)
if err != nil {
return
}
audioSender, err = conn.PeerConnection.AddTrack(audioTLSRTP)
if err != nil {
return
}
}
}
}
if conf.EnableDC && useDC {
dc, err := conn.CreateDataChannel(suber.StreamPath, nil)
if err != nil {
return
}
go func() {
// suber.Handle(m7s.SubscriberHandler{
// OnAudio: func(audio *rtmp.RTMPAudio) error {
// },
// OnVideo: func(video *rtmp.RTMPVideo) error {
// },
// })
dc.Close()
}()
} else {
if audioSender == nil {
suber.SubAudio = false
}
if videoSender == nil {
suber.SubVideo = false
}
go suber.Handle(m7s.SubscriberHandler{
OnAudio: func(frame *rtp.RTPAudio) error {
return audioTLSRTP.WriteRTP(&frame.Packet)
},
OnVideo: func(frame *rtp.RTPVideo) error {
return videoTLSRTP.WriteRTP(&frame.Packet)
},
})
}
conn.OnICECandidate(func(ice *ICECandidate) {
if ice != nil {
suber.Info(ice.ToJSON().Candidate)
}
})
if err = conn.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: conn.SDP}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if sdp, err := conn.GetAnswer(); err == nil {
w.Write([]byte(sdp))
} else {
http.Error(w, err.Error(), http.StatusBadRequest)
}
}

View File

@@ -0,0 +1,97 @@
package webrtc
import (
. "github.com/pion/webrtc/v4"
)
func RegisterCodecs(m *MediaEngine) error {
for _, codec := range []RTPCodecParameters{
{
RTPCodecCapability: RTPCodecCapability{MimeTypePCMU, 8000, 0, "", nil},
PayloadType: 0,
},
{
RTPCodecCapability: RTPCodecCapability{MimeTypePCMA, 8000, 0, "", nil},
PayloadType: 8,
},
} {
if err := m.RegisterCodec(codec, RTPCodecTypeAudio); err != nil {
return err
}
}
videoRTCPFeedback := []RTCPFeedback{{"goog-remb", ""}, {"ccm", "fir"}, {"nack", ""}, {"nack", "pli"}}
for _, codec := range []RTPCodecParameters{
// {
// RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=96", nil},
// PayloadType: 97,
// },
// {
// RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=98", nil},
// PayloadType: 99,
// },
// {
// RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=100", nil},
// PayloadType: 101,
// },
{
RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f", videoRTCPFeedback},
PayloadType: 102,
},
// {
// RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=102", nil},
// PayloadType: 121,
// },
{
RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f", videoRTCPFeedback},
PayloadType: 127,
},
// {
// RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=127", nil},
// PayloadType: 120,
// },
{
RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", videoRTCPFeedback},
PayloadType: 125,
},
// {
// RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=125", nil},
// PayloadType: 107,
// },
{
RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42e01f", videoRTCPFeedback},
PayloadType: 108,
},
// {
// RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=108", nil},
// PayloadType: 109,
// },
{
RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f", videoRTCPFeedback},
PayloadType: 127,
},
// {
// RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=127", nil},
// PayloadType: 120,
// },
{
RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=640032", videoRTCPFeedback},
PayloadType: 123,
},
// {
// RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=123", nil},
// PayloadType: 118,
// },
} {
if err := m.RegisterCodec(codec, RTPCodecTypeVideo); err != nil {
return err
}
}
return nil
}

View File

@@ -0,0 +1,30 @@
package webrtc
import (
. "github.com/pion/webrtc/v4"
)
type Connection struct {
*PeerConnection
SDP string
// LocalSDP *sdp.SessionDescription
}
func (IO *Connection) GetAnswer() (string, error) {
// Sets the LocalDescription, and starts our UDP listeners
answer, err := IO.CreateAnswer(nil)
if err != nil {
return "", err
}
// IO.LocalSDP, err = answer.Unmarshal()
// if err != nil {
// return "", err
// }
gatherComplete := GatheringCompletePromise(IO.PeerConnection)
if err := IO.SetLocalDescription(answer); err != nil {
return "", err
}
<-gatherComplete
return IO.LocalDescription().SDP, nil
}

112
plugin/webrtc/publish.html Normal file
View File

@@ -0,0 +1,112 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>测试WebRTC推流</title>
</head>
<body>
<video id="video" width="640" height="480" autoplay muted>
</video>
<!-- <button id="sw" onclick="action()" type="button" style="width:100px;height:30px;display: block;">unpublish</button> -->
<div id="camera"></div>
<pre>
<code id="remoteSdp">
</code>
</pre>
</body>
<script>
let action = () => { console.log('action not set'); };
const screenshare = location.pathname.endsWith("screenshare");
(async () => {
const searchParams = new URLSearchParams(location.search);
const $camera = document.getElementById('camera');
navigator.mediaDevices.enumerateDevices().then((devices) => {
devices.forEach((device) => {
console.log(device.kind + ": " + device.label + " id = " + device.deviceId);
if (device.kind == 'videoinput' && !screenshare) {
const a = document.createElement('a');
a.href = location.pathname + '?videoinput=' + device.deviceId;
a.innerHTML = device.label;
$camera.appendChild(a);
$camera.appendChild(document.createElement('br'));
}
});
});
const mediaStream = await (screenshare ? navigator.mediaDevices.getDisplayMedia() : navigator.mediaDevices.getUserMedia({
video: searchParams.get('videoinput') ? {
deviceId: searchParams.get('videoinput'),
} : true,
audio: true,
}));
searchParams.delete('videoinput');
document.getElementById('video').srcObject = mediaStream;
const pc = new RTCPeerConnection();
pc.oniceconnectionstatechange = () => {
console.log('oniceconnectionstatechange', pc.iceConnectionState);
};
pc.onicecandidate = (e) => {
console.log('onicecandidate', e.candidate);
};
const streamPath = searchParams.get('streamPath') || 'live/webrtc';
searchParams.delete('streamPath');
mediaStream.id = streamPath;
mediaStream.getTracks().forEach((t) => {
pc.addTrack(t, mediaStream);
});
// const videoTransceiver = pc.addTransceiver(mediaStream.getVideoTracks()[0], { direction: 'sendonly' });
// const audioTransceiver = pc.addTransceiver(mediaStream.getAudioTracks()[0], { direction: 'sendonly' });
// const dc = pc.createDataChannel('sdp');
const offer = await pc.createOffer();
await pc.setLocalDescription(offer);
const result = await fetch(
`/webrtc/push/${streamPath}${location.search ? `?${searchParams.toString()}` : ''}`,
{
method: 'POST',
mode: 'cors',
cache: 'no-cache',
credentials: 'include',
redirect: 'follow',
referrerPolicy: 'no-referrer',
headers: { 'Content-Type': 'application/sdp' },
body: offer.sdp,
},
);
const remoteSdp = await result.text();
document.getElementById('remoteSdp').innerText = remoteSdp;
await pc.setRemoteDescription(
new RTCSessionDescription({ type: 'answer', sdp: remoteSdp }),
);
// dc.onmessage = async (e) => {
// await pc.setRemoteDescription(
// new RTCSessionDescription({ type: 'answer', sdp: e.data }),
// );
// };
// const publish = async () => {
// videoTransceiver.direction = 'sendonly';
// audioTransceiver.direction = 'sendonly';
// const offer = await pc.createOffer();
// await pc.setLocalDescription(offer);
// dc.send('1' + offer.sdp);
// action = unpublish;
// document.getElementById('sw').innerText = 'unpublish';
// };
// const unpublish = async () => {
// videoTransceiver.direction = 'inactive';
// audioTransceiver.direction = 'inactive';
// const offer = await pc.createOffer();
// await pc.setLocalDescription(offer);
// dc.send('0' + offer.sdp);
// action = publish;
// document.getElementById('sw').innerText = 'publish';
// };
// action = unpublish;
})()
</script>
</html>

View File

@@ -0,0 +1,91 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>测试WebRTC拉流</title>
</head>
<body>
<video id="video" width="640" height="480" autoplay muted controls>
</video>
<!-- <button id="sw" onclick="action()" type="button" style="width:100px;height:30px;display: block;">unpublish</button> -->
<pre>
<code id="remoteSdp">
</code>
</pre>
</body>
<script>
let action = () => { console.log('action not set'); };
(async () => {
const pc = new RTCPeerConnection();
pc.ontrack = (e) => {
console.log('ontrack', e);
if (e.streams.length === 0) return;
document.getElementById('video').srcObject = e.streams[0];
document.getElementById('video').play();
};
pc.oniceconnectionstatechange = () => {
console.log('oniceconnectionstatechange', pc.iceConnectionState);
};
pc.onicecandidate = (e) => {
console.log('onicecandidate', e.candidate);
};
pc.addTransceiver('video', { direction: 'recvonly' });
pc.addTransceiver('audio', { direction: 'recvonly' });
// const dc = pc.createDataChannel('sdp');
const offer = await pc.createOffer();
await pc.setLocalDescription(offer);
const searchParams = new URLSearchParams(location.search);
const streamPath = searchParams.get('streamPath') || 'live/webrtc';
searchParams.delete('streamPath')
const result = await fetch(
`/webrtc/play/${streamPath}${location.search?`?${searchParams.toString()}`:''}`,
{
method: 'POST',
mode: 'cors',
cache: 'no-cache',
credentials: 'include',
redirect: 'follow',
referrerPolicy: 'no-referrer',
headers: { 'Content-Type': 'application/sdp' },
body: offer.sdp,
},
);
const remoteSdp = await result.text();
document.getElementById('remoteSdp').innerText = remoteSdp;
await pc.setRemoteDescription(
new RTCSessionDescription({ type: 'answer', sdp: remoteSdp }),
);
// dc.onmessage = async (e) => {
// await pc.setRemoteDescription(
// new RTCSessionDescription({ type: 'answer', sdp: e.data }),
// );
// };
// const publish = async () => {
// videoTransceiver.direction = 'sendonly';
// audioTransceiver.direction = 'sendonly';
// const offer = await pc.createOffer();
// await pc.setLocalDescription(offer);
// dc.send('1' + offer.sdp);
// action = unpublish;
// document.getElementById('sw').innerText = 'unpublish';
// };
// const unpublish = async () => {
// videoTransceiver.direction = 'inactive';
// audioTransceiver.direction = 'inactive';
// const offer = await pc.createOffer();
// await pc.setLocalDescription(offer);
// dc.send('0' + offer.sdp);
// action = publish;
// document.getElementById('sw').innerText = 'publish';
// };
// action = unpublish;
})()
</script>
</html>

View File

@@ -136,7 +136,7 @@ func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) {
}
frame.Timestamp = max(1, p.baseTs+ts)
p.lastTs = frame.Timestamp
p.Debug("write", "seq", frame.Sequence)
p.Trace("write", "seq", frame.Sequence)
t.Step()
p.speedControl(p.Publish.Speed, p.lastTs)
}
@@ -216,13 +216,24 @@ func (p *Publisher) WriteData(data IDataFrame) (err error) {
return
}
func (p *Publisher) createTransTrack(dataType reflect.Type) (t *AVTrack) {
p.Lock()
defer p.Unlock()
t = &AVTrack{}
t.Logger = p.Logger.With("track", "audio")
t.Init(256)
p.TransTrack[dataType] = t
return t
}
func (p *Publisher) GetAudioTrack(dataType reflect.Type) (t *AVTrack) {
p.RLock()
defer p.RUnlock()
if t, ok := p.TransTrack[dataType]; ok {
p.RUnlock()
return t
}
return
p.RUnlock()
return p.createTransTrack(dataType)
}
func (p *Publisher) GetVideoTrack(dataType reflect.Type) (t *AVTrack) {
@@ -263,7 +274,7 @@ func (p *Publisher) SnapShot() (ret *pb.StreamSnapShot) {
snap.Wrap = &pb.Wrap{
Timestamp: uint32(v.Wrap.GetTimestamp()),
Size: uint32(v.Wrap.GetSize()),
Data: v.Wrap.Print(),
Data: v.Wrap.String(),
}
}
ret.VideoTrack = append(ret.VideoTrack, &snap)
@@ -280,7 +291,7 @@ func (p *Publisher) SnapShot() (ret *pb.StreamSnapShot) {
snap.Wrap = &pb.Wrap{
Timestamp: uint32(v.Wrap.GetTimestamp()),
Size: uint32(v.Wrap.GetSize()),
Data: v.Wrap.Print(),
Data: v.Wrap.String(),
}
}
ret.AudioTrack = append(ret.AudioTrack, &snap)

View File

@@ -17,6 +17,7 @@ import (
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/mcuadros/go-defaults"
"github.com/phsym/console-slog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"gopkg.in/yaml.v3"
@@ -137,7 +138,13 @@ func (s *Server) run(ctx context.Context, conf any) (err error) {
}
var lv slog.LevelVar
lv.UnmarshalText([]byte(s.LogLevel))
slog.SetLogLoggerLevel(lv.Level())
if s.LogLevel == "trace" {
lv.Set(TraceLevel)
}
s.Logger = slog.New(
console.NewHandler(os.Stdout, &console.HandlerOptions{Level: lv.Level()}),
)
// slog.SetLogLoggerLevel(lv.Level())
s.registerHandler()
if httpConf.ListenAddrTLS != "" {

View File

@@ -117,7 +117,7 @@ func (s *Subscriber) Handle(handler SubscriberHandler) {
}
sendVideoFrame := func() (err error) {
lastSentVF = videoFrame
s.Debug("send video frame", "seq", videoFrame.Sequence, "data", videoFrame.Wrap.Print(), "size", videoFrame.Wrap.GetSize())
s.Debug("send video frame", "seq", videoFrame.Sequence, "data", videoFrame.Wrap.String(), "size", videoFrame.Wrap.GetSize())
res := vh.Call([]reflect.Value{reflect.ValueOf(videoFrame.Wrap)})
if len(res) > 0 && !res[0].IsNil() {
if err = res[0].Interface().(error); err != ErrInterrupt {
@@ -144,8 +144,10 @@ func (s *Subscriber) Handle(handler SubscriberHandler) {
// fmt.Println("video", s.VideoReader.Track.PreFrame().Sequence-frame.Sequence)
if videoFrame.Wrap.IsIDR() && vr.DecConfChanged() {
vr.LastCodecCtx = vr.Track.ICodecCtx
s.Debug("video codec changed", "data", vr.Track.ICodecCtx.GetSequenceFrame().Print())
vh.Call([]reflect.Value{reflect.ValueOf(vr.Track.ICodecCtx.GetSequenceFrame())})
if seqFrame := vr.Track.ICodecCtx.GetSequenceFrame(); seqFrame != nil {
s.Debug("video codec changed", "data", seqFrame.String())
vh.Call([]reflect.Value{reflect.ValueOf(seqFrame)})
}
}
if ar != nil {
if audioFrame != nil {
@@ -194,8 +196,8 @@ func (s *Subscriber) Handle(handler SubscriberHandler) {
// fmt.Println("audio", s.AudioReader.Track.PreFrame().Sequence-frame.Sequence)
if ar.DecConfChanged() {
ar.LastCodecCtx = ar.Track.ICodecCtx
if sf := ar.Track.ICodecCtx.GetSequenceFrame(); sf != nil {
ah.Call([]reflect.Value{reflect.ValueOf(sf)})
if seqFrame := ar.Track.ICodecCtx.GetSequenceFrame(); seqFrame != nil {
ah.Call([]reflect.Value{reflect.ValueOf(seqFrame)})
}
}
if vr != nil && videoFrame != nil {