From f4eab4cf516c5198f3f0b9a8c4c5019cd1d03ffb Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Thu, 18 Apr 2024 09:01:09 +0800 Subject: [PATCH] feat: add webrtc --- example/default/main.go | 1 + go.mod | 19 ++ go.sum | 95 ++++++++ pkg/avframe.go | 2 +- pkg/config/http.go | 6 +- pkg/error.go | 1 + pkg/port.go | 83 +++++++ pkg/unit.go | 6 + pkg/util/mem.go | 10 +- plugin.go | 20 +- plugin/hdl/index.go | 3 +- plugin/rtmp/index.go | 3 +- plugin/rtmp/pkg/const.go | 4 +- plugin/rtp/index.go | 2 + plugin/rtp/pkg/audio.go | 112 +++++++++ plugin/rtp/pkg/video.go | 33 +++ plugin/webrtc/index.go | 398 ++++++++++++++++++++++++++++++++ plugin/webrtc/pkg/config.go | 97 ++++++++ plugin/webrtc/pkg/connection.go | 30 +++ plugin/webrtc/publish.html | 112 +++++++++ plugin/webrtc/subscribe.html | 91 ++++++++ publisher.go | 21 +- server.go | 9 +- subscriber.go | 12 +- 24 files changed, 1145 insertions(+), 25 deletions(-) create mode 100644 pkg/port.go create mode 100644 plugin/rtp/index.go create mode 100644 plugin/rtp/pkg/audio.go create mode 100644 plugin/rtp/pkg/video.go create mode 100644 plugin/webrtc/index.go create mode 100644 plugin/webrtc/pkg/config.go create mode 100644 plugin/webrtc/pkg/connection.go create mode 100644 plugin/webrtc/publish.html create mode 100644 plugin/webrtc/subscribe.html diff --git a/example/default/main.go b/example/default/main.go index ed1053c..ef31209 100644 --- a/example/default/main.go +++ b/example/default/main.go @@ -6,6 +6,7 @@ import ( "m7s.live/m7s/v5" _ "m7s.live/m7s/v5/plugin/debug" _ "m7s.live/m7s/v5/plugin/hdl" + _ "m7s.live/m7s/v5/plugin/webrtc" _ "m7s.live/m7s/v5/plugin/rtmp" ) diff --git a/go.mod b/go.mod index 74dfcbc..782d179 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ toolchain go1.22.1 require ( github.com/cnotch/ipchub v1.1.0 github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1 + github.com/pion/interceptor v0.1.29 github.com/q191201771/naza v0.30.48 github.com/quic-go/quic-go v0.42.0 google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de @@ -16,7 +17,23 @@ require ( require ( github.com/go-ole/go-ole v1.2.6 // indirect + github.com/google/uuid v1.6.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect + github.com/pion/datachannel v1.5.6 // indirect + github.com/pion/dtls/v2 v2.2.10 // indirect + github.com/pion/ice/v3 v3.0.6 // indirect + github.com/pion/logging v0.2.2 // indirect + github.com/pion/mdns/v2 v2.0.7 // indirect + github.com/pion/randutil v0.1.0 // indirect + github.com/pion/rtcp v1.2.14 // indirect + github.com/pion/rtp v1.8.5 // indirect + github.com/pion/sctp v1.8.16 // indirect + github.com/pion/sdp/v3 v3.0.9 // indirect + github.com/pion/srtp/v3 v3.0.1 // indirect + github.com/pion/stun/v2 v2.0.0 // indirect + github.com/pion/transport/v2 v2.2.4 // indirect + github.com/pion/transport/v3 v3.0.2 // indirect + github.com/pion/turn/v3 v3.0.2 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect @@ -35,6 +52,8 @@ require ( github.com/gorilla/websocket v1.5.1 github.com/mcuadros/go-defaults v1.2.0 github.com/onsi/ginkgo/v2 v2.9.5 // indirect + github.com/phsym/console-slog v0.3.1 + github.com/pion/webrtc/v4 v4.0.0-beta.17 github.com/shirou/gopsutil/v3 v3.24.3 go.uber.org/mock v0.4.0 // indirect golang.org/x/crypto v0.22.0 // indirect diff --git a/go.sum b/go.sum index 63dca77..385e12a 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= @@ -42,6 +44,7 @@ github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1: github.com/kelindar/process v0.0.0-20170730150328-69a29e249ec3/go.mod h1:+lTCLnZFXOkqwD8sLPl6u4erAc0cP8wFegQHfipz7KE= github.com/kelindar/rate v1.0.0/go.mod h1:AjT4G+hTItNwt30lucEGZIz8y7Uk5zPho6vurIZ+1Es= github.com/kelindar/tcp v1.0.0/go.mod h1:JB5hj1cshLU60XrLij2BBxW3JQ4hOye8vqbyvuKb52k= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= @@ -59,8 +62,49 @@ github.com/onsi/ginkgo/v2 v2.9.5/go.mod h1:tvAoo1QUJwNEU2ITftXTpR7R1RbCzoZUOs3Ro github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg= +github.com/phsym/console-slog v0.3.1 h1:Fuzcrjr40xTc004S9Kni8XfNsk+qrptQmyR+wZw9/7A= +github.com/phsym/console-slog v0.3.1/go.mod h1:oJskjp/X6e6c0mGpfP8ELkfKUsrkDifYRAqJQgmdDS0= +github.com/pion/datachannel v1.5.6 h1:1IxKJntfSlYkpUj8LlYRSWpYiTTC02nUrOE8T3DqGeg= +github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNIVb/NfGW4= +github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= +github.com/pion/dtls/v2 v2.2.10 h1:u2Axk+FyIR1VFTPurktB+1zoEPGIW3bmyj3LEFrXjAA= +github.com/pion/dtls/v2 v2.2.10/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= +github.com/pion/ice/v3 v3.0.6 h1:UC5vZCMhmve7yv+Y6E5eTnRTl+t9LLtmeBYQ9038Zm8= +github.com/pion/ice/v3 v3.0.6/go.mod h1:4eMTUKQEjC1fGQGB6qUzy2ux9Pc1v9EsO3hNaii+kXI= +github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M= +github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4= +github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= +github.com/pion/mdns/v2 v2.0.7 h1:c9kM8ewCgjslaAmicYMFQIde2H9/lrZpjBkN8VwoVtM= +github.com/pion/mdns/v2 v2.0.7/go.mod h1:vAdSYNAT0Jy3Ru0zl2YiW3Rm/fJCwIeM0nToenfOJKA= +github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= +github.com/pion/rtcp v1.2.12/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4= +github.com/pion/rtcp v1.2.14 h1:KCkGV3vJ+4DAJmvP0vaQShsb0xkRfWkO540Gy102KyE= +github.com/pion/rtcp v1.2.14/go.mod h1:sn6qjxvnwyAkkPzPULIbVqSKI5Dv54Rv7VG0kNxh9L4= github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= +github.com/pion/rtp v1.8.3/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= +github.com/pion/rtp v1.8.5 h1:uYzINfaK+9yWs7r537z/Rc1SvT8ILjBcmDOpJcTB+OU= +github.com/pion/rtp v1.8.5/go.mod h1:pBGHaFt/yW7bf1jjWAoUjpSNoDnw98KTMg+jWWvziqU= +github.com/pion/sctp v1.8.13/go.mod h1:YKSgO/bO/6aOMP9LCie1DuD7m+GamiK2yIiPM6vH+GA= +github.com/pion/sctp v1.8.16 h1:PKrMs+o9EMLRvFfXq59WFsC+V8mN1wnKzqrv+3D/gYY= +github.com/pion/sctp v1.8.16/go.mod h1:P6PbDVA++OJMrVNg2AL3XtYHV4uD6dvfyOovCgMs0PE= +github.com/pion/sdp/v3 v3.0.9 h1:pX++dCHoHUwq43kuwf3PyJfHlwIj4hXA7Vrifiq0IJY= +github.com/pion/sdp/v3 v3.0.9/go.mod h1:B5xmvENq5IXJimIO4zfp6LAe1fD9N+kFv+V/1lOdz8M= +github.com/pion/srtp/v3 v3.0.1 h1:AkIQRIZ+3tAOJMQ7G301xtrD1vekQbNeRO7eY1K8ZHk= +github.com/pion/srtp/v3 v3.0.1/go.mod h1:3R3a1qIOIxBkVTLGFjafKK6/fJoTdQDhcC67HOyMbJ8= +github.com/pion/stun/v2 v2.0.0 h1:A5+wXKLAypxQri59+tmQKVs7+l6mMM+3d+eER9ifRU0= +github.com/pion/stun/v2 v2.0.0/go.mod h1:22qRSh08fSEttYUmJZGlriq9+03jtVmXNODgLccj8GQ= +github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g= +github.com/pion/transport/v2 v2.2.4 h1:41JJK6DZQYSeVLxILA2+F4ZkKb4Xd/tFJZRFZQ9QAlo= +github.com/pion/transport/v2 v2.2.4/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0= +github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0= +github.com/pion/transport/v3 v3.0.2 h1:r+40RJR25S9w3jbA6/5uEPTzcdn7ncyU44RWCbHkLg4= +github.com/pion/transport/v3 v3.0.2/go.mod h1:nIToODoOlb5If2jF9y2Igfx3PFYWfuXi37m0IlWa/D0= +github.com/pion/turn/v3 v3.0.2 h1:iBonAIIKRwkVUJBFiFd/kSjytP7FlX0HwCyBDJPRDdU= +github.com/pion/turn/v3 v3.0.2/go.mod h1:vw0Dz420q7VYAF3J4wJKzReLHIo2LGp4ev8nXQexYsc= +github.com/pion/webrtc/v4 v4.0.0-beta.17 h1:KdAbozM+lQ3Dz1NJ0JATRDQ4W02WUhWwIkvjyBRODL0= +github.com/pion/webrtc/v4 v4.0.0-beta.17/go.mod h1:I/Z0MFtc6Ok7mN7kZmA1xqU7KA9ycZZx/6eXz5+yD+4= github.com/pixelbender/go-sdp v1.1.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -87,6 +131,7 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= @@ -94,42 +139,91 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= +golang.org/x/crypto v0.12.0/go.mod h1:NF0Gs7EO5K4qLn+Ylc+fih8BSTeIjAP05siRnAh98yw= +golang.org/x/crypto v0.18.0/go.mod h1:R0j02AL6hcrfOiy9T4ZYp/rcWeMxM3L6QYxlOuEG1mg= +golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= golang.org/x/exp v0.0.0-20221205204356-47842c84f3db h1:D/cFflL63o2KSLJIwjlcIt8PR064j/xsmdEJL/YvY/o= golang.org/x/exp v0.0.0-20221205204356-47842c84f3db/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU= golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.14.0/go.mod h1:PpSgVXXLK0OxS0F31C1/tv6XNguvCrnXIDrFMspZIUI= +golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= +golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.11.0/go.mod h1:zC9APTIj3jG3FdV/Ons+XE1riIZXG4aZ4GTHiPZJPIU= +golang.org/x/term v0.16.0/go.mod h1:yn7UURbUtPyrVJPGPq404EukNFxcm/foM+bV/bfcDsY= +golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= +golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.12.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.9.1 h1:8WMNJAz3zrtPmnYC7ISf5dEn3MT0gY7jBJfw27yrrLo= golang.org/x/tools v0.9.1/go.mod h1:owI94Op576fPu3cIGQeHs3joujW/2Oc6MtlxbF5dfNc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de h1:F6qOa9AZTYJXOUEr4jDysRDLrm4PHePlge4v4TGAlxY= google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de/go.mod h1:VUhTRKeHn9wwcdrk73nvdC9gF178Tzhmt/qyaFcPLSo= @@ -142,6 +236,7 @@ google.golang.org/grpc v1.63.2/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDom google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/pkg/avframe.go b/pkg/avframe.go index a8d4740..3cf32f6 100644 --- a/pkg/avframe.go +++ b/pkg/avframe.go @@ -35,7 +35,7 @@ type ( GetSize() int Recycle() IsIDR() bool - Print() string + String() string } Nalu = [][]byte diff --git a/pkg/config/http.go b/pkg/config/http.go index f12db94..aab30a7 100644 --- a/pkg/config/http.go +++ b/pkg/config/http.go @@ -66,9 +66,11 @@ func (config *HTTP) Handle(path string, f http.Handler) { if strings.HasSuffix(path, "/") { path += "{streamPath=**}" } - mux.HandlePath("GET", path, func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) { + handler := func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) { f.ServeHTTP(w, r) - }) + } + mux.HandlePath("GET", path, handler) + mux.HandlePath("POST", path, handler) } } diff --git a/pkg/error.go b/pkg/error.go index 979ccc9..30b2650 100644 --- a/pkg/error.go +++ b/pkg/error.go @@ -14,4 +14,5 @@ var ( ErrSubscribeTimeout = errors.New("subscribe timeout") ErrRestart = errors.New("restart") ErrInterrupt = errors.New("interrupt") + ErrUnsupportCodec = errors.New("unsupport codec") ) diff --git a/pkg/port.go b/pkg/port.go new file mode 100644 index 0000000..f18c77c --- /dev/null +++ b/pkg/port.go @@ -0,0 +1,83 @@ +package pkg + +import ( + "strconv" + "strings" +) + +type ( + TCPPort int + UDPPort int + TCPRangePort [2]int + UDPRangePort [2]int + Port struct { + Protocol string + Ports [2]int + } + IPort interface { + IsTCP() bool + IsUDP() bool + IsRange() bool + } +) + +func (p Port) String() string { + if p.Ports[0] == p.Ports[1] { + return p.Protocol + ":" + strconv.Itoa(p.Ports[0]) + } + return p.Protocol + ":" + strconv.Itoa(p.Ports[0]) + "-" + strconv.Itoa(p.Ports[1]) +} + +func (p Port) IsTCP() bool { + return p.Protocol == "tcp" +} + +func (p Port) IsUDP() bool { + return p.Protocol == "udp" +} + +func (p Port) IsRange() bool { + return p.Ports[0] != p.Ports[1] +} + +func ParsePort2(conf string) (ret any, err error) { + var port Port + port, err = ParsePort(conf) + if err != nil { + return + } + if port.IsTCP() { + if port.IsRange() { + return TCPRangePort(port.Ports), nil + } + return TCPPort(port.Ports[0]), nil + } + if port.IsRange() { + return UDPRangePort(port.Ports), nil + } + return UDPPort(port.Ports[0]), nil +} + +func ParsePort(conf string) (ret Port, err error) { + var port string + var min, max int + ret.Protocol, port, _ = strings.Cut(conf, ":") + if r := strings.Split(port, "-"); len(r) == 2 { + min, err = strconv.Atoi(r[0]) + if err != nil { + return + } + max, err = strconv.Atoi(r[1]) + if err != nil { + return + } + if min < max { + ret.Ports[0], ret.Ports[1] = min, max + } else { + ret.Ports[0], ret.Ports[1] = max, min + } + } else if p, err := strconv.Atoi(port); err == nil { + ret.Ports[0], ret.Ports[1] = p, p + } + return +} diff --git a/pkg/unit.go b/pkg/unit.go index 4441d1f..8c6d284 100644 --- a/pkg/unit.go +++ b/pkg/unit.go @@ -6,6 +6,8 @@ import ( "time" ) +const TraceLevel = slog.Level(-8) + type Unit struct { StartTime time.Time *slog.Logger `json:"-" yaml:"-"` @@ -13,6 +15,10 @@ type Unit struct { context.CancelCauseFunc `json:"-" yaml:"-"` } +func (unit *Unit) Trace(msg string, fields ...any) { + unit.Log(unit.Context, TraceLevel, msg, fields...) +} + func (unit *Unit) IsStopped() bool { select { case <-unit.Done(): diff --git a/pkg/util/mem.go b/pkg/util/mem.go index 69d8c99..771ad17 100644 --- a/pkg/util/mem.go +++ b/pkg/util/mem.go @@ -62,7 +62,7 @@ func (ma *MemoryAllocator) Malloc(size int) (memory []byte) { } func (ma *MemoryAllocator) Free2(start, end int) bool { - if start < 0 || end > ma.Size { + if start < 0 || end > ma.Size || start >= end { return false } for e := ma.blocks.Front(); e != nil; e = e.Next() { @@ -148,3 +148,11 @@ func (r *RecyclableMemory) Recycle() { } r.mem = r.mem[:0] } + +func (r *RecyclableMemory) RecycleBack(n int) { + l := len(r.mem) + end := &r.mem[l-1] + start := *end - n + r.Free2(r.mem[l-3], start, *end) + *end = start +} diff --git a/plugin.go b/plugin.go index 7f0ee16..2e45d0e 100644 --- a/plugin.go +++ b/plugin.go @@ -26,7 +26,10 @@ type PluginMeta struct { } func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) { - instance := reflect.New(plugin.Type).Interface().(IPlugin) + instance, ok := reflect.New(plugin.Type).Interface().(IPlugin) + if !ok { + panic("plugin must implement IPlugin") + } defaults.SetDefaults(instance) p := reflect.ValueOf(instance).Elem().FieldByName("Plugin").Addr().Interface().(*Plugin) p.handler = instance @@ -38,7 +41,6 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) { p.Warn("disabled by env") return } - s.Plugins = append(s.Plugins, p) p.Config.Parse(p.GetCommonConf()) p.Config.Parse(instance, strings.ToUpper(plugin.Name)) for _, fname := range MergeConfigs { @@ -70,7 +72,13 @@ func (plugin *PluginMeta) Init(s *Server, userConfig map[string]any) { p.assign() } p.Info("init", "version", plugin.Version) - instance.OnInit() + err := instance.OnInit() + if err != nil { + p.Error("init", "error", err) + p.Stop(err) + return + } + s.Plugins = append(s.Plugins, p) p.Start() } @@ -79,7 +87,7 @@ type iPlugin interface { } type IPlugin interface { - OnInit() + OnInit() error OnEvent(any) } @@ -200,8 +208,8 @@ func (p *Plugin) Start() { } } -func (p *Plugin) OnInit() { - +func (p *Plugin) OnInit() error { + return nil } func (p *Plugin) onEvent(event any) { diff --git a/plugin/hdl/index.go b/plugin/hdl/index.go index 48b80d3..33331b8 100644 --- a/plugin/hdl/index.go +++ b/plugin/hdl/index.go @@ -17,10 +17,11 @@ type HDLPlugin struct { m7s.Plugin } -func (p *HDLPlugin) OnInit() { +func (p *HDLPlugin) OnInit() error { for streamPath, url := range p.GetCommonConf().PullOnStart { go p.Pull(streamPath, url, NewHDLPuller()) } + return nil } var _ = m7s.InstallPlugin[HDLPlugin]() diff --git a/plugin/rtmp/index.go b/plugin/rtmp/index.go index 4a15015..c7a9478 100644 --- a/plugin/rtmp/index.go +++ b/plugin/rtmp/index.go @@ -18,10 +18,11 @@ type RTMPPlugin struct { var _ = m7s.InstallPlugin[RTMPPlugin](m7s.DefaultYaml(`tcp: listenaddr: :1935`)) -func (p *RTMPPlugin) OnInit() { +func (p *RTMPPlugin) OnInit() error { for streamPath, url := range p.GetCommonConf().PullOnStart { go p.Pull(streamPath, url, &Client{}) } + return nil } func (p *RTMPPlugin) OnPull(puller *m7s.Puller) { diff --git a/plugin/rtmp/pkg/const.go b/plugin/rtmp/pkg/const.go index fdff9bb..b86d166 100644 --- a/plugin/rtmp/pkg/const.go +++ b/plugin/rtmp/pkg/const.go @@ -27,10 +27,10 @@ func (avcc *RTMPData) GetSize() int { } func (avcc *RTMPData) MarshalJSON() ([]byte, error) { - return []byte(fmt.Sprintf(`{"Timestamp":%d,"Size":%d,"Data":"%s"}`, avcc.Timestamp, avcc.Length, avcc.Print())), nil + return []byte(fmt.Sprintf(`{"Timestamp":%d,"Size":%d,"Data":"%s"}`, avcc.Timestamp, avcc.Length, avcc.String())), nil } -func (avcc *RTMPData) Print() string { +func (avcc *RTMPData) String() string { return fmt.Sprintf("% 02X", avcc.Buffers.Buffers[0][:5]) } diff --git a/plugin/rtp/index.go b/plugin/rtp/index.go new file mode 100644 index 0000000..b9608e0 --- /dev/null +++ b/plugin/rtp/index.go @@ -0,0 +1,2 @@ +package plugin_rtp + diff --git a/plugin/rtp/pkg/audio.go b/plugin/rtp/pkg/audio.go new file mode 100644 index 0000000..586383d --- /dev/null +++ b/plugin/rtp/pkg/audio.go @@ -0,0 +1,112 @@ +package rtp + +import ( + "time" + + "github.com/pion/rtp" + "github.com/pion/webrtc/v4" + . "m7s.live/m7s/v5/pkg" + "m7s.live/m7s/v5/pkg/codec" + "m7s.live/m7s/v5/pkg/util" +) + +type RTPData struct { + *webrtc.RTPCodecParameters + rtp.Packet + util.RecyclableMemory +} + +func (r *RTPData) GetTimestamp() time.Duration { + return time.Duration(r.Timestamp) * time.Second / time.Duration(r.ClockRate) +} + +func (r *RTPData) GetSize() int { + return r.Packet.MarshalSize() +} + +func (r *RTPData) IsIDR() bool { + return false +} + +type ( + RTPCtx struct { + webrtc.RTPCodecParameters + } + RTPG711Ctx struct { + RTPCtx + } + RTPOPUSCtx struct { + RTPCtx + } + RTPAACCtx struct { + RTPCtx + } +) + +func (r *RTPCtx) GetRTPCodecCapability() webrtc.RTPCodecCapability { + return r.RTPCodecCapability +} + +func (r *RTPCtx) GetSequenceFrame() IAVFrame { + return nil +} + +func (r *RTPData) DecodeConfig(track *AVTrack) error { + switch r.MimeType { + case webrtc.MimeTypeOpus: + track.Codec = codec.FourCC_OPUS + var ctx RTPOPUSCtx + ctx.RTPCodecParameters = *r.RTPCodecParameters + track.ICodecCtx = &ctx + case webrtc.MimeTypePCMA: + track.Codec = codec.FourCC_ALAW + var ctx RTPG711Ctx + ctx.RTPCodecParameters = *r.RTPCodecParameters + track.ICodecCtx = &ctx + case webrtc.MimeTypePCMU: + track.Codec = codec.FourCC_ULAW + var ctx RTPG711Ctx + ctx.RTPCodecParameters = *r.RTPCodecParameters + track.ICodecCtx = &ctx + case webrtc.MimeTypeH264: + track.Codec = codec.FourCC_H264 + var ctx RTPH264Ctx + ctx.RTPCodecParameters = *r.RTPCodecParameters + track.ICodecCtx = &ctx + case webrtc.MimeTypeVP9: + track.Codec = codec.FourCC_VP9 + var ctx RTPVP9Ctx + ctx.RTPCodecParameters = *r.RTPCodecParameters + track.ICodecCtx = &ctx + case webrtc.MimeTypeAV1: + track.Codec = codec.FourCC_AV1 + var ctx RTPAV1Ctx + ctx.RTPCodecParameters = *r.RTPCodecParameters + track.ICodecCtx = &ctx + case webrtc.MimeTypeH265: + track.Codec = codec.FourCC_H265 + var ctx RTPH265Ctx + ctx.RTPCodecParameters = *r.RTPCodecParameters + track.ICodecCtx = &ctx + case "audio/MPEG4-GENERIC", "audio/AAC": + track.Codec = codec.FourCC_MP4A + var ctx RTPAACCtx + ctx.RTPCodecParameters = *r.RTPCodecParameters + track.ICodecCtx = &ctx + default: + return ErrUnsupportCodec + } + return nil +} + +type RTPAudio struct { + RTPData +} + +func (r *RTPAudio) FromRaw(*AVTrack, any) error { + panic("unimplemented") +} + +func (r *RTPAudio) ToRaw(*AVTrack) (any, error) { + return r.Payload, nil +} diff --git a/plugin/rtp/pkg/video.go b/plugin/rtp/pkg/video.go new file mode 100644 index 0000000..69df232 --- /dev/null +++ b/plugin/rtp/pkg/video.go @@ -0,0 +1,33 @@ +package rtp + +import . "m7s.live/m7s/v5/pkg" + +type ( + RTPH264Ctx struct { + RTPCtx + SPS [][]byte + PPS [][]byte + } + RTPH265Ctx struct { + RTPH264Ctx + VPS [][]byte + } + RTPAV1Ctx struct { + RTPCtx + } + RTPVP9Ctx struct { + RTPCtx + } +) + +type RTPVideo struct { + RTPData +} + +func (r *RTPVideo) FromRaw(*AVTrack, any) error { + panic("unimplemented") +} + +func (r *RTPVideo) ToRaw(*AVTrack) (any, error) { + return r.Payload, nil +} diff --git a/plugin/webrtc/index.go b/plugin/webrtc/index.go new file mode 100644 index 0000000..cc7541c --- /dev/null +++ b/plugin/webrtc/index.go @@ -0,0 +1,398 @@ +package plugin_webrtc + +import ( + _ "embed" + "errors" + "fmt" + "io" + "net" + "net/http" + "regexp" + "strings" + "time" + + "github.com/pion/interceptor" + "github.com/pion/rtcp" + . "github.com/pion/webrtc/v4" + "m7s.live/m7s/v5" + . "m7s.live/m7s/v5/pkg" + "m7s.live/m7s/v5/pkg/codec" + "m7s.live/m7s/v5/pkg/util" + rtp "m7s.live/m7s/v5/plugin/rtp/pkg" + . "m7s.live/m7s/v5/plugin/webrtc/pkg" +) + +var ( + //go:embed publish.html + publishHTML []byte + + //go:embed subscribe.html + subscribeHTML []byte + reg_level = regexp.MustCompile("profile-level-id=(4.+f)") + _ = m7s.InstallPlugin[WebRTCPlugin]() +) + +type WebRTCPlugin struct { + m7s.Plugin + ICEServers []ICEServer `desc:"ice服务器配置"` + PublicIP string `desc:"公网IP"` + PublicIPv6 string `desc:"公网IPv6"` + Port string `default:"tcp:9000" desc:"监听端口"` + PLI time.Duration `default:"2s" desc:"发送PLI请求间隔"` // 视频流丢包后,发送PLI请求 + EnableOpus bool `default:"true" desc:"是否启用opus编码"` // 是否启用opus编码 + EnableVP9 bool `default:"false" desc:"是否启用vp9编码"` // 是否启用vp9编码 + EnableAv1 bool `default:"true" desc:"是否启用av1编码"` // 是否启用av1编码 + EnableDC bool `default:"true" desc:"是否启用DataChannel"` // 在不支持编码格式的情况下是否启用DataChannel传输 + m MediaEngine + s SettingEngine + api *API +} + +func (p *WebRTCPlugin) OnInit() (err error) { + if len(p.ICEServers) > 0 { + for i := range p.ICEServers { + b, _ := p.ICEServers[i].MarshalJSON() + p.ICEServers[i].UnmarshalJSON(b) + } + } + RegisterCodecs(&p.m) + if p.EnableOpus { + p.m.RegisterCodec(RTPCodecParameters{ + RTPCodecCapability: RTPCodecCapability{MimeTypeOpus, 48000, 2, "minptime=10;useinbandfec=1", nil}, + PayloadType: 111, + }, RTPCodecTypeAudio) + } + if p.EnableVP9 { + p.m.RegisterCodec(RTPCodecParameters{ + RTPCodecCapability: RTPCodecCapability{MimeTypeVP9, 90000, 0, "", nil}, + PayloadType: 100, + }, RTPCodecTypeVideo) + } + if p.EnableAv1 { + p.m.RegisterCodec(RTPCodecParameters{ + RTPCodecCapability: RTPCodecCapability{MimeTypeAV1, 90000, 0, "profile=2;level-idx=8;tier=1", nil}, + PayloadType: 45, + }, RTPCodecTypeVideo) + } + i := &interceptor.Registry{} + if p.PublicIP != "" { + ips := []string{p.PublicIP} + if p.PublicIPv6 != "" { + ips = append(ips, p.PublicIPv6) + } + p.s.SetNAT1To1IPs(ips, ICECandidateTypeHost) + } + ports, err := ParsePort2(p.Port) + if err != nil { + p.Error("webrtc port config error", "error", err, "port", p.Port) + return err + } + + switch v := ports.(type) { + case TCPPort: + tcpport := int(v) + tcpl, err := net.ListenTCP("tcp", &net.TCPAddr{ + IP: net.IP{0, 0, 0, 0}, + Port: tcpport, + }) + if err != nil { + p.Error("webrtc listener tcp", "error", err) + } + p.Info("webrtc start listen", "port", tcpport) + p.s.SetICETCPMux(NewICETCPMux(nil, tcpl, 4096)) + p.s.SetNetworkTypes([]NetworkType{NetworkTypeTCP4, NetworkTypeTCP6}) + case UDPRangePort: + p.s.SetEphemeralUDPPortRange(uint16(v[0]), uint16(v[1])) + case UDPPort: + // 创建共享WEBRTC端口 默认9000 + udpListener, err := net.ListenUDP("udp", &net.UDPAddr{ + IP: net.IP{0, 0, 0, 0}, + Port: int(v), + }) + if err != nil { + p.Error("webrtc listener udp", "error", err) + return err + } + p.Info("webrtc start listen", "port", v) + p.s.SetICEUDPMux(NewICEUDPMux(nil, udpListener)) + p.s.SetNetworkTypes([]NetworkType{NetworkTypeUDP4, NetworkTypeUDP6}) + } + if err = RegisterDefaultInterceptors(&p.m, i); err != nil { + return err + } + p.api = NewAPI(WithMediaEngine(&p.m), + WithInterceptorRegistry(i), WithSettingEngine(p.s)) + return +} + +func (*WebRTCPlugin) Test_Publish(w http.ResponseWriter, r *http.Request) { + w.Write(publishHTML) +} +func (*WebRTCPlugin) Test_ScreenShare(w http.ResponseWriter, r *http.Request) { + w.Write(publishHTML) +} +func (*WebRTCPlugin) Test_Subscribe(w http.ResponseWriter, r *http.Request) { + w.Write(subscribeHTML) +} + +// https://datatracker.ietf.org/doc/html/draft-ietf-wish-whip +func (conf *WebRTCPlugin) Push_(w http.ResponseWriter, r *http.Request) { + streamPath := r.URL.Path[len("/push/"):] + rawQuery := r.URL.RawQuery + auth := r.Header.Get("Authorization") + if strings.HasPrefix(auth, "Bearer ") { + auth = auth[len("Bearer "):] + if rawQuery != "" { + rawQuery += "&bearer=" + auth + } else { + rawQuery = "bearer=" + auth + } + conf.Info("push", "stream", streamPath, "bearer", auth) + } + w.Header().Set("Content-Type", "application/sdp") + w.Header().Set("Location", "/webrtc/api/stop/push/"+streamPath) + if rawQuery != "" { + streamPath += "?" + rawQuery + } + bytes, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + var conn Connection + conn.SDP = string(bytes) + if conn.PeerConnection, err = conf.api.NewPeerConnection(Configuration{ + ICEServers: conf.ICEServers, + }); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + var publisher *m7s.Publisher + if publisher, err = conf.Publish(streamPath, conn.PeerConnection); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + conn.OnTrack(func(track *TrackRemote, receiver *RTPReceiver) { + publisher.Info("OnTrack", "kind", track.Kind().String(), "payloadType", uint8(track.Codec().PayloadType)) + var n int + var err error + if codecP := track.Codec(); track.Kind() == RTPCodecTypeAudio { + mem := util.NewScalableMemoryAllocator(1460 * 100) + for { + var packet rtp.RTPAudio + packet.RTPCodecParameters = &codecP + packet.ScalableMemoryAllocator = mem + buf := packet.Malloc(1460) + if n, _, err = track.Read(buf); err == nil { + err = packet.Unmarshal(buf[:n]) + packet.RecycleBack(1460 - n) + } + if err != nil { + return + } + publisher.WriteAudio(&packet) + } + } else { + var lastPLISent time.Time + mem := util.NewScalableMemoryAllocator(1460 * 100) + for { + if time.Since(lastPLISent) > conf.PLI { + if rtcpErr := conn.WriteRTCP([]rtcp.Packet{&rtcp.PictureLossIndication{MediaSSRC: uint32(track.SSRC())}}); rtcpErr != nil { + publisher.Error("writeRTCP", "error", rtcpErr) + return + } + lastPLISent = time.Now() + } + var packet rtp.RTPVideo + packet.RTPCodecParameters = &codecP + packet.ScalableMemoryAllocator = mem + buf := packet.Malloc(1460) + if n, _, err = track.Read(buf); err == nil { + err = packet.Unmarshal(buf[:n]) + packet.RecycleBack(1460 - n) + } + if err != nil { + return + } + publisher.WriteVideo(&packet) + } + } + }) + conn.OnICECandidate(func(ice *ICECandidate) { + if ice != nil { + publisher.Info(ice.ToJSON().Candidate) + } + }) + conn.OnDataChannel(func(d *DataChannel) { + publisher.Info("OnDataChannel", "label", d.Label()) + d.OnMessage(func(msg DataChannelMessage) { + conn.SDP = string(msg.Data[1:]) + publisher.Debug("dc message", "sdp", conn.SDP) + if err := conn.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: conn.SDP}); err != nil { + return + } + if answer, err := conn.GetAnswer(); err == nil { + d.SendText(answer) + } else { + return + } + switch msg.Data[0] { + case '0': + publisher.Stop(errors.New("stop by remote")) + case '1': + + } + }) + }) + conn.OnConnectionStateChange(func(state PeerConnectionState) { + publisher.Info("Connection State has changed:" + state.String()) + switch state { + case PeerConnectionStateConnected: + + case PeerConnectionStateDisconnected, PeerConnectionStateFailed, PeerConnectionStateClosed: + publisher.Stop(errors.New("connection state:" + state.String())) + } + }) + if err := conn.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: conn.SDP}); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + if answer, err := conn.GetAnswer(); err == nil { + w.WriteHeader(http.StatusCreated) + fmt.Fprint(w, answer) + } else { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } +} + +func (conf *WebRTCPlugin) Play_(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/sdp") + streamPath := r.URL.Path[len("/play/"):] + rawQuery := r.URL.RawQuery + var conn Connection + bytes, err := io.ReadAll(r.Body) + defer func() { + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + } + }() + if err != nil { + return + } + conn.SDP = string(bytes) + if conn.PeerConnection, err = conf.api.NewPeerConnection(Configuration{ + ICEServers: conf.ICEServers, + }); err != nil { + return + } + var suber *m7s.Subscriber + if rawQuery != "" { + streamPath += "?" + rawQuery + } + if suber, err = conf.Subscribe(streamPath, conn.PeerConnection); err != nil { + return + } + var useDC bool + var audioTLSRTP, videoTLSRTP *TrackLocalStaticRTP + var audioSender, videoSender *RTPSender + if suber.Publisher != nil { + if vt := suber.Publisher.VideoTrack; vt != nil { + if vt.Codec == codec.FourCC_H265 { + useDC = true + } else { + ctx := vt.ICodecCtx.(interface { + GetRTPCodecCapability() RTPCodecCapability + }) + videoTLSRTP, err = NewTrackLocalStaticRTP(ctx.GetRTPCodecCapability(), vt.Codec.String(), suber.StreamPath) + if err != nil { + return + } + videoSender, err = conn.PeerConnection.AddTrack(videoTLSRTP) + if err != nil { + return + } + go func() { + rtcpBuf := make([]byte, 1500) + for { + if n, _, rtcpErr := videoSender.Read(rtcpBuf); rtcpErr != nil { + suber.Warn("rtcp read error", "error", rtcpErr) + return + } else { + if p, err := rtcp.Unmarshal(rtcpBuf[:n]); err == nil { + for _, pp := range p { + switch pp.(type) { + case *rtcp.PictureLossIndication: + // fmt.Println("PictureLossIndication") + } + } + } + } + } + }() + } + } + if at := suber.Publisher.AudioTrack; at != nil { + if at.Codec == codec.FourCC_MP4A { + useDC = true + } else { + ctx := at.ICodecCtx.(interface { + GetRTPCodecCapability() RTPCodecCapability + }) + audioTLSRTP, err = NewTrackLocalStaticRTP(ctx.GetRTPCodecCapability(), at.Codec.String(), suber.StreamPath) + if err != nil { + return + } + audioSender, err = conn.PeerConnection.AddTrack(audioTLSRTP) + if err != nil { + return + } + } + } + } + + if conf.EnableDC && useDC { + dc, err := conn.CreateDataChannel(suber.StreamPath, nil) + if err != nil { + return + } + go func() { + // suber.Handle(m7s.SubscriberHandler{ + // OnAudio: func(audio *rtmp.RTMPAudio) error { + // }, + // OnVideo: func(video *rtmp.RTMPVideo) error { + // }, + // }) + dc.Close() + }() + } else { + if audioSender == nil { + suber.SubAudio = false + } + if videoSender == nil { + suber.SubVideo = false + } + go suber.Handle(m7s.SubscriberHandler{ + OnAudio: func(frame *rtp.RTPAudio) error { + return audioTLSRTP.WriteRTP(&frame.Packet) + }, + OnVideo: func(frame *rtp.RTPVideo) error { + return videoTLSRTP.WriteRTP(&frame.Packet) + }, + }) + } + conn.OnICECandidate(func(ice *ICECandidate) { + if ice != nil { + suber.Info(ice.ToJSON().Candidate) + } + }) + if err = conn.SetRemoteDescription(SessionDescription{Type: SDPTypeOffer, SDP: conn.SDP}); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + if sdp, err := conn.GetAnswer(); err == nil { + w.Write([]byte(sdp)) + } else { + http.Error(w, err.Error(), http.StatusBadRequest) + } +} diff --git a/plugin/webrtc/pkg/config.go b/plugin/webrtc/pkg/config.go new file mode 100644 index 0000000..578b5a8 --- /dev/null +++ b/plugin/webrtc/pkg/config.go @@ -0,0 +1,97 @@ +package webrtc + +import ( + . "github.com/pion/webrtc/v4" +) + +func RegisterCodecs(m *MediaEngine) error { + for _, codec := range []RTPCodecParameters{ + { + RTPCodecCapability: RTPCodecCapability{MimeTypePCMU, 8000, 0, "", nil}, + PayloadType: 0, + }, + { + RTPCodecCapability: RTPCodecCapability{MimeTypePCMA, 8000, 0, "", nil}, + PayloadType: 8, + }, + } { + if err := m.RegisterCodec(codec, RTPCodecTypeAudio); err != nil { + return err + } + } + videoRTCPFeedback := []RTCPFeedback{{"goog-remb", ""}, {"ccm", "fir"}, {"nack", ""}, {"nack", "pli"}} + for _, codec := range []RTPCodecParameters{ + // { + // RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=96", nil}, + // PayloadType: 97, + // }, + + // { + // RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=98", nil}, + // PayloadType: 99, + // }, + + // { + // RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=100", nil}, + // PayloadType: 101, + // }, + { + RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42001f", videoRTCPFeedback}, + PayloadType: 102, + }, + // { + // RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=102", nil}, + // PayloadType: 121, + // }, + + { + RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f", videoRTCPFeedback}, + PayloadType: 127, + }, + // { + // RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=127", nil}, + // PayloadType: 120, + // }, + + { + RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", videoRTCPFeedback}, + PayloadType: 125, + }, + // { + // RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=125", nil}, + // PayloadType: 107, + // }, + + { + RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42e01f", videoRTCPFeedback}, + PayloadType: 108, + }, + // { + // RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=108", nil}, + // PayloadType: 109, + // }, + + { + RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=0;profile-level-id=42001f", videoRTCPFeedback}, + PayloadType: 127, + }, + // { + // RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=127", nil}, + // PayloadType: 120, + // }, + + { + RTPCodecCapability: RTPCodecCapability{MimeTypeH264, 90000, 0, "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=640032", videoRTCPFeedback}, + PayloadType: 123, + }, + // { + // RTPCodecCapability: RTPCodecCapability{"video/rtx", 90000, 0, "apt=123", nil}, + // PayloadType: 118, + // }, + } { + if err := m.RegisterCodec(codec, RTPCodecTypeVideo); err != nil { + return err + } + } + return nil +} diff --git a/plugin/webrtc/pkg/connection.go b/plugin/webrtc/pkg/connection.go new file mode 100644 index 0000000..8751e44 --- /dev/null +++ b/plugin/webrtc/pkg/connection.go @@ -0,0 +1,30 @@ +package webrtc + +import ( + . "github.com/pion/webrtc/v4" +) + +type Connection struct { + *PeerConnection + SDP string + // LocalSDP *sdp.SessionDescription +} + +func (IO *Connection) GetAnswer() (string, error) { + // Sets the LocalDescription, and starts our UDP listeners + answer, err := IO.CreateAnswer(nil) + if err != nil { + return "", err + } + // IO.LocalSDP, err = answer.Unmarshal() + // if err != nil { + // return "", err + // } + gatherComplete := GatheringCompletePromise(IO.PeerConnection) + if err := IO.SetLocalDescription(answer); err != nil { + return "", err + } + <-gatherComplete + return IO.LocalDescription().SDP, nil +} + diff --git a/plugin/webrtc/publish.html b/plugin/webrtc/publish.html new file mode 100644 index 0000000..3d646b0 --- /dev/null +++ b/plugin/webrtc/publish.html @@ -0,0 +1,112 @@ + + + + + + + + 测试WebRTC推流 + + + + + +
+
+  
+
+  
+
+ + + + \ No newline at end of file diff --git a/plugin/webrtc/subscribe.html b/plugin/webrtc/subscribe.html new file mode 100644 index 0000000..c7fa742 --- /dev/null +++ b/plugin/webrtc/subscribe.html @@ -0,0 +1,91 @@ + + + + + + + + 测试WebRTC拉流 + + + + + +
+  
+
+  
+
+ + + + \ No newline at end of file diff --git a/publisher.go b/publisher.go index a5dec12..8793610 100644 --- a/publisher.go +++ b/publisher.go @@ -136,7 +136,7 @@ func (p *Publisher) writeAV(t *AVTrack, data IAVFrame) { } frame.Timestamp = max(1, p.baseTs+ts) p.lastTs = frame.Timestamp - p.Debug("write", "seq", frame.Sequence) + p.Trace("write", "seq", frame.Sequence) t.Step() p.speedControl(p.Publish.Speed, p.lastTs) } @@ -216,13 +216,24 @@ func (p *Publisher) WriteData(data IDataFrame) (err error) { return } +func (p *Publisher) createTransTrack(dataType reflect.Type) (t *AVTrack) { + p.Lock() + defer p.Unlock() + t = &AVTrack{} + t.Logger = p.Logger.With("track", "audio") + t.Init(256) + p.TransTrack[dataType] = t + return t +} + func (p *Publisher) GetAudioTrack(dataType reflect.Type) (t *AVTrack) { p.RLock() - defer p.RUnlock() if t, ok := p.TransTrack[dataType]; ok { + p.RUnlock() return t } - return + p.RUnlock() + return p.createTransTrack(dataType) } func (p *Publisher) GetVideoTrack(dataType reflect.Type) (t *AVTrack) { @@ -263,7 +274,7 @@ func (p *Publisher) SnapShot() (ret *pb.StreamSnapShot) { snap.Wrap = &pb.Wrap{ Timestamp: uint32(v.Wrap.GetTimestamp()), Size: uint32(v.Wrap.GetSize()), - Data: v.Wrap.Print(), + Data: v.Wrap.String(), } } ret.VideoTrack = append(ret.VideoTrack, &snap) @@ -280,7 +291,7 @@ func (p *Publisher) SnapShot() (ret *pb.StreamSnapShot) { snap.Wrap = &pb.Wrap{ Timestamp: uint32(v.Wrap.GetTimestamp()), Size: uint32(v.Wrap.GetSize()), - Data: v.Wrap.Print(), + Data: v.Wrap.String(), } } ret.AudioTrack = append(ret.AudioTrack, &snap) diff --git a/server.go b/server.go index 366340b..eb32c20 100644 --- a/server.go +++ b/server.go @@ -17,6 +17,7 @@ import ( "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/mcuadros/go-defaults" + "github.com/phsym/console-slog" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "gopkg.in/yaml.v3" @@ -137,7 +138,13 @@ func (s *Server) run(ctx context.Context, conf any) (err error) { } var lv slog.LevelVar lv.UnmarshalText([]byte(s.LogLevel)) - slog.SetLogLoggerLevel(lv.Level()) + if s.LogLevel == "trace" { + lv.Set(TraceLevel) + } + s.Logger = slog.New( + console.NewHandler(os.Stdout, &console.HandlerOptions{Level: lv.Level()}), + ) + // slog.SetLogLoggerLevel(lv.Level()) s.registerHandler() if httpConf.ListenAddrTLS != "" { diff --git a/subscriber.go b/subscriber.go index 1191a82..4f6eacf 100644 --- a/subscriber.go +++ b/subscriber.go @@ -117,7 +117,7 @@ func (s *Subscriber) Handle(handler SubscriberHandler) { } sendVideoFrame := func() (err error) { lastSentVF = videoFrame - s.Debug("send video frame", "seq", videoFrame.Sequence, "data", videoFrame.Wrap.Print(), "size", videoFrame.Wrap.GetSize()) + s.Debug("send video frame", "seq", videoFrame.Sequence, "data", videoFrame.Wrap.String(), "size", videoFrame.Wrap.GetSize()) res := vh.Call([]reflect.Value{reflect.ValueOf(videoFrame.Wrap)}) if len(res) > 0 && !res[0].IsNil() { if err = res[0].Interface().(error); err != ErrInterrupt { @@ -144,8 +144,10 @@ func (s *Subscriber) Handle(handler SubscriberHandler) { // fmt.Println("video", s.VideoReader.Track.PreFrame().Sequence-frame.Sequence) if videoFrame.Wrap.IsIDR() && vr.DecConfChanged() { vr.LastCodecCtx = vr.Track.ICodecCtx - s.Debug("video codec changed", "data", vr.Track.ICodecCtx.GetSequenceFrame().Print()) - vh.Call([]reflect.Value{reflect.ValueOf(vr.Track.ICodecCtx.GetSequenceFrame())}) + if seqFrame := vr.Track.ICodecCtx.GetSequenceFrame(); seqFrame != nil { + s.Debug("video codec changed", "data", seqFrame.String()) + vh.Call([]reflect.Value{reflect.ValueOf(seqFrame)}) + } } if ar != nil { if audioFrame != nil { @@ -194,8 +196,8 @@ func (s *Subscriber) Handle(handler SubscriberHandler) { // fmt.Println("audio", s.AudioReader.Track.PreFrame().Sequence-frame.Sequence) if ar.DecConfChanged() { ar.LastCodecCtx = ar.Track.ICodecCtx - if sf := ar.Track.ICodecCtx.GetSequenceFrame(); sf != nil { - ah.Call([]reflect.Value{reflect.ValueOf(sf)}) + if seqFrame := ar.Track.ICodecCtx.GetSequenceFrame(); seqFrame != nil { + ah.Call([]reflect.Value{reflect.ValueOf(seqFrame)}) } } if vr != nil && videoFrame != nil {