Merge pull request #69 from singchia/feat/example-rtmp

examples: add rtmp for realtime message proxy
This commit is contained in:
singchia
2024-06-21 16:30:35 +08:00
committed by GitHub
5 changed files with 176 additions and 0 deletions

22
examples/rtmp/Makefile Normal file
View File

@@ -0,0 +1,22 @@
PREFIX?=/usr
BINDIR?=$(PREFIX)/bin
GOHOSTOS?=$(shell go env GOHOSTOS)
GOARCH?=$(shell go env GOARCH)
.PHONY: all
all: rtmp_service rtmp_edge
.PHONY: clean
clean:
rm rtmp_service rtmp_edge
.PHONY: rtmp_service
rtmp_service: service/*.go
CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \
go build -trimpath -ldflags "-s -w" -o ./bin/rtmp_service service/*.go
.PHONY: rtmp_edge
rtmp_edge: edge/*.go
CGO_ENABLED=0 GOOS=$(GOHOSTOS) GOARCH=$(GOARCH) \
go build -trimpath -ldflags "-s -w" -o ./bin/rtmp_edge edge/*.go

View File

@@ -0,0 +1,62 @@
package main
import (
"fmt"
"io"
"net"
"sync"
"github.com/singchia/frontier/api/dataplane/v1/edge"
"github.com/spf13/pflag"
)
func main() {
network := pflag.String("network", "tcp", "network to dial")
address := pflag.String("address", "127.0.0.1:30012", "address to dial")
name := pflag.String("name", "alice", "user name to join chatroom")
listen := pflag.String("listen", "127.0.0.1:1935", "rtmp port to proxy")
pflag.Parse()
dialer := func() (net.Conn, error) {
return net.Dial(*network, *address)
}
cli, err := edge.NewNoRetryEdge(dialer, edge.OptionEdgeMeta([]byte(*name)))
if err != nil {
fmt.Println("new edge err:", err)
return
}
for {
ln, err := net.Listen("tcp", *listen)
if err != nil {
return
}
for {
netconn, err := ln.Accept()
if err != nil {
fmt.Printf("accept err: %s\n", err)
break
}
go func() {
st, err := cli.OpenStream("rtmp")
if err != nil {
fmt.Printf("open stream err: %s\n", err)
return
}
wg := new(sync.WaitGroup)
wg.Add(2)
go func() {
defer wg.Done()
io.Copy(st, netconn)
netconn.Close()
st.Close()
}()
go func() {
defer wg.Done()
io.Copy(netconn, st)
netconn.Close()
st.Close()
}()
wg.Wait()
}()
}
}
}

View File

@@ -0,0 +1,86 @@
package main
import (
"fmt"
"net"
"sync"
"github.com/singchia/frontier/api/dataplane/v1/service"
"github.com/singchia/joy4/av/avutil"
"github.com/singchia/joy4/av/pktque"
"github.com/singchia/joy4/av/pubsub"
"github.com/singchia/joy4/format"
"github.com/singchia/joy4/format/rtmp"
"github.com/spf13/pflag"
)
func init() {
format.RegisterAll()
}
func main() {
network := pflag.String("network", "tcp", "network to dial")
address := pflag.String("address", "127.0.0.1:30011", "address to dial")
pflag.Parse()
// service
dialer := func() (net.Conn, error) {
return net.Dial(*network, *address)
}
svc, err := service.NewService(dialer, service.OptionServiceName("rtmp"))
if err != nil {
fmt.Println("new service err:", err)
return
}
// rtmp service
rtmpserver := &rtmp.Server{}
l := &sync.RWMutex{}
type Channel struct {
que *pubsub.Queue
}
channels := map[string]*Channel{}
rtmpserver.HandlePlay = func(conn *rtmp.Conn) {
fmt.Println(conn.URL.Path)
l.RLock()
ch := channels[conn.URL.Path]
l.RUnlock()
if ch != nil {
cursor := ch.que.Latest()
filters := pktque.Filters{}
demuxer := &pktque.FilterDemuxer{
Filter: filters,
Demuxer: cursor,
}
avutil.CopyFile(conn, demuxer)
}
}
rtmpserver.HandlePublish = func(conn *rtmp.Conn) {
l.Lock()
ch := channels[conn.URL.Path]
if ch == nil {
ch = &Channel{}
ch.que = pubsub.NewQueue()
channels[conn.URL.Path] = ch
} else {
ch = nil
}
l.Unlock()
if ch == nil {
return
}
avutil.CopyFile(ch.que, conn)
l.Lock()
delete(channels, conn.URL.Path)
l.Unlock()
ch.que.Close()
}
rtmpserver.Serve(svc)
}

2
go.mod
View File

@@ -14,6 +14,7 @@ require (
github.com/rabbitmq/amqp091-go v1.9.0
github.com/singchia/geminio v1.1.7-rc.1
github.com/singchia/go-timer/v2 v2.2.1
github.com/singchia/joy4 v0.0.0-20240621074108-53a2b0132ec6
github.com/soheilhy/cmux v0.1.5
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.4
@@ -33,6 +34,7 @@ require (
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/nareix/joy4 v0.0.0-20200507095837-05a4ffbb5369 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/tidwall/btree v1.4.2 // indirect
github.com/tidwall/gjson v1.14.3 // indirect

4
go.sum
View File

@@ -123,6 +123,8 @@ github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-sqlite3 v1.14.17 h1:mCRHCLDUBXgpKAqIKsaAaAsrAlbkeomtRFKXh2L6YIM=
github.com/mattn/go-sqlite3 v1.14.17/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/nareix/joy4 v0.0.0-20200507095837-05a4ffbb5369 h1:Yp0zFEufLz0H7jzffb4UPXijavlyqlYeOg7dcyVUNnQ=
github.com/nareix/joy4 v0.0.0-20200507095837-05a4ffbb5369/go.mod h1:aFJ1ZwLjvHN4yEzE5Bkz8rD8/d8Vlj3UIuvz2yfET7I=
github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70=
github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
@@ -149,6 +151,8 @@ github.com/singchia/geminio v1.1.7-rc.1/go.mod h1:LkgZj4Ddja97vP7NWQk7TffFLZAosH
github.com/singchia/go-timer/v2 v2.0.3/go.mod h1:PgkEQc6io8slCUiT5rHzWKU4/P2HXHWk3WWfijZXAf4=
github.com/singchia/go-timer/v2 v2.2.1 h1:gJucmL99fkuNzGk2AfNPFpa1X3/4+aGO21KkjFAG624=
github.com/singchia/go-timer/v2 v2.2.1/go.mod h1:PgkEQc6io8slCUiT5rHzWKU4/P2HXHWk3WWfijZXAf4=
github.com/singchia/joy4 v0.0.0-20240621074108-53a2b0132ec6 h1:B9MVqDiyqKAjHmYYFNjOPYHqhml8rA1ogKs8rYTfZ00=
github.com/singchia/joy4 v0.0.0-20240621074108-53a2b0132ec6/go.mod h1:apGwjKmzM7JlKFbd/KANpq6T8Y5Ntr8Jjhq1BmKU/FA=
github.com/singchia/yafsm v1.0.1 h1:TTDSX7SBCr2YNdv/DZ76LjTer0rYwm7IPt24odNymUs=
github.com/singchia/yafsm v1.0.1/go.mod h1:fSWQl6DCzqc51DhLfwHr3gN2FhGmOEjTAQ2AOKDSBtY=
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=