From 0a70915c8c0ad942dc7454468193150602af5e4d Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 27 Sep 2020 15:35:47 +0200 Subject: [PATCH] implement publishing (#4) --- README.md | 3 + connclient.go | 323 ++++++++++++++++++----------- connclient_test.go | 307 ++++++++++++++++++++++----- connclientudpl.go | 22 +- connserver.go | 2 +- examples/publish-tcp.go | 121 +++++++++++ examples/publish-udp.go | 117 +++++++++++ examples/read-tcp.go | 5 +- examples/read-udp.go | 6 +- go.mod | 1 + go.sum | 3 + testimages/gstreamer/Dockerfile | 16 ++ testimages/gstreamer/emptyvideo.ts | Bin 0 -> 65048 bytes testimages/gstreamer/start.sh | 3 + track.go | 37 ++++ utils.go | 23 ++ 16 files changed, 807 insertions(+), 182 deletions(-) create mode 100644 examples/publish-tcp.go create mode 100644 examples/publish-udp.go create mode 100644 testimages/gstreamer/Dockerfile create mode 100644 testimages/gstreamer/emptyvideo.ts create mode 100644 testimages/gstreamer/start.sh diff --git a/README.md b/README.md index 02872bd6..b831653d 100644 --- a/README.md +++ b/README.md @@ -9,12 +9,15 @@ RTSP 1.0 library for the Go programming language, written for [rtsp-simple-serve Features: * Read streams via TCP or UDP +* Publish streams via TCP or UDP * Provides primitives, a class for building clients (`ConnClient`) and a class for building servers (`ConnServer`) ## Examples * [read-tcp](examples/read-tcp.go) * [read-udp](examples/read-udp.go) +* [publish-tcp](examples/publish-tcp.go) +* [publish-udp](examples/publish-udp.go) ## Documentation diff --git a/connclient.go b/connclient.go index ec56fbec..f06b9c0b 100644 --- a/connclient.go +++ b/connclient.go @@ -29,6 +29,14 @@ const ( clientUDPFrameReadBufferSize = 2048 ) +type connClientState int + +const ( + connClientStateInitial connClientState = iota + connClientStateReading + connClientStatePublishing +) + // ConnClientConf allows to configure a ConnClient. type ConnClientConf struct { // target address in format hostname:port @@ -66,6 +74,7 @@ type ConnClient struct { session string cseq int auth *authClient + state connClientState streamUrl *url.URL streamProtocol *StreamProtocol rtcpReceivers map[int]*RtcpReceiver @@ -73,7 +82,6 @@ type ConnClient struct { udpRtpListeners map[int]*connClientUDPListener udpRtcpListeners map[int]*connClientUDPListener tcpFrames *multiFrame - playing bool receiverReportTerminate chan struct{} receiverReportDone chan struct{} @@ -116,7 +124,7 @@ func NewConnClient(conf ConnClientConf) (*ConnClient, error) { // Close closes all the ConnClient resources. func (c *ConnClient) Close() error { - if c.playing { + if c.state == connClientStateReading { c.Do(&Request{ Method: TEARDOWN, Url: c.streamUrl, @@ -142,50 +150,22 @@ func (c *ConnClient) Close() error { return err } +// CloseUDPListeners closes any open UDP listener. +func (c *ConnClient) CloseUDPListeners() { + for _, l := range c.udpRtpListeners { + l.close() + } + + for _, l := range c.udpRtcpListeners { + l.close() + } +} + // NetConn returns the underlying net.Conn. func (c *ConnClient) NetConn() net.Conn { return c.nconn } -// ReadFrame reads an InterleavedFrame. -func (c *ConnClient) ReadFrame() (*InterleavedFrame, error) { - c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) - frame := c.tcpFrames.next() - err := frame.Read(c.br) - if err != nil { - return nil, err - } - - c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) - - return frame, nil -} - -// ReadFrameUDP reads an UDP frame. -func (c *ConnClient) ReadFrameUDP(track *Track, streamType StreamType) ([]byte, error) { - if streamType == StreamTypeRtp { - buf, err := c.udpRtpListeners[track.Id].read() - if err != nil { - return nil, err - } - - atomic.StoreInt64(c.udpLastFrameTimes[track.Id], time.Now().Unix()) - c.rtcpReceivers[track.Id].OnFrame(streamType, buf) - - return buf, nil - } - - buf, err := c.udpRtcpListeners[track.Id].read() - if err != nil { - return nil, err - } - - atomic.StoreInt64(c.udpLastFrameTimes[track.Id], time.Now().Unix()) - c.rtcpReceivers[track.Id].OnFrame(streamType, buf) - - return buf, nil -} - func (c *ConnClient) readFrameOrResponse() (interface{}, error) { c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) b, err := c.br.ReadByte() @@ -206,6 +186,60 @@ func (c *ConnClient) readFrameOrResponse() (interface{}, error) { return ReadResponse(c.br) } +// ReadFrameTCP reads an InterleavedFrame. +// This can't be used when recording. +func (c *ConnClient) ReadFrameTCP() (*InterleavedFrame, error) { + c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) + frame := c.tcpFrames.next() + err := frame.Read(c.br) + if err != nil { + return nil, err + } + + c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) + + return frame, nil +} + +// ReadFrameUDP reads an UDP frame. +func (c *ConnClient) ReadFrameUDP(track *Track, streamType StreamType) ([]byte, error) { + var buf []byte + var err error + if streamType == StreamTypeRtp { + buf, err = c.udpRtpListeners[track.Id].read() + if err != nil { + return nil, err + } + } else { + buf, err = c.udpRtcpListeners[track.Id].read() + if err != nil { + return nil, err + } + } + + atomic.StoreInt64(c.udpLastFrameTimes[track.Id], time.Now().Unix()) + + c.rtcpReceivers[track.Id].OnFrame(streamType, buf) + + return buf, nil +} + +// WriteFrameTCP writes an interleaved frame. +// this can't be used when playing. +func (c *ConnClient) WriteFrameTCP(frame *InterleavedFrame) error { + c.nconn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout)) + return frame.Write(c.bw) +} + +// WriteFrameUDP writes an UDP frame. +func (c *ConnClient) WriteFrameUDP(track *Track, streamType StreamType, content []byte) error { + if streamType == StreamTypeRtp { + return c.udpRtpListeners[track.Id].write(content) + } + + return c.udpRtcpListeners[track.Id].write(content) +} + // Do writes a Request and reads a Response. func (c *ConnClient) Do(req *Request) (*Response, error) { if req.Header == nil { @@ -274,19 +308,12 @@ func (c *ConnClient) Do(req *Request) (*Response, error) { return res, nil } -// this can't be exported -// otherwise there's a race condition with the rtcp receiver report routine -func (c *ConnClient) writeFrame(frame *InterleavedFrame) error { - c.nconn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout)) - return frame.Write(c.bw) -} - // Options writes an OPTIONS request and reads a response, that contains // the methods allowed by the server. Since this method is not implemented by // every RTSP server, the function does not fail if the returned code is StatusNotFound. func (c *ConnClient) Options(u *url.URL) (*Response, error) { - if c.playing { - return nil, fmt.Errorf("can't be called when playing") + if c.state != connClientStateInitial { + return nil, fmt.Errorf("can't be called when reading or publishing") } res, err := c.Do(&Request{ @@ -304,18 +331,16 @@ func (c *ConnClient) Options(u *url.URL) (*Response, error) { } if res.StatusCode != StatusOK && res.StatusCode != StatusNotFound { - return nil, fmt.Errorf("OPTIONS: bad status code: %d (%s)", res.StatusCode, res.StatusMessage) + return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) } return res, nil } -// Describe writes a DESCRIBE request, that means that we want to obtain the SDP -// document that describes the tracks available in the given URL. It then -// reads a Response. +// Describe writes a DESCRIBE request and reads a Response. func (c *ConnClient) Describe(u *url.URL) (Tracks, *Response, error) { - if c.playing { - return nil, nil, fmt.Errorf("can't be called when playing") + if c.state != connClientStateInitial { + return nil, nil, fmt.Errorf("can't be called when reading or publishing") } res, err := c.Do(&Request{ @@ -330,16 +355,16 @@ func (c *ConnClient) Describe(u *url.URL) (Tracks, *Response, error) { } if res.StatusCode != StatusOK { - return nil, nil, fmt.Errorf("DESCRIBE: bad status code: %d (%s)", res.StatusCode, res.StatusMessage) + return nil, nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) } contentType, ok := res.Header["Content-Type"] if !ok || len(contentType) != 1 { - return nil, nil, fmt.Errorf("DESCRIBE: Content-Type not provided") + return nil, nil, fmt.Errorf("Content-Type not provided") } if contentType[0] != "application/sdp" { - return nil, nil, fmt.Errorf("DESCRIBE: wrong Content-Type, expected application/sdp") + return nil, nil, fmt.Errorf("wrong Content-Type, expected application/sdp") } tracks, err := ReadTracks(res.Content) @@ -419,19 +444,18 @@ func (c *ConnClient) setup(u *url.URL, track *Track, ht *HeaderTransport) (*Resp } if res.StatusCode != StatusOK { - return nil, fmt.Errorf("SETUP: bad status code: %d (%s)", res.StatusCode, res.StatusMessage) + return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) } return res, nil } -// SetupUDP writes a SETUP request, that means that we want to read -// a given track with the UDP transport. It then reads a Response. +// SetupUDP writes a SETUP request and reads a Response. // If rtpPort and rtcpPort are zero, they will be chosen automatically. -func (c *ConnClient) SetupUDP(u *url.URL, track *Track, rtpPort int, +func (c *ConnClient) SetupUDP(u *url.URL, mode SetupMode, track *Track, rtpPort int, rtcpPort int) (*Response, error) { - if c.playing { - return nil, fmt.Errorf("can't be called when playing") + if c.state != connClientStateInitial { + return nil, fmt.Errorf("can't be called when reading or publishing") } if c.streamUrl != nil && *u != *c.streamUrl { @@ -442,10 +466,6 @@ func (c *ConnClient) SetupUDP(u *url.URL, track *Track, rtpPort int, return nil, fmt.Errorf("cannot setup tracks with different protocols") } - if _, ok := c.rtcpReceivers[track.Id]; ok { - return nil, fmt.Errorf("track has already been setup") - } - if (rtpPort == 0 && rtcpPort != 0) || (rtpPort != 0 && rtcpPort == 0) { return nil, fmt.Errorf("rtpPort and rtcpPort must be both zero or non-zero") @@ -457,12 +477,12 @@ func (c *ConnClient) SetupUDP(u *url.URL, track *Track, rtpPort int, rtpListener, rtcpListener, err := func() (*connClientUDPListener, *connClientUDPListener, error) { if rtpPort != 0 { - rtpListener, err := newConnClientUDPListener(c, rtpPort) + rtpListener, err := newConnClientUDPListener(c.conf, rtpPort) if err != nil { return nil, nil, err } - rtcpListener, err := newConnClientUDPListener(c, rtcpPort) + rtcpListener, err := newConnClientUDPListener(c.conf, rtcpPort) if err != nil { rtpListener.close() return nil, nil, err @@ -477,12 +497,12 @@ func (c *ConnClient) SetupUDP(u *url.URL, track *Track, rtpPort int, rtpPort = (rand.Intn((65535-10000)/2) * 2) + 10000 rtcpPort = rtpPort + 1 - rtpListener, err := newConnClientUDPListener(c, rtpPort) + rtpListener, err := newConnClientUDPListener(c.conf, rtpPort) if err != nil { continue } - rtcpListener, err := newConnClientUDPListener(c, rtcpPort) + rtcpListener, err := newConnClientUDPListener(c.conf, rtcpPort) if err != nil { rtpListener.close() continue @@ -503,6 +523,15 @@ func (c *ConnClient) SetupUDP(u *url.URL, track *Track, rtpPort int, return &ret }(), ClientPorts: &[2]int{rtpPort, rtcpPort}, + Mode: func() *string { + var v string + if mode == SetupModeRecord { + v = "record" + } else { + v = "play" + } + return &v + }(), }) if err != nil { rtpListener.close() @@ -514,39 +543,43 @@ func (c *ConnClient) SetupUDP(u *url.URL, track *Track, rtpPort int, if err != nil { rtpListener.close() rtcpListener.close() - return nil, fmt.Errorf("SETUP: transport header: %s", err) + return nil, fmt.Errorf("transport header: %s", err) } if th.ServerPorts == nil { rtpListener.close() rtcpListener.close() - return nil, fmt.Errorf("SETUP: server ports not provided") + return nil, fmt.Errorf("server ports not provided") } c.streamUrl = u streamProtocol := StreamProtocolUDP c.streamProtocol = &streamProtocol - c.rtcpReceivers[track.Id] = NewRtcpReceiver() - v := time.Now().Unix() - c.udpLastFrameTimes[track.Id] = &v + if mode == SetupModePlay { + c.rtcpReceivers[track.Id] = NewRtcpReceiver() - rtpListener.publisherIp = c.nconn.RemoteAddr().(*net.TCPAddr).IP - rtpListener.publisherPort = (*th.ServerPorts)[0] + v := time.Now().Unix() + c.udpLastFrameTimes[track.Id] = &v + } + + rtpListener.remoteIp = c.nconn.RemoteAddr().(*net.TCPAddr).IP + rtpListener.remoteZone = c.nconn.RemoteAddr().(*net.TCPAddr).Zone + rtpListener.remotePort = (*th.ServerPorts)[0] c.udpRtpListeners[track.Id] = rtpListener - rtcpListener.publisherIp = c.nconn.RemoteAddr().(*net.TCPAddr).IP - rtcpListener.publisherPort = (*th.ServerPorts)[1] + rtcpListener.remoteIp = c.nconn.RemoteAddr().(*net.TCPAddr).IP + rtcpListener.remoteZone = c.nconn.RemoteAddr().(*net.TCPAddr).Zone + rtcpListener.remotePort = (*th.ServerPorts)[1] c.udpRtcpListeners[track.Id] = rtcpListener return res, nil } -// SetupTCP writes a SETUP request, that means that we want to read -// a given track with the TCP transport. It then reads a Response. -func (c *ConnClient) SetupTCP(u *url.URL, track *Track) (*Response, error) { - if c.playing { - return nil, fmt.Errorf("can't be called when playing") +// SetupTCP writes a SETUP request and reads a Response. +func (c *ConnClient) SetupTCP(u *url.URL, mode SetupMode, track *Track) (*Response, error) { + if c.state != connClientStateInitial { + return nil, fmt.Errorf("can't be called when reading or publishing") } if c.streamUrl != nil && *u != *c.streamUrl { @@ -557,10 +590,6 @@ func (c *ConnClient) SetupTCP(u *url.URL, track *Track) (*Response, error) { return nil, fmt.Errorf("cannot setup tracks with different protocols") } - if _, ok := c.rtcpReceivers[track.Id]; ok { - return nil, fmt.Errorf("track has already been setup") - } - interleavedIds := [2]int{(track.Id * 2), (track.Id * 2) + 1} res, err := c.setup(u, track, &HeaderTransport{ Protocol: StreamProtocolTCP, @@ -569,6 +598,15 @@ func (c *ConnClient) SetupTCP(u *url.URL, track *Track) (*Response, error) { return &ret }(), InterleavedIds: &interleavedIds, + Mode: func() *string { + var v string + if mode == SetupModeRecord { + v = "record" + } else { + v = "play" + } + return &v + }(), }) if err != nil { return nil, err @@ -576,29 +614,32 @@ func (c *ConnClient) SetupTCP(u *url.URL, track *Track) (*Response, error) { th, err := ReadHeaderTransport(res.Header["Transport"]) if err != nil { - return nil, fmt.Errorf("SETUP: transport header: %s", err) + return nil, fmt.Errorf("transport header: %s", err) } - if th.InterleavedIds == nil || (*th.InterleavedIds)[0] != interleavedIds[0] || + if th.InterleavedIds == nil || + (*th.InterleavedIds)[0] != interleavedIds[0] || (*th.InterleavedIds)[1] != interleavedIds[1] { - return nil, fmt.Errorf("SETUP: transport header does not have interleaved ids %v (%s)", + return nil, fmt.Errorf("transport header does not have interleaved ids %v (%s)", interleavedIds, res.Header["Transport"]) } c.streamUrl = u streamProtocol := StreamProtocolTCP c.streamProtocol = &streamProtocol - c.rtcpReceivers[track.Id] = NewRtcpReceiver() + + if mode == SetupModePlay { + c.rtcpReceivers[track.Id] = NewRtcpReceiver() + } return res, nil } -// Play writes a PLAY request, that means that we want to start the stream. -// It then reads a Response. This function can be called only after SetupUDP() -// or SetupTCP(). +// Play writes a PLAY request and reads a Response +// This function can be called only after SetupUDP() or SetupTCP(). func (c *ConnClient) Play(u *url.URL) (*Response, error) { - if c.playing { - return nil, fmt.Errorf("can't be called when playing") + if c.state != connClientStateInitial { + return nil, fmt.Errorf("can't be called when reading or publishing") } if c.streamUrl == nil { @@ -655,26 +696,16 @@ func (c *ConnClient) Play(u *url.URL) (*Response, error) { return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) } - c.playing = true + c.state = connClientStateReading // open the firewall by sending packets to every channel if *c.streamProtocol == StreamProtocolUDP { for trackId := range c.udpRtpListeners { - c.udpRtpListeners[trackId].pc.WriteTo( - []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}, - &net.UDPAddr{ - IP: c.nconn.RemoteAddr().(*net.TCPAddr).IP, - Zone: c.nconn.RemoteAddr().(*net.TCPAddr).Zone, - Port: c.udpRtpListeners[trackId].publisherPort, - }) + c.udpRtpListeners[trackId].write( + []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}) - c.udpRtcpListeners[trackId].pc.WriteTo( - []byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00}, - &net.UDPAddr{ - IP: c.nconn.RemoteAddr().(*net.TCPAddr).IP, - Zone: c.nconn.RemoteAddr().(*net.TCPAddr).Zone, - Port: c.udpRtcpListeners[trackId].publisherPort, - }) + c.udpRtcpListeners[trackId].write( + []byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00}) } } @@ -696,14 +727,10 @@ func (c *ConnClient) Play(u *url.URL) (*Response, error) { frame := c.rtcpReceivers[trackId].Report() if *c.streamProtocol == StreamProtocolUDP { - c.udpRtcpListeners[trackId].pc.WriteTo(frame, &net.UDPAddr{ - IP: c.nconn.RemoteAddr().(*net.TCPAddr).IP, - Zone: c.nconn.RemoteAddr().(*net.TCPAddr).Zone, - Port: c.udpRtcpListeners[trackId].publisherPort, - }) + c.udpRtcpListeners[trackId].write(frame) } else { - c.writeFrame(&InterleavedFrame{ + c.WriteFrameTCP(&InterleavedFrame{ TrackId: trackId, StreamType: StreamTypeRtcp, Content: frame, @@ -777,3 +804,57 @@ func (c *ConnClient) LoopUDP(u *url.URL) error { } } } + +// Announce writes an ANNOUNCE request and reads a Response. +func (c *ConnClient) Announce(u *url.URL, tracks Tracks) (*Response, error) { + if c.streamUrl != nil { + fmt.Errorf("announce has already been sent with another url url") + } + + res, err := c.Do(&Request{ + Method: ANNOUNCE, + Url: u, + Header: Header{ + "Content-Type": HeaderValue{"application/sdp"}, + }, + Content: tracks.Write(), + }) + if err != nil { + return nil, err + } + + if res.StatusCode != StatusOK { + return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) + } + + c.streamUrl = u + + return res, nil +} + +// Record writes a RECORD request and reads a Response. +func (c *ConnClient) Record(u *url.URL) (*Response, error) { + if c.state != connClientStateInitial { + return nil, fmt.Errorf("can't be called when reading or publishing") + } + + if *u != *c.streamUrl { + return nil, fmt.Errorf("must be called with the same url used for Announce()") + } + + res, err := c.Do(&Request{ + Method: RECORD, + Url: u, + }) + if err != nil { + return nil, err + } + + if res.StatusCode != StatusOK { + return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) + } + + c.state = connClientStatePublishing + + return nil, nil +} diff --git a/connclient_test.go b/connclient_test.go index 6db5b053..0a489745 100644 --- a/connclient_test.go +++ b/connclient_test.go @@ -1,6 +1,8 @@ package gortsplib import ( + "fmt" + "net" "net/url" "os" "os/exec" @@ -8,6 +10,7 @@ import ( "testing" "time" + "github.com/pion/rtp" "github.com/stretchr/testify/require" ) @@ -56,52 +59,6 @@ func (c *container) wait() int { return int(code) } -func TestConnClientReadTCP(t *testing.T) { - cnt1, err := newContainer("rtsp-simple-server", "server", []string{}) - require.NoError(t, err) - defer cnt1.close() - - time.Sleep(1 * time.Second) - - cnt2, err := newContainer("ffmpeg", "publish", []string{ - "-re", - "-stream_loop", "-1", - "-i", "/emptyvideo.ts", - "-c", "copy", - "-f", "rtsp", - "-rtsp_transport", "udp", - "rtsp://localhost:8554/teststream", - }) - require.NoError(t, err) - defer cnt2.close() - - time.Sleep(1 * time.Second) - - u, err := url.Parse("rtsp://localhost:8554/teststream") - require.NoError(t, err) - - conn, err := NewConnClient(ConnClientConf{Host: u.Host}) - require.NoError(t, err) - defer conn.Close() - - _, err = conn.Options(u) - require.NoError(t, err) - - tracks, _, err := conn.Describe(u) - require.NoError(t, err) - - for _, track := range tracks { - _, err := conn.SetupTCP(u, track) - require.NoError(t, err) - } - - _, err = conn.Play(u) - require.NoError(t, err) - - _, err = conn.ReadFrame() - require.NoError(t, err) -} - func TestConnClientReadUDP(t *testing.T) { cnt1, err := newContainer("rtsp-simple-server", "server", []string{}) require.NoError(t, err) @@ -137,15 +94,269 @@ func TestConnClientReadUDP(t *testing.T) { require.NoError(t, err) for _, track := range tracks { - _, err := conn.SetupUDP(u, track, 0, 0) + _, err := conn.SetupUDP(u, SetupModePlay, track, 0, 0) require.NoError(t, err) } _, err = conn.Play(u) require.NoError(t, err) - go conn.LoopUDP(u) + loopDone := make(chan struct{}) + defer func() { <-loopDone }() + + go func() { + defer close(loopDone) + conn.LoopUDP(u) + }() _, err = conn.ReadFrameUDP(tracks[0], StreamTypeRtp) require.NoError(t, err) + + conn.CloseUDPListeners() +} + +func TestConnClientReadTCP(t *testing.T) { + cnt1, err := newContainer("rtsp-simple-server", "server", []string{}) + require.NoError(t, err) + defer cnt1.close() + + time.Sleep(1 * time.Second) + + cnt2, err := newContainer("ffmpeg", "publish", []string{ + "-re", + "-stream_loop", "-1", + "-i", "/emptyvideo.ts", + "-c", "copy", + "-f", "rtsp", + "-rtsp_transport", "udp", + "rtsp://localhost:8554/teststream", + }) + require.NoError(t, err) + defer cnt2.close() + + time.Sleep(1 * time.Second) + + u, err := url.Parse("rtsp://localhost:8554/teststream") + require.NoError(t, err) + + conn, err := NewConnClient(ConnClientConf{Host: u.Host}) + require.NoError(t, err) + defer conn.Close() + + _, err = conn.Options(u) + require.NoError(t, err) + + tracks, _, err := conn.Describe(u) + require.NoError(t, err) + + for _, track := range tracks { + _, err := conn.SetupTCP(u, SetupModePlay, track) + require.NoError(t, err) + } + + _, err = conn.Play(u) + require.NoError(t, err) + + _, err = conn.ReadFrameTCP() + require.NoError(t, err) +} + +func getH264SPSandPPS(pc net.PacketConn) ([]byte, []byte, error) { + var sps []byte + var pps []byte + + buf := make([]byte, 2048) + for { + n, _, err := pc.ReadFrom(buf) + if err != nil { + return nil, nil, err + } + + packet := &rtp.Packet{} + err = packet.Unmarshal(buf[:n]) + if err != nil { + return nil, nil, err + } + + // require h264 + if packet.PayloadType != 96 { + return nil, nil, fmt.Errorf("wrong payload type '%d', expected 96", + packet.PayloadType) + } + + // switch by NALU type + switch packet.Payload[0] & 0x1F { + case 0x07: // sps + sps = append([]byte(nil), packet.Payload...) + if sps != nil && pps != nil { + return sps, pps, nil + } + + case 0x08: // pps + pps = append([]byte(nil), packet.Payload...) + if sps != nil && pps != nil { + return sps, pps, nil + } + } + } +} + +func TestConnClientPublishUDP(t *testing.T) { + cnt1, err := newContainer("rtsp-simple-server", "server", []string{}) + require.NoError(t, err) + defer cnt1.close() + + time.Sleep(1 * time.Second) + + u, err := url.Parse("rtsp://localhost:8554/teststream") + require.NoError(t, err) + + conn, err := NewConnClient(ConnClientConf{Host: u.Host}) + require.NoError(t, err) + + publishDone := make(chan struct{}) + defer func() { <-publishDone }() + defer conn.Close() + + go func() { + defer close(publishDone) + + pc, err := net.ListenPacket("udp4", "127.0.0.1:0") + require.NoError(t, err) + defer pc.Close() + + cnt2, err := newContainer("gstreamer", "source", []string{ + "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + + " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), + }) + require.NoError(t, err) + defer cnt2.close() + + sps, pps, err := getH264SPSandPPS(pc) + require.NoError(t, err) + + _, err = conn.Options(u) + require.NoError(t, err) + + track := NewTrackH264(0, sps, pps) + + _, err = conn.Announce(u, Tracks{track}) + require.NoError(t, err) + + _, err = conn.SetupUDP(u, SetupModeRecord, track, 0, 0) + require.NoError(t, err) + + _, err = conn.Record(u) + require.NoError(t, err) + + buf := make([]byte, 2048) + for { + n, _, err := pc.ReadFrom(buf) + if err != nil { + break + } + + err = conn.WriteFrameUDP(track, StreamTypeRtp, buf[:n]) + if err != nil { + break + } + } + }() + + time.Sleep(1 * time.Second) + + cnt3, err := newContainer("ffmpeg", "read", []string{ + "-rtsp_transport", "udp", + "-i", "rtsp://localhost:8554/teststream", + "-vframes", "1", + "-f", "image2", + "-y", "/dev/null", + }) + require.NoError(t, err) + defer cnt3.close() + + code := cnt3.wait() + require.Equal(t, 0, code) +} + +func TestConnClientPublishTCP(t *testing.T) { + cnt1, err := newContainer("rtsp-simple-server", "server", []string{}) + require.NoError(t, err) + defer cnt1.close() + + time.Sleep(1 * time.Second) + + u, err := url.Parse("rtsp://localhost:8554/teststream") + require.NoError(t, err) + + conn, err := NewConnClient(ConnClientConf{Host: u.Host}) + require.NoError(t, err) + + publishDone := make(chan struct{}) + defer func() { <-publishDone }() + defer conn.Close() + + go func() { + defer close(publishDone) + + pc, err := net.ListenPacket("udp4", "127.0.0.1:0") + require.NoError(t, err) + defer pc.Close() + + cnt2, err := newContainer("gstreamer", "source", []string{ + "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + + " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), + }) + require.NoError(t, err) + defer cnt2.close() + + sps, pps, err := getH264SPSandPPS(pc) + require.NoError(t, err) + + _, err = conn.Options(u) + require.NoError(t, err) + + track := NewTrackH264(0, sps, pps) + + _, err = conn.Announce(u, Tracks{track}) + require.NoError(t, err) + + _, err = conn.SetupTCP(u, SetupModeRecord, track) + require.NoError(t, err) + + _, err = conn.Record(u) + require.NoError(t, err) + + buf := make([]byte, 2048) + for { + n, _, err := pc.ReadFrom(buf) + if err != nil { + break + } + + err = conn.WriteFrameTCP(&InterleavedFrame{ + TrackId: track.Id, + StreamType: StreamTypeRtp, + Content: buf[:n], + }) + if err != nil { + break + } + } + }() + + time.Sleep(1 * time.Second) + + cnt3, err := newContainer("ffmpeg", "read", []string{ + "-rtsp_transport", "udp", + "-i", "rtsp://localhost:8554/teststream", + "-vframes", "1", + "-f", "image2", + "-y", "/dev/null", + }) + require.NoError(t, err) + defer cnt3.close() + + code := cnt3.wait() + require.Equal(t, 0, code) } diff --git a/connclientudpl.go b/connclientudpl.go index e9e914b7..e705c748 100644 --- a/connclientudpl.go +++ b/connclientudpl.go @@ -7,20 +7,21 @@ import ( type connClientUDPListener struct { pc net.PacketConn - publisherIp net.IP - publisherPort int + remoteIp net.IP + remoteZone string + remotePort int udpFrameReadBuf *MultiBuffer } -func newConnClientUDPListener(c *ConnClient, port int) (*connClientUDPListener, error) { - pc, err := c.conf.ListenPacket("udp", ":"+strconv.FormatInt(int64(port), 10)) +func newConnClientUDPListener(conf ConnClientConf, port int) (*connClientUDPListener, error) { + pc, err := conf.ListenPacket("udp", ":"+strconv.FormatInt(int64(port), 10)) if err != nil { return nil, err } return &connClientUDPListener{ pc: pc, - udpFrameReadBuf: NewMultiBuffer(c.conf.ReadBufferCount, 2048), + udpFrameReadBuf: NewMultiBuffer(conf.ReadBufferCount, 2048), }, nil } @@ -38,10 +39,19 @@ func (l *connClientUDPListener) read() ([]byte, error) { uaddr := addr.(*net.UDPAddr) - if !l.publisherIp.Equal(uaddr.IP) || l.publisherPort != uaddr.Port { + if !l.remoteIp.Equal(uaddr.IP) || l.remotePort != uaddr.Port { continue } return buf[:n], nil } } + +func (l *connClientUDPListener) write(buf []byte) error { + _, err := l.pc.WriteTo(buf, &net.UDPAddr{ + IP: l.remoteIp, + Zone: l.remoteZone, + Port: l.remotePort, + }) + return err +} diff --git a/connserver.go b/connserver.go index 4c985f49..8a949016 100644 --- a/connserver.go +++ b/connserver.go @@ -106,7 +106,7 @@ func (s *ConnServer) WriteResponse(res *Response) error { } // WriteFrame writes an InterleavedFrame. -func (s *ConnServer) WriteFrame(frame *InterleavedFrame) error { +func (s *ConnServer) WriteFrameTCP(frame *InterleavedFrame) error { s.conf.Conn.SetWriteDeadline(time.Now().Add(s.conf.WriteTimeout)) return frame.Write(s.bw) } diff --git a/examples/publish-tcp.go b/examples/publish-tcp.go new file mode 100644 index 00000000..cfb9e8e9 --- /dev/null +++ b/examples/publish-tcp.go @@ -0,0 +1,121 @@ +// +build ignore + +package main + +import ( + "fmt" + "net" + "net/url" + + "github.com/aler9/gortsplib" + "github.com/pion/rtp" +) + +func getH264SPSandPPS(pc net.PacketConn) ([]byte, []byte, error) { + var sps []byte + var pps []byte + + buf := make([]byte, 2048) + for { + n, _, err := pc.ReadFrom(buf) + if err != nil { + return nil, nil, err + } + + packet := &rtp.Packet{} + err = packet.Unmarshal(buf[:n]) + if err != nil { + return nil, nil, err + } + + // require h264 + if packet.PayloadType != 96 { + return nil, nil, fmt.Errorf("wrong payload type '%d', expected 96", + packet.PayloadType) + } + + // switch by NALU type + switch packet.Payload[0] & 0x1F { + case 0x07: // sps + sps = append([]byte(nil), packet.Payload...) + if sps != nil && pps != nil { + return sps, pps, nil + } + + case 0x08: // pps + pps = append([]byte(nil), packet.Payload...) + if sps != nil && pps != nil { + return sps, pps, nil + } + } + } +} + +func main() { + pc, err := net.ListenPacket("udp4", "127.0.0.1:9000") + if err != nil { + panic(err) + } + defer pc.Close() + + fmt.Println("Waiting for a rtp/h264 stream on port 9000 - you can send one with gstreamer:\n" + + "gst-launch-1.0 filesrc location=video.mp4 ! qtdemux ! video/x-h264" + + " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=9000") + + sps, pps, err := getH264SPSandPPS(pc) + if err != nil { + panic(err) + } + + fmt.Println("stream is ok") + + u, err := url.Parse("rtsp://localhost:8554/mystream") + if err != nil { + panic(err) + } + + conn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{Host: u.Host}) + if err != nil { + panic(err) + } + defer conn.Close() + + _, err = conn.Options(u) + if err != nil { + panic(err) + } + + track := gortsplib.NewTrackH264(0, sps, pps) + + _, err = conn.Announce(u, gortsplib.Tracks{track}) + if err != nil { + panic(err) + } + + _, err = conn.SetupTCP(u, gortsplib.SetupModeRecord, track) + if err != nil { + panic(err) + } + + _, err = conn.Record(u) + if err != nil { + panic(err) + } + + buf := make([]byte, 2048) + for { + n, _, err := pc.ReadFrom(buf) + if err != nil { + break + } + + err = conn.WriteFrameTCP(&gortsplib.InterleavedFrame{ + TrackId: track.Id, + StreamType: gortsplib.StreamTypeRtp, + Content: buf[:n], + }) + if err != nil { + break + } + } +} diff --git a/examples/publish-udp.go b/examples/publish-udp.go new file mode 100644 index 00000000..aa0ed402 --- /dev/null +++ b/examples/publish-udp.go @@ -0,0 +1,117 @@ +// +build ignore + +package main + +import ( + "fmt" + "net" + "net/url" + + "github.com/aler9/gortsplib" + "github.com/pion/rtp" +) + +func getH264SPSandPPS(pc net.PacketConn) ([]byte, []byte, error) { + var sps []byte + var pps []byte + + buf := make([]byte, 2048) + for { + n, _, err := pc.ReadFrom(buf) + if err != nil { + return nil, nil, err + } + + packet := &rtp.Packet{} + err = packet.Unmarshal(buf[:n]) + if err != nil { + return nil, nil, err + } + + // require h264 + if packet.PayloadType != 96 { + return nil, nil, fmt.Errorf("wrong payload type '%d', expected 96", + packet.PayloadType) + } + + // switch by NALU type + switch packet.Payload[0] & 0x1F { + case 0x07: // sps + sps = append([]byte(nil), packet.Payload...) + if sps != nil && pps != nil { + return sps, pps, nil + } + + case 0x08: // pps + pps = append([]byte(nil), packet.Payload...) + if sps != nil && pps != nil { + return sps, pps, nil + } + } + } +} + +func main() { + pc, err := net.ListenPacket("udp4", "127.0.0.1:9000") + if err != nil { + panic(err) + } + defer pc.Close() + + fmt.Println("Waiting for a rtp/h264 stream on port 9000 - you can send one with gstreamer:\n" + + "gst-launch-1.0 filesrc location=video.mp4 ! qtdemux ! video/x-h264" + + " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=9000") + + sps, pps, err := getH264SPSandPPS(pc) + if err != nil { + panic(err) + } + + fmt.Println("stream is ok") + + u, err := url.Parse("rtsp://localhost:8554/mystream") + if err != nil { + panic(err) + } + + conn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{Host: u.Host}) + if err != nil { + panic(err) + } + defer conn.Close() + + _, err = conn.Options(u) + if err != nil { + panic(err) + } + + track := gortsplib.NewTrackH264(0, sps, pps) + + _, err = conn.Announce(u, gortsplib.Tracks{track}) + if err != nil { + panic(err) + } + + _, err = conn.SetupUDP(u, gortsplib.SetupModeRecord, track, 0, 0) + if err != nil { + panic(err) + } + + _, err = conn.Record(u) + if err != nil { + panic(err) + } + + buf := make([]byte, 2048) + for { + n, _, err := pc.ReadFrom(buf) + if err != nil { + break + } + + err = conn.WriteFrameUDP(track, gortsplib.StreamTypeRtp, buf[:n]) + if err != nil { + break + } + } +} diff --git a/examples/read-tcp.go b/examples/read-tcp.go index 5e22fc2e..05608d3e 100644 --- a/examples/read-tcp.go +++ b/examples/read-tcp.go @@ -32,7 +32,7 @@ func main() { } for _, track := range tracks { - _, err := conn.SetupTCP(u, track) + _, err := conn.SetupTCP(u, gortsplib.SetupModePlay, track) if err != nil { panic(err) } @@ -44,9 +44,8 @@ func main() { } for { - frame, err := conn.ReadFrame() + frame, err := conn.ReadFrameTCP() if err != nil { - conn.Close() fmt.Println("connection is closed (%s)", err) break } diff --git a/examples/read-udp.go b/examples/read-udp.go index 506854a8..c4c409af 100644 --- a/examples/read-udp.go +++ b/examples/read-udp.go @@ -33,7 +33,7 @@ func main() { } for _, track := range tracks { - _, err := conn.SetupUDP(u, track, 0, 0) + _, err := conn.SetupUDP(u, gortsplib.SetupModePlay, track, 0, 0) if err != nil { panic(err) } @@ -45,6 +45,8 @@ func main() { } var wg sync.WaitGroup + defer wg.Wait() + defer conn.CloseUDPListeners() // read RTP frames for _, track := range tracks { @@ -83,7 +85,5 @@ func main() { } err = conn.LoopUDP(u) - conn.Close() - wg.Wait() fmt.Println("connection is closed (%s)", err) } diff --git a/go.mod b/go.mod index 87f223eb..5e15d084 100644 --- a/go.mod +++ b/go.mod @@ -5,5 +5,6 @@ go 1.12 require ( github.com/aler9/sdp-dirty/v3 v3.0.0-20200919115950-f1abc664f625 github.com/pion/rtcp v1.2.3 + github.com/pion/rtp v1.6.0 github.com/stretchr/testify v1.6.1 ) diff --git a/go.sum b/go.sum index 928e5140..fce70483 100644 --- a/go.sum +++ b/go.sum @@ -2,10 +2,13 @@ github.com/aler9/sdp-dirty/v3 v3.0.0-20200919115950-f1abc664f625 h1:A3upkpYzceQT github.com/aler9/sdp-dirty/v3 v3.0.0-20200919115950-f1abc664f625/go.mod h1:5bO/aUQr9m3OasDatNNcVqKAgs7r5hgGXmszWHaC6mI= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pion/randutil v0.0.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.3 h1:2wrhKnqgSz91Q5nzYTO07mQXztYPtxL8a0XOss4rJqA= github.com/pion/rtcp v1.2.3/go.mod h1:zGhIv0RPRF0Z1Wiij22pUt5W/c9fevqSzT4jje/oK7I= +github.com/pion/rtp v1.6.0 h1:4Ssnl/T5W2LzxHj9ssYpGVEQh3YYhQFNVmSWO88MMwk= +github.com/pion/rtp v1.6.0/go.mod h1:QgfogHsMBVE/RFNno467U/KBqfUywEH+HK+0rtnwsdI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/testimages/gstreamer/Dockerfile b/testimages/gstreamer/Dockerfile new file mode 100644 index 00000000..46a70444 --- /dev/null +++ b/testimages/gstreamer/Dockerfile @@ -0,0 +1,16 @@ +FROM amd64/alpine:3.12 + +RUN apk add --no-cache \ + gstreamer-tools \ + gst-plugins-good \ + gst-plugins-bad \ + && apk add --no-cache \ + -X http://dl-cdn.alpinelinux.org/alpine/edge/testing \ + gst-rtsp-server + +COPY emptyvideo.ts / + +COPY start.sh / +RUN chmod +x /start.sh + +ENTRYPOINT [ "/start.sh" ] diff --git a/testimages/gstreamer/emptyvideo.ts b/testimages/gstreamer/emptyvideo.ts new file mode 100644 index 0000000000000000000000000000000000000000..6693c037aa8cc233dd7a4f44803bef78b8cc53a5 GIT binary patch literal 65048 zcmeI54OmrGy2sZN5D*X+AL0kd*$Rq@3WA?Wx>HkcINE60loPXoqmm#9$JEJ5PgyfH zI@JtQQ<8SlzO6||r@GP2GckHct;Vt@i|%-?G)|+LmDN}%_g#4QU9ZnMP2SeKd!p++ z4Z`67zxO@=wf5Tkw--#AAd=X{$8#9>GuF=A8E;#aIyIShz2uVWTKD|eS?;>)D$CvZ z1xr^<+}-}|`rniZEQztF5<1u!Kh7@L61nx;h4`g3Q)U8tm#30D*aPf1>m%Q!JwN@5 zFa45l%_pXqEI)cCW7o&uz!>8P8C$iZTLFvx&?i{*o0fbm_PBgIzk=m%J}_Zfmx1r@ zn8a8FiMI8Hhg4} zYly47rq1mu7%{?CH+)RNC|ABUx~O8zX!yj`q40!Z(`QbeGNj05UNI4#S>Y~+PfV<- zU0&s$=Wz|s&o3M@JikCbdx6JOJ9gNxrBgT)OP3D4uCl^iQ)Mk0T2nWFnEXjY7kH|x z;CVH*p30g<^iiJw9Z;I z-(6fV(pA2ouBO^5g)c2|dFtF%Rh9Md-sokcE6P3aPWj^M;(R~=_GGK#$2E)G#ls8o z3JP5Fta?vrZT-T^TKTLq-(+!ZY0bQO^=?n`kl`-Rf;zWV0pD!6tE#4Ep|t=$DzzUR z?y9eXp{*XscP*;x_(A2B)s{!XR4($k>#8hx5?d)b`V#?V7RUp?|dW+jHOK zVk7^yH$R`6`}~zh|2%w&3_bcUBKh9ee^Jr%jhDt{2OsADl<&A{0;xr`@o)te)hk5>bLG`z&>(5`(~!SvtA+X zE=5Mc969QWB*xYk-9TLdB<^5v%1z8R1dDZZ29KSYa?N|QtIQdv-&g}7`g%20PXPAO zKLB?BTyl1JF%f=3w*cUO=tKDHFJ5+I*4IUk=#mKRV@iR&b+r@hXR;WqU{#dC;%+b% zJYR2lCPIB*ukVqKz&`e>B*vpRJHn0|f45gK=sspIzgC0&K48CaCa_9i z&g8uq^3E4$)3e(%FUou3Q+|cd*FUI}cbS=sWZoM}9AU?;-^~Ss{$uusb@DF9>Uf#= zr#*!1h&cUL1BiEXMQq-?8S-AGpLaRG=gYjm)kw%5n)kGtfM76gQ3>mJPfpX$dx6aR zsb+fiFnLcoWGZ~V{ym+%%ghXiymxUOb%Y(IC#}sZ81x^rAJNTwA>_Twq*Ow7MBL{& z9}u^B6|s5mahkl#`Mn78-evwcLiW(S_jK?%nU zRKkY4H1pma=194IkJz}0o;^(7jUW3JKHsoiC-3(9J!0oxN7zw%`ZsXFp#PZtX`Q@F zcDa6!c<%%uJ0c!5%K+jAu87TbO*DD8*YA-D8HDVic^^0?AQ+5WRKkY0H1jU6&&c(A zWd0<2_Aq(RdBRlqe8WkdyxZ&d$XQj6u%q-0-02kz`j6Sa(9L_{WzhHQ8YtNj@}R%^ z0ePoaA)D(?1p~jb{q&=EeckI~39;b$L?`xqq7!?*BJg~X_cWd5`36^6g2DWzUg>l7 zeD0s!$f902{n=~d_ijCUZ}#>>k9_vP`T?aAm*ntucQwC1cGcM#x%{oqlHP5u4V=Dd zNAMF__cIo?06yS5<(qi<@D};zcKFiU;Q#l@54OS^={}SO-gZW*0kI1H0%6X<^W=E6Fu=Q|gf*FUwW8~f=@?bnU zI~0KMhurKF42BWHC-NRq5k*e0pE2+y|2_U@KxW?;+Qb(tAWzZ6;(7F2Ya=1MeZ)0f;S+g}uHzdWVMpm1wbT?0 z`j6SQyob!Y^d4QynhDtvaq&n?Fu+#CWZpy0@6vm8z4<61duZM-%rpdpaf?dO_8v0t z(tC7$G?kt`Oy0-b;Z^uV-lOZ@agMN~^j!F;UohxDX4mo_GVjuRbp4=$kR1`53j#p= zs9zD2c@H_ikA}QQC$Ax756%1d89u>a+@cb+y@$-Z^d8Y8w$ZbP$$RmOT;UUWkLW8~ z9AQW48Q)?G2K~qETHZtEU3!n`6_JGOhS`dqn?XHa&ZoyqlkR6+V&oh<?GbJ=GT)b*%9)kS9t}SW||6_%6r6s_ek!<-Xpn_dyklBs?PErmmW6Ym3}Fp zUg>l99;;b8Y!KQ1^xmzX@*Zwdes4KE|fWLS~+Kltw!=CNz zf4|H{e3OsIEtls_;5}mAZlIiJI%-6d&1Jpd&;&D%iXg#$kB;*u*zXZ@x`~_}3PAWX zm<9NU4TP`hJ?!(QSfkYm_A?m^Rv@nfPp|hYcp~o+J2~DFcHH>2TrlW9W+(9;u?va_ z*%9&OB{12}Sj!bLx%Y_u`CLNw;Brm{0-5)SeR4HDd$0t1#LB#1_K>OY zHNA&C_mKNNV)t!!gdMm31+QSxf6PweJz`JnCuB#&Ke)jM#4mUiF}e5Xmfl9l9-8+# zZc8v2x2Obd?_u9B*6qSEm-{{9 z0+ICWVe(#HYbtzA?;$fI*YEL>d5*B7^tf;J3I_eh>?GbJe&B3Ec0{~zs!uS$R>b7q zBmUo35VD8ny>gf(7>rv~g0}at?^}wm-bBwHChzkf@GE>x?_sasBQb6rIUM)_`hVFK?3g~@1K#s=PDtyikuw^K=?J+ z8-l?wLin2A!#;0Hn9$$^`_voIonvfk4FF$Ds2H1+2%zMcBUGDejK5{c5 zduZO59yFkYodHw=nfK`agZ=dEVe-ComRI2ud5`WZ+8klW>8bJy2K~qETHZq*o22*X zet$M0J0e~=I3U>aD!(Eo^B(d(4!Pf>`->%n?4f!8NwN=0IBB60w7rMD-=q8AJoM~g z@_zj-T;UUWkHpwUN7zw%ezL(74Em4RwY-PSyYwE3dCi3Ei1?;!EFj)sDq=G4(XoDq zyeG~$O2{6Xckg6FFc`O}1a0qO-?x-lpGwajChses@hW^G?~!=dI7iq~dc6DmfmEHhT6jdB5ohSNKHUBdK4DBkVXmk)~kKf6T7sJ!IbH@3~64B$ALF5&!%}OR%3t znu?grd)Vvuq{=)(_RzfF`j`PF+z~(}ka>@!HM8m2!{q(eOs~Qx@*YXQUEv5jPR~fc zV965)@d5^U% zK6s^HwA3qo?%w14evhw`9r5#MdXWDD4en9O_F=S|{2n+Vy1i!fBeuX8QIV8o&lw7rLY-XxB+ z(z6Fkut%)S`>)pd6+V&oNKTA*gdMkjI~NT4kJ(AQNAmC@LUu&F&SL=acCLuYy+`up za|zi)^M21&0l{G0q7ulwNAj}O^z32se)r3!!q@a3a(?~xp6BV-TF`+aLI!C>5?612UCz274xDx01?Oy2Kf zeub~;J>;4|?)ON^Epdb$r>8d;4Em4RwY-NMtI~Uw7rMD-y^ke1wDJ1 zyg%snD|}7wA?J6w-y_vr;|M!S&%-OZV9Xo49>MUcRI*uUp0?W(=x>`(y0-%{%n z42BWHC-NR?KRMw9`WJknL zH^F4P<&>$2$-IZWe@5OfmUg&`kUh8vLnUnAXb1))7L}mwJ?!(Q9^D$~*@Gq6BUa{p zdk?R|*YqCtc~g%eO^&dm);~SWFBtS6vuk+|`@E^g%vM5nMEu;bfM9^Fh{?T24^KQH zduZODeccBoG+C$wGVjsj?jm~jFnND=8dvy4-lNCA&vk?yr>D#m4Em4RNxVmocUBX! zBVxZ{3HDr>sffwEhn(N#evfpvnUFm+?=Qp{P{OeQDuK*ikQrMbgbWH-q)58vWMpV z#j!rYVBDe-w7rMD-y{7g4?TOBy!)Tz3ZKY(q`%ha2s=v8i@Qw0p#PX%%X`SYOYf2X zX)_@^BL34Y77*_;6)~Cju-ETBdmSZY56%0oYC|v>x2Obd?_uA!)U!C1o;^(7|9H@= z@QJ)f&mWC*gdL@4*Ga!%(0|OXufn{uyvHlA8t_U#6Hu@8 zxqFZE`#t`Dc#mFJ%%+@YI%ak>*}NVJhbEYDR0K)9#{?nR#m93P_cPYc+Zk_Ll{z(< zcfI72>RR{w*jetn>nh9L`2|Z?Ox$fBY`!J`!u&3OM^~@qE6Do}6{6PfJ7PiWUo}wc zHJy!p&eyAPlN0P`UZ7wF@*4KOYvAQ&JodPJtLbdyaaHbB>Gi^1N7zy0Ut7ingYIK? z5@*xvzfTadBjN+2;6MS`ikRHl^zNEL$Q~TasD!^{1_XoI29-eOYmTz92K~qETF%Bk=j&b5K*)}W->LKg@nc>^Oy+Ft zbH3icY$9Y2&HLLkEx};iq7t;7jeXA7dwVNAdzic*c+s!$iJVRE1M!Zqqx8J}4i^mi zkJ(9_P4BiMLUu$PXf%NM9j=JUolT#txrFSYdH?H;0l{G0q7ulQO`nTb)3b-k`#YbS z3SZON$oXCFRq5m2>I!VSURy_hRva%YnF8@CLlk|6*8r>N$%bo#@G5vB?~(amD;#0RjsM6m7<3=AYk3d*ys0nW zM97YaPi}$9_T!KIikQrM$m_szuS(zSy@c$+MHnjKv)}jxgAt2LAoCu5r<|Z?50+q$ zSef_F;<>^n@*aH`W;nu*T7NRf6b$;0*-5-d-(O53WJkoOS}nl`Zfd)@kO zsUl<#&HER73@Bks0F|KaJ?!(QzWW>K*~8@hi;KJpU(!Nr<_RzeypYlNotrjYQ%zI>6MfB`p z^4`9fD|{mFk+piRBkU+W{5DfC=s#vB@g7+ZuO?(i#L?3%!2w$llX;Jh^*hY(S-UqA zvWMiIN97wrj$2d$nfJ(ge?L8YXx@3mMz6vr@*b|FHb>Y|dZL>ALiQiCYk3cOY?9u? zRg_K0j)>!~4*+qKUlEge4}1OYno~l^9-8;qGN0fuZcz!^-oxIj;`*_No;^(7qu<~P zpU8W-?rU^}9i=Ds15l99_RO}e7C)a z{X4phUu~nDXF6tfG})wm4u>X~aa05ey@&Dp7IJnd0O6+`3;_NvAHvu49`^lW##@n2 zu%9`y1}lK;z&vHPso*uehrG5V_j?$p@*H7DjZdrg3J%@J>?Gc!pKCTDJ0k9r2a_#e zD`Il*(Qo1kLiXSy43*F;#exz(H&6-M-orj`>Njr_J$tYOd&J7Tr{CgN_?q5BuEpej zkAAoAb%Y(a{(dev^dGa6c#nRYPY|*r;;d2wi0|i$nB05x`*Q{%duZM>CkF(Faf?bI z^B(;^ofR!TU#XMNIBJ`oA}qkUcc-IgbUP zggbqx1TycD9lx5MJxtzn`kD$~(|g$K_v~St9bw1m8RZom`j6R3yhrx;_Y<-s;=!N# z1RprctBA?HNA{97LiW(S4|>Of5}FNEg0}at_j_dDlTFVaChvo0`W3#W_psOR*-a&m zu;cVx%LRx2V|Fd?A;+rp9@&5O5V9lUfDj~NeaKCw!q@a3_WC_%>QP77QF?|odIg96V|Fd?A=#z($f-)DWJkya z+x>vt=vBy+-lJD1_8z@Dx%bGqecV~zqwr!&aG2lJD}C*^KhQp$TRj6+r^;A+Hn2{T?~5uOVlL0ucU)t9^pQFhcm6-a}p|ko)9w{<+Nw z_A^J;Uv zM^-q(j#__VZ@=Ksf6T7sJvz>tU>!MN&n8NCggpL0Kyctz$Vz++Y(KsIf>q~m1dRyu z9R4`uUGDc7aBMFn`&o12j&23Kc!v)vc-TS}ka>@R;siZ=n7kJkafMIhJqC`*aD*L) zXSyk5|1rD1_kg?)ymAsHJ3^iuZwWqrx~Y(r_|VV0^d1ARucBlR$-DWn0TmnwpbE6T zhrCW8y~n`y4fO0`@@~%aDtt}vA=d=*_goG1H#x$N!ZUHDUvTI@X4mo_a(*8r^WNG@ z$&QexT@(-;xD~PzANqM84SCOviKk=_$@`QXpWrZTQ3Yh)BX@8SJ$snEPripMd?N3W z`@Ol2u%qxy*=h<7{m1P3-UITUyLdGvJ3^km*aGCOrb1TYBS(8ZBiHY_4Vx+1L-Ky< zm4@IjY*7Vd-Xr(v{q*c%@;+^kSK$+RkKDJ~9AQV{xwO?UIP@R0Yk3cOY?9t1_jEQP zJ0iYpV*rR-{fbz552O9`W0!|nnX_=q^^gZ6<^a1~zYj7>2-$;SK_$$%%_levTU3I! z_ptYS44UMjXAhJ2>8H8EC-NSH<~KURj#__Nx+ysHAG2$D51DuAJqF#@Ov#RrXXSeZ M2X2K-