开源版本,视频直播功能,删除录像相关代码

This commit is contained in:
gx_ma
2024-07-12 14:45:34 +08:00
parent 67534a1f16
commit d6cda401fd
7 changed files with 4 additions and 684 deletions

View File

@@ -1,93 +0,0 @@
package com.fastbee.sip.handler.req.message.response.cmdType;
import com.fastbee.sip.domain.SipDevice;
import com.fastbee.sip.handler.req.ReqAbstractHandler;
import com.fastbee.sip.handler.req.message.IMessageHandler;
import com.fastbee.sip.handler.req.message.response.ResponseMessageHandler;
import com.fastbee.sip.model.RecordItem;
import com.fastbee.sip.model.RecordList;
import com.fastbee.sip.server.RecordCacheManager;
import com.fastbee.sip.service.ISipCacheService;
import com.fastbee.sip.util.SipUtil;
import com.fastbee.sip.util.XmlUtil;
import lombok.extern.slf4j.Slf4j;
import org.dom4j.DocumentException;
import org.dom4j.Element;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.sip.InvalidArgumentException;
import javax.sip.RequestEvent;
import javax.sip.SipException;
import java.text.ParseException;
import java.util.Iterator;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
@Component
public class RecordInfoHandler extends ReqAbstractHandler implements InitializingBean, IMessageHandler {
@Autowired
private ResponseMessageHandler responseMessageHandler;
@Autowired
private ISipCacheService sipCacheService;
@Autowired
private RecordCacheManager recordCacheManager;
@Override
public void handlerCmdType(RequestEvent evt, SipDevice device, Element element) {
try {
// 回复200 OK
responseAck(evt);
Element rootElement = getRootElement(evt);
String deviceId = rootElement.element("DeviceID").getText();
String sn = XmlUtil.getText(rootElement, "SN");
String sumNum = XmlUtil.getText(rootElement, "SumNum");
String recordkey = deviceId + ":" + sn;
int recordnum = 0;
RecordList recordList = recordCacheManager.get(recordkey);
recordList.setDeviceID(deviceId);
Element recordListElement = rootElement.element("RecordList");
if (recordListElement == null || sumNum == null || sumNum.equals("")) {
log.info("无录像数据");
} else {
Iterator<Element> recordListIterator = recordListElement.elementIterator();
if (recordListIterator != null) {
while (recordListIterator.hasNext()) {
Element itemRecord = recordListIterator.next();
Element recordElement = itemRecord.element("DeviceID");
if (recordElement == null) {
continue;
}
RecordItem item = new RecordItem();
item.setStart(SipUtil.ISO8601Totimestamp(XmlUtil.getText(itemRecord, "StartTime")));
item.setEnd(SipUtil.ISO8601Totimestamp(XmlUtil.getText(itemRecord, "EndTime")));
recordList.addItem(item);
recordnum++;
}
}
}
log.info("getSumNum:{}", recordList.getSumNum());
if (recordList.getSumNum() + recordnum == Integer.parseInt(sumNum)) {
//时间合并 保存到redia
recordList.mergeItems();
log.info("mergeItems recordList:{}", recordList);
recordCacheManager.remove(recordkey);
sipCacheService.setRecordList(recordkey, recordList);
//发布设备property到emqx
} else {
recordList.setSumNum(recordList.getSumNum() + recordnum);
recordCacheManager.put(recordkey, recordList);
}
} catch (DocumentException | SipException | InvalidArgumentException | ParseException e) {
e.printStackTrace();
}
}
@Override
public void afterPropertiesSet() throws Exception {
String cmdType = "RecordInfo";
responseMessageHandler.addHandler(cmdType, this);
}
}

View File

@@ -1,41 +0,0 @@
package com.fastbee.sip.server;
import com.fastbee.sip.model.RecordList;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
@Component
public class RecordCacheManager {
private final ConcurrentHashMap<String, RecordList> recordMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ReentrantLock> lockMap = new ConcurrentHashMap<>();
public void put(String key,RecordList list){
recordMap.putIfAbsent(key, list);
}
public RecordList get(String key){
RecordList ret = recordMap.get(key);
if (ret == null) {
ret = new RecordList();
recordMap.putIfAbsent(key, ret);
}
return ret;
}
public void remove(String key) {
recordMap.remove(key);
lockMap.remove(key);
}
public void addlock(String key){
lockMap.put(key,new ReentrantLock());
}
public ReentrantLock getlock(String key){
return lockMap.get(key);
}
}

View File

@@ -1,30 +0,0 @@
package com.fastbee.sip.service;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.fastbee.sip.model.RecordItem;
import com.fastbee.sip.model.RecordList;
import com.fastbee.sip.model.Stream;
import org.springframework.web.bind.annotation.RequestParam;
import java.util.List;
public interface IRecordService {
RecordList listDevRecord(String deviceId, String channelId, String startTime, String endTime);
List<RecordItem> listRecord(String channelId, String sn);
JSONObject listServerRecord(String recordApi, Integer pageNum, Integer pageSize);
JSONArray listServerRecordByDate(String recordApi, Integer year, Integer month, String app, String stream);
JSONObject listServerRecordByStream(String recordApi, Integer pageNum, Integer pageSize, String app);
JSONObject listServerRecordByApp(String recordApi, Integer pageNum, Integer pageSize);
JSONObject listServerRecordByFile(String recordApi, Integer pageNum, Integer pageSize, String app, String stream, String startTime, String endTime);
JSONObject listServerRecordByDevice(Integer pageNum, Integer pageSize, String deviceId, String channelId, String startTime, String endTime);
boolean startRecord(String stream);
boolean stopRecord(String stream);
boolean isRecording(String stream);
JSONObject getMp4RecordFile(String stream,String period);
Stream download(String deviceId, String channelId,
String startTime, String endTime, int downloadSpeed);
Stream playRecord(String deviceId, String channelId);
}

View File

@@ -1,192 +0,0 @@
package com.fastbee.sip.service.impl;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.fastbee.common.core.domain.entity.SysUser;
import com.fastbee.common.core.redis.RedisCache;
import com.fastbee.common.core.redis.RedisKeyBuilder;
import com.fastbee.sip.domain.MediaServer;
import com.fastbee.sip.domain.SipDevice;
import com.fastbee.sip.model.RecordItem;
import com.fastbee.sip.model.RecordList;
import com.fastbee.sip.model.Stream;
import com.fastbee.sip.model.VideoSessionInfo;
import com.fastbee.sip.server.ISipCmd;
import com.fastbee.sip.server.MessageInvoker;
import com.fastbee.sip.server.RecordCacheManager;
import com.fastbee.sip.service.*;
import com.fastbee.sip.util.RecordApiUtils;
import com.fastbee.sip.util.SipUtil;
import com.fastbee.sip.util.ZlmApiUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import static com.fastbee.common.utils.SecurityUtils.getLoginUser;
@Slf4j
@Service
public class RecordServiceImpl implements IRecordService {
@Autowired
private MessageInvoker messageInvoker;
@Autowired
private RedisCache redisCache;
@Autowired
private RecordCacheManager recordCacheManager;
@Autowired
private ISipDeviceService sipDeviceService;
@Autowired
private IMediaServerService mediaServerService;
@Autowired
private IZmlHookService zmlHookService;
@Autowired
private ZlmApiUtils zlmApiUtils;
@Autowired
private RecordApiUtils recordApiUtils;
@Autowired
private ISipCmd sipCmd;
@Autowired
private IPlayService playService;
@Override
public RecordList listDevRecord(String deviceId, String channelId, String start, String end) {
SipDevice dev = sipDeviceService.selectSipDeviceBySipId(deviceId);
if (dev != null) {
String sn = String.valueOf((int) ((Math.random() * 9 + 1) * 100000));
String recordkey = channelId + ":" + sn;
recordCacheManager.addlock(recordkey);
messageInvoker.recordInfoQuery(dev, sn, channelId, SipUtil.timestampToDate(start), SipUtil.timestampToDate(end));
String catchkey = RedisKeyBuilder.buildSipRecordinfoCacheKey(recordkey);
return (RecordList) messageInvoker.getExecResult(catchkey, SipUtil.DEFAULT_EXEC_TIMEOUT);
}
return null;
}
@Override
public List<RecordItem> listRecord(String channelId, String sn) {
String recordkey = channelId + ":" + sn;
String catchkey = RedisKeyBuilder.buildSipRecordinfoCacheKey(recordkey);
List<RecordItem> items = redisCache.getCacheList(catchkey);
if (items.size() > 1) {
items.sort(Comparator.naturalOrder());
}
return items;
}
@Override
public JSONObject listServerRecord(String recordApi, Integer pageNum, Integer pageSize) {
return recordApiUtils.getRecordlist(recordApi, pageNum, pageSize, null).getJSONObject("data");
}
@Override
public JSONArray listServerRecordByDate(String recordApi, Integer year, Integer month, String app, String stream) {
return recordApiUtils.getRecordDatelist(recordApi, year, month, app, stream, null).getJSONArray("data");
}
@Override
public JSONObject listServerRecordByStream(String recordApi, Integer pageNum, Integer pageSize, String app) {
return recordApiUtils.getRecordStreamlist(recordApi, pageNum, pageSize, app, null).getJSONObject("data");
}
@Override
public JSONObject listServerRecordByApp(String recordApi, Integer pageNum, Integer pageSize) {
return recordApiUtils.getRecordApplist(recordApi, pageNum, pageSize, null).getJSONObject("data");
}
@Override
public JSONObject listServerRecordByFile(String recordApi, Integer pageNum, Integer pageSize, String app, String stream, String startTime, String endTime) {
return recordApiUtils.getRecordFilelist(recordApi, pageNum, pageSize, app, stream, startTime, endTime, null).getJSONObject("data");
}
@Override
public JSONObject listServerRecordByDevice(Integer pageNum, Integer pageSize, String deviceId, String channelId, String startTime, String endTime) {
String stream = "gb_play_" + deviceId + "_" + channelId;
MediaServer mediaServer = mediaServerService.selectMediaServerBydeviceSipId(deviceId);
String recordApi = "";
if (mediaServer != null && Objects.equals(mediaServer.getProtocol(), "http")) {
recordApi = "http://" + mediaServer.getIp() + ":" + mediaServer.getRecordPort();
} else if (mediaServer != null && Objects.equals(mediaServer.getProtocol(), "https")) {
recordApi = "https://" + mediaServer.getDomain() + ":" + mediaServer.getRecordPort();
}
JSONObject obj = recordApiUtils.getRecordFilelist(recordApi, pageNum, pageSize, "rtp",
stream, startTime, endTime, null);
if (obj != null) {
obj = obj.getJSONObject("data");
obj.put("recordApi", recordApi);
log.info("obj:{}", obj);
}
return obj;
}
@Override
public boolean startRecord(String stream) {
SysUser user = getLoginUser().getUser();
//缓存zlm服务器配置
MediaServer media = mediaServerService.selectMediaServerBytenantId(user.getUserId());
if (media != null) {
return zlmApiUtils.startRecord(media, "1", "live", stream).getBoolean("result");
}
return false;
}
@Override
public boolean stopRecord(String stream) {
SysUser user = getLoginUser().getUser();
MediaServer media = mediaServerService.selectMediaServerBytenantId(user.getUserId());
;
if (media != null) {
return zlmApiUtils.stopRecord(media, "1", "live", stream).getBoolean("result");
}
return false;
}
@Override
public boolean isRecording(String stream) {
SysUser user = getLoginUser().getUser();
MediaServer media = mediaServerService.selectMediaServerBytenantId(user.getUserId());
;
if (media != null) {
return zlmApiUtils.isRecording(media, "1", "live", stream).getBoolean("status");
}
return false;
}
@Override
public JSONObject getMp4RecordFile(String stream, String period) {
SysUser user = getLoginUser().getUser();
MediaServer media = mediaServerService.selectMediaServerBytenantId(user.getUserId());
if (media != null) {
return zlmApiUtils.getMp4RecordFile(media, period, "live", stream).getJSONObject("data");
}
return null;
}
@Override
public Stream download(String deviceId, String channelId, String startTime, String endTime, int downloadSpeed) {
SipDevice dev = sipDeviceService.selectSipDeviceBySipId(deviceId);
VideoSessionInfo info = sipCmd.downloadStreamCmd(dev, channelId, startTime, endTime, downloadSpeed);
return zmlHookService.updateStream(info);
}
@Override
public Stream playRecord(String deviceId, String channelId) {
return playService.play(deviceId, channelId, true);
}
}

View File

@@ -1,170 +0,0 @@
package com.fastbee.sip.util;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.springframework.stereotype.Component;
import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.net.ConnectException;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Component
public class RecordApiUtils {
public interface RequestCallback {
void run(JSONObject response);
}
private OkHttpClient getClient() {
OkHttpClient.Builder httpClientBuilder = new OkHttpClient.Builder();
return httpClientBuilder.build();
}
public JSONObject sendGet(String recoreUrl, String api, Map<String, Object> param, RequestCallback callback) {
OkHttpClient client = getClient();
StringBuilder stringBuffer = new StringBuilder();
stringBuffer.append(String.format("%s/%s", recoreUrl, api));
JSONObject responseJSON = null;
if (param != null && param.keySet().size() > 0) {
stringBuffer.append("?");
int index = 1;
for (String key : param.keySet()) {
if (param.get(key) != null) {
stringBuffer.append(key).append("=").append(param.get(key));
if (index < param.size()) {
stringBuffer.append("&");
}
}
index++;
}
}
String url = stringBuffer.toString();
Request request = new Request.Builder()
.get()
.url(url)
.build();
if (callback == null) {
try {
Response response = client.newCall(request).execute();
if (response.isSuccessful()) {
ResponseBody responseBody = response.body();
if (responseBody != null) {
String responseStr = responseBody.string();
responseJSON = JSON.parseObject(responseStr);
}
} else {
response.close();
Objects.requireNonNull(response.body()).close();
}
} catch (ConnectException e) {
log.error(String.format("连接Assist失败: %s, %s", e.getCause().getMessage(), e.getMessage()));
log.error("请检查media配置并确认Assist已启动...");
} catch (IOException e) {
log.error(String.format("[ %s ]请求失败: %s", url, e.getMessage()));
}
} else {
client.newCall(request).enqueue(new Callback() {
@Override
public void onResponse(@NotNull Call call, @NotNull Response response) {
if (response.isSuccessful()) {
try {
String responseStr = Objects.requireNonNull(response.body()).string();
callback.run(JSON.parseObject(responseStr));
} catch (IOException e) {
log.error(String.format("[ %s ]请求失败: %s", url, e.getMessage()));
}
} else {
response.close();
Objects.requireNonNull(response.body()).close();
}
}
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
log.error(String.format("连接Assist失败: %s, %s", e.getCause().getMessage(), e.getMessage()));
log.info("请检查media配置并确认Assist已启动...");
}
});
}
return responseJSON;
}
public JSONObject fileDuration(String recoreUrl, String app, String stream, RequestCallback callback) {
Map<String, Object> param = new HashMap<>();
param.put("app", app);
param.put("stream", stream);
param.put("recordIng", true);
return sendGet(recoreUrl, "zlm/record//file/duration", param, callback);
}
public JSONObject getInfo(String recoreUrl, RequestCallback callback) {
Map<String, Object> param = new HashMap<>();
return sendGet(recoreUrl, "zlm/record//info", param, callback);
}
public JSONObject getRecordlist(String recoreUrl, Integer pageNum, Integer pageSize, RequestCallback callback) {
Map<String, Object> param = new HashMap<>();
param.put("pageNum", pageNum);
param.put("pageSize", pageSize);
return sendGet(recoreUrl, "zlm/record/list", param, callback);
}
public JSONObject getRecordDatelist(String recoreUrl, Integer year, Integer month, String app, String stream, RequestCallback callback) {
Map<String, Object> param = new HashMap<>();
if(year != null) {
param.put("year", year);
}
if(year != null) {
param.put("month", month);
}
param.put("app", app);
param.put("stream", stream);
return sendGet(recoreUrl, "zlm/record/date/list", param, callback);
}
public JSONObject getRecordStreamlist(String recoreUrl, Integer pageNum, Integer pageSize, String app, RequestCallback callback) {
Map<String, Object> param = new HashMap<>();
param.put("page", pageNum);
param.put("count", pageSize);
param.put("app", app);
return sendGet(recoreUrl, "zlm/record/stream/list", param, callback);
}
public JSONObject getRecordApplist(String recoreUrl, Integer pageNum, Integer pageSize, RequestCallback callback) {
Map<String, Object> param = new HashMap<>();
param.put("page", pageNum);
param.put("count", pageSize);
return sendGet(recoreUrl, "zlm/record/app/list", param, callback);
}
public JSONObject getRecordFilelist(String recoreUrl, Integer pageNum, Integer pageSize, String app, String stream, String startTime, String endTime, RequestCallback callback) {
Map<String, Object> param = new HashMap<>();
param.put("page", pageNum);
param.put("count", pageSize);
param.put("app", app);
param.put("stream", stream);
param.put("startTime", startTime);
param.put("endTime", endTime);
return sendGet(recoreUrl, "zlm/record/file/list", param, callback);
}
public JSONObject addStreamCallInfo(String recoreUrl, String app, String stream, String callId, RequestCallback callback) {
Map<String, Object> param = new HashMap<>();
param.put("app", app);
param.put("stream", stream);
param.put("callId", callId);
return sendGet(recoreUrl, "zlm/record/addStreamCallInfo", param, callback);
}
public JSONObject uploadOss(String recoreUrl, String file, RequestCallback callback) {
Map<String, Object> param = new HashMap<>();
param.put("resourcePath", file);
return sendGet(recoreUrl, "file/upload", param, callback);
}
}