mirror of
				https://github.com/AlexxIT/go2rtc.git
				synced 2025-10-31 19:53:02 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			147 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			147 lines
		
	
	
		
			2.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package rtmp
 | |
| 
 | |
| import (
 | |
| 	"encoding/base64"
 | |
| 	"encoding/binary"
 | |
| 	"github.com/AlexxIT/go2rtc/pkg/h264"
 | |
| 	"github.com/AlexxIT/go2rtc/pkg/streamer"
 | |
| 	"github.com/deepch/vdk/av"
 | |
| 	"github.com/deepch/vdk/codec/h264parser"
 | |
| 	"github.com/deepch/vdk/format/rtmp"
 | |
| 	"github.com/pion/rtp"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| type Client struct {
 | |
| 	streamer.Element
 | |
| 
 | |
| 	URI string
 | |
| 
 | |
| 	medias []*streamer.Media
 | |
| 	tracks []*streamer.Track
 | |
| 
 | |
| 	conn   *rtmp.Conn
 | |
| 	closed bool
 | |
| 
 | |
| 	receive int
 | |
| }
 | |
| 
 | |
| func NewClient(uri string) *Client {
 | |
| 	return &Client{URI: uri}
 | |
| }
 | |
| 
 | |
| func (c *Client) Dial() (err error) {
 | |
| 	c.conn, err = rtmp.Dial(c.URI)
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	// important to get SPS/PPS
 | |
| 	streams, err := c.conn.Streams()
 | |
| 	if err != nil {
 | |
| 		return
 | |
| 	}
 | |
| 
 | |
| 	for _, stream := range streams {
 | |
| 		switch stream.Type() {
 | |
| 		case av.H264:
 | |
| 			cd := stream.(h264parser.CodecData)
 | |
| 			fmtp := "sprop-parameter-sets=" +
 | |
| 				base64.StdEncoding.EncodeToString(cd.RecordInfo.SPS[0]) + "," +
 | |
| 				base64.StdEncoding.EncodeToString(cd.RecordInfo.PPS[0])
 | |
| 
 | |
| 			codec := &streamer.Codec{
 | |
| 				Name:        streamer.CodecH264,
 | |
| 				ClockRate:   90000,
 | |
| 				FmtpLine:    fmtp,
 | |
| 				PayloadType: h264.PayloadTypeAVC,
 | |
| 			}
 | |
| 
 | |
| 			media := &streamer.Media{
 | |
| 				Kind:      streamer.KindVideo,
 | |
| 				Direction: streamer.DirectionSendonly,
 | |
| 				Codecs:    []*streamer.Codec{codec},
 | |
| 			}
 | |
| 			c.medias = append(c.medias, media)
 | |
| 
 | |
| 			track := &streamer.Track{
 | |
| 				Codec: codec, Direction: media.Direction,
 | |
| 			}
 | |
| 			c.tracks = append(c.tracks, track)
 | |
| 
 | |
| 		case av.AAC:
 | |
| 			panic("not implemented")
 | |
| 		default:
 | |
| 			panic("unsupported codec")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	c.Fire(streamer.StateReady)
 | |
| 
 | |
| 	return
 | |
| }
 | |
| 
 | |
| func (c *Client) Handle() (err error) {
 | |
| 	defer c.Fire(streamer.StateNull)
 | |
| 
 | |
| 	c.Fire(streamer.StatePlaying)
 | |
| 
 | |
| 	for {
 | |
| 		var pkt av.Packet
 | |
| 		pkt, err = c.conn.ReadPacket()
 | |
| 		if err != nil {
 | |
| 			if c.closed {
 | |
| 				return nil
 | |
| 			}
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		c.receive += len(pkt.Data)
 | |
| 
 | |
| 		track := c.tracks[int(pkt.Idx)]
 | |
| 
 | |
| 		timestamp := uint32(pkt.Time / time.Duration(track.Codec.ClockRate))
 | |
| 
 | |
| 		var payloads [][]byte
 | |
| 		if track.Codec.Name == streamer.CodecH264 {
 | |
| 			payloads = splitAVC(pkt.Data)
 | |
| 		} else {
 | |
| 			payloads = [][]byte{pkt.Data}
 | |
| 		}
 | |
| 
 | |
| 		for _, payload := range payloads {
 | |
| 			packet := &rtp.Packet{
 | |
| 				Header:  rtp.Header{Timestamp: timestamp},
 | |
| 				Payload: payload,
 | |
| 			}
 | |
| 			_ = track.WriteRTP(packet)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (c *Client) Close() error {
 | |
| 	if c.conn == nil {
 | |
| 		return nil
 | |
| 	}
 | |
| 	c.closed = true
 | |
| 	return c.conn.Close()
 | |
| }
 | |
| 
 | |
| func splitAVC(data []byte) [][]byte {
 | |
| 	var nals [][]byte
 | |
| 	for {
 | |
| 		// get AVC length
 | |
| 		size := int(binary.BigEndian.Uint32(data))
 | |
| 
 | |
| 		// check if multiple items in one packet
 | |
| 		if size+4 < len(data) {
 | |
| 			nals = append(nals, data[:size+4])
 | |
| 			data = data[size+4:]
 | |
| 		} else {
 | |
| 			nals = append(nals, data)
 | |
| 			break
 | |
| 		}
 | |
| 	}
 | |
| 	return nals
 | |
| }
 | 
