WHEP 구조화 이전 백업

This commit is contained in:
Diallo Han
2024-08-30 10:23:57 +09:00
parent 9074bcfe08
commit f1e586776e
19 changed files with 944 additions and 454 deletions

2
go.mod
View File

@@ -1,4 +1,4 @@
module mrw-clone
module liveflow
go 1.21

View File

@@ -11,8 +11,8 @@ import (
"github.com/bluenviron/gohlslib/pkg/playlist"
"github.com/labstack/echo/v4"
"mrw-clone/log"
"mrw-clone/media/hlshub"
"liveflow/log"
"liveflow/media/hlshub"
)
const (

View File

@@ -2,6 +2,8 @@ package log
import (
"context"
"fmt"
"runtime"
"github.com/sirupsen/logrus"
)
@@ -10,6 +12,46 @@ type ctxKey string
const loggerKey ctxKey = "logger"
type SkipHook struct {
skip int
}
func (hook *SkipHook) Levels() []logrus.Level {
return logrus.AllLevels
}
func (hook *SkipHook) Fire(entry *logrus.Entry) error {
pc, file, line, ok := runtime.Caller(hook.skip)
if ok {
entry.Caller = &runtime.Frame{
PC: pc,
File: file,
Line: line,
Function: runtime.FuncForPC(pc).Name(),
}
}
return nil
}
func SetLevel(ctx context.Context, level logrus.Level) {
logrus.SetLevel(level)
//getLogger(ctx).Logger.SetLevel(level)
}
func Init() {
//logrus.AddHook(&SkipHook{skip: 7})
}
func SetFormatter(ctx context.Context, formatter logrus.Formatter) {
logrus.SetFormatter(formatter)
//getLogger(ctx).Logger.SetFormatter(formatter)
}
func SetCaller(ctx context.Context, flag bool) {
//logrus.SetReportCaller(flag)
//getLogger(ctx).Logger.SetReportCaller(flag)
}
// 컨텍스트에서 로거를 가져오는 헬퍼 함수
func getLogger(ctx context.Context) *logrus.Entry {
logger, ok := ctx.Value(loggerKey).(*logrus.Entry)
@@ -25,71 +67,78 @@ func WithFields(ctx context.Context, fields map[string]interface{}) context.Cont
logger := getLogger(ctx).WithFields(fields)
return context.WithValue(ctx, loggerKey, logger)
}
func CallerFileLine() string {
_, file, line, ok := runtime.Caller(3)
if ok {
return fmt.Sprintf("%s:%d", file, line)
}
return ""
}
func CallerFunc() string {
pc, _, _, ok := runtime.Caller(3)
if ok {
return runtime.FuncForPC(pc).Name()
}
return ""
}
func getLoggerWithStack(ctx context.Context) *logrus.Entry {
return getLogger(ctx).WithFields(logrus.Fields{
"file": CallerFileLine(),
"func": CallerFunc(),
})
}
func Info(ctx context.Context, args ...interface{}) {
getLogger(ctx).Info(args...)
getLoggerWithStack(ctx).Info(args...)
}
func Infof(ctx context.Context, format string, args ...interface{}) {
getLogger(ctx).Infof(format, args...)
getLoggerWithStack(ctx).Infof(format, args...)
}
func Debug(ctx context.Context, args ...interface{}) {
getLogger(ctx).Debug(args...)
getLoggerWithStack(ctx).Debug(args...)
}
func Debugf(ctx context.Context, format string, args ...interface{}) {
getLogger(ctx).Debugf(format, args...)
getLoggerWithStack(ctx).Debugf(format, args...)
}
func Warn(ctx context.Context, args ...interface{}) {
getLogger(ctx).Warn(args...)
getLoggerWithStack(ctx).Warn(args...)
}
func Warnf(ctx context.Context, format string, args ...interface{}) {
getLogger(ctx).Warnf(format, args...)
getLoggerWithStack(ctx).Warnf(format, args...)
}
func Error(ctx context.Context, args ...interface{}) {
getLogger(ctx).Error(args...)
getLoggerWithStack(ctx).Error(args...)
}
func Errorf(ctx context.Context, format string, args ...interface{}) {
getLogger(ctx).Errorf(format, args...)
getLoggerWithStack(ctx).Errorf(format, args...)
}
func Fatal(ctx context.Context, args ...interface{}) {
getLogger(ctx).Fatal(args...)
getLoggerWithStack(ctx).Fatal(args...)
}
func Fatalf(ctx context.Context, format string, args ...interface{}) {
getLogger(ctx).Fatalf(format, args...)
getLoggerWithStack(ctx).Fatalf(format, args...)
}
func Panic(ctx context.Context, args ...interface{}) {
getLogger(ctx).Panic(args...)
getLoggerWithStack(ctx).Panic(args...)
}
func Panicf(ctx context.Context, format string, args ...interface{}) {
getLogger(ctx).Panicf(format, args...)
getLoggerWithStack(ctx).Panicf(format, args...)
}
func Print(ctx context.Context, args ...interface{}) {
getLogger(ctx).Print(args...)
}
func SetLevel(ctx context.Context, level logrus.Level) {
logrus.SetLevel(level)
getLogger(ctx).Logger.SetLevel(level)
}
func SetFormatter(ctx context.Context, formatter logrus.Formatter) {
logrus.SetFormatter(formatter)
getLogger(ctx).Logger.SetFormatter(formatter)
}
func SetCaller(ctx context.Context, flag bool) {
logrus.SetReportCaller(flag)
getLogger(ctx).Logger.SetReportCaller(flag)
getLoggerWithStack(ctx).Print(args...)
}

48
main.go
View File

@@ -5,26 +5,37 @@ import (
"fmt"
"github.com/labstack/echo/v4"
"github.com/sirupsen/logrus"
"mrw-clone/httpsrv"
"mrw-clone/media/hlshub"
"mrw-clone/media/hub"
"mrw-clone/media/streamer/hls"
"mrw-clone/media/streamer/record/mp4"
"mrw-clone/media/streamer/rtmp"
"mrw-clone/media/streamer/whip"
"liveflow/httpsrv"
"liveflow/log"
"liveflow/media/hlshub"
"liveflow/media/hub"
"liveflow/media/streamer/egress/hls"
"liveflow/media/streamer/egress/record/mp4"
"liveflow/media/streamer/egress/whep"
"liveflow/media/streamer/ingress/rtmp"
"liveflow/media/streamer/ingress/whip"
)
// RTMP 받으면 자동으로 HLS 서비스 동작, 녹화 서비스까지~?
func main() {
ctx := context.Background()
log.Init()
//log.SetCaller(ctx, true)
//log.SetFormatter(ctx, &logrus.JSONFormatter{
// TimestampFormat: "2006-01-02 15:04:05",
//})
ctx = log.WithFields(ctx, logrus.Fields{
"app": "liveflow",
})
log.Info(ctx, "liveflow is started")
hub := hub.NewHub()
// ingress
// Egress 서비스는 streamID 알림을 구독하여 처리 시작
go func() {
api := echo.New()
api.HideBanner = true
hlsHub := hlshub.NewHLSHub()
hlsHandler := httpsrv.NewHandler(hlsHub)
api.GET("/health", func(c echo.Context) error {
@@ -37,17 +48,30 @@ func main() {
go func() {
api.Start("0.0.0.0:8044")
}()
for streamID := range hub.SubscribeToStreamID() {
fmt.Printf("New streamID received: %s\n", streamID)
for source := range hub.SubscribeToStreamID() {
log.Infof(ctx, "New streamID received: %s", source.StreamID())
hls := hls.NewHLS(hls.HLSArgs{
Hub: hub,
HLSHub: hlsHub,
})
hls.Start(ctx, streamID)
err := hls.Start(ctx, source)
if err != nil {
log.Errorf(ctx, "failed to start hls: %v", err)
}
mp4 := mp4.NewMP4(mp4.MP4Args{
Hub: hub,
})
mp4.Start(ctx, streamID)
err = mp4.Start(ctx, source)
if err != nil {
log.Errorf(ctx, "failed to start mp4: %v", err)
}
whep := whep.NewWHEP(whep.WHEPArgs{
Hub: hub,
})
err = whep.Start(ctx, source)
if err != nil {
log.Errorf(ctx, "failed to start whip: %v", err)
}
}
}()

View File

@@ -7,6 +7,7 @@ import (
type FrameData struct {
H264Video *H264Video
AACAudio *AACAudio
OPUSAudio *OPUSAudio
//AudioCodecData *AudioCodecData
//MediaInfo *MediaInfo
}
@@ -45,6 +46,37 @@ func (h *H264Video) RawDTS() int64 {
}
}
type OPUSAudio struct {
PTS int64
DTS int64
AudioClockRate uint32
Data []byte
}
func (a *OPUSAudio) RawTimestamp() int64 {
if a.AudioClockRate == 0 {
return a.PTS
} else {
return int64(float64(a.PTS) / float64(a.AudioClockRate) * 1000)
}
}
func (a *OPUSAudio) RawPTS() int64 {
if a.AudioClockRate == 0 {
return a.PTS
} else {
return int64(float64(a.PTS) / float64(a.AudioClockRate/1000.0))
}
}
func (a *OPUSAudio) RawDTS() int64 {
if a.AudioClockRate == 0 {
return a.DTS
} else {
return int64(float64(a.DTS) / float64(a.AudioClockRate/1000.0))
}
}
type AACAudio struct {
Data []byte
MPEG4AudioConfigBytes []byte

View File

@@ -5,12 +5,32 @@ import (
"fmt"
"sync"
"time"
"liveflow/log"
)
type MediaType int
const (
Video MediaType = 1
Audio = 2
)
type MediaSpec struct {
MediaType MediaType
CodecType string
}
type Source interface {
Name() string
MediaSpecs() []MediaSpec
StreamID() string
}
// Hub 구조체: streamID별로 독립적으로 데이터를 관리하고, Pub/Sub 메커니즘을 지원합니다.
type Hub struct {
streams map[string][]chan *FrameData // 각 streamID에 대한 채널을 저장
notifyChan chan string // streamID가 결정되었을 때 노티하는 채널
notifyChan chan Source // streamID가 결정되었을 때 노티하는 채널
mu sync.RWMutex // 동시성을 위한 Mutex
}
@@ -18,11 +38,12 @@ type Hub struct {
func NewHub() *Hub {
return &Hub{
streams: make(map[string][]chan *FrameData),
notifyChan: make(chan string, 1024), // 버퍼 크기를 조절할 수 있습니다.
notifyChan: make(chan Source, 1024), // 버퍼 크기를 조절할 수 있습니다.
}
}
func (h *Hub) Notify(streamID string) {
func (h *Hub) Notify(ctx context.Context, streamID Source) {
log.Info(ctx, "Notify", streamID.Name(), streamID.MediaSpecs())
h.notifyChan <- streamID
}
@@ -71,7 +92,7 @@ func (h *Hub) Subscribe(streamID string) <-chan *FrameData {
}
// SubscribeToStreamID : 스트림 ID가 결정되었을 때 이를 구독하는 채널을 반환합니다.
func (h *Hub) SubscribeToStreamID() <-chan string {
func (h *Hub) SubscribeToStreamID() <-chan Source {
return h.notifyChan
}

View File

@@ -9,10 +9,17 @@ import (
"github.com/bluenviron/gohlslib"
"github.com/bluenviron/gohlslib/pkg/codecs"
"github.com/deepch/vdk/codec/h264parser"
"github.com/sirupsen/logrus"
"mrw-clone/log"
"mrw-clone/media/hlshub"
"mrw-clone/media/hub"
"liveflow/log"
"liveflow/media/hlshub"
"liveflow/media/hub"
"liveflow/media/streamer/fields"
)
var (
ErrNotContainAudioOrVideo = errors.New("media spec does not contain audio or video")
ErrUnsupportedCodec = errors.New("unsupported codec")
)
type HLS struct {
@@ -33,8 +40,37 @@ func NewHLS(args HLSArgs) *HLS {
}
}
func (h *HLS) Start(ctx context.Context, streamID string) {
sub := h.hub.Subscribe(streamID)
func (h *HLS) Start(ctx context.Context, source hub.Source) error {
containsAudio := false
containsVideo := false
audioCodec := ""
videoCodec := ""
for _, spec := range source.MediaSpecs() {
if spec.MediaType == hub.Audio {
containsAudio = true
audioCodec = spec.CodecType
}
if spec.MediaType == hub.Video {
containsVideo = true
videoCodec = spec.CodecType
}
}
if !containsVideo || !containsAudio {
return ErrNotContainAudioOrVideo
}
if audioCodec != "aac" {
return ErrUnsupportedCodec
}
if videoCodec != "h264" {
return ErrUnsupportedCodec
}
ctx = log.WithFields(ctx, logrus.Fields{
fields.StreamID: source.StreamID(),
fields.SourceName: source.Name(),
})
log.Info(ctx, "start hls")
log.Info(ctx, "view url: ", "http://localhost:8044/hls/"+source.StreamID()+"/master.m3u8")
sub := h.hub.Subscribe(source.StreamID())
go func() {
for data := range sub {
if data.AACAudio != nil {
@@ -43,7 +79,7 @@ func (h *HLS) Start(ctx context.Context, streamID string) {
if err != nil {
log.Error(ctx, err)
}
h.hlsHub.StoreMuxer(streamID, "pass", muxer)
h.hlsHub.StoreMuxer(source.StreamID(), "pass", muxer)
err = muxer.Start()
if err != nil {
log.Error(ctx, err)
@@ -67,8 +103,9 @@ func (h *HLS) Start(ctx context.Context, streamID string) {
}
}
}
fmt.Println("@@@ [HLS] end of streamID: ", streamID)
fmt.Println("@@@ [HLS] end of streamID: ", source.StreamID())
}()
return nil
}
func (h *HLS) makeMuxer(extraData []byte) (*gohlslib.Muxer, error) {

View File

@@ -8,10 +8,17 @@ import (
"os"
"github.com/deepch/vdk/codec/aacparser"
"github.com/sirupsen/logrus"
gomp4 "github.com/yapingcat/gomedia/go-mp4"
"mrw-clone/log"
"mrw-clone/media/hub"
"liveflow/log"
"liveflow/media/hub"
"liveflow/media/streamer/fields"
)
var (
ErrNotContainAudioOrVideo = errors.New("media spec does not contain audio or video")
ErrUnsupportedCodec = errors.New("unsupported codec")
)
type cacheWriterSeeker struct {
@@ -87,8 +94,37 @@ func NewMP4(args MP4Args) *MP4 {
}
}
func (h *MP4) Start(ctx context.Context, streamID string) error {
sub := h.hub.Subscribe(streamID)
func (h *MP4) Start(ctx context.Context, source hub.Source) error {
containsAudio := false
containsVideo := false
audioCodec := ""
videoCodec := ""
for _, spec := range source.MediaSpecs() {
if spec.MediaType == hub.Audio {
containsAudio = true
audioCodec = spec.CodecType
}
if spec.MediaType == hub.Video {
containsVideo = true
videoCodec = spec.CodecType
}
}
if !containsVideo || !containsAudio {
return ErrNotContainAudioOrVideo
}
if audioCodec != "aac" {
return ErrUnsupportedCodec
}
if videoCodec != "h264" {
return ErrUnsupportedCodec
}
ctx = log.WithFields(ctx, logrus.Fields{
fields.StreamID: source.StreamID(),
fields.SourceName: source.Name(),
})
log.Info(ctx, "start mp4")
sub := h.hub.Subscribe(source.StreamID())
go func() {
var err error

View File

@@ -0,0 +1,32 @@
package whep
import (
"context"
"liveflow/media/hub"
)
type WHEPArgs struct {
Hub *hub.Hub
}
// whip
type WHEP struct {
hub *hub.Hub
}
func NewWHEP(args WHEPArgs) *WHEP {
return &WHEP{
hub: args.Hub,
}
}
func (w *WHEP) Start(ctx context.Context, source hub.Source) error {
sub := w.hub.Subscribe(source.StreamID())
go func() {
for data := range sub {
_ = data
}
}()
return nil
}

View File

@@ -0,0 +1,6 @@
package fields
const (
StreamID = "liveflow_stream_id"
SourceName = "liveflow_source_name"
)

View File

@@ -16,8 +16,8 @@ import (
"github.com/yutopp/go-rtmp"
rtmpmsg "github.com/yutopp/go-rtmp/message"
"mrw-clone/log"
"mrw-clone/media/hub"
"liveflow/log"
"liveflow/media/hub"
)
type Handler struct {
@@ -32,6 +32,21 @@ type Handler struct {
sps []byte
pps []byte
hasSPS bool
mediaSpecs []hub.MediaSpec
notifiedSource bool
}
func (h *Handler) Name() string {
return "rtmp"
}
func (h *Handler) MediaSpecs() []hub.MediaSpec {
return h.mediaSpecs
}
func (h *Handler) StreamID() string {
return h.streamID
}
func (h *Handler) OnServe(conn *rtmp.Conn) {
@@ -48,7 +63,8 @@ func (h *Handler) OnCreateStream(timestamp uint32, cmd *rtmpmsg.NetConnectionCre
}
func (h *Handler) OnPublish(_ *rtmp.StreamContext, timestamp uint32, cmd *rtmpmsg.NetStreamPublish) error {
log.Infof(context.Background(), "OnPublish: %#v", cmd)
ctx := context.Background()
log.Infof(ctx, "OnPublish: %#v", cmd)
// (example) Reject a connection when PublishingName is empty
if cmd.PublishingName == "" {
@@ -74,8 +90,21 @@ func (h *Handler) OnPublish(_ *rtmp.StreamContext, timestamp uint32, cmd *rtmpms
h.flvEnc = enc
h.streamID = cmd.PublishingName
h.hub.Notify(cmd.PublishingName)
h.mediaSpecs = []hub.MediaSpec{
{
MediaType: hub.Video,
CodecType: "h264",
},
{
MediaType: hub.Audio,
CodecType: "aac",
},
}
if !h.notifiedSource && len(h.mediaSpecs) == 2 {
h.hub.Notify(ctx, h)
h.notifiedSource = true
}
return nil
}
@@ -160,6 +189,8 @@ func (h *Handler) OnVideo(timestamp uint32, payload io.Reader) error {
if err != nil {
return err
}
if videoData.CodecID == flvtag.CodecIDAVC {
}
// FLV 바디 데이터를 처리하고 대응하는 작업 수행
return h.processVideoData(ctx, timestamp, videoData)

View File

@@ -2,13 +2,15 @@ package rtmp
import (
"context"
"fmt"
"io"
"net"
"github.com/sirupsen/logrus"
"github.com/yutopp/go-rtmp"
"mrw-clone/log"
"mrw-clone/media/hub"
"liveflow/log"
"liveflow/media/hub"
)
const (
@@ -33,7 +35,8 @@ func NewRTMP(args RTMPArgs) *RTMP {
}
func (r *RTMP) Serve(ctx context.Context) error {
tcpAddr, err := net.ResolveTCPAddr("tcp", ":1930")
port := 1930
tcpAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Errorf(ctx, "Failed: %+v", err)
}
@@ -51,7 +54,6 @@ func (r *RTMP) Serve(ctx context.Context) error {
//ControlState: rtmp.StreamControlStateConfig{
// DefaultBandwidthWindowSize: 6 * 1024 * 1024 / 8,
//},
//Logger: nil,
SkipHandshakeVerification: false,
IgnoreMessagesOnNotExistStream: false,
IgnoreMessagesOnNotExistStreamThreshold: 0,
@@ -63,6 +65,10 @@ func (r *RTMP) Serve(ctx context.Context) error {
}
},
})
ctx = log.WithFields(ctx, logrus.Fields{
"port": port,
})
log.Info(ctx, "RTMP server started")
if err := srv.Serve(listener); err != nil {
log.Errorf(ctx, "Failed: %+v", err)
}

View File

@@ -0,0 +1,342 @@
package whip
import (
"context"
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/labstack/echo/v4"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"github.com/pion/webrtc/v3"
"liveflow/log"
"liveflow/media/hub"
)
var (
ErrMissingTrack = fmt.Errorf("missing track")
ErrTrackWaitTimeOut = fmt.Errorf("track wait timeout")
)
type WebRTCHandler struct {
hub *hub.Hub
pc *webrtc.PeerConnection
streamID string
timestampGen TimestampGenerator[int64]
tracks map[string][]*webrtc.TrackLocalStaticRTP
videoTrack *webrtc.TrackLocalStaticRTP
audioTrack *webrtc.TrackLocalStaticRTP
notifiedSource bool
mediaArgs []hub.MediaSpec
expectedTrackCount int
}
type WebRTCHandlerArgs struct {
Hub *hub.Hub
PeerConnection *webrtc.PeerConnection
StreamID string
Tracks map[string][]*webrtc.TrackLocalStaticRTP
ExpectedTrackCount int
}
func NewWebRTCHandler(hub *hub.Hub, args *WebRTCHandlerArgs) *WebRTCHandler {
videoTrack, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion")
if err != nil {
panic(err)
}
audioTrack, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion")
if err != nil {
panic(err)
}
ret := &WebRTCHandler{
hub: hub,
streamID: args.StreamID,
timestampGen: TimestampGenerator[int64]{},
pc: args.PeerConnection,
tracks: args.Tracks,
expectedTrackCount: args.ExpectedTrackCount,
videoTrack: videoTrack,
audioTrack: audioTrack,
}
if _, ok := ret.tracks[args.StreamID]; !ok {
ret.tracks[args.StreamID] = []*webrtc.TrackLocalStaticRTP{videoTrack, audioTrack}
}
return ret
}
func (w *WebRTCHandler) StreamID() string {
return w.streamID
}
func (w *WebRTCHandler) Name() string {
return "webrtc"
}
func (w *WebRTCHandler) MediaSpecs() []hub.MediaSpec {
var ret []hub.MediaSpec
for _, arg := range w.mediaArgs {
ret = append(ret, arg)
}
return ret
}
func (w *WebRTCHandler) WaitTrackArgs(ctx context.Context, timeout time.Duration, trackArgCh <-chan TrackArgs) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
for {
select {
case <-ctx.Done():
if len(w.mediaArgs) == 0 {
return ErrMissingTrack
}
return ErrTrackWaitTimeOut
case args := <-trackArgCh:
fmt.Println("add track args: ", args)
audioSplits := strings.Split(args.MimeType, "audio/")
videoSplits := strings.Split(args.MimeType, "video/")
if len(audioSplits) > 1 {
w.mediaArgs = append(w.mediaArgs, hub.MediaSpec{
MediaType: hub.Audio,
CodecType: audioSplits[1],
})
}
if len(videoSplits) > 1 {
w.mediaArgs = append(w.mediaArgs, hub.MediaSpec{
MediaType: hub.Video,
CodecType: videoSplits[1],
})
}
if len(w.mediaArgs) == w.expectedTrackCount {
w.hub.Notify(ctx, w)
w.notifiedSource = true
return nil
}
}
}
}
func (w *WebRTCHandler) OnICEConnectionStateChange(connectionState webrtc.ICEConnectionState, trackArgCh <-chan TrackArgs) {
ctx := context.Background()
switch connectionState {
case webrtc.ICEConnectionStateConnected:
fmt.Println("ICE Connection State Connected")
go func() {
err := w.WaitTrackArgs(ctx, 3*time.Second, trackArgCh)
if err != nil {
log.Error(ctx, err, "failed to wait track args")
return
}
}()
case webrtc.ICEConnectionStateDisconnected:
w.OnClose(ctx)
//delete(w.tracks, streamKey)
fmt.Println("ICE Connection State Disconnected")
case webrtc.ICEConnectionStateFailed:
fmt.Println("ICE Connection State Failed")
_ = w.pc.Close()
}
}
type TrackArgs struct {
MimeType string
ClockRate uint32
Channels uint16
}
func (w *WebRTCHandler) OnTrack(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver, trackArgCh chan<- TrackArgs) {
ctx := context.Background()
fmt.Printf("Track has started, of type %s %s\n", track.Kind(), track.Codec().MimeType)
var videoPackets []*rtp.Packet
var audioPackets []*rtp.Packet
var videoPacketsQueue [][]*rtp.Packet
var audioPacketsQueue [][]*rtp.Packet
currentVideoTimestamp := uint32(0)
currentAudioTimestamp := uint32(0)
trackArgCh <- TrackArgs{
MimeType: track.Codec().MimeType,
ClockRate: track.Codec().ClockRate,
Channels: track.Codec().Channels,
}
for {
pkt, _, err := track.ReadRTP()
if err != nil {
log.Error(ctx, err, "failed to read rtp")
break
}
switch track.Kind() {
case webrtc.RTPCodecTypeVideo:
//fmt.Println("timestamp: ", pkt.Timestamp)
if len(videoPackets) > 0 && currentVideoTimestamp != pkt.Timestamp {
videoPacketsQueue = append(videoPacketsQueue, videoPackets)
videoPackets = nil
}
videoPackets = append(videoPackets, pkt)
currentVideoTimestamp = pkt.Timestamp
if pkt.Marker {
videoPacketsQueue = append(videoPacketsQueue, videoPackets)
videoPackets = nil
}
//fmt.Println("frame len: ", len(h264Bytes))
if err = w.videoTrack.WriteRTP(pkt); err != nil {
panic(err)
}
case webrtc.RTPCodecTypeAudio:
if len(audioPackets) > 0 && currentAudioTimestamp != pkt.Timestamp {
audioPacketsQueue = append(audioPacketsQueue, audioPackets)
audioPackets = nil
}
audioPackets = append(audioPackets, pkt)
currentAudioTimestamp = pkt.Timestamp
if pkt.Marker {
audioPacketsQueue = append(audioPacketsQueue, audioPackets)
audioPackets = nil
}
if err = w.audioTrack.WriteRTP(pkt); err != nil {
panic(err)
}
}
if len(videoPacketsQueue) > 0 || len(audioPacketsQueue) > 0 {
if !w.notifiedSource {
fmt.Println("not yet notified source")
}
}
if w.notifiedSource {
for _, videoPackets := range videoPacketsQueue {
w.OnVideo(ctx, videoPackets)
}
videoPacketsQueue = nil
for _, audioPackets := range audioPacketsQueue {
w.OnAudio(ctx, track.Codec().ClockRate, audioPackets)
}
audioPacketsQueue = nil
}
}
}
func (w *WebRTCHandler) OnClose(ctx context.Context) error {
w.hub.Unpublish(w.streamID)
fmt.Println("OnClose")
return nil
}
func (w *WebRTCHandler) OnVideo(ctx context.Context, packets []*rtp.Packet) error {
var h264RTPParser = &codecs.H264Packet{}
payload := make([]byte, 0)
for _, pkt := range packets {
if len(pkt.Payload) == 0 {
continue
}
b, err := h264RTPParser.Unmarshal(pkt.Payload)
if err != nil {
log.Error(ctx, err, "failed to unmarshal h264")
}
payload = append(payload, b...)
}
if len(payload) == 0 {
return nil
}
pts := w.timestampGen.GetTimestamp(int64(packets[0].Timestamp))
w.hub.Publish(w.streamID, &hub.FrameData{
H264Video: &hub.H264Video{
PTS: pts,
DTS: pts,
VideoClockRate: 90000,
Data: payload,
SPS: nil,
PPS: nil,
SliceType: 0,
CodecData: nil,
},
AACAudio: nil,
})
return nil
}
func (w *WebRTCHandler) OnAudio(ctx context.Context, clockRate uint32, packets []*rtp.Packet) error {
var opusRTPParser = &codecs.OpusPacket{}
payload := make([]byte, 0)
for _, pkt := range packets {
if len(pkt.Payload) == 0 {
continue
}
b, err := opusRTPParser.Unmarshal(pkt.Payload)
if err != nil {
log.Error(ctx, err, "failed to unmarshal opus")
}
payload = append(payload, b...)
}
if len(payload) == 0 {
return nil
}
pts := w.timestampGen.GetTimestamp(int64(packets[0].Timestamp))
w.hub.Publish(w.streamID, &hub.FrameData{
OPUSAudio: &hub.OPUSAudio{
PTS: pts,
DTS: pts,
AudioClockRate: clockRate,
Data: payload,
},
})
return nil
}
func (r *WHIP) whepHandler(c echo.Context) error {
// Read the offer from HTTP Request
offer, err := io.ReadAll(c.Request().Body)
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
streamKey, err := r.bearerToken(c)
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
// Create a new RTCPeerConnection
peerConnection, err := webrtc.NewPeerConnection(peerConnectionConfiguration)
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
var rtpSenders []*webrtc.RTPSender
for _, track := range r.tracks[streamKey] {
sender, err := peerConnection.AddTrack(track)
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
rtpSenders = append(rtpSenders, sender)
}
// Read incoming RTCP packets
go func() {
rtcpBuf := make([]byte, 1500)
for {
for _, rtpSender := range rtpSenders {
n, _, rtcpErr := rtpSender.Read(rtcpBuf)
if rtcpErr != nil {
return
}
fmt.Println("rtcpBuf: len:", n)
}
}
}()
peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
if connectionState == webrtc.ICEConnectionStateFailed {
_ = peerConnection.Close()
}
})
// Send answer via HTTP Response
return writeAnswer3(c, peerConnection, offer, "/whep")
}

View File

@@ -0,0 +1,260 @@
package whip
import (
"fmt"
"io"
"net/http"
"strconv"
"strings"
"github.com/labstack/echo/v4"
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/intervalpli"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
"liveflow/media/hub"
)
var (
errNoStreamKey = echo.NewHTTPError(http.StatusUnauthorized, "No stream key provided")
)
type WHIP struct {
hub *hub.Hub
tracks map[string][]*webrtc.TrackLocalStaticRTP
}
type WHIPArgs struct {
Hub *hub.Hub
}
var (
//videoTrack *webrtc.TrackLocalStaticRTP
//audioTrack *webrtc.TrackLocalStaticRTP
peerConnectionConfiguration = webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
)
func NewWHIP(args WHIPArgs) *WHIP {
return &WHIP{
hub: args.Hub,
tracks: make(map[string][]*webrtc.TrackLocalStaticRTP),
}
}
func (r *WHIP) Serve() {
whipServer := echo.New()
whipServer.HideBanner = true
whipServer.Static("/", ".")
whipServer.POST("/whip", r.whipHandler)
whipServer.POST("/whep", r.whepHandler)
//whipServer.PATCH("/whip", whipHandler)
whipServer.Start(":5555")
}
func (r *WHIP) bearerToken(c echo.Context) (string, error) {
bearerToken := c.Request().Header.Get("Authorization")
if len(bearerToken) == 0 {
return "", errNoStreamKey
}
authHeaderParts := strings.Split(bearerToken, " ")
if len(authHeaderParts) != 2 {
return "", errNoStreamKey
}
return authHeaderParts[1], nil
}
func (r *WHIP) whipHandler(c echo.Context) error {
// Read the offer from HTTP Request
offer, err := io.ReadAll(c.Request().Body)
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
// Parse the SDP
parsedSDP := sdp.SessionDescription{}
if err := parsedSDP.Unmarshal([]byte(offer)); err != nil {
panic(err)
}
// Count the number of media tracks
trackCount := 0
for _, media := range parsedSDP.MediaDescriptions {
//for _, attr := range media.Attributes {
// if attr.Key == "rtpmap" {
// rtpmap, err := parseRTPMAP(attr)
// if err != nil {
// return c.JSON(http.StatusInternalServerError, err.Error())
// }
// fmt.Printf("RTPMAP: %+v\n", rtpmap)
// }
//}
if media.MediaName.Media == "audio" || media.MediaName.Media == "video" {
trackCount++
}
}
streamKey, err := r.bearerToken(c)
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
fmt.Println("streamkey: ", streamKey)
// Create a MediaEngine object to configure the supported codec
m := &webrtc.MediaEngine{}
// Setup the codecs you want to use.
if err = m.RegisterCodec(webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil},
PayloadType: 96,
}, webrtc.RTPCodecTypeVideo); err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
if err = m.RegisterCodec(webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 2, SDPFmtpLine: "", RTCPFeedback: nil},
PayloadType: 111,
}, webrtc.RTPCodecTypeAudio); err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
// Create a InterceptorRegistry
i := &interceptor.Registry{}
// Register a intervalpli factory
intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
i.Add(intervalPliFactory)
// Use the default set of Interceptors
if err = webrtc.RegisterDefaultInterceptors(m, i); err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
// Create the API object with the MediaEngine
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i))
// Create a new RTCPeerConnection
peerConnection, err := api.NewPeerConnection(peerConnectionConfiguration)
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
// Allow us to receive 1 video track
if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
// Allow us to receive 1 audio track
if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
whipHandler := NewWebRTCHandler(r.hub, &WebRTCHandlerArgs{
Hub: r.hub,
PeerConnection: peerConnection,
StreamID: streamKey,
Tracks: r.tracks,
ExpectedTrackCount: trackCount,
})
trackArgCh := make(chan TrackArgs)
peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
whipHandler.OnICEConnectionStateChange(connectionState, trackArgCh)
fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
})
peerConnection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
for _, t := range receiver.GetParameters().Codecs {
fmt.Println("Codec: ", t.MimeType)
}
whipHandler.OnTrack(track, receiver, trackArgCh)
})
// Send answer via HTTP Response
return writeAnswer3(c, peerConnection, offer, "/whip")
}
func writeAnswer3(c echo.Context, peerConnection *webrtc.PeerConnection, offer []byte, path string) error {
// Set the handler for ICE connection state
if err := peerConnection.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(offer)}); err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
// Create channel that is blocked until ICE Gathering is complete
gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
// Create answer
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
// Block until ICE Gathering is complete, disabling trickle ICE
<-gatherComplete
// WHIP+WHEP expects a Location header and a HTTP Status Code of 201
c.Response().Header().Add("Location", path)
c.Response().WriteHeader(http.StatusCreated)
// Write Answer with Candidates as HTTP Response
return c.String(http.StatusOK, peerConnection.LocalDescription().SDP)
}
type RTPMAP struct {
PayloadType webrtc.PayloadType
MimeType string
CodecName string
ClockRate uint32
Channels uint16
}
func parseRTPMAP(attr sdp.Attribute) (RTPMAP, error) {
const (
attributeRtpMap = "rtpmap"
attributeValueIndex = 2
fmtpIndexCodec = 0
fmtpIndexClockRate = 1
fmtpIndexChannels = 2
)
var (
InvalidSDPAttribute = fmt.Errorf("invalid SDP attribute")
)
if attr.Key != attributeRtpMap || attr.Value == "" {
return RTPMAP{}, InvalidSDPAttribute
}
var rtpmap RTPMAP
r := strings.Split(attr.Value, " ")
if len(r) >= 1 {
pt, err := strconv.Atoi(r[0])
if err != nil {
return RTPMAP{}, InvalidSDPAttribute
}
rtpmap.PayloadType = webrtc.PayloadType(pt)
}
if len(r) >= attributeValueIndex {
rtpmap.MimeType = r[1]
r2 := strings.Split(rtpmap.MimeType, "/")
if len(r2) >= fmtpIndexCodec+1 {
rtpmap.CodecName = r2[fmtpIndexCodec]
}
if len(r2) >= fmtpIndexClockRate+1 {
if clockRate, err := strconv.Atoi(r2[fmtpIndexClockRate]); err == nil {
rtpmap.ClockRate = uint32(clockRate)
}
}
if len(r2) >= fmtpIndexChannels+1 {
if channels, err := strconv.Atoi(r2[fmtpIndexChannels]); err == nil {
rtpmap.Channels = uint16(channels)
}
}
}
return rtpmap, nil
}

View File

@@ -3,7 +3,7 @@ package streamer
import (
"context"
"mrw-clone/media/hub"
"liveflow/media/hub"
)
type Streamer interface {

View File

@@ -1 +0,0 @@
package whip

View File

@@ -1,385 +0,0 @@
package whip
import (
"context"
"fmt"
"io"
"net/http"
"strings"
"github.com/labstack/echo/v4"
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/intervalpli"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"github.com/pion/webrtc/v3"
"mrw-clone/log"
"mrw-clone/media/hub"
)
var (
errNoStreamKey = echo.NewHTTPError(http.StatusUnauthorized, "No stream key provided")
)
type WHIP struct {
hub *hub.Hub
tracks map[string][]*webrtc.TrackLocalStaticRTP
}
type WHIPArgs struct {
Hub *hub.Hub
}
var (
//videoTrack *webrtc.TrackLocalStaticRTP
//audioTrack *webrtc.TrackLocalStaticRTP
peerConnectionConfiguration = webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{
URLs: []string{"stun:stun.l.google.com:19302"},
},
},
}
)
func NewWHIP(args WHIPArgs) *WHIP {
return &WHIP{
hub: args.Hub,
tracks: make(map[string][]*webrtc.TrackLocalStaticRTP),
}
}
func (r *WHIP) Serve() {
whipServer := echo.New()
whipServer.Static("/", ".")
whipServer.POST("/whip", r.whipHandler)
whipServer.POST("/whep", r.whepHandler)
//whipServer.PATCH("/whip", whipHandler)
whipServer.Start(":5555")
}
func (r *WHIP) bearerToken(c echo.Context) (string, error) {
bearerToken := c.Request().Header.Get("Authorization")
if len(bearerToken) == 0 {
return "", errNoStreamKey
}
authHeaderParts := strings.Split(bearerToken, " ")
if len(authHeaderParts) != 2 {
return "", errNoStreamKey
}
return authHeaderParts[1], nil
}
func (r *WHIP) whipHandler(c echo.Context) error {
// Read the offer from HTTP Request
offer, err := io.ReadAll(c.Request().Body)
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
streamKey, err := r.bearerToken(c)
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
fmt.Println("streamkey: ", streamKey)
// Create a MediaEngine object to configure the supported codec
m := &webrtc.MediaEngine{}
// Setup the codecs you want to use.
if err = m.RegisterCodec(webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264, ClockRate: 90000, Channels: 0, SDPFmtpLine: "", RTCPFeedback: nil},
PayloadType: 96,
}, webrtc.RTPCodecTypeVideo); err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
if err = m.RegisterCodec(webrtc.RTPCodecParameters{
RTPCodecCapability: webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus, ClockRate: 48000, Channels: 2, SDPFmtpLine: "", RTCPFeedback: nil},
PayloadType: 111,
}, webrtc.RTPCodecTypeAudio); err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
// Create a InterceptorRegistry
i := &interceptor.Registry{}
// Register a intervalpli factory
intervalPliFactory, err := intervalpli.NewReceiverInterceptor()
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
i.Add(intervalPliFactory)
// Use the default set of Interceptors
if err = webrtc.RegisterDefaultInterceptors(m, i); err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
// Create the API object with the MediaEngine
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithInterceptorRegistry(i))
// Create a new RTCPeerConnection
peerConnection, err := api.NewPeerConnection(peerConnectionConfiguration)
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
// Allow us to receive 1 video track
if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo); err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
// Allow us to receive 1 audio track
if _, err = peerConnection.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio); err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
whipHandler := NewWebRTCHandler(r.hub, &WebRTCHandlerArgs{
Hub: r.hub,
PeerConnection: peerConnection,
StreamID: streamKey,
Tracks: r.tracks,
})
peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
whipHandler.OnICEConnectionStateChange(connectionState)
fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
})
peerConnection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
whipHandler.OnTrack(track, receiver)
})
// Send answer via HTTP Response
return writeAnswer3(c, peerConnection, offer, "/whip")
}
type WebRTCHandler struct {
hub *hub.Hub
pc *webrtc.PeerConnection
streamID string
timestampGen TimestampGenerator[int64]
tracks map[string][]*webrtc.TrackLocalStaticRTP
videoTrack *webrtc.TrackLocalStaticRTP
audioTrack *webrtc.TrackLocalStaticRTP
}
type WebRTCHandlerArgs struct {
Hub *hub.Hub
PeerConnection *webrtc.PeerConnection
StreamID string
Tracks map[string][]*webrtc.TrackLocalStaticRTP
}
func NewWebRTCHandler(hub *hub.Hub, args *WebRTCHandlerArgs) *WebRTCHandler {
//ctx := context.Background()
videoTrack, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeH264}, "video", "pion")
if err != nil {
panic(err)
}
// Add Audio Track that is being written to from WHIP Session
audioTrack, err := webrtc.NewTrackLocalStaticRTP(webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeOpus}, "audio", "pion")
if err != nil {
panic(err)
}
//if _, ok := r.tracks[streamKey]; !ok {
// r.tracks[streamKey] = []*webrtc.TrackLocalStaticRTP{videoTrack, audioTrack}
//}
ret := &WebRTCHandler{}
ret.hub = hub
ret.streamID = args.StreamID
ret.timestampGen = TimestampGenerator[int64]{}
ret.pc = args.PeerConnection
ret.tracks = args.Tracks
if _, ok := ret.tracks[args.StreamID]; !ok {
ret.tracks[args.StreamID] = []*webrtc.TrackLocalStaticRTP{videoTrack, audioTrack}
}
ret.videoTrack = videoTrack
ret.audioTrack = audioTrack
return ret
}
func (w *WebRTCHandler) OnICEConnectionStateChange(connectionState webrtc.ICEConnectionState) {
ctx := context.Background()
switch connectionState {
case webrtc.ICEConnectionStateConnected:
w.hub.Notify(w.streamID)
fmt.Println("ICE Connection State Connected")
case webrtc.ICEConnectionStateDisconnected:
w.OnClose(ctx)
//delete(w.tracks, streamKey)
fmt.Println("ICE Connection State Disconnected")
case webrtc.ICEConnectionStateFailed:
fmt.Println("ICE Connection State Failed")
_ = w.pc.Close()
}
}
func (w *WebRTCHandler) OnTrack(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
ctx := context.Background()
fmt.Printf("Track has started, of type %s %s\n", track.Kind(), track.Codec().MimeType)
var videoPackets []*rtp.Packet
var audioPackets []*rtp.Packet
currentVideoTimestamp := uint32(0)
currentAudioTimestamp := uint32(0)
for {
pkt, _, err := track.ReadRTP()
if err != nil {
log.Error(ctx, err, "failed to read rtp")
break
}
switch track.Kind() {
case webrtc.RTPCodecTypeVideo:
//fmt.Println("timestamp: ", pkt.Timestamp)
if len(videoPackets) > 0 && currentVideoTimestamp != pkt.Timestamp {
w.OnVideo(ctx, videoPackets)
videoPackets = nil
}
videoPackets = append(videoPackets, pkt)
currentVideoTimestamp = pkt.Timestamp
if pkt.Marker {
w.OnVideo(ctx, videoPackets)
videoPackets = nil
}
//fmt.Println("frame len: ", len(h264Bytes))
if err = w.videoTrack.WriteRTP(pkt); err != nil {
panic(err)
}
case webrtc.RTPCodecTypeAudio:
if len(audioPackets) > 0 && currentAudioTimestamp != pkt.Timestamp {
w.OnAudio(ctx, audioPackets)
audioPackets = nil
}
audioPackets = append(audioPackets, pkt)
currentAudioTimestamp = pkt.Timestamp
if pkt.Marker {
w.OnAudio(ctx, audioPackets)
audioPackets = nil
}
if err = w.audioTrack.WriteRTP(pkt); err != nil {
panic(err)
}
}
}
}
func (w *WebRTCHandler) OnClose(ctx context.Context) error {
w.hub.Unpublish(w.streamID)
fmt.Println("OnClose")
return nil
}
func (w *WebRTCHandler) OnVideo(ctx context.Context, packets []*rtp.Packet) error {
var h264RTPParser = &codecs.H264Packet{}
payload := make([]byte, 0)
for _, pkt := range packets {
if len(pkt.Payload) == 0 {
continue
}
b, err := h264RTPParser.Unmarshal(pkt.Payload)
if err != nil {
log.Error(ctx, err, "failed to unmarshal h264")
}
payload = append(payload, b...)
}
if len(payload) == 0 {
return nil
}
pts := w.timestampGen.GetTimestamp(int64(packets[0].Timestamp))
w.hub.Publish(w.streamID, &hub.FrameData{
H264Video: &hub.H264Video{
PTS: pts,
DTS: pts,
VideoClockRate: 90000,
Data: payload,
SPS: nil,
PPS: nil,
SliceType: 0,
CodecData: nil,
},
AACAudio: nil,
})
return nil
}
func (w *WebRTCHandler) OnAudio(ctx context.Context, packets []*rtp.Packet) error {
return nil
}
func (r *WHIP) whepHandler(c echo.Context) error {
// Read the offer from HTTP Request
offer, err := io.ReadAll(c.Request().Body)
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
streamKey, err := r.bearerToken(c)
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
// Create a new RTCPeerConnection
peerConnection, err := webrtc.NewPeerConnection(peerConnectionConfiguration)
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
var rtpSenders []*webrtc.RTPSender
for _, track := range r.tracks[streamKey] {
sender, err := peerConnection.AddTrack(track)
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
rtpSenders = append(rtpSenders, sender)
}
// Read incoming RTCP packets
go func() {
rtcpBuf := make([]byte, 1500)
for {
for _, rtpSender := range rtpSenders {
if _, _, rtcpErr := rtpSender.Read(rtcpBuf); rtcpErr != nil {
return
}
}
}
}()
peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) {
fmt.Printf("ICE Connection State has changed: %s\n", connectionState.String())
if connectionState == webrtc.ICEConnectionStateFailed {
_ = peerConnection.Close()
}
})
// Send answer via HTTP Response
return writeAnswer3(c, peerConnection, offer, "/whep")
}
func writeAnswer3(c echo.Context, peerConnection *webrtc.PeerConnection, offer []byte, path string) error {
// Set the handler for ICE connection state
if err := peerConnection.SetRemoteDescription(webrtc.SessionDescription{Type: webrtc.SDPTypeOffer, SDP: string(offer)}); err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
// Create channel that is blocked until ICE Gathering is complete
gatherComplete := webrtc.GatheringCompletePromise(peerConnection)
// Create answer
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
} else if err = peerConnection.SetLocalDescription(answer); err != nil {
return c.JSON(http.StatusInternalServerError, err.Error())
}
// Block until ICE Gathering is complete, disabling trickle ICE
<-gatherComplete
// WHIP+WHEP expects a Location header and a HTTP Status Code of 201
c.Response().Header().Add("Location", path)
c.Response().WriteHeader(http.StatusCreated)
// Write Answer with Candidates as HTTP Response
return c.String(http.StatusOK, peerConnection.LocalDescription().SDP)
}