improve performance

This commit is contained in:
aler9
2020-11-16 00:08:50 +01:00
parent 8cf24e0a7d
commit c2712da906
8 changed files with 112 additions and 210 deletions

View File

@@ -5,6 +5,7 @@ import (
"os"
"os/exec"
"strconv"
"sync/atomic"
"testing"
"time"
@@ -58,56 +59,6 @@ func (c *container) wait() int {
return int(code)
}
func TestDialRead(t *testing.T) {
for _, proto := range []string{
"udp",
"tcp",
} {
t.Run(proto, func(t *testing.T) {
cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"})
require.NoError(t, err)
defer cnt1.close()
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "publish", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
"rtsp://localhost:8554/teststream",
})
require.NoError(t, err)
defer cnt2.close()
time.Sleep(1 * time.Second)
dialer := func() Dialer {
if proto == "udp" {
return Dialer{}
}
return Dialer{StreamProtocol: StreamProtocolTCP}
}()
conn, err := dialer.DialRead("rtsp://localhost:8554/teststream")
require.NoError(t, err)
id, typ, _, err := conn.ReadFrame()
require.NoError(t, err)
require.Equal(t, 0, id)
require.Equal(t, StreamTypeRtp, typ)
conn.Close()
_, _, _, err = conn.ReadFrame()
require.Error(t, err)
})
}
}
func TestDialReadParallel(t *testing.T) {
for _, proto := range []string{
"udp",
@@ -144,27 +95,35 @@ func TestDialReadParallel(t *testing.T) {
conn, err := dialer.DialRead("rtsp://localhost:8554/teststream")
require.NoError(t, err)
var firstFrame int32
frameRecv := make(chan struct{})
readerDone := make(chan struct{})
go func() {
defer close(readerDone)
for {
_, _, _, err := conn.ReadFrame()
if err != nil {
break
}
conn.OnFrame(func(id int, typ StreamType, content []byte, err error) {
if err != nil {
close(readerDone)
return
}
}()
time.Sleep(1 * time.Second)
if atomic.SwapInt32(&firstFrame, 1) == 0 {
close(frameRecv)
}
})
<-frameRecv
conn.Close()
<-readerDone
readerDone = make(chan struct{})
conn.OnFrame(func(id int, typ StreamType, content []byte, err error) {
require.Error(t, err)
close(readerDone)
})
<-readerDone
})
}
}
func TestDialReadRedirect(t *testing.T) {
func TestDialReadRedirectParallel(t *testing.T) {
cnt1, err := newContainer("rtsp-simple-server", "server", []string{
"paths:\n" +
" path1:\n" +
@@ -193,62 +152,24 @@ func TestDialReadRedirect(t *testing.T) {
conn, err := DialRead("rtsp://localhost:8554/path1")
require.NoError(t, err)
defer conn.Close()
_, _, _, err = conn.ReadFrame()
require.NoError(t, err)
}
var firstFrame int32
frameRecv := make(chan struct{})
readerDone := make(chan struct{})
conn.OnFrame(func(id int, typ StreamType, content []byte, err error) {
if err != nil {
close(readerDone)
return
}
func TestDialReadPause(t *testing.T) {
for _, proto := range []string{
"udp",
"tcp",
} {
t.Run(proto, func(t *testing.T) {
cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"})
require.NoError(t, err)
defer cnt1.close()
if atomic.SwapInt32(&firstFrame, 1) == 0 {
close(frameRecv)
}
})
time.Sleep(1 * time.Second)
cnt2, err := newContainer("ffmpeg", "publish", []string{
"-re",
"-stream_loop", "-1",
"-i", "/emptyvideo.ts",
"-c", "copy",
"-f", "rtsp",
"-rtsp_transport", "udp",
"rtsp://localhost:8554/teststream",
})
require.NoError(t, err)
defer cnt2.close()
time.Sleep(1 * time.Second)
dialer := func() Dialer {
if proto == "udp" {
return Dialer{}
}
return Dialer{StreamProtocol: StreamProtocolTCP}
}()
conn, err := dialer.DialRead("rtsp://localhost:8554/teststream")
require.NoError(t, err)
defer conn.Close()
_, _, _, err = conn.ReadFrame()
require.NoError(t, err)
_, err = conn.Pause()
require.NoError(t, err)
_, err = conn.Play()
require.NoError(t, err)
_, _, _, err = conn.ReadFrame()
require.NoError(t, err)
})
}
<-frameRecv
conn.Close()
<-readerDone
}
func TestDialReadPauseParallel(t *testing.T) {
@@ -287,30 +208,50 @@ func TestDialReadPauseParallel(t *testing.T) {
conn, err := dialer.DialRead("rtsp://localhost:8554/teststream")
require.NoError(t, err)
firstFrame := int32(0)
frameRecv := make(chan struct{})
readerDone := make(chan struct{})
go func() {
defer close(readerDone)
for {
_, _, _, err := conn.ReadFrame()
if err != nil {
break
}
conn.OnFrame(func(id int, typ StreamType, content []byte, err error) {
if err != nil {
close(readerDone)
return
}
}()
time.Sleep(1 * time.Second)
if atomic.SwapInt32(&firstFrame, 1) == 0 {
close(frameRecv)
}
})
<-frameRecv
_, err = conn.Pause()
require.NoError(t, err)
<-readerDone
_, err = conn.Play()
require.NoError(t, err)
firstFrame = int32(0)
frameRecv = make(chan struct{})
readerDone = make(chan struct{})
conn.OnFrame(func(id int, typ StreamType, content []byte, err error) {
if err != nil {
close(readerDone)
return
}
if atomic.SwapInt32(&firstFrame, 1) == 0 {
close(frameRecv)
}
})
<-frameRecv
conn.Close()
<-readerDone
})
}
}
func TestDialPublish(t *testing.T) {
func TestDialPublishSerial(t *testing.T) {
for _, proto := range []string{
"udp",
"tcp",
@@ -476,7 +417,7 @@ func TestDialPublishParallel(t *testing.T) {
}
}
func TestDialPublishPause(t *testing.T) {
func TestDialPublishPauseSerial(t *testing.T) {
for _, proto := range []string{
"udp",
"tcp",