From 2373e4accb28f914b83df28d76f77bb51e909b9c Mon Sep 17 00:00:00 2001 From: yangjiechina <1534796060@qq.com> Date: Thu, 8 Aug 2024 23:37:28 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=9B=9E=E6=94=BEseek?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api.go | 30 +++++++++- device.go | 4 +- log.go | 81 ------------------------- logrus.go | 148 ---------------------------------------------- record.go | 7 +++ sip_server.go | 2 +- stream.go | 25 ++++++-- stream_manager.go | 6 +- 8 files changed, 59 insertions(+), 244 deletions(-) delete mode 100644 logrus.go diff --git a/api.go b/api.go index 9270c26..b2de620 100644 --- a/api.go +++ b/api.go @@ -151,7 +151,7 @@ func (api *ApiServer) OnPlay(streamId, protocol string, w http.ResponseWriter, r return } - stream = &Stream{Id: streamId, Protocol: "28181", ByeRequest: nil} + stream = &Stream{Id: streamId, Protocol: "28181", DialogRequest: nil} if err := StreamManager.Add(stream); err != nil { w.WriteHeader(http.StatusOK) return @@ -243,7 +243,7 @@ func (api *ApiServer) OnPlay(streamId, protocol string, w http.ResponseWriter, r Sugar.Errorf("send ack error %s %s", err.Error(), ackRequest.String()) } else { inviteOk = true - bye = device.CreateByeRequestFromAnswer(res, false) + bye = device.CreateDialogRequestFromAnswer(res, false) } } else if res.StatusCode() > 299 { cancel() @@ -275,7 +275,7 @@ func (api *ApiServer) OnPlay(streamId, protocol string, w http.ResponseWriter, r } if stream.waitPublishStream() { - stream.ByeRequest = bye + stream.DialogRequest = bye w.WriteHeader(http.StatusOK) } else { SipUA.SendRequest(bye) @@ -423,6 +423,30 @@ func (api *ApiServer) OnSubscribePosition(w http.ResponseWriter, r *http.Request } func (api *ApiServer) OnSeekPlayback(w http.ResponseWriter, r *http.Request) { + v := struct { + StreamId string `json:"stream_id"` + Seconds int `json:"seconds"` + }{} + + if err := HttpDecodeJSONBody(w, r, &v); err != nil { + httpResponse2(w, err) + return + } + + stream := StreamManager.Find(v.StreamId) + if stream == nil || stream.DialogRequest == nil { + return + } + + seekRequest := stream.CreateRequestFromDialog(sip.INFO) + seq, _ := seekRequest.CSeq() + body := fmt.Sprintf(SeekBodyFormat, seq.SeqNo, v.Seconds) + seekRequest.SetBody(body, true) + seekRequest.RemoveHeader(RtspMessageType.Name()) + seekRequest.AppendHeader(&RtspMessageType) + + SipUA.SendRequest(seekRequest) + w.WriteHeader(http.StatusOK) } func (api *ApiServer) OnPTZControl(w http.ResponseWriter, r *http.Request) { diff --git a/device.go b/device.go index f362ae5..bad55ac 100644 --- a/device.go +++ b/device.go @@ -185,9 +185,9 @@ func (d *DBDevice) BuildDownloadRequest(channelId, ip string, port uint16, start return d.BuildInviteRequest("Download", channelId, ip, port, startTime, stopTime, setup, speed, ssrc) } -// CreateByeRequestFromAnswer 根据invite的应答创建Bye请求 +// CreateDialogRequestFromAnswer 根据invite的应答创建Dialog请求 // 应答的to头域需携带tag -func (d *DBDevice) CreateByeRequestFromAnswer(message sip.Response, uas bool) sip.Request { +func (d *DBDevice) CreateDialogRequestFromAnswer(message sip.Response, uas bool) sip.Request { from, _ := message.From() to, _ := message.To() id, _ := message.CallID() diff --git a/log.go b/log.go index abe55f7..d715472 100644 --- a/log.go +++ b/log.go @@ -1,12 +1,10 @@ package main import ( - "fmt" "github.com/natefinch/lumberjack" "go.uber.org/zap" "go.uber.org/zap/zapcore" "os" - "strings" ) var ( @@ -54,82 +52,3 @@ func getLogWriter(name string, maxSize, maxBackup, maxAge int, compress bool) za } return zapcore.AddSync(lumberJackLogger) } - -// Logger interface used as base logger throughout the library. -type Logger interface { - Print(args ...interface{}) - Printf(format string, args ...interface{}) - - Trace(args ...interface{}) - Tracef(format string, args ...interface{}) - - Debug(args ...interface{}) - Debugf(format string, args ...interface{}) - - Info(args ...interface{}) - Infof(format string, args ...interface{}) - - Warn(args ...interface{}) - Warnf(format string, args ...interface{}) - - Error(args ...interface{}) - Errorf(format string, args ...interface{}) - - Fatal(args ...interface{}) - Fatalf(format string, args ...interface{}) - - Panic(args ...interface{}) - Panicf(format string, args ...interface{}) - - WithPrefix(prefix string) Logger - Prefix() string - - WithFields(fields Fields) Logger - Fields() Fields - - SetLevel(level Level) -} - -type Loggable interface { - Log() Logger -} - -type Fields map[string]interface{} - -func (fields Fields) String() string { - str := make([]string, 0) - - for k, v := range fields { - str = append(str, fmt.Sprintf("%s=%+v", k, v)) - } - - return strings.Join(str, " ") -} - -func (fields Fields) WithFields(newFields Fields) Fields { - allFields := make(Fields) - - for k, v := range fields { - allFields[k] = v - } - - for k, v := range newFields { - allFields[k] = v - } - - return allFields -} - -func AddFieldsFrom(logger Logger, values ...interface{}) Logger { - for _, value := range values { - switch v := value.(type) { - case Logger: - logger = logger.WithFields(v.Fields()) - case Loggable: - logger = logger.WithFields(v.Log().Fields()) - case interface{ Fields() Fields }: - logger = logger.WithFields(v.Fields()) - } - } - return logger -} diff --git a/logrus.go b/logrus.go deleted file mode 100644 index 2319f1c..0000000 --- a/logrus.go +++ /dev/null @@ -1,148 +0,0 @@ -package main - -import ( - "github.com/sirupsen/logrus" - prefixed "github.com/x-cray/logrus-prefixed-formatter" -) - -type LogrusLogger struct { - log logrus.Ext1FieldLogger - prefix string - fields Fields -} - -// Level type -type Level uint32 - -// These are the different logging levels. You can set the logging level to log -// on your instance of logger, obtained with `logrus.New()`. -const ( - // PanicLevel level, highest level of severity. Logs and then calls panic with the - // message passed to Debug, Info, ... - PanicLevel Level = iota - // FatalLevel level. Logs and then calls `logger.Exit(1)`. It will exit even if the - // logging level is set to Panic. - FatalLevel - // ErrorLevel level. Logs. Used for errors that should definitely be noted. - // Commonly used for hooks to send errors to an error tracking service. - ErrorLevel - // WarnLevel level. Non-critical entries that deserve eyes. - WarnLevel - // InfoLevel level. General operational entries about what's going on inside the - // application. - InfoLevel - // DebugLevel level. Usually only enabled when debugging. Very verbose logging. - DebugLevel - // TraceLevel level. Designates finer-grained informational events than the Debug. - TraceLevel -) - -func NewLogrusLogger(logrus logrus.Ext1FieldLogger, prefix string, fields Fields) *LogrusLogger { - return &LogrusLogger{ - log: logrus, - prefix: prefix, - fields: fields, - } -} - -func NewDefaultLogrusLogger() *LogrusLogger { - logger := logrus.New() - logger.Formatter = &prefixed.TextFormatter{ - FullTimestamp: true, - TimestampFormat: "2006-01-02 15:04:05.000", - } - - return NewLogrusLogger(logger, "main", nil) -} - -func (l *LogrusLogger) Print(args ...interface{}) { - l.prepareEntry().Print(args...) -} - -func (l *LogrusLogger) Printf(format string, args ...interface{}) { - l.prepareEntry().Printf(format, args...) -} - -func (l *LogrusLogger) Trace(args ...interface{}) { - l.prepareEntry().Trace(args...) -} - -func (l *LogrusLogger) Tracef(format string, args ...interface{}) { - l.prepareEntry().Tracef(format, args...) -} - -func (l *LogrusLogger) Debug(args ...interface{}) { - l.prepareEntry().Debug(args...) -} - -func (l *LogrusLogger) Debugf(format string, args ...interface{}) { - l.prepareEntry().Debugf(format, args...) -} - -func (l *LogrusLogger) Info(args ...interface{}) { - l.prepareEntry().Info(args...) -} - -func (l *LogrusLogger) Infof(format string, args ...interface{}) { - l.prepareEntry().Infof(format, args...) -} - -func (l *LogrusLogger) Warn(args ...interface{}) { - l.prepareEntry().Warn(args...) -} - -func (l *LogrusLogger) Warnf(format string, args ...interface{}) { - l.prepareEntry().Warnf(format, args...) -} - -func (l *LogrusLogger) Error(args ...interface{}) { - l.prepareEntry().Error(args...) -} - -func (l *LogrusLogger) Errorf(format string, args ...interface{}) { - l.prepareEntry().Errorf(format, args...) -} - -func (l *LogrusLogger) Fatal(args ...interface{}) { - l.prepareEntry().Fatal(args...) -} - -func (l *LogrusLogger) Fatalf(format string, args ...interface{}) { - l.prepareEntry().Fatalf(format, args...) -} - -func (l *LogrusLogger) Panic(args ...interface{}) { - l.prepareEntry().Panic(args...) -} - -func (l *LogrusLogger) Panicf(format string, args ...interface{}) { - l.prepareEntry().Panicf(format, args...) -} - -func (l *LogrusLogger) WithPrefix(prefix string) Logger { - return NewLogrusLogger(l.log, prefix, l.Fields()) -} - -func (l *LogrusLogger) Prefix() string { - return l.prefix -} - -func (l *LogrusLogger) WithFields(fields Fields) Logger { - return NewLogrusLogger(l.log, l.Prefix(), l.Fields().WithFields(fields)) -} - -func (l *LogrusLogger) Fields() Fields { - return l.fields -} - -func (l *LogrusLogger) prepareEntry() *logrus.Entry { - return l.log. - WithFields(logrus.Fields(l.Fields())). - WithField("prefix", l.Prefix()) -} - -func (l *LogrusLogger) SetLevel(level Level) { - if ll, ok := l.log.(*logrus.Logger); ok { - ll.SetLevel(logrus.Level(level)) - } -} diff --git a/record.go b/record.go index cdbd881..b476544 100644 --- a/record.go +++ b/record.go @@ -4,6 +4,7 @@ import ( "context" "encoding/xml" "fmt" + "github.com/ghettovoice/gosip/sip" ) const ( @@ -16,6 +17,12 @@ const ( "%s\r\n" + "%s\r\n" + "\r\n" + + SeekBodyFormat = "PLAY RTSP/1.0\r\n" + "CSeq: %d\r\n" + "Range: npt=%d-\r\n" +) + +var ( + RtspMessageType sip.ContentType = "application/RTSP" ) type QueryRecordInfoResponse struct { diff --git a/sip_server.go b/sip_server.go index 573b29e..97f0443 100644 --- a/sip_server.go +++ b/sip_server.go @@ -149,7 +149,7 @@ func (s *sipServer) OnInvite(req sip.Request, tx sip.ServerTransaction) { setToTag(response, toTag) session.Successful = true - session.ByeRequest = device.CreateByeRequestFromAnswer(response, true) + session.ByeRequest = device.CreateDialogRequestFromAnswer(response, true) id, _ := req.CallID() BroadcastManager.AddSessionWithCallId(id.Value(), session) diff --git a/stream.go b/stream.go index a5ebd88..827f0f3 100644 --- a/stream.go +++ b/stream.go @@ -7,9 +7,9 @@ import ( ) type Stream struct { - Id string //推流ID - Protocol string //推流协议 - ByeRequest sip.Request + Id string //推流ID + Protocol string //推流协议 + DialogRequest sip.Request publishEvent chan byte cancelFunc func() @@ -34,8 +34,21 @@ func (s *Stream) Close(sendBye bool) { s.cancelFunc() } - if sendBye && s.ByeRequest != nil { - SipUA.SendRequest(s.ByeRequest) - s.ByeRequest = nil + if sendBye && s.DialogRequest != nil { + SipUA.SendRequest(s.CreateRequestFromDialog(sip.BYE)) + s.DialogRequest = nil } } + +func (s *Stream) CreateRequestFromDialog(method sip.RequestMethod) sip.Request { + { + seq, _ := s.DialogRequest.CSeq() + seq.SeqNo++ + seq.MethodName = method + } + + request := s.DialogRequest.Clone().(sip.Request) + request.SetMethod(method) + + return request +} diff --git a/stream_manager.go b/stream_manager.go index 4a6fed4..56b45da 100644 --- a/stream_manager.go +++ b/stream_manager.go @@ -36,7 +36,7 @@ func (s *streamManager) AddWithCallId(stream *Stream) error { s.lock.Lock() defer s.lock.Unlock() - id, _ := stream.ByeRequest.CallID() + id, _ := stream.DialogRequest.CallID() if _, ok := s.streams[id.Value()]; ok { return fmt.Errorf("the stream %s has been exist", id.Value()) } @@ -71,8 +71,8 @@ func (s *streamManager) Remove(id string) (*Stream, error) { stream, ok := s.streams[id] delete(s.streams, id) - if ok && stream.ByeRequest != nil { - callID, _ := stream.ByeRequest.CallID() + if ok && stream.DialogRequest != nil { + callID, _ := stream.DialogRequest.CallID() delete(s.callIds, callID.Value()) return stream, nil }