插件开发

插件的定义

所谓的插件,没有什么固定的规则,只需要完成安装操作即可。插件可以实现任意的功能扩展,最常见的是实现某种传输协议用来推流或者拉流

插件的安装

下面是内置插件jessica的源码,代表了典型的插件安装

package jessica

import (
	. "github.com/langhuihui/monibuca/monica"
	"log"
	"net/http"
)

var config = new(ListenerConfig)

func init() {
	InstallPlugin(&PluginConfig{
		Name:   "Jessica",
		Type:   PLUGIN_SUBSCRIBER,
		Config: config,
		Run:    run,
	})
}
func run() {
	log.Printf("server Jessica start at %s", config.ListenAddr)
	log.Fatal(http.ListenAndServe(config.ListenAddr, http.HandlerFunc(WsHandler)))
}

当主程序读取配置文件完成解析后,会调用各个插件的Run函数,上面代码中执行了一个http的端口监听

开发订阅者插件

所谓订阅者就是用来从流媒体服务器接收音视频流的程序,例如RTMP协议执行play命令后、http-flv请求响应程序、websocket响应程序。内置插件中录制flv程序也是一个特殊的订阅者。 下面是http-flv插件的源码,供参考

package HDL

import (
	. "github.com/langhuihui/monibuca/monica"
	"github.com/langhuihui/monibuca/monica/avformat"
	"github.com/langhuihui/monibuca/monica/pool"
	"log"
	"net/http"
	"strings"
)

var config = new(ListenerConfig)

func init() {
	InstallPlugin(&PluginConfig{
		Name:   "HDL",
		Type:   PLUGIN_SUBSCRIBER,
		Config: config,
		Run:    run,
	})
}

func run() {
	log.Printf("HDL start at %s", config.ListenAddr)
	log.Fatal(http.ListenAndServe(config.ListenAddr, http.HandlerFunc(HDLHandler)))
}

func HDLHandler(w http.ResponseWriter, r *http.Request) {
	sign := r.URL.Query().Get("sign")
	if err := AuthHooks.Trigger(sign); err != nil {
		w.WriteHeader(403)
		return
	}
	stringPath := strings.TrimLeft(r.RequestURI, "/")
	if strings.HasSuffix(stringPath, ".flv") {
		stringPath = strings.TrimRight(stringPath, ".flv")
	}
	if _, ok := AllRoom.Load(stringPath); ok {
		//atomic.AddInt32(&hdlId, 1)
		w.Header().Set("Transfer-Encoding", "chunked")
		w.Header().Set("Content-Type", "video/x-flv")
		w.Write(avformat.FLVHeader)
		p := OutputStream{
			Sign: sign,
			SendHandler: func(packet *pool.SendPacket) error {
				return avformat.WriteFLVTag(w, packet)
			},
			SubscriberInfo: SubscriberInfo{
				ID: r.RemoteAddr, Type: "FLV",
			},
		}
		p.Play(stringPath)
	} else {
		w.WriteHeader(404)
	}
}

其中,核心逻辑就是创建OutputStream对象,每一个订阅者需要提供SendHandler函数,用来接收来自发布者广播出来的音视频数据。 最后调用该对象的Play函数进行播放。请注意:Play函数会阻塞当前goroutine。

开发发布者插件

所谓发布者,就是提供音视频数据的程序,例如接收来自OBS、ffmpeg的推流的程序。内置插件中,集群功能里面有一个特殊的发布者,它接收来自源服务器的音视频数据,然后在本服务器中广播音视频。 以此为例,我们需要提供一个结构体定义来表示特定的发布者:

type Receiver struct {
	InputStream
	io.Reader
	*bufio.Writer
}

其中InputStream 是固定的,必须包含,且必须以组合继承的方式定义。其余的成员则是任意的。 发布者的发布动作需要特定条件的触发,例如在集群插件中,当本服务器有订阅者订阅了某个流,而该流并没有发布者的时候就会触发向源服务器拉流的函数:

func PullUpStream(streamPath string) {
	addr, err := net.ResolveTCPAddr("tcp", config.Master)
	if MayBeError(err) {
		return
	}
	conn, err := net.DialTCP("tcp", nil, addr)
	if MayBeError(err) {
		return
	}
	brw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
	p := &Receiver{
		Reader: conn,
		Writer: brw.Writer,
	}
	if p.Publish(streamPath, p) {
		p.WriteByte(MSG_SUBSCRIBE)
		p.WriteString(streamPath)
		p.WriteByte(0)
		p.Flush()
		for _, v := range p.Subscribers {
			p.Auth(v)
		}
	} else {
		return
	}
	defer p.Cancel()
	for {
		cmd, err := brw.ReadByte()
		if MayBeError(err) {
			return
		}
		switch cmd {
		case MSG_AUDIO:
			if audio, err := p.readAVPacket(avformat.FLV_TAG_TYPE_AUDIO); err == nil {
				p.PushAudio(audio)
			}
		case MSG_VIDEO:
			if video, err := p.readAVPacket(avformat.FLV_TAG_TYPE_VIDEO); err == nil && len(video.Payload) > 2 {
				tmp := video.Payload[0]         // 第一个字节保存着视频的相关信息.
				video.VideoFrameType = tmp >> 4 // 帧类型 4Bit, H264一般为1或者2
				p.PushVideo(video)
			}
		case MSG_AUTH:
			cmd, err = brw.ReadByte()
			if MayBeError(err) {
				return
			}
			bytes, err := brw.ReadBytes(0)
			if MayBeError(err) {
				return
			}
			subId := strings.Split(string(bytes[0:len(bytes)-1]), ",")[0]
			if v, ok := p.Subscribers[subId]; ok {
				if cmd != 1 {
					v.Cancel()
				}
			}
		}
	}
}

正在该函数中会向源服务器建立tcp连接,然后发送特定命令表示需要拉流,当我们接收到源服务器的数据的时候,就调用PushVideo和PushAudio函数来广播音视频。

核心逻辑是调用InputStream的Publish以及PushVideo、PushAudio函数

开发钩子插件

钩子插件就是在服务器的关键逻辑处插入的函数调用,方便扩展服务器的功能,比如对连接进行验证,或者触发一些特殊的发布者。 目前提供的钩子包括

  • 当发布者开始发布时 OnPublishHooks.AddHook(onPublish) 例如:
func onPublish(r *Room) {
	for _, v := range r.Subscribers {
		if err := CheckSign(v.Sign); err != nil {
			v.Cancel()
		}
	}
}

此时可以访问房间里面的订阅者,对其进行验证。

  • 当有订阅者订阅了某个流时,OnSubscribeHooks.AddHook(onSubscribe) 例如:
func onSubscribe(s *OutputStream) {
	if s.Publisher == nil {
		go PullUpStream(s.StreamPath)
	}
}

拉取源服务器的流