mirror of
https://github.com/erroot/plugin-transform.git
synced 2025-09-27 03:06:31 +08:00
132 lines
3.3 KiB
Go
132 lines
3.3 KiB
Go
package transform
|
||
|
||
import (
|
||
"encoding/hex"
|
||
"fmt"
|
||
"log"
|
||
|
||
"go.uber.org/zap"
|
||
. "m7s.live/engine/v4"
|
||
"m7s.live/engine/v4/codec"
|
||
"m7s.live/engine/v4/track"
|
||
)
|
||
|
||
type TransformSubscriber struct {
|
||
Subscriber
|
||
task *TransformTask
|
||
}
|
||
|
||
func (s *TransformSubscriber) Delete() {
|
||
s.Stop(zap.String("reason", "for restart"))
|
||
}
|
||
|
||
func sliceAppend(s1 []byte, s2 []byte) []byte {
|
||
l1 := len(s1)
|
||
l2 := len(s2)
|
||
|
||
if cap(s1) < l2+l1 {
|
||
return s1
|
||
}
|
||
|
||
for i := 0; i < len(s2); i++ {
|
||
s1[l1+i] = s2[i]
|
||
}
|
||
|
||
return s1
|
||
}
|
||
|
||
func (s *TransformSubscriber) OnEvent(event any) {
|
||
|
||
//获取转码流发布者
|
||
t := s.task
|
||
|
||
//s.Stream.Path
|
||
switch v := event.(type) {
|
||
case *track.Video:
|
||
fmt.Println("=====> write track.Video to publisher")
|
||
if s.Video != nil {
|
||
return
|
||
}
|
||
switch v.CodecID {
|
||
case codec.CodecID_H264:
|
||
fmt.Println("=====> CodecID_H264 on sub:", v.PayloadType)
|
||
// vt := p.VideoTrack
|
||
// vt = track.NewH264(p.Stream, v.PayloadType)
|
||
// p.VideoTrack = vt
|
||
log.Printf("pipe in SPS:%d, [%v]\n",
|
||
len(v.ParamaterSets[0]), hex.EncodeToString(v.ParamaterSets[0]))
|
||
|
||
log.Printf("pipe in PPS:%d, [%v]\n",
|
||
len(v.ParamaterSets[1]), hex.EncodeToString(v.ParamaterSets[1]))
|
||
|
||
//2023/04/02 17:07:51 pipe in SPS:35, [6764001fac2ca4014016ec04400000fa000030d43800001e848000186a02ef2e0fa489]
|
||
//2023/04/02 17:07:51 pipe in PPS:4, [68eb8f2c]
|
||
nal := []byte{0, 0, 0, 1}
|
||
//SPS
|
||
if len(v.ParamaterSets[0]) > 0 {
|
||
//vt.WriteSliceBytes(v.ParamaterSets[0])
|
||
|
||
t.writeToFFPipe0(append(nal, v.ParamaterSets[0]...))
|
||
}
|
||
//PPS:
|
||
if len(v.ParamaterSets[1]) > 0 {
|
||
//vt.WriteSliceBytes(v.ParamaterSets[1])
|
||
t.writeToFFPipe0(append(nal, v.ParamaterSets[1]...))
|
||
}
|
||
case codec.CodecID_H265:
|
||
fmt.Println("=====> CodecID_H265 on sub")
|
||
// vt := p.VideoTrack
|
||
// vt = track.NewH265(p.Stream, v.PayloadType)
|
||
// p.VideoTrack = vt
|
||
// //VPS
|
||
// if len(v.ParamaterSets[0]) > 0 {
|
||
// vt.WriteSliceBytes(v.ParamaterSets[0])
|
||
// }
|
||
// //SPS
|
||
// if len(v.ParamaterSets[1]) > 0 {
|
||
// vt.WriteSliceBytes(v.ParamaterSets[1])
|
||
// }
|
||
// //PPS:
|
||
// if len(v.ParamaterSets[2]) > 0 {
|
||
// vt.WriteSliceBytes(v.ParamaterSets[2])
|
||
// }
|
||
}
|
||
s.AddTrack(v)
|
||
case *track.Audio:
|
||
if s.Audio != nil {
|
||
return
|
||
}
|
||
fmt.Println("=====> write *track.Audio to publisher")
|
||
//p.VideoTrack.WriteAnnexB(v.PTS, v.DTS, v.GetAnnexB()[0])
|
||
s.AddTrack(v)
|
||
case VideoFrame:
|
||
//fmt.Println("=====> write VideoFrame to publisher")
|
||
firstFrame := v.GetAnnexB()
|
||
// log.Printf("pipe in PTS:%d,DTS:%d buf num:%d\n",
|
||
// v.PTS, v.DTS, len(firstFrame))
|
||
|
||
for _, buf := range firstFrame {
|
||
// log.Printf("pipe in PTS:%d,DTS:%d buf len:%d,\n",
|
||
// v.PTS, v.DTS, len(buf))
|
||
//s.debugPrintfNal(buf, "on sub frame")
|
||
//p.VideoTrack.WriteAnnexB(v.PTS, v.DTS, buf)
|
||
t.writeToFFPipe0(buf)
|
||
}
|
||
case VideoRTP:
|
||
fmt.Println("=====> on subscribe VideoRTP")
|
||
//p.WritePacketRTP(s.videoTrack, v.Packet)
|
||
//p.VideoTrack.WriteRTPPack(v.Packet)
|
||
case AudioRTP:
|
||
fmt.Println("=====> on subscribe AudioRTP to")
|
||
//s.stream.WritePacketRTP(s.audioTrack, v.Packet)
|
||
//p.AudioTrack.WriteRTPPack(v.Packet)
|
||
case ISubscriber:
|
||
//代表订阅成功事件,v就是p
|
||
fmt.Println("=====> begain Subscriber sucess")
|
||
default:
|
||
s.Subscriber.OnEvent(event)
|
||
|
||
//fmt.Println("TransformSubscriber OnEvent:%T", v)
|
||
}
|
||
}
|