diff --git a/goreleaser.yml b/goreleaser.yml index 62fb8a4..2866912 100644 --- a/goreleaser.yml +++ b/goreleaser.yml @@ -10,6 +10,8 @@ builds: - CGO_ENABLED=0 tags: - sqlite + - mysql + - postgres ldflags: - -s -w -X m7s.live/v5.Version={{.Tag}} goos: diff --git a/pkg/av1_parse_test.go b/pkg/av1_parse_test.go new file mode 100644 index 0000000..8ae250f --- /dev/null +++ b/pkg/av1_parse_test.go @@ -0,0 +1,274 @@ +package pkg + +import ( + "testing" + + "github.com/bluenviron/mediacommon/pkg/codecs/av1" + "github.com/langhuihui/gomem" + "m7s.live/v5/pkg/codec" +) + +// TestParseAV1OBUs tests the ParseAV1OBUs method +func TestParseAV1OBUs(t *testing.T) { + t.Run("empty reader", func(t *testing.T) { + sample := &BaseSample{} + mem := gomem.Memory{} + reader := mem.NewReader() + + err := sample.ParseAV1OBUs(&reader) + if err != nil { + t.Errorf("Expected no error for empty reader, got: %v", err) + } + }) + + t.Run("single OBU - Sequence Header", func(t *testing.T) { + sample := &BaseSample{} + + // Create a simple AV1 OBU (Sequence Header) + // OBU Header: type=1 (SEQUENCE_HEADER), extension_flag=0, has_size_field=1 + obuHeader := byte(0b00001010) // type=1, has_size=1 + obuSize := byte(4) // Size of OBU payload + payload := []byte{0x08, 0x0C, 0x00, 0x00} + + mem := gomem.Memory{} + mem.PushOne([]byte{obuHeader, obuSize}) + mem.PushOne(payload) + + reader := mem.NewReader() + err := sample.ParseAV1OBUs(&reader) + if err != nil { + t.Errorf("ParseAV1OBUs failed: %v", err) + } + + nalus := sample.Raw.(*Nalus) + if nalus.Count() != 1 { + t.Errorf("Expected 1 OBU, got %d", nalus.Count()) + } + }) + + t.Run("multiple OBUs", func(t *testing.T) { + sample := &BaseSample{} + + mem := gomem.Memory{} + + // First OBU - Temporal Delimiter + obuHeader1 := byte(0b00010010) // type=2 (TEMPORAL_DELIMITER), has_size=1 + obuSize1 := byte(0) + mem.PushOne([]byte{obuHeader1, obuSize1}) + + // Second OBU - Frame Header with some payload + obuHeader2 := byte(0b00011010) // type=3 (FRAME_HEADER), has_size=1 + obuSize2 := byte(3) + payload2 := []byte{0x01, 0x02, 0x03} + mem.PushOne([]byte{obuHeader2, obuSize2}) + mem.PushOne(payload2) + + reader := mem.NewReader() + err := sample.ParseAV1OBUs(&reader) + if err != nil { + t.Errorf("ParseAV1OBUs failed: %v", err) + } + + nalus := sample.Raw.(*Nalus) + if nalus.Count() != 2 { + t.Errorf("Expected 2 OBUs, got %d", nalus.Count()) + } + }) +} + +// TestGetOBUs tests the GetOBUs method +func TestGetOBUs(t *testing.T) { + t.Run("initialize empty OBUs", func(t *testing.T) { + sample := &BaseSample{} + obus := sample.GetOBUs() + + if obus == nil { + t.Error("GetOBUs should return non-nil OBUs") + } + + if sample.Raw != obus { + t.Error("Raw should be set to the returned OBUs") + } + }) + + t.Run("return existing OBUs", func(t *testing.T) { + existingOBUs := &OBUs{} + sample := &BaseSample{ + Raw: existingOBUs, + } + + obus := sample.GetOBUs() + if obus != existingOBUs { + t.Error("GetOBUs should return the existing OBUs") + } + }) +} + +// TestAV1OBUTypes tests all AV1 OBU type constants +func TestAV1OBUTypes(t *testing.T) { + tests := []struct { + name string + obuType int + expected int + }{ + {"SEQUENCE_HEADER", codec.AV1_OBU_SEQUENCE_HEADER, 1}, + {"TEMPORAL_DELIMITER", codec.AV1_OBU_TEMPORAL_DELIMITER, 2}, + {"FRAME_HEADER", codec.AV1_OBU_FRAME_HEADER, 3}, + {"TILE_GROUP", codec.AV1_OBU_TILE_GROUP, 4}, + {"METADATA", codec.AV1_OBU_METADATA, 5}, + {"FRAME", codec.AV1_OBU_FRAME, 6}, + {"REDUNDANT_FRAME_HEADER", codec.AV1_OBU_REDUNDANT_FRAME_HEADER, 7}, + {"TILE_LIST", codec.AV1_OBU_TILE_LIST, 8}, + {"PADDING", codec.AV1_OBU_PADDING, 15}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.obuType != tt.expected { + t.Errorf("OBU type %s: expected %d, got %d", tt.name, tt.expected, tt.obuType) + } + }) + } +} + +// TestAV1Integration tests the full integration of AV1 codec +func TestAV1Integration(t *testing.T) { + t.Run("create AV1 context and parse OBUs", func(t *testing.T) { + // Create AV1 codec context + ctx := &codec.AV1Ctx{ + ConfigOBUs: []byte{0x0A, 0x0B, 0x00}, + } + + // Verify context properties + if ctx.GetInfo() != "AV1" { + t.Errorf("Expected 'AV1', got '%s'", ctx.GetInfo()) + } + + if ctx.FourCC() != codec.FourCC_AV1 { + t.Error("FourCC should be AV1") + } + + // Create a sample with OBUs + sample := &Sample{ + ICodecCtx: ctx, + BaseSample: &BaseSample{}, + } + + // Add some OBUs + obus := sample.GetOBUs() + obu := obus.GetNextPointer() + obu.PushOne([]byte{0x0A, 0x01, 0x02, 0x03}) + + // Verify OBU count + if obus.Count() != 1 { + t.Errorf("Expected 1 OBU, got %d", obus.Count()) + } + }) +} + +// TestAV1OBUHeaderParsing tests parsing of actual AV1 OBU headers +func TestAV1OBUHeaderParsing(t *testing.T) { + tests := []struct { + name string + headerByte byte + obuType uint + hasSize bool + }{ + { + name: "Sequence Header with size", + headerByte: 0b00001010, // type=1, has_size=1 + obuType: 1, + hasSize: true, + }, + { + name: "Frame with size", + headerByte: 0b00110010, // type=6, has_size=1 + obuType: 6, + hasSize: true, + }, + { + name: "Temporal Delimiter with size", + headerByte: 0b00010010, // type=2, has_size=1 + obuType: 2, + hasSize: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var header av1.OBUHeader + err := header.Unmarshal([]byte{tt.headerByte}) + if err != nil { + t.Fatalf("Failed to unmarshal OBU header: %v", err) + } + + if uint(header.Type) != tt.obuType { + t.Errorf("Expected OBU type %d, got %d", tt.obuType, header.Type) + } + + if header.HasSize != tt.hasSize { + t.Errorf("Expected HasSize %v, got %v", tt.hasSize, header.HasSize) + } + }) + } +} + +// BenchmarkParseAV1OBUs benchmarks the OBU parsing performance +func BenchmarkParseAV1OBUs(b *testing.B) { + // Prepare test data + mem := gomem.Memory{} + for i := 0; i < 10; i++ { + obuHeader := byte(0b00110010) // Frame OBU + obuSize := byte(10) + payload := make([]byte, 10) + for j := range payload { + payload[j] = byte(j) + } + mem.PushOne([]byte{obuHeader, obuSize}) + mem.PushOne(payload) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + sample := &BaseSample{} + reader := mem.NewReader() + _ = sample.ParseAV1OBUs(&reader) + } +} + +// TestOBUsReuseArray tests the reuse array functionality with OBUs +func TestOBUsReuseArray(t *testing.T) { + t.Run("reuse OBU memory", func(t *testing.T) { + obus := &OBUs{} + + // First allocation + obu1 := obus.GetNextPointer() + obu1.PushOne([]byte{1, 2, 3}) + + if obus.Count() != 1 { + t.Errorf("Expected count 1, got %d", obus.Count()) + } + + // Second allocation + obu2 := obus.GetNextPointer() + obu2.PushOne([]byte{4, 5, 6}) + + if obus.Count() != 2 { + t.Errorf("Expected count 2, got %d", obus.Count()) + } + + // Reset and reuse + obus.Reset() + if obus.Count() != 0 { + t.Errorf("Expected count 0 after reset, got %d", obus.Count()) + } + + // Reuse memory + obu3 := obus.GetNextPointer() + obu3.PushOne([]byte{7, 8, 9}) + + if obus.Count() != 1 { + t.Errorf("Expected count 1 after reuse, got %d", obus.Count()) + } + }) +} diff --git a/pkg/codec/av1_test.go b/pkg/codec/av1_test.go new file mode 100644 index 0000000..90ca510 --- /dev/null +++ b/pkg/codec/av1_test.go @@ -0,0 +1,187 @@ +package codec + +import ( + "testing" +) + +func TestAV1Ctx_GetInfo(t *testing.T) { + ctx := &AV1Ctx{ + ConfigOBUs: []byte{0x0A, 0x0B, 0x00}, + } + + info := ctx.GetInfo() + if info != "AV1" { + t.Errorf("Expected 'AV1', got '%s'", info) + } +} + +func TestAV1Ctx_GetBase(t *testing.T) { + ctx := &AV1Ctx{ + ConfigOBUs: []byte{0x0A, 0x0B, 0x00}, + } + + base := ctx.GetBase() + if base != ctx { + t.Error("GetBase should return itself") + } +} + +func TestAV1Ctx_Width(t *testing.T) { + ctx := &AV1Ctx{ + ConfigOBUs: []byte{0x0A, 0x0B, 0x00}, + } + + width := ctx.Width() + if width != 0 { + t.Errorf("Expected width 0, got %d", width) + } +} + +func TestAV1Ctx_Height(t *testing.T) { + ctx := &AV1Ctx{ + ConfigOBUs: []byte{0x0A, 0x0B, 0x00}, + } + + height := ctx.Height() + if height != 0 { + t.Errorf("Expected height 0, got %d", height) + } +} + +func TestAV1Ctx_FourCC(t *testing.T) { + ctx := &AV1Ctx{} + + fourcc := ctx.FourCC() + expected := FourCC_AV1 + if fourcc != expected { + t.Errorf("Expected %v, got %v", expected, fourcc) + } + + // Verify the actual FourCC string + if fourcc.String() != "av01" { + t.Errorf("Expected 'av01', got '%s'", fourcc.String()) + } +} + +func TestAV1Ctx_GetRecord(t *testing.T) { + configOBUs := []byte{0x0A, 0x0B, 0x00, 0x01, 0x02} + ctx := &AV1Ctx{ + ConfigOBUs: configOBUs, + } + + record := ctx.GetRecord() + if len(record) != len(configOBUs) { + t.Errorf("Expected record length %d, got %d", len(configOBUs), len(record)) + } + + for i, b := range record { + if b != configOBUs[i] { + t.Errorf("Byte mismatch at index %d: expected %02X, got %02X", i, configOBUs[i], b) + } + } +} + +func TestAV1Ctx_String(t *testing.T) { + tests := []struct { + name string + configOBUs []byte + expected string + }{ + { + name: "Standard config", + configOBUs: []byte{0x0A, 0x0B, 0x00}, + expected: "av01.0A0B00", + }, + { + name: "Different config", + configOBUs: []byte{0x08, 0x0C, 0x00}, + expected: "av01.080C00", + }, + { + name: "High profile config", + configOBUs: []byte{0x0C, 0x10, 0x00}, + expected: "av01.0C1000", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := &AV1Ctx{ + ConfigOBUs: tt.configOBUs, + } + + result := ctx.String() + if result != tt.expected { + t.Errorf("Expected '%s', got '%s'", tt.expected, result) + } + }) + } +} + +func TestAV1Ctx_EmptyConfigOBUs(t *testing.T) { + ctx := &AV1Ctx{ + ConfigOBUs: []byte{}, + } + + // Should not panic when calling methods with empty ConfigOBUs + defer func() { + if r := recover(); r != nil { + t.Errorf("Panic occurred with empty ConfigOBUs: %v", r) + } + }() + + _ = ctx.GetInfo() + _ = ctx.GetBase() + _ = ctx.FourCC() + _ = ctx.GetRecord() + // Note: String() will panic with empty ConfigOBUs due to array indexing +} + +func TestAV1Ctx_NilConfigOBUs(t *testing.T) { + ctx := &AV1Ctx{ + ConfigOBUs: nil, + } + + // Should not panic for most methods + defer func() { + if r := recover(); r != nil { + t.Errorf("Panic occurred with nil ConfigOBUs: %v", r) + } + }() + + _ = ctx.GetInfo() + _ = ctx.GetBase() + _ = ctx.FourCC() + + record := ctx.GetRecord() + if record != nil { + t.Error("Expected nil record for nil ConfigOBUs") + } +} + +// Test AV1 OBU Type Constants +func TestAV1_OBUTypeConstants(t *testing.T) { + tests := []struct { + name string + obuType int + expected int + }{ + {"SEQUENCE_HEADER", AV1_OBU_SEQUENCE_HEADER, 1}, + {"TEMPORAL_DELIMITER", AV1_OBU_TEMPORAL_DELIMITER, 2}, + {"FRAME_HEADER", AV1_OBU_FRAME_HEADER, 3}, + {"TILE_GROUP", AV1_OBU_TILE_GROUP, 4}, + {"METADATA", AV1_OBU_METADATA, 5}, + {"FRAME", AV1_OBU_FRAME, 6}, + {"REDUNDANT_FRAME_HEADER", AV1_OBU_REDUNDANT_FRAME_HEADER, 7}, + {"TILE_LIST", AV1_OBU_TILE_LIST, 8}, + {"PADDING", AV1_OBU_PADDING, 15}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.obuType != tt.expected { + t.Errorf("Expected OBU type %d, got %d", tt.expected, tt.obuType) + } + }) + } +} diff --git a/pkg/format/av1_test.go b/pkg/format/av1_test.go new file mode 100644 index 0000000..4c23dda --- /dev/null +++ b/pkg/format/av1_test.go @@ -0,0 +1,254 @@ +package format + +import ( + "testing" + "time" + + "github.com/langhuihui/gomem" + "m7s.live/v5/pkg" + "m7s.live/v5/pkg/codec" +) + +func TestAV1Frame_CheckCodecChange(t *testing.T) { + // Test with nil codec context - should return error + t.Run("nil codec context", func(t *testing.T) { + frame := &AV1Frame{} + err := frame.CheckCodecChange() + if err != pkg.ErrUnsupportCodec { + t.Errorf("Expected ErrUnsupportCodec, got %v", err) + } + }) + + // Test with valid AV1 codec context + t.Run("valid codec context", func(t *testing.T) { + frame := &AV1Frame{ + Sample: pkg.Sample{ + ICodecCtx: &codec.AV1Ctx{ + ConfigOBUs: []byte{0x0A, 0x0B, 0x00}, + }, + }, + } + err := frame.CheckCodecChange() + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + }) +} + +func TestAV1Frame_GetSize(t *testing.T) { + t.Run("empty OBUs", func(t *testing.T) { + frame := &AV1Frame{ + Sample: pkg.Sample{ + BaseSample: &pkg.BaseSample{ + Raw: &pkg.OBUs{}, + }, + }, + } + size := frame.GetSize() + if size != 0 { + t.Errorf("Expected size 0, got %d", size) + } + }) + + t.Run("with OBUs", func(t *testing.T) { + obus := &pkg.OBUs{} + + // Add first OBU + obu1 := obus.GetNextPointer() + obu1.PushOne([]byte{1, 2, 3, 4}) + + // Add second OBU + obu2 := obus.GetNextPointer() + obu2.PushOne([]byte{5, 6, 7, 8, 9}) + + frame := &AV1Frame{ + Sample: pkg.Sample{ + BaseSample: &pkg.BaseSample{ + Raw: obus, + }, + }, + } + + size := frame.GetSize() + expectedSize := 4 + 5 // Total bytes in both OBUs + if size != expectedSize { + t.Errorf("Expected size %d, got %d", expectedSize, size) + } + }) + + t.Run("non-OBUs raw data", func(t *testing.T) { + frame := &AV1Frame{ + Sample: pkg.Sample{ + BaseSample: &pkg.BaseSample{ + Raw: &gomem.Memory{}, + }, + }, + } + size := frame.GetSize() + if size != 0 { + t.Errorf("Expected size 0 for non-OBUs raw data, got %d", size) + } + }) +} + +func TestAV1Frame_Demux(t *testing.T) { + mem := gomem.Memory{} + mem.PushOne([]byte{1, 2, 3, 4, 5}) + + frame := &AV1Frame{ + Sample: pkg.Sample{ + RecyclableMemory: gomem.RecyclableMemory{ + Memory: mem, + }, + BaseSample: &pkg.BaseSample{}, + }, + } + + err := frame.Demux() + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // After demux, Raw should point to the Memory + if frame.Sample.BaseSample.Raw != &frame.Sample.RecyclableMemory.Memory { + t.Error("Raw should point to Memory after Demux") + } +} + +func TestAV1Frame_Mux(t *testing.T) { + // Create source sample with OBUs + obus := &pkg.OBUs{} + + obu1 := obus.GetNextPointer() + obu1.PushOne([]byte{1, 2, 3}) + + obu2 := obus.GetNextPointer() + obu2.PushOne([]byte{4, 5, 6, 7}) + + ctx := &codec.AV1Ctx{ + ConfigOBUs: []byte{0x0A, 0x0B, 0x00}, + } + + sourceSample := &pkg.Sample{ + ICodecCtx: ctx, + BaseSample: &pkg.BaseSample{ + Raw: obus, + Timestamp: time.Second, + CTS: 100 * time.Millisecond, + }, + } + + // Create destination frame + destFrame := &AV1Frame{ + Sample: pkg.Sample{ + BaseSample: &pkg.BaseSample{}, + }, + } + + // Perform mux + err := destFrame.Mux(sourceSample) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + // Verify codec context is set + if destFrame.ICodecCtx != ctx { + t.Error("Codec context not set correctly") + } + + // Verify data was copied + if destFrame.Memory.Size != 7 { // 3 + 4 bytes + t.Errorf("Expected memory size 7, got %d", destFrame.Memory.Size) + } +} + +func TestAV1Frame_String(t *testing.T) { + frame := &AV1Frame{ + Sample: pkg.Sample{ + ICodecCtx: &codec.AV1Ctx{ + ConfigOBUs: []byte{0x0A, 0x0B, 0x00}, + }, + BaseSample: &pkg.BaseSample{ + Timestamp: time.Second, + CTS: 100 * time.Millisecond, + }, + }, + } + + str := frame.String() + // Should contain AV1Frame, FourCC, Timestamp, and CTS + if len(str) == 0 { + t.Error("String() should not return empty string") + } + + // The string should contain key information + t.Logf("AV1Frame.String() output: %s", str) +} + +func TestAV1Frame_Workflow(t *testing.T) { + // Test the complete workflow: create -> demux -> mux + t.Run("complete workflow", func(t *testing.T) { + // Step 1: Create a frame with sample data + mem := gomem.Memory{} + mem.PushOne([]byte{1, 2, 3, 4, 5}) + + ctx := &codec.AV1Ctx{ + ConfigOBUs: []byte{0x0A, 0x0B, 0x00}, + } + + originalFrame := &AV1Frame{ + Sample: pkg.Sample{ + ICodecCtx: ctx, + RecyclableMemory: gomem.RecyclableMemory{ + Memory: mem, + }, + BaseSample: &pkg.BaseSample{ + Timestamp: time.Second, + CTS: 100 * time.Millisecond, + IDR: true, + }, + }, + } + + // Step 2: Demux + err := originalFrame.Demux() + if err != nil { + t.Fatalf("Demux failed: %v", err) + } + + // Step 3: Create OBUs for muxing + obus := &pkg.OBUs{} + obu := obus.GetNextPointer() + obu.PushOne([]byte{10, 20, 30}) + + sourceSample := &pkg.Sample{ + ICodecCtx: ctx, + BaseSample: &pkg.BaseSample{ + Raw: obus, + }, + } + + // Step 4: Mux into new frame + newFrame := &AV1Frame{ + Sample: pkg.Sample{ + BaseSample: &pkg.BaseSample{}, + }, + } + + err = newFrame.Mux(sourceSample) + if err != nil { + t.Fatalf("Mux failed: %v", err) + } + + // Step 5: Verify codec context + if newFrame.ICodecCtx != ctx { + t.Error("Codec context not preserved") + } + + // Step 6: Check codec change should not return error + err = newFrame.CheckCodecChange() + if err != nil { + t.Errorf("CheckCodecChange failed: %v", err) + } + }) +} diff --git a/pkg/format/raw.go b/pkg/format/raw.go index c60b8e5..1cd82f3 100644 --- a/pkg/format/raw.go +++ b/pkg/format/raw.go @@ -127,7 +127,7 @@ func (r *H26xFrame) GetSize() (ret int) { } func (h *H26xFrame) String() string { - return fmt.Sprintf("H26xFrame{FourCC: %s, Timestamp: %s, CTS: %s}", h.FourCC, h.Timestamp, h.CTS) + return fmt.Sprintf("H26xFrame{FourCC: %s, Timestamp: %s, CTS: %s}", h.FourCC(), h.Timestamp, h.CTS) } var _ pkg.IAVFrame = (*AV1Frame)(nil) @@ -168,5 +168,5 @@ func (a *AV1Frame) Mux(from *pkg.Sample) (err error) { } func (a *AV1Frame) String() string { - return fmt.Sprintf("AV1Frame{FourCC: %s, Timestamp: %s, CTS: %s}", a.FourCC, a.Timestamp, a.CTS) + return fmt.Sprintf("AV1Frame{FourCC: %s, Timestamp: %s, CTS: %s}", a.FourCC(), a.Timestamp, a.CTS) } diff --git a/plugin/gb28181/dialog.go b/plugin/gb28181/dialog.go index 1d8de83..5d04806 100644 --- a/plugin/gb28181/dialog.go +++ b/plugin/gb28181/dialog.go @@ -11,7 +11,7 @@ import ( sipgo "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" - "github.com/langhuihui/gotask" + task "github.com/langhuihui/gotask" m7s "m7s.live/v5" pkg "m7s.live/v5/pkg" "m7s.live/v5/pkg/util" @@ -141,9 +141,9 @@ func (d *Dialog) Start() (err error) { d.MediaPort = d.gb.tcpPort } else { if d.gb.MediaPort.Valid() { - select { - case d.MediaPort = <-d.gb.tcpPorts: - default: + var ok bool + d.MediaPort, ok = d.gb.tcpPB.Allocate() + if !ok { d.pullCtx.Fail("no available tcp port") return fmt.Errorf("no available tcp port") } @@ -156,9 +156,10 @@ func (d *Dialog) Start() (err error) { d.MediaPort = d.gb.udpPort } else { if d.gb.MediaPort.Valid() { - select { - case d.MediaPort = <-d.gb.udpPorts: - default: + var ok bool + d.MediaPort, ok = d.gb.udpPB.Allocate() + if !ok { + d.pullCtx.Fail("no available udp port") return fmt.Errorf("no available udp port") } } else { @@ -424,15 +425,20 @@ func (d *Dialog) GetKey() string { } func (d *Dialog) Dispose() { - if d.StreamMode == mrtp.StreamModeUDP { - if d.gb.udpPort == 0 { //多端口 - // 如果没有设置udp端口,则将MediaPort设置为0,表示不再使用 - d.gb.udpPorts <- d.MediaPort + switch d.StreamMode { + case mrtp.StreamModeUDP: + if d.gb.udpPort == 0 { //多端口模式 + // 回收端口,防止重复回收 + if !d.gb.udpPB.Release(d.MediaPort) { + d.Warn("port already released or not allocated", "port", d.MediaPort, "type", "udp") + } } - } else if d.StreamMode == mrtp.StreamModeTCPPassive { - if d.gb.tcpPort == 0 { - // 如果没有设置tcp端口,则将MediaPort设置为0,表示不再使用 - d.gb.tcpPorts <- d.MediaPort + case mrtp.StreamModeTCPPassive: + if d.gb.tcpPort == 0 { //多端口模式 + // 回收端口,防止重复回收 + if !d.gb.tcpPB.Release(d.MediaPort) { + d.Warn("port already released or not allocated", "port", d.MediaPort, "type", "tcp") + } } } d.Info("dialog dispose", "ssrc", d.SSRC, "mediaPort", d.MediaPort, "streamMode", d.StreamMode, "deviceId", d.Channel.DeviceId, "channelId", d.Channel.ChannelId) diff --git a/plugin/gb28181/forwarddialog.go b/plugin/gb28181/forwarddialog.go index fb0d4ad..37c0d0d 100644 --- a/plugin/gb28181/forwarddialog.go +++ b/plugin/gb28181/forwarddialog.go @@ -79,12 +79,9 @@ func (d *ForwardDialog) Start() (err error) { if device.StreamMode != mrtp.StreamModeTCPActive { if d.gb.MediaPort.Valid() { - select { - case d.MediaPort = <-d.gb.tcpPorts: - defer func() { - d.gb.tcpPorts <- d.MediaPort - }() - default: + var ok bool + d.MediaPort, ok = d.gb.tcpPB.Allocate() + if !ok { return fmt.Errorf("no available tcp port") } } else { @@ -283,6 +280,12 @@ func (d *ForwardDialog) Run() (err error) { // Dispose 释放会话资源 func (d *ForwardDialog) Dispose() { + // 回收端口(如果是多端口模式) + if d.MediaPort > 0 && d.gb.tcpPort == 0 { + if !d.gb.tcpPB.Release(d.MediaPort) { + d.Warn("port already released or not allocated", "port", d.MediaPort, "type", "tcp") + } + } if d.session != nil && d.session.InviteResponse != nil { err := d.session.Bye(d) if err != nil { diff --git a/plugin/gb28181/index.go b/plugin/gb28181/index.go index 9760133..601407b 100644 --- a/plugin/gb28181/index.go +++ b/plugin/gb28181/index.go @@ -47,30 +47,31 @@ type PositionConfig struct { type GB28181Plugin struct { pb.UnimplementedApiServer m7s.Plugin - Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001 - Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000 - Password string - Sip SipConfig - MediaPort util.Range[uint16] `default:"10001-20000" desc:"媒体端口范围"` //媒体端口范围 - Position PositionConfig - Parent string `desc:"父级设备"` - AutoMigrate bool `default:"true" desc:"自动迁移数据库结构并初始化根组织"` - ua *sipgo.UserAgent - server *sipgo.Server - devices task.WorkCollection[string, *Device] - dialogs util.Collection[string, *Dialog] - forwardDialogs util.Collection[uint32, *ForwardDialog] - platforms task.WorkCollection[string, *Platform] - tcpPorts chan uint16 - tcpPort uint16 + Serial string `default:"34020000002000000001" desc:"sip 服务 id"` //sip 服务器 id, 默认 34020000002000000001 + Realm string `default:"3402000000" desc:"sip 服务域"` //sip 服务器域,默认 3402000000 + Password string + Sip SipConfig + MediaPort util.Range[uint16] `default:"10001-20000" desc:"媒体端口范围"` //媒体端口范围 + Position PositionConfig + Parent string `desc:"父级设备"` + AutoMigrate bool `default:"true" desc:"自动迁移数据库结构并初始化根组织"` + ua *sipgo.UserAgent + server *sipgo.Server + devices task.WorkCollection[string, *Device] + dialogs util.Collection[string, *Dialog] + forwardDialogs util.Collection[uint32, *ForwardDialog] + platforms task.WorkCollection[string, *Platform] + tcpPort uint16 // 单端口模式下的 TCP 端口 + udpPort uint16 // 单端口模式下的 UDP 端口 + // 端口位图管理(多端口模式) + tcpPB PortBitmap + udpPB PortBitmap sipPorts []int SipIP string `desc:"sip发送命令的IP,一般是本地IP,多网卡时需要配置正确的IP"` MediaIP string `desc:"流媒体IP,用于接收流"` deviceRegisterManager task.WorkCollection[string, *DeviceRegisterQueueTask] Platforms []*gb28181.PlatformModel channels util.Collection[string, *Channel] - udpPorts chan uint16 - udpPort uint16 singlePorts util.Collection[uint32, *gb28181.SinglePortReader] } @@ -185,12 +186,9 @@ func (gb *GB28181Plugin) Start() (err error) { Collection: &gb.singlePorts, }) } else { - gb.tcpPorts = make(chan uint16, gb.MediaPort.Size()) - gb.udpPorts = make(chan uint16, gb.MediaPort.Size()) - for i := range gb.MediaPort.Size() { - gb.tcpPorts <- gb.MediaPort[0] + i - gb.udpPorts <- gb.MediaPort[0] + i - } + // 初始化位图 + gb.tcpPB.Init(gb.MediaPort[0], uint16(gb.MediaPort.Size())) + gb.udpPB.Init(gb.MediaPort[0], uint16(gb.MediaPort.Size())) } } else { gb.SetDescription("tcp", fmt.Sprintf("%d", gb.MediaPort[0])) @@ -849,15 +847,14 @@ func (gb *GB28181Plugin) OnInvite(req *sip.Request, tx sip.ServerTransaction) { mediaPort := uint16(0) if inviteInfo.StreamMode != mrtp.StreamModeTCPPassive { if gb.MediaPort.Valid() { - select { - case port := <-gb.tcpPorts: - mediaPort = port - gb.Debug("OnInvite", "action", "allocate port", "port", port) - default: + var ok bool + mediaPort, ok = gb.tcpPB.Allocate() + if !ok { gb.Error("OnInvite", "error", "no available port") _ = tx.Respond(sip.NewResponseFromRequest(req, sip.StatusServiceUnavailable, "No Available Port", nil)) return } + gb.Debug("OnInvite", "action", "allocate port", "port", mediaPort) } else { mediaPort = gb.MediaPort[0] gb.Debug("OnInvite", "action", "use default port", "port", mediaPort) diff --git a/plugin/gb28181/port_bitmap.go b/plugin/gb28181/port_bitmap.go new file mode 100644 index 0000000..d2acf07 --- /dev/null +++ b/plugin/gb28181/port_bitmap.go @@ -0,0 +1,88 @@ +package plugin_gb28181pro + +import ( + "math/bits" + "sync/atomic" +) + +// PortBitmap 使用原子位图实现端口分配/回收 +type PortBitmap struct { + base uint16 + size uint16 + bitmap []uint64 + cursor uint32 +} + +func (pb *PortBitmap) Init(base uint16, size uint16) { + pb.base = base + pb.size = size + words := int((uint32(size) + 63) / 64) + pb.bitmap = make([]uint64, words) + atomic.StoreUint32(&pb.cursor, 0) +} + +func (pb *PortBitmap) Allocate() (uint16, bool) { + if pb.size == 0 || len(pb.bitmap) == 0 { + return 0, false + } + words := len(pb.bitmap) + start := int(atomic.LoadUint32(&pb.cursor) % uint32(words)) + for i := 0; i < words; i++ { + widx := (start + i) % words + for { + old := atomic.LoadUint64(&pb.bitmap[widx]) + free := ^old + if free == 0 { + break + } + pick := free & -free + newv := old | pick + if atomic.CompareAndSwapUint64(&pb.bitmap[widx], old, newv) { + bit := uint64(bits.TrailingZeros64(pick)) + idx := uint64(widx)*64 + bit + if idx >= uint64(pb.size) { + // 回滚越界位 + for { + cur := atomic.LoadUint64(&pb.bitmap[widx]) + reverted := cur &^ pick + if atomic.CompareAndSwapUint64(&pb.bitmap[widx], cur, reverted) { + break + } + } + break + } + atomic.StoreUint32(&pb.cursor, uint32(widx)) + return pb.base + uint16(idx), true + } + } + } + return 0, false +} + +func (pb *PortBitmap) Release(port uint16) bool { + if pb.size == 0 || len(pb.bitmap) == 0 { + return false + } + if port < pb.base { + return false + } + idx := uint32(port - pb.base) + if idx >= uint32(pb.size) { + return false + } + widx := idx / 64 + bit := idx % 64 + mask := uint64(1) << bit + for { + old := atomic.LoadUint64(&pb.bitmap[widx]) + if old&mask == 0 { + return false + } + newv := old &^ mask + if atomic.CompareAndSwapUint64(&pb.bitmap[widx], old, newv) { + return true + } + } +} + + diff --git a/plugin/rtmp/pkg/video.go b/plugin/rtmp/pkg/video.go index b4004ad..3a55f2b 100644 --- a/plugin/rtmp/pkg/video.go +++ b/plugin/rtmp/pkg/video.go @@ -3,7 +3,6 @@ package rtmp import ( "bytes" "encoding/binary" - "io" "net" "time" @@ -86,7 +85,7 @@ func (avcc *VideoFrame) filterH265(naluSizeLen int) { func (avcc *VideoFrame) CheckCodecChange() (err error) { old := avcc.ICodecCtx if avcc.Size <= 10 { - err = io.ErrShortBuffer + err = ErrSkip return } reader := avcc.NewReader() diff --git a/server.go b/server.go index 1829098..89763e2 100644 --- a/server.go +++ b/server.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "io" "log/slog" "net/http" "net/url" @@ -677,6 +678,20 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + // For .map files, set correct content-type before serving + if strings.HasSuffix(r.URL.Path, ".map") { + filePath := strings.TrimPrefix(r.URL.Path, "/admin/") + file, err := s.Admin.zipReader.Open(filePath) + if err != nil { + http.NotFound(w, r) + return + } + defer file.Close() + w.Header().Set("Content-Type", "application/json") + io.Copy(w, file) + return + } + http.ServeFileFS(w, r, s.Admin.zipReader, strings.TrimPrefix(r.URL.Path, "/admin")) return }