webrtc 拉流

This commit is contained in:
yangjiechina
2024-03-28 18:14:54 +08:00
parent 7694237dc7
commit 1ec5c201b5
9 changed files with 351 additions and 4 deletions

57
api.go
View File

@@ -2,13 +2,17 @@ package main
import (
"context"
"encoding/json"
"github.com/gorilla/mux"
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/live-server/flv"
"github.com/yangjiechina/live-server/rtc"
"github.com/yangjiechina/live-server/stream"
"io"
"net"
"net/http"
"strings"
"sync"
"time"
)
@@ -16,7 +20,10 @@ func startApiServer(addr string) {
r := mux.NewRouter()
r.HandleFunc("/live/flv/{source}", onFLV)
r.HandleFunc("/live/hls/{source}", onHLS)
r.HandleFunc("/live/rtc/{source}", onRtc)
r.HandleFunc("/rtc.html", func(writer http.ResponseWriter, request *http.Request) {
http.ServeFile(writer, request, "./rtc.html")
})
http.Handle("/", r)
srv := &http.Server{
@@ -96,3 +103,51 @@ func onHLS(w http.ResponseWriter, r *http.Request) {
}
}
func onRtc(w http.ResponseWriter, r *http.Request) {
v := struct {
Type string `json:"type"`
SDP string `json:"sdp"`
}{}
data, err := io.ReadAll(r.Body)
if err != nil {
panic(err)
}
if err := json.Unmarshal(data, &v); err != nil {
panic(err)
}
sinkId := stream.SinkId(123)
split := strings.Split(r.URL.Path, "/")
group := sync.WaitGroup{}
group.Add(1)
sink := rtc.NewSink(sinkId, split[len(split)-1], v.SDP, func(sdp string) {
response := struct {
Type string `json:"type"`
SDP string `json:"sdp"`
}{
Type: "answer",
SDP: sdp,
}
marshal, err := json.Marshal(response)
if err != nil {
panic(err)
}
w.Header().Set("Content-Type", "application/json")
w.Write(marshal)
group.Done()
})
sink.Play(sink, func() {
}, func(state utils.HookState) {
group.Done()
})
group.Wait()
}

View File

@@ -29,6 +29,7 @@
"webrtc": {
"port": 8000,
"public_ip": "192.168.31.123",
"transport": "UDP"
},

33
go.mod
View File

@@ -2,8 +2,37 @@ module github.com/yangjiechina/live-server
require github.com/yangjiechina/avformat v0.0.0
require github.com/gorilla/mux v1.8.1
require (
github.com/gorilla/mux v1.8.1
github.com/pion/webrtc/v3 v3.2.29
)
replace github.com/yangjiechina/avformat => ../avformat
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/pion/datachannel v1.5.5 // indirect
github.com/pion/dtls/v2 v2.2.7 // indirect
github.com/pion/ice/v2 v2.3.13 // indirect
github.com/pion/interceptor v0.1.25 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/mdns v0.0.12 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.12 // indirect
github.com/pion/rtp v1.8.3 // indirect
github.com/pion/sctp v1.8.12 // indirect
github.com/pion/sdp/v3 v3.0.8 // indirect
github.com/pion/srtp/v2 v2.0.18 // indirect
github.com/pion/stun v0.6.1 // indirect
github.com/pion/transport/v2 v2.2.3 // indirect
github.com/pion/turn/v2 v2.1.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.9.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/sys v0.16.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace github.com/yangjiechina/avformat => ../../avformat
go 1.19

View File

@@ -3,6 +3,7 @@ package main
import (
"github.com/yangjiechina/live-server/flv"
"github.com/yangjiechina/live-server/hls"
"github.com/yangjiechina/live-server/rtc"
"github.com/yangjiechina/live-server/rtsp"
"net"
"net/http"
@@ -35,11 +36,12 @@ func CreateTransStream(source stream.ISource, protocol stream.Protocol, streams
return flv.NewHttpTransStream()
} else if stream.ProtocolRtsp == protocol {
trackFormat := source.Id() + "?track=%d"
return rtsp.NewTransStream(net.IPAddr{
IP: rtspAddr.IP,
Zone: rtspAddr.Zone,
}, trackFormat)
} else if stream.ProtocolRtc == protocol {
return rtc.NewTransStream()
}
return nil

101
rtc.html Normal file
View File

@@ -0,0 +1,101 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>RTC Player</title>
</head>
<style>
#dialog {
position: fixed;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
background-color: #fff;
padding: 10px;
border-radius: 4px;
z-index: 9999;
}
.modal-footer {
display: flex;
justify-content: space-between;
margin-top: 10px;
}
.modal-footer button {
cursor: pointer;
margin-right: 10px;
}
</style>
<body>
<div style="margin-top: 10px; margin-left: 10px;">
<input style="width: 100px;" id="url" type="text" value="live/rtc/_mystream"/>
<button onclick="play()"> 播放</button>
</div>
<div id="dialog" style="display:none;">
<p id="call_title">确实要执行操作吗?</p>
<div class="modal-footer">
<button id="confirmBtn">接听</button>
<button id="cancelBtn">拒绝</button>
</div>
</div>
<div style="margin-top: 10px;">
<div style="float: left">
<video id="videoview" width="310" autoplay muted controls ></video>
</div>
</div>
</body>
</html>
<script>
async function play() {
let remote_view = document.getElementById("videoview");
let url = document.getElementById("url").value;
let pc = new RTCPeerConnection(null);
// pc.addTransceiver("audio", {direction: "recvonly"});
pc.addTransceiver("video", {direction: "recvonly"});
let offer = await pc.createOffer();
await pc.setLocalDescription(offer)
var data = {
type: "offer",
sdp: offer.sdp,
}
var stream = new MediaStream();
remote_view.srcObject = stream
pc.ontrack = function (event) {
if (event.streams.length === 0) {
return
}
stream.addTrack(event.track)
// remote_view.srcObject = event.streams[0]
// remote_view.play()
}
pc.onicegatheringstatechange = function (event) {
console.log("ice state:" + pc.iceConnectionState)
}
pc.onicecandidate = function (event) {
console.log("ice candidate:" + event.candidate)
}
fetch(url, {
method: 'POST',
body: JSON.stringify(data),
headers: new Headers({
'Content-Type': 'application/json',
}),
}).then((res) => res.json())
.then((data) => {
console.log("拉流响应:" + data["sdp"])
pc.setRemoteDescription({type: 'answer', sdp: data["sdp"]})
})
}
</script>

50
rtc/rtc_sink.go Normal file
View File

@@ -0,0 +1,50 @@
package rtc
import (
"github.com/pion/webrtc/v3"
"github.com/pion/webrtc/v3/pkg/media"
"github.com/yangjiechina/live-server/stream"
"time"
)
type sink struct {
stream.SinkImpl
offer string
answer string
peer *webrtc.PeerConnection
tracks []*webrtc.TrackLocalStaticSample
state webrtc.ICEConnectionState
cb func(sdp string)
}
func NewSink(id stream.SinkId, sourceId string, offer string, cb func(sdp string)) stream.ISink {
return &sink{stream.SinkImpl{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolRtc}, offer, "", nil, nil, webrtc.ICEConnectionStateNew, cb}
}
func (s *sink) setTrackCount(count int) {
s.tracks = make([]*webrtc.TrackLocalStaticSample, count)
}
func (s *sink) addTrack(index int, track *webrtc.TrackLocalStaticSample) error {
s.tracks[index] = track
return nil
}
func (s *sink) SendHeader(data []byte) error {
s.cb(string(data))
return nil
}
func (s *sink) input(index int, data []byte, ts uint32) error {
if s.tracks[index] == nil {
return nil
}
return s.tracks[index].WriteSample(media.Sample{
Data: data,
Duration: time.Duration(ts) * time.Millisecond,
})
}

100
rtc/rtc_stream.go Normal file
View File

@@ -0,0 +1,100 @@
package rtc
import (
"github.com/pion/webrtc/v3"
"github.com/yangjiechina/avformat/utils"
"github.com/yangjiechina/live-server/stream"
)
type transStream struct {
stream.TransStreamImpl
}
func NewTransStream() stream.ITransStream {
t := &transStream{}
t.Init()
return t
}
func (t *transStream) Input(packet utils.AVPacket) error {
if utils.AVMediaTypeAudio == packet.MediaType() {
} else if utils.AVMediaTypeVideo == packet.MediaType() {
for _, iSink := range t.Sinks {
sink_ := iSink.(*sink)
if sink_.state < webrtc.ICEConnectionStateConnected {
continue
}
if packet.KeyFrame() {
extra, err := t.TransStreamImpl.Tracks[packet.Index()].AnnexBExtraData()
if err != nil {
return err
}
sink_.input(packet.Index(), extra, 0)
}
sink_.input(packet.Index(), packet.AnnexBPacketData(), 40)
}
}
return nil
}
func (t *transStream) AddSink(sink_ stream.ISink) error {
//创建PeerConnection
var videoTrack *webrtc.TrackLocalStaticSample
rtcSink := sink_.(*sink)
rtcSink.setTrackCount(len(t.Tracks))
connection, err := webrtc.NewPeerConnection(webrtc.Configuration{})
connection.OnICECandidate(func(candidate *webrtc.ICECandidate) {
})
for index, track := range t.Tracks {
if utils.AVCodecIdH264 != track.CodecId() {
continue
}
videoTrack, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion")
if err != nil {
panic(err)
}
if _, err := connection.AddTransceiverFromTrack(videoTrack, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionSendonly}); err != nil {
panic(err)
}
if _, err = connection.AddTrack(videoTrack); err != nil {
panic(err)
}
rtcSink.addTrack(index, videoTrack)
}
if err = connection.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: rtcSink.offer}); err != nil {
return err
}
complete := webrtc.GatheringCompletePromise(connection)
answer, err := connection.CreateAnswer(nil)
if err != nil {
panic(err)
} else if err = connection.SetLocalDescription(answer); err != nil {
panic(err)
}
<-complete
connection.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
rtcSink.state = state
})
rtcSink.peer = connection
rtcSink.SendHeader([]byte(connection.LocalDescription().SDP))
return t.TransStreamImpl.AddSink(sink_)
}
func (t *transStream) WriteHeader() error {
return nil
}

View File

@@ -3,6 +3,7 @@ package stream
import (
"bytes"
"encoding/json"
"github.com/yangjiechina/avformat/utils"
"net/http"
"time"
)
@@ -35,6 +36,12 @@ func NewPublishHookEventInfo(stream, remoteAddr string, protocol SourceType) eve
return eventInfo{stream: stream, protocol: sourceTypeToStr(protocol), remoteAddr: remoteAddr}
}
type HookHandler interface {
Play(sink ISink, success func(), failure func(state utils.HookState))
PlayDone(sink ISink, success func(), failure func(state utils.HookState))
}
type HookSession interface {
send(url string, body interface{}, success func(response *http.Response), failure func(response *http.Response, err error)) error

View File

@@ -10,6 +10,8 @@ import (
type SinkId interface{}
type ISink interface {
HookHandler
Id() SinkId
Input(data []byte) error