Files
AI2API/wo2api.go
BlueSkyXN 363b4d62ff 0.1
2025-03-22 13:47:39 +08:00

2367 lines
68 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"bufio"
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"os/signal"
"path/filepath"
"regexp"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
)
// 配置结构体用于存储命令行参数
type Config struct {
Port string // 代理服务器监听端口
Address string // 代理服务器监听地址
LogLevel string // 日志级别
DevMode bool // 开发模式标志
DiagnosticLevel string // 诊断级别none, basic, full
SaveResponses bool // 是否保存所有响应
MaxRetries int // 最大重试次数
Timeout int // 请求超时时间(秒)
}
// WoCloud API 目标URL硬编码
const (
TargetURL = "https://panservice.mail.wo.cn"
ClientID = "1001000035"
Version = "1.1.0" // 版本号
)
// 日志级别
const (
LogLevelDebug = "debug"
LogLevelInfo = "info"
LogLevelWarn = "warn"
LogLevelError = "error"
)
// 解析命令行参数并返回 Config 实例
func parseFlags() *Config {
cfg := &Config{}
flag.StringVar(&cfg.Port, "port", "5858", "Port to listen on")
flag.StringVar(&cfg.Address, "address", "localhost", "Address to listen on")
flag.StringVar(&cfg.LogLevel, "log-level", LogLevelInfo, "Log level (debug, info, warn, error)")
flag.BoolVar(&cfg.DevMode, "dev", false, "Enable development mode with enhanced logging")
flag.StringVar(&cfg.DiagnosticLevel, "diag", "none", "Diagnostic level: none, basic, full")
flag.BoolVar(&cfg.SaveResponses, "save-responses", false, "Save all responses for analysis")
flag.IntVar(&cfg.MaxRetries, "max-retries", 3, "Maximum number of retries for failed requests")
flag.IntVar(&cfg.Timeout, "timeout", 300, "Request timeout in seconds")
flag.Parse()
// 如果开发模式开启自动设置日志级别为debug
if cfg.DevMode && cfg.LogLevel != LogLevelDebug {
cfg.LogLevel = LogLevelDebug
fmt.Println("开发模式已启用日志级别设置为debug")
}
return cfg
}
// 全局配置变量
var (
appConfig *Config
)
// 性能指标
var (
requestCounter int64
successCounter int64
errorCounter int64
parseErrorCounter int64
avgResponseTime int64
latencyHistogram [10]int64 // 0-100ms, 100-200ms, ... >1s
statusMetrics sync.Map // 记录不同状态码的计数
)
// 日志记录器
var (
logger *log.Logger
logLevel string
logMutex sync.Mutex
)
// 日志初始化
func initLogger(level string) {
logger = log.New(os.Stdout, "[Wo2API] ", log.LstdFlags)
logLevel = level
}
// 根据日志级别记录日志
func logDebug(format string, v ...interface{}) {
if logLevel == LogLevelDebug {
logMutex.Lock()
logger.Printf("[DEBUG] "+format, v...)
logMutex.Unlock()
}
}
func logInfo(format string, v ...interface{}) {
if logLevel == LogLevelDebug || logLevel == LogLevelInfo {
logMutex.Lock()
logger.Printf("[INFO] "+format, v...)
logMutex.Unlock()
}
}
func logWarn(format string, v ...interface{}) {
if logLevel == LogLevelDebug || logLevel == LogLevelInfo || logLevel == LogLevelWarn {
logMutex.Lock()
logger.Printf("[WARN] "+format, v...)
logMutex.Unlock()
}
}
func logError(format string, v ...interface{}) {
logMutex.Lock()
logger.Printf("[ERROR] "+format, v...)
logMutex.Unlock()
// 错误计数
atomic.AddInt64(&errorCounter, 1)
}
// OpenAI/DeepSeek 消息格式
type APIMessage struct {
Role string `json:"role"`
Content interface{} `json:"content"` // 使用interface{}以支持各种类型
}
// OpenAI/DeepSeek 请求格式
type APIRequest struct {
Model string `json:"model"`
Messages []APIMessage `json:"messages"`
Stream bool `json:"stream"`
Temperature float64 `json:"temperature,omitempty"`
MaxTokens int `json:"max_tokens,omitempty"`
}
// WoCloud 历史记录格式
type WoCloudHistory struct {
Query string `json:"query"`
RewriteQuery string `json:"rewriteQuery"`
UploadFileUrl string `json:"uploadFileUrl"`
Response string `json:"response"`
ReasoningContent string `json:"reasoningContent"`
State string `json:"state"`
Key string `json:"key"`
}
// WoCloud 请求格式
type WoCloudRequest struct {
ModelId int `json:"modelId"`
Input string `json:"input"`
History []WoCloudHistory `json:"history"`
}
// WoCloud 响应格式 - 增强版
type WoCloudResponse struct {
Code int `json:"code"`
Message interface{} `json:"message"`
Response string `json:"response"`
ReasoningContent string `json:"reasoningContent"`
Finish int `json:"finish"`
// 添加新字段增强兼容性
Content string `json:"content,omitempty"` // 兼容可能使用的另一种字段名
Result string `json:"result,omitempty"` // 兼容可能使用的另一种字段名
Think string `json:"think,omitempty"` // 兼容可能使用的思考内容字段
Extra map[string]interface{} `json:"-"`
}
// 自定义UnmarshalJSON方法增强容错性
func (r *WoCloudResponse) UnmarshalJSON(data []byte) error {
// 标准字段
type StandardResponse WoCloudResponse
// 临时结构,用于捕获所有字段
var temp struct {
StandardResponse
Extra map[string]interface{} `json:"-"`
}
// 尝试标准解析
if err := json.Unmarshal(data, &temp.StandardResponse); err != nil {
// 尝试解析为map
var rawMap map[string]interface{}
if mapErr := json.Unmarshal(data, &rawMap); mapErr != nil {
// 清除可能的BOM
cleanData := bytes.TrimPrefix(data, []byte("\xef\xbb\xbf"))
if cleanErr := json.Unmarshal(cleanData, &rawMap); cleanErr != nil {
reqID := generateRequestID()
logDebug("[reqID:%s] JSON解析失败: %v, 内容: %s", reqID, err, string(data[:min(len(data), 200)]))
return err
}
}
// 从map中提取关键字段
for key, value := range rawMap {
switch strings.ToLower(key) {
case "code":
switch v := value.(type) {
case float64:
temp.Code = int(v)
case int:
temp.Code = v
case string:
if c, err := strconv.Atoi(v); err == nil {
temp.Code = c
}
}
case "response", "content", "result":
if str, ok := value.(string); ok && str != "" {
temp.Response = str
}
case "reasoningcontent", "reasoning_content", "thinking", "think":
if str, ok := value.(string); ok && str != "" {
temp.ReasoningContent = str
}
case "finish", "done", "completed":
switch v := value.(type) {
case float64:
temp.Finish = int(v)
case int:
temp.Finish = v
case bool:
if v {
temp.Finish = 1
}
case string:
if f, err := strconv.Atoi(v); err == nil {
temp.Finish = f
} else if v == "true" || v == "yes" {
temp.Finish = 1
}
}
}
}
temp.Extra = rawMap
}
// 复制回原结构
*r = WoCloudResponse(temp.StandardResponse)
r.Extra = temp.Extra
// 优先级处理:如果 Response 为空但其他字段有值
if r.Response == "" {
if r.Content != "" {
r.Response = r.Content
} else if r.Result != "" {
r.Response = r.Result
}
}
// 如果 ReasoningContent 为空但 Think 有值
if r.ReasoningContent == "" && r.Think != "" {
r.ReasoningContent = r.Think
}
return nil
}
// DeepSeek 流式响应格式 - 修改以支持reasoning_content
type StreamChunk struct {
ID string `json:"id"`
Object string `json:"object"`
Created int64 `json:"created"`
Model string `json:"model"`
Choices []struct {
Index int `json:"index"`
FinishReason *string `json:"finish_reason,omitempty"`
Delta struct {
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
ReasoningContent string `json:"reasoning_content,omitempty"` // 使用reasoning_content而非think标签
} `json:"delta"`
} `json:"choices"`
}
// DeepSeek 非流式响应格式 - 修改以支持reasoning_content
type CompletionResponse struct {
ID string `json:"id"`
Object string `json:"object"`
Created int64 `json:"created"`
Model string `json:"model"`
Choices []struct {
Index int `json:"index"`
FinishReason string `json:"finish_reason"`
Message struct {
Role string `json:"role"`
Content string `json:"content"`
ReasoningContent string `json:"reasoning_content,omitempty"` // 使用reasoning_content
} `json:"message"`
} `json:"choices"`
Usage struct {
PromptTokens int `json:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens"`
TotalTokens int `json:"total_tokens"`
} `json:"usage"`
}
// WoCloud错误响应
type WoCloudError struct {
Code string `json:"code"`
Message string `json:"message"`
}
// 请求计数和互斥锁,用于监控
var (
requestCount uint64 = 0
countMutex sync.Mutex
)
// 启动指标报告器
func startMetricsReporter(interval time.Duration) {
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
<-ticker.C
reqCount := atomic.LoadInt64(&requestCounter)
successCount := atomic.LoadInt64(&successCounter)
errCount := atomic.LoadInt64(&errorCounter)
parseErrCount := atomic.LoadInt64(&parseErrorCounter)
// 仅当有请求时才输出指标
if reqCount > 0 {
avgTime := atomic.LoadInt64(&avgResponseTime)
// 修复类型不匹配问题,确保使用相同类型计算成功率
successRate := float64(0)
if reqCount > 0 {
successRate = float64(successCount) / float64(reqCount) * 100
}
logInfo("性能指标 - 请求总数: %d, 成功: %d (%.2f%%), 错误: %d, 解析错误: %d, 平均响应时间: %dms",
reqCount, successCount, successRate, errCount, parseErrCount, avgTime/max(reqCount, 1))
// 输出延迟直方图
var latencyReport strings.Builder
latencyReport.WriteString("延迟分布 - ")
for i, count := range latencyHistogram {
if count > 0 {
if i < 9 {
latencyReport.WriteString(fmt.Sprintf("%d-%dms: %d, ", i*100, (i+1)*100, count))
} else {
latencyReport.WriteString(fmt.Sprintf(">900ms: %d, ", count))
}
}
}
logInfo(strings.TrimSuffix(latencyReport.String(), ", "))
}
}
}()
}
// 主入口函数
func main() {
// 解析配置
appConfig = parseFlags()
// 初始化日志
initLogger(appConfig.LogLevel)
logInfo("启动服务: TargetURL=%s, Address=%s, Port=%s, Version=%s, LogLevel=%s",
TargetURL, appConfig.Address, appConfig.Port, Version, appConfig.LogLevel)
// 配置更高的并发处理能力
http.DefaultTransport.(*http.Transport).MaxIdleConnsPerHost = 100
http.DefaultTransport.(*http.Transport).MaxIdleConns = 100
http.DefaultTransport.(*http.Transport).IdleConnTimeout = 90 * time.Second
// 创建自定义服务器,支持更高并发
server := &http.Server{
Addr: appConfig.Address + ":" + appConfig.Port,
ReadTimeout: time.Duration(appConfig.Timeout) * time.Second,
WriteTimeout: time.Duration(appConfig.Timeout) * time.Second,
IdleTimeout: 120 * time.Second,
Handler: nil, // 使用默认的ServeMux
}
// 创建处理器
http.HandleFunc("/v1/models", func(w http.ResponseWriter, r *http.Request) {
handleModelsRequest(w, r)
})
http.HandleFunc("/v1/chat/completions", func(w http.ResponseWriter, r *http.Request) {
// 设置超时上下文
ctx, cancel := context.WithTimeout(r.Context(), time.Duration(appConfig.Timeout)*time.Second)
defer cancel()
// 包含超时上下文的请求
r = r.WithContext(ctx)
// 添加恢复机制防止panic
defer func() {
if r := recover(); r != nil {
logError("处理请求时发生panic: %v\n%s", r, debug.Stack())
http.Error(w, "Internal server error", http.StatusInternalServerError)
}
}()
// 计数器增加
countMutex.Lock()
requestCount++
currentCount := requestCount
countMutex.Unlock()
logInfo("收到新请求 #%d", currentCount)
// 请求计数
atomic.AddInt64(&requestCounter, 1)
// 处理请求
handleChatCompletionRequest(w, r)
})
// 添加健康检查端点
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
countMutex.Lock()
count := requestCount
countMutex.Unlock()
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(fmt.Sprintf(`{"status":"ok","version":"%s","requests":%d}`, Version, count)))
})
// 添加版本端点
http.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(fmt.Sprintf(`{"version":"%s"}`, Version)))
})
// 添加诊断端点
http.HandleFunc("/diagnostics", func(w http.ResponseWriter, r *http.Request) {
if !appConfig.DevMode {
http.Error(w, "Diagnostics only available in development mode", http.StatusForbidden)
return
}
reqCount := atomic.LoadInt64(&requestCounter)
successCount := atomic.LoadInt64(&successCounter)
errCount := atomic.LoadInt64(&errorCounter)
parseErrCount := atomic.LoadInt64(&parseErrorCounter)
// 计算成功率
successRate := float64(0)
if reqCount > 0 {
successRate = float64(successCount) / float64(reqCount) * 100
}
diagnosticInfo := map[string]interface{}{
"version": Version,
"start_time": time.Now().Format(time.RFC3339),
"requests": reqCount,
"success": successCount,
"errors": errCount,
"parse_errors": parseErrCount,
"success_rate": fmt.Sprintf("%.2f%%", successRate),
"avg_response_ms": atomic.LoadInt64(&avgResponseTime) / max(reqCount, 1),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(diagnosticInfo)
})
// 启动指标报告
if appConfig.DevMode {
startMetricsReporter(1 * time.Minute)
}
// 创建停止通道
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
// 在goroutine中启动服务器
go func() {
logInfo("Starting proxy server on %s", server.Addr)
if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logError("Failed to start server: %v", err)
os.Exit(1)
}
}()
// 等待停止信号
<-stop
// 创建上下文用于优雅关闭
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 优雅关闭服务器
logInfo("Server is shutting down...")
if err := server.Shutdown(ctx); err != nil {
logError("Server shutdown failed: %v", err)
}
logInfo("Server gracefully stopped")
}
// 验证消息格式
func validateMessages(messages []APIMessage) (bool, string) {
reqID := generateRequestID()
logDebug("[reqID:%s] 验证消息格式", reqID)
if messages == nil || len(messages) == 0 {
return false, "Messages array is required"
}
for _, msg := range messages {
if msg.Role == "" || msg.Content == nil {
return false, "Invalid message format: each message must have role and content"
}
}
return true, ""
}
// 从请求头中提取令牌
func extractToken(r *http.Request) (string, error) {
// 获取 Authorization 头部
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
return "", fmt.Errorf("missing Authorization header")
}
// 验证格式并提取令牌
if !strings.HasPrefix(authHeader, "Bearer ") {
return "", fmt.Errorf("invalid Authorization header format, must start with 'Bearer '")
}
// 提取令牌值
token := strings.TrimPrefix(authHeader, "Bearer ")
if token == "" {
return "", fmt.Errorf("empty token in Authorization header")
}
return token, nil
}
// 转换任意类型的内容为字符串
func contentToString(content interface{}) string {
if content == nil {
return ""
}
switch v := content.(type) {
case string:
return v
default:
jsonBytes, err := json.Marshal(v)
if err != nil {
logWarn("将内容转换为JSON失败: %v", err)
return ""
}
return string(jsonBytes)
}
}
// 从 OpenAI/DeepSeek 消息中提取用户消息和历史记录
func extractMessages(messages []APIMessage) (string, []WoCloudHistory) {
reqID := generateRequestID()
logDebug("[reqID:%s] 提取消息和历史记录", reqID)
// 获取最后一条用户消息
userMessage := ""
for i := len(messages) - 1; i >= 0; i-- {
if messages[i].Role == "user" {
userMessage = contentToString(messages[i].Content)
userMessage = strings.TrimSpace(userMessage)
break
}
}
// 构建历史记录
var history []WoCloudHistory
for i := 0; i < len(messages)-1; i++ {
if messages[i].Role == "user" && i+1 < len(messages) && messages[i+1].Role == "assistant" {
query := contentToString(messages[i].Content)
response := contentToString(messages[i+1].Content)
query = strings.TrimSpace(query)
response = strings.TrimSpace(response)
history = append(history, WoCloudHistory{
Query: query,
RewriteQuery: query,
UploadFileUrl: "",
Response: response,
ReasoningContent: "", // 无法从标准消息中提取推理内容
State: "finish",
Key: fmt.Sprintf("%d", time.Now().UnixNano()),
})
}
}
logDebug("[reqID:%s] 提取的用户消息长度: %d", reqID, len(userMessage))
logDebug("[reqID:%s] 提取的历史记录数量: %d", reqID, len(history))
return userMessage, history
}
// 增强的WoCloud错误处理函数
func handleWoError(resp *http.Response) (*WoCloudError, error) {
reqID := generateRequestID() // 为错误处理生成唯一ID
logDebug("[reqID:%s] 处理WoCloud错误响应", reqID)
contentType := resp.Header.Get("Content-Type")
// 先读取整个响应体
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("读取错误响应失败: %v", err)
}
bodyStr := string(bodyBytes)
logDebug("[reqID:%s] 错误响应内容: %s", reqID, bodyStr)
if strings.Contains(contentType, "text/event-stream") {
// 流式响应中的错误
lines := strings.Split(bodyStr, "\n")
for _, line := range lines {
if strings.HasPrefix(line, "data:") {
jsonStr := strings.TrimPrefix(line, "data:")
jsonStr = strings.TrimSpace(jsonStr)
if jsonStr == "[DONE]" {
continue
}
var errorData WoCloudError
if err := json.Unmarshal([]byte(jsonStr), &errorData); err != nil {
logDebug("[reqID:%s] 解析流式错误行失败: %v", reqID, err)
continue
}
if errorData.Code != "" && errorData.Code != "0" {
return &errorData, nil
}
}
}
// 如果没有找到具体错误信息,返回默认错误
return &WoCloudError{
Code: fmt.Sprintf("HTTP_%d", resp.StatusCode),
Message: fmt.Sprintf("Stream error with status code: %d", resp.StatusCode),
}, nil
} else {
// 尝试解析为JSON错误
var errorData WoCloudError
if err := json.Unmarshal(bodyBytes, &errorData); err != nil {
// 清理可能的BOM
cleanBody := bytes.TrimPrefix(bodyBytes, []byte("\xef\xbb\xbf"))
if err := json.Unmarshal(cleanBody, &errorData); err != nil {
// 尝试用更宽松的方式解析
var mapData map[string]interface{}
if mapErr := json.Unmarshal(cleanBody, &mapData); mapErr == nil {
// 从map中提取错误信息
if code, ok := mapData["code"]; ok {
switch v := code.(type) {
case string:
errorData.Code = v
case float64:
errorData.Code = fmt.Sprintf("%d", int(v))
case int:
errorData.Code = fmt.Sprintf("%d", v)
}
}
if message, ok := mapData["message"]; ok {
switch v := message.(type) {
case string:
errorData.Message = v
default:
errorData.Message = fmt.Sprintf("%v", v)
}
}
return &errorData, nil
}
// 返回基于HTTP状态码的错误
return &WoCloudError{
Code: fmt.Sprintf("HTTP_%d", resp.StatusCode),
Message: fmt.Sprintf("Error with status code: %d and content: %s", resp.StatusCode, bodyStr),
}, nil
}
}
return &errorData, nil
}
}
// 处理模型列表请求
func handleModelsRequest(w http.ResponseWriter, r *http.Request) {
logInfo("处理模型列表请求")
// 返回模型列表
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
modelsList := map[string]interface{}{
"object": "list",
"data": []map[string]interface{}{
{
"id": "DeepSeek-R1",
"object": "model",
"created": time.Now().Unix(),
"owned_by": "ChinaUnicom",
"capabilities": []string{"chat", "completions"},
},
},
}
json.NewEncoder(w).Encode(modelsList)
}
// 创建角色块 - 使用reasoning_content
func createRoleChunk(id string, created int64) []byte {
chunk := StreamChunk{
ID: id,
Object: "chat.completion.chunk",
Created: created,
Model: "DeepSeek-R1",
Choices: []struct {
Index int `json:"index"`
FinishReason *string `json:"finish_reason,omitempty"`
Delta struct {
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
ReasoningContent string `json:"reasoning_content,omitempty"`
} `json:"delta"`
}{
{
Index: 0,
Delta: struct {
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
ReasoningContent string `json:"reasoning_content,omitempty"`
}{
Role: "assistant",
},
},
},
}
data, _ := json.Marshal(chunk)
return data
}
// 创建推理内容块 - 使用reasoning_content
func createReasoningChunk(id string, created int64, reasoningContent string) []byte {
chunk := StreamChunk{
ID: id,
Object: "chat.completion.chunk",
Created: created,
Model: "DeepSeek-R1",
Choices: []struct {
Index int `json:"index"`
FinishReason *string `json:"finish_reason,omitempty"`
Delta struct {
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
ReasoningContent string `json:"reasoning_content,omitempty"`
} `json:"delta"`
}{
{
Index: 0,
Delta: struct {
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
ReasoningContent string `json:"reasoning_content,omitempty"`
}{
ReasoningContent: reasoningContent,
},
},
},
}
data, _ := json.Marshal(chunk)
return data
}
// 创建内容块
func createContentChunk(id string, created int64, content string) []byte {
chunk := StreamChunk{
ID: id,
Object: "chat.completion.chunk",
Created: created,
Model: "DeepSeek-R1",
Choices: []struct {
Index int `json:"index"`
FinishReason *string `json:"finish_reason,omitempty"`
Delta struct {
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
ReasoningContent string `json:"reasoning_content,omitempty"`
} `json:"delta"`
}{
{
Index: 0,
Delta: struct {
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
ReasoningContent string `json:"reasoning_content,omitempty"`
}{
Content: content,
},
},
},
}
data, _ := json.Marshal(chunk)
return data
}
// 创建完成块
func createDoneChunk(id string, created int64, reason string) []byte {
finishReason := reason
chunk := StreamChunk{
ID: id,
Object: "chat.completion.chunk",
Created: created,
Model: "DeepSeek-R1",
Choices: []struct {
Index int `json:"index"`
FinishReason *string `json:"finish_reason,omitempty"`
Delta struct {
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
ReasoningContent string `json:"reasoning_content,omitempty"`
} `json:"delta"`
}{
{
Index: 0,
FinishReason: &finishReason,
Delta: struct {
Role string `json:"role,omitempty"`
Content string `json:"content,omitempty"`
ReasoningContent string `json:"reasoning_content,omitempty"`
}{},
},
},
}
data, _ := json.Marshal(chunk)
return data
}
// 创建错误块
func createErrorChunk(id string, created int64, errorMsg string) []byte {
chunk := map[string]interface{}{
"error": map[string]interface{}{
"message": errorMsg,
"type": "api_error",
},
}
data, _ := json.Marshal(chunk)
return data
}
// 处理聊天补全请求
func handleChatCompletionRequest(w http.ResponseWriter, r *http.Request) {
reqID := generateRequestID()
startTime := time.Now()
logInfo("[reqID:%s] 处理聊天补全请求", reqID)
// 从请求头中提取令牌
token, err := extractToken(r)
if err != nil {
logError("[reqID:%s] 提取令牌失败: %v", reqID, err)
http.Error(w, fmt.Sprintf("Authorization error: %v", err), http.StatusUnauthorized)
return
}
// 解析请求体
var apiReq APIRequest
if err := json.NewDecoder(r.Body).Decode(&apiReq); err != nil {
logError("[reqID:%s] 解析请求失败: %v", reqID, err)
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
// 验证消息格式
valid, errMsg := validateMessages(apiReq.Messages)
if !valid {
logError("[reqID:%s] 消息格式验证失败: %s", reqID, errMsg)
http.Error(w, errMsg, http.StatusBadRequest)
return
}
// 获取最后一条用户消息和历史记录
userMessage, history := extractMessages(apiReq.Messages)
if userMessage == "" {
logError("[reqID:%s] 未找到有效的用户消息", reqID)
http.Error(w, "No valid user message found", http.StatusBadRequest)
return
}
// 转发请求到 WoCloud API
var responseErr error
if apiReq.Stream {
responseErr = handleStreamingRequest(w, r, userMessage, history, token, reqID)
} else {
responseErr = handleNonStreamingRequest(w, r, userMessage, history, token, reqID)
}
// 请求处理完成,更新指标
elapsed := time.Since(startTime).Milliseconds()
// 更新延迟直方图
bucketIndex := min(int(elapsed/100), 9)
atomic.AddInt64(&latencyHistogram[bucketIndex], 1)
// 更新平均响应时间
atomic.AddInt64(&avgResponseTime, elapsed)
if responseErr == nil {
// 成功计数增加
atomic.AddInt64(&successCounter, 1)
logInfo("[reqID:%s] 请求处理成功,耗时: %dms", reqID, elapsed)
} else {
logError("[reqID:%s] 请求处理失败: %v, 耗时: %dms", reqID, responseErr, elapsed)
}
}
// 重试机制的HTTP请求函数
func doRequestWithRetry(req *http.Request, client *http.Client, maxRetries int) (*http.Response, error) {
var resp *http.Response
var err error
reqID := generateRequestID()
for i := 0; i < maxRetries; i++ {
resp, err = client.Do(req)
if err == nil && resp.StatusCode == http.StatusOK {
logDebug("[reqID:%s] HTTP请求成功", reqID)
return resp, nil
}
if resp != nil {
resp.Body.Close()
}
logWarn("[reqID:%s] HTTP请求失败尝试%d/%d: %v", reqID, i+1, maxRetries, err)
// 避免最后一次失败后等待
if i < maxRetries-1 {
// 指数退避
backoffTime := time.Duration(100*(1<<i)) * time.Millisecond
logDebug("[reqID:%s] 等待 %v 后重试", reqID, backoffTime)
time.Sleep(backoffTime)
}
}
// 返回适当的错误信息
if err != nil {
return nil, fmt.Errorf("在%d次尝试后HTTP请求仍然失败: %v", maxRetries, err)
}
return nil, fmt.Errorf("在%d次尝试后HTTP请求返回非200状态码: %d", maxRetries, resp.StatusCode)
}
// 清理JSON字符串去除可能导致JSON编码失败的字符
func sanitizeJsonString(input string, reqID string) string {
if input == "" {
return input
}
// 记录原始长度
originalLen := len(input)
// 移除控制字符(除了常见的换行、回车、制表符)
cleanStr := strings.Map(func(r rune) rune {
if r < 32 && r != '\n' && r != '\r' && r != '\t' {
return -1 // 删除字符
}
return r
}, input)
// 处理不成对的引号和转义字符
var result strings.Builder
inBackslash := false
openQuote := false
for _, ch := range cleanStr {
switch {
case inBackslash:
inBackslash = false
result.WriteRune(ch)
case ch == '\\':
inBackslash = true
result.WriteRune(ch)
case ch == '"':
openQuote = !openQuote
result.WriteRune(ch)
default:
result.WriteRune(ch)
}
}
// 确保JSON合法性
cleanedStr := result.String()
// 在关键位置添加额外日志,帮助排查问题
if originalLen != len(cleanedStr) {
logDebug("[reqID:%s] JSON清理: 原始长度=%d, 清理后长度=%d",
reqID, originalLen, len(cleanedStr))
// 输出清理前后的部分样本用于比较
sampleSize := 50
if originalLen < sampleSize {
sampleSize = originalLen
}
logDebug("[reqID:%s] 清理前样本: %s", reqID, input[:sampleSize])
if len(cleanedStr) < sampleSize {
sampleSize = len(cleanedStr)
}
if sampleSize > 0 {
logDebug("[reqID:%s] 清理后样本: %s", reqID, cleanedStr[:sampleSize])
}
}
// 返回清理后的字符串
return cleanedStr
}
// 截断字符串到指定长度
func truncateString(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen] + "..."
}
// 从JSON字符串中递归提取所有字符串值
func extractAllStringValues(obj map[string]interface{}, values map[string]string, prefix string) {
for k, v := range obj {
path := prefix
if prefix != "" {
path += "." + k
} else {
path = k
}
switch val := v.(type) {
case string:
values[path] = val
case map[string]interface{}:
extractAllStringValues(val, values, path)
case []interface{}:
for i, item := range val {
if mapItem, ok := item.(map[string]interface{}); ok {
extractAllStringValues(mapItem, values, fmt.Sprintf("%s[%d]", path, i))
} else if strItem, ok := item.(string); ok {
values[fmt.Sprintf("%s[%d]", path, i)] = strItem
}
}
}
}
}
// 从流式响应中查找最有可能的完整内容
func findMostLikelyContent(lines []string, reqID string) string {
// 如果行数少于2无法使用此方法
if len(lines) < 2 {
return ""
}
// 先解析所有有效的JSON行
var jsonObjects []map[string]interface{}
for _, line := range lines {
line = strings.TrimSpace(line)
if !strings.HasPrefix(line, "data:") {
continue
}
jsonStr := strings.TrimPrefix(line, "data:")
jsonStr = strings.TrimSpace(jsonStr)
if jsonStr == "[DONE]" || jsonStr == "" {
continue
}
var obj map[string]interface{}
if err := json.Unmarshal([]byte(jsonStr), &obj); err != nil {
// 尝试修复JSON
fixedJson := sanitizeJsonString(jsonStr, reqID)
if err := json.Unmarshal([]byte(fixedJson), &obj); err != nil {
continue
}
}
jsonObjects = append(jsonObjects, obj)
}
// 如果没有有效JSON对象返回空
if len(jsonObjects) == 0 {
return ""
}
// 提取每个对象中的所有字符串值
allValues := make([]map[string]string, len(jsonObjects))
for i, obj := range jsonObjects {
allValues[i] = make(map[string]string)
extractAllStringValues(obj, allValues[i], "")
}
// 分析哪些路径出现频率最高
pathFrequency := make(map[string]int)
for _, values := range allValues {
for path := range values {
pathFrequency[path]++
}
}
// 找出最常见的路径
var mostCommonPath string
var maxFrequency int = 0
for path, freq := range pathFrequency {
if freq > maxFrequency {
mostCommonPath = path
maxFrequency = freq
}
}
// 找到所有最常见路径的值,并选择最长的一个
var longestValue string
var maxLength int = 0
for _, values := range allValues {
if value, ok := values[mostCommonPath]; ok {
if len(value) > maxLength {
longestValue = value
maxLength = len(value)
}
}
}
// 记录选择结果
logInfo("[reqID:%s] 找到最可能的内容路径: %s, 出现频率: %d/%d, 最大长度: %d",
reqID, mostCommonPath, maxFrequency, len(jsonObjects), maxLength)
return longestValue
}
// 尝试从JSON字符串中提取响应内容
func extractResponseContent(jsonStr string, reqID string) string {
// 尝试几种正则表达式模式
patterns := []string{
`"response"\s*:\s*"([^"]*)"`,
`"response":"(.*?)"`,
`response":"([^"]+)"`,
}
for _, pattern := range patterns {
re := regexp.MustCompile(pattern)
matches := re.FindStringSubmatch(jsonStr)
if len(matches) > 1 && matches[1] != "" {
return matches[1]
}
}
return ""
}
// 根据API响应的特征智能提取内容
func intelligentContentExtraction(body []byte, reqID string) string {
bodyStr := string(body)
lines := strings.Split(bodyStr, "\n")
// 方法1: 使用找到最常见路径的方法
content := findMostLikelyContent(lines, reqID)
if content != "" && len(content) > 10 {
logInfo("[reqID:%s] 使用最可能内容路径方法提取到内容,长度=%d", reqID, len(content))
return content
}
// 方法2: 使用最长响应行策略
var longestResponse string
var maxLength int = 0
for _, line := range lines {
line = strings.TrimSpace(line)
if !strings.HasPrefix(line, "data:") {
continue
}
jsonStr := strings.TrimPrefix(line, "data:")
jsonStr = strings.TrimSpace(jsonStr)
if jsonStr == "[DONE]" || jsonStr == "" {
continue
}
var obj map[string]interface{}
if err := json.Unmarshal([]byte(jsonStr), &obj); err != nil {
continue
}
// 提取response字段
if resp, ok := obj["response"]; ok {
if respStr, ok := resp.(string); ok {
if len(respStr) > maxLength {
longestResponse = respStr
maxLength = len(respStr)
}
}
}
}
if longestResponse != "" {
logInfo("[reqID:%s] 使用最长响应策略提取到内容,长度=%d", reqID, len(longestResponse))
return longestResponse
}
// 方法3: 使用正则表达式
patterns := []string{
`"response"\s*:\s*"((?:.|\n)*?)"`,
`"response":"([^"]*)"`,
`response":"([^"]+)"`,
}
for _, pattern := range patterns {
re := regexp.MustCompile(pattern)
matches := re.FindAllStringSubmatch(bodyStr, -1)
if len(matches) > 0 {
var longestMatch string
var maxMatchLength int = 0
for _, match := range matches {
if len(match) > 1 && len(match[1]) > maxMatchLength {
// 处理转义
content := match[1]
content = strings.Replace(content, "\\n", "\n", -1)
content = strings.Replace(content, "\\\"", "\"", -1)
content = strings.Replace(content, "\\\\", "\\", -1)
longestMatch = content
maxMatchLength = len(content)
}
}
if longestMatch != "" {
logInfo("[reqID:%s] 使用正则表达式方法提取到内容,长度=%d", reqID, len(longestMatch))
return longestMatch
}
}
}
// 未能提取到有效内容
return ""
}
// 将原始响应保存为分析文件
func saveResponseForAnalysis(body []byte, reqID string) string {
diagDir := "diagnostics"
os.MkdirAll(diagDir, 0755)
// 保存原始响应
rawPath := filepath.Join(diagDir, fmt.Sprintf("raw_response_%s.txt", reqID))
if err := os.WriteFile(rawPath, body, 0644); err != nil {
logError("[reqID:%s] 保存原始响应文件失败: %v", reqID, err)
return ""
}
// 创建分析文件
analysisPath := filepath.Join(diagDir, fmt.Sprintf("analysis_%s.txt", reqID))
f, err := os.Create(analysisPath)
if err != nil {
logError("[reqID:%s] 创建分析文件失败: %v", reqID, err)
return rawPath // 至少返回原始文件路径
}
defer f.Close()
// 写入分析头部
fmt.Fprintf(f, "WoCloud 响应分析 - 请求ID: %s\n", reqID)
fmt.Fprintf(f, "分析时间: %s\n", time.Now().Format(time.RFC3339))
fmt.Fprintf(f, "原始响应大小: %d 字节\n\n", len(body))
// 解析响应行
bodyStr := string(body)
lines := strings.Split(bodyStr, "\n")
// 统计信息
var stats struct {
TotalLines int
DataLines int
ValidJSON int
InvalidJSON int
ResponseLines int
MaxResponseLength int
}
stats.TotalLines = len(lines)
// 记录所有响应内容的长度
responseLengths := make([]int, 0)
// 分析每一行
for i, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
if !strings.HasPrefix(line, "data:") {
fmt.Fprintf(f, "行 %d: 非data前缀行: %s\n", i+1, truncateString(line, 100))
continue
}
stats.DataLines++
jsonStr := strings.TrimPrefix(line, "data:")
jsonStr = strings.TrimSpace(jsonStr)
if jsonStr == "[DONE]" {
fmt.Fprintf(f, "行 %d: [DONE]标记\n", i+1)
continue
}
if jsonStr == "" {
fmt.Fprintf(f, "行 %d: 空数据行\n", i+1)
continue
}
// 尝试解析JSON
var obj map[string]interface{}
if err := json.Unmarshal([]byte(jsonStr), &obj); err != nil {
stats.InvalidJSON++
fmt.Fprintf(f, "行 %d: 无效JSON: %s\n 错误: %v\n", i+1, truncateString(jsonStr, 100), err)
continue
}
stats.ValidJSON++
// 检查是否有response字段
if resp, ok := obj["response"]; ok {
if respStr, ok := resp.(string); ok {
stats.ResponseLines++
respLen := len(respStr)
responseLengths = append(responseLengths, respLen)
if respLen > stats.MaxResponseLength {
stats.MaxResponseLength = respLen
}
// 记录响应内容摘要
fmt.Fprintf(f, "行 %d: 包含response字段长度=%d, 内容: %s\n",
i+1, respLen, truncateString(respStr, 100))
} else {
fmt.Fprintf(f, "行 %d: response字段非字符串类型: %T\n", i+1, resp)
}
} else {
// 打印完整JSON对象
prettyJSON, _ := json.MarshalIndent(obj, "", " ")
maxJSON := truncateString(string(prettyJSON), 500)
fmt.Fprintf(f, "行 %d: 不包含response字段完整JSON: \n%s\n", i+1, maxJSON)
}
}
// 写入统计信息
fmt.Fprintf(f, "\n统计信息:\n")
fmt.Fprintf(f, "总行数: %d\n", stats.TotalLines)
fmt.Fprintf(f, "data:前缀行数: %d\n", stats.DataLines)
fmt.Fprintf(f, "有效JSON行数: %d\n", stats.ValidJSON)
fmt.Fprintf(f, "无效JSON行数: %d\n", stats.InvalidJSON)
fmt.Fprintf(f, "包含response字段的行数: %d\n", stats.ResponseLines)
fmt.Fprintf(f, "最大response长度: %d\n", stats.MaxResponseLength)
// 响应长度分布
if len(responseLengths) > 0 {
fmt.Fprintf(f, "\nresponse长度分布:\n")
// 排序长度
sort.Ints(responseLengths)
// 计算分位数
p25 := responseLengths[len(responseLengths)*1/4]
p50 := responseLengths[len(responseLengths)*2/4]
p75 := responseLengths[len(responseLengths)*3/4]
p100 := responseLengths[len(responseLengths)-1]
fmt.Fprintf(f, "最小值: %d\n", responseLengths[0])
fmt.Fprintf(f, "25%%分位数: %d\n", p25)
fmt.Fprintf(f, "中位数: %d\n", p50)
fmt.Fprintf(f, "75%%分位数: %d\n", p75)
fmt.Fprintf(f, "最大值: %d\n", p100)
// 记录所有响应长度
fmt.Fprintf(f, "\n所有response长度: %v\n", responseLengths)
}
// 尝试使用各种方法提取内容
fmt.Fprintf(f, "\n尝试使用各种方法提取内容:\n")
// 方法1: 最常见路径
content1 := findMostLikelyContent(lines, reqID)
if content1 != "" {
fmt.Fprintf(f, "1. 使用最常见路径方法提取到内容,长度=%d\n", len(content1))
fmt.Fprintf(f, " 前100个字符: %s\n", truncateString(content1, 100))
} else {
fmt.Fprintf(f, "1. 最常见路径方法未能提取到内容\n")
}
// 方法2: 正则表达式
patterns := []string{
`"response"\s*:\s*"((?:.|\n)*?)"`,
`"response":"([^"]*)"`,
`response":"([^"]+)"`,
}
for i, pattern := range patterns {
re := regexp.MustCompile(pattern)
matches := re.FindAllStringSubmatch(bodyStr, -1)
if len(matches) > 0 {
var longestMatch string
var maxMatchLength int = 0
for _, match := range matches {
if len(match) > 1 && len(match[1]) > maxMatchLength {
longestMatch = match[1]
maxMatchLength = len(match[1])
}
}
if longestMatch != "" {
fmt.Fprintf(f, "2.%d 使用正则表达式 '%s' 提取到内容,长度=%d\n",
i+1, truncateString(pattern, 30), len(longestMatch))
fmt.Fprintf(f, " 前100个字符: %s\n", truncateString(longestMatch, 100))
}
} else {
fmt.Fprintf(f, "2.%d 正则表达式 '%s' 未找到匹配\n", i+1, truncateString(pattern, 30))
}
}
// 方法3: 使用最长响应行策略
var longestResponse string
var maxLength int = 0
for _, line := range lines {
line = strings.TrimSpace(line)
if !strings.HasPrefix(line, "data:") {
continue
}
jsonStr := strings.TrimPrefix(line, "data:")
jsonStr = strings.TrimSpace(jsonStr)
if jsonStr == "[DONE]" || jsonStr == "" {
continue
}
var obj map[string]interface{}
if err := json.Unmarshal([]byte(jsonStr), &obj); err != nil {
continue
}
// 提取response字段
if resp, ok := obj["response"]; ok {
if respStr, ok := resp.(string); ok {
if len(respStr) > maxLength {
longestResponse = respStr
maxLength = len(respStr)
}
}
}
}
if longestResponse != "" {
fmt.Fprintf(f, "3. 使用最长响应策略提取到内容,长度=%d\n", len(longestResponse))
fmt.Fprintf(f, " 前100个字符: %s\n", truncateString(longestResponse, 100))
} else {
fmt.Fprintf(f, "3. 最长响应策略未能提取到内容\n")
}
logInfo("[reqID:%s] 完整诊断分析已保存到: %s", reqID, analysisPath)
return analysisPath
}
// 增强版诊断流式响应
func enhancedDiagnoseChatResponse(body []byte, reqID string) {
// 如果没有启用诊断,则直接返回
if appConfig.DiagnosticLevel == "none" && !appConfig.DevMode {
return
}
// 保存完整响应内容和进行详细分析 - 不使用返回值
saveResponseForAnalysis(body, reqID)
// 如果在开发模式下,进行更深入分析
if appConfig.DevMode {
// 提取并保存最可能的完整内容
content := intelligentContentExtraction(body, reqID)
if content != "" {
contentPath := filepath.Join("diagnostics", fmt.Sprintf("extracted_content_%s.txt", reqID))
os.WriteFile(contentPath, []byte(content), 0644)
logInfo("[reqID:%s] 保存提取的完整内容到: %s", reqID, contentPath)
}
// 打印流式响应的结构信息
analyzeStreamStructure(body, reqID)
}
}
// 分析流式响应结构
func analyzeStreamStructure(body []byte, reqID string) {
bodyStr := string(body)
lines := strings.Split(bodyStr, "\n")
// 统计各种类型的行数
dataLines := 0
jsonLines := 0
doneLines := 0
// 响应长度增长模式
responseLengths := make([]int, 0)
for _, line := range lines {
line = strings.TrimSpace(line)
if !strings.HasPrefix(line, "data:") {
continue
}
dataLines++
jsonStr := strings.TrimPrefix(line, "data:")
jsonStr = strings.TrimSpace(jsonStr)
if jsonStr == "[DONE]" {
doneLines++
continue
}
if jsonStr == "" {
continue
}
var obj map[string]interface{}
if err := json.Unmarshal([]byte(jsonStr), &obj); err != nil {
continue
}
jsonLines++
// 记录响应长度
if resp, ok := obj["response"]; ok {
if respStr, ok := resp.(string); ok {
responseLengths = append(responseLengths, len(respStr))
}
}
}
// 分析响应长度增长模式
isIncremental := true
for i := 1; i < len(responseLengths); i++ {
if responseLengths[i] < responseLengths[i-1] {
isIncremental = false
break
}
}
// 打印分析结果
logInfo("[reqID:%s] 流式响应结构分析: 总行数=%d, data行数=%d, JSON行数=%d, DONE行数=%d",
reqID, len(lines), dataLines, jsonLines, doneLines)
if len(responseLengths) > 0 {
logInfo("[reqID:%s] 响应长度模式: %v, 增量模式=%v", reqID, responseLengths[:min(10, len(responseLengths))], isIncremental)
}
}
// 将流式响应解析为非流式响应 - 完全重写版
func parseStreamResponseAsNonStream(body []byte, reqID string) (*WoCloudResponse, error) {
logInfo("[reqID:%s] 解析流式响应为非流式", reqID)
parseStartTime := time.Now()
// 如果配置了诊断,保存响应内容
if appConfig.DiagnosticLevel != "none" || appConfig.SaveResponses {
enhancedDiagnoseChatResponse(body, reqID)
}
bodyStr := string(body)
lines := strings.Split(bodyStr, "\n")
// 统计变量
totalLines := 0
validJsonLines := 0
// 初始化完整响应对象
fullResponse := &WoCloudResponse{
Code: 0,
Response: "",
ReasoningContent: "",
Finish: 1, // 默认设置为完成状态
}
// 创建一个有序map存储响应行确保按顺序处理
responseMap := make(map[int]string)
var maxIndex int = 0
// 第一步收集所有的data:行并解析
for i, line := range lines {
line = strings.TrimSpace(line)
if !strings.HasPrefix(line, "data:") {
continue
}
totalLines++
jsonStr := strings.TrimPrefix(line, "data:")
jsonStr = strings.TrimSpace(jsonStr)
if jsonStr == "[DONE]" {
continue
}
// 解析JSON
var respObj map[string]interface{}
if err := json.Unmarshal([]byte(jsonStr), &respObj); err != nil {
// 尝试修复JSON格式
fixedJson := sanitizeJsonString(jsonStr, reqID)
if err := json.Unmarshal([]byte(fixedJson), &respObj); err != nil {
// 跳过无效的JSON
continue
}
}
validJsonLines++
// 提取响应内容
if resp, ok := respObj["response"]; ok {
if respStr, ok := resp.(string); ok && respStr != "" {
// 存储响应,使用行号作为键确保顺序
responseMap[i] = respStr
if i > maxIndex {
maxIndex = i
}
}
}
}
// 第二步:如果没有找到任何有效响应,尝试使用正则表达式
if len(responseMap) == 0 {
logWarn("[reqID:%s] 没有找到有效的响应行,尝试使用正则表达式", reqID)
// 尝试多种正则表达式模式
patterns := []string{
`"response"\s*:\s*"((?:.|\n)*?)"`,
`"response":"([^"]*)"`,
`response":"([^"]+)"`,
}
for _, pattern := range patterns {
re := regexp.MustCompile(pattern)
matches := re.FindAllStringSubmatch(bodyStr, -1)
if len(matches) > 0 {
// 收集所有匹配
for i, match := range matches {
if len(match) > 1 && match[1] != "" {
// 处理转义
content := match[1]
content = strings.Replace(content, "\\n", "\n", -1)
content = strings.Replace(content, "\\\"", "\"", -1)
content = strings.Replace(content, "\\\\", "\\", -1)
responseMap[i] = content
if i > maxIndex {
maxIndex = i
}
}
}
if len(responseMap) > 0 {
break // 找到匹配就停止
}
}
}
}
// 第三步:确定合并策略
if len(responseMap) == 0 {
// 没有找到任何响应内容,尝试使用智能提取
fullResponse.Response = intelligentContentExtraction(body, reqID)
if fullResponse.Response == "" {
// 智能提取也失败,使用默认消息
fullResponse.Response = "抱歉,无法获取有效回复。请稍后再试。"
logWarn("[reqID:%s] 无法提取响应内容,使用默认消息", reqID)
atomic.AddInt64(&parseErrorCounter, 1)
} else {
logInfo("[reqID:%s] 使用智能提取获取响应内容,长度=%d", reqID, len(fullResponse.Response))
}
} else {
// 尝试确定响应模式
// 模式1: 增量流式响应(每行都是前面行的超集)
// 模式2: 完整消息流式响应(每行都是完整消息)
// 默认假设是模式1
isIncrementalMode := true
// 检查响应模式
if len(responseMap) >= 2 {
// 获取有序的键
var keys []int
for k := range responseMap {
keys = append(keys, k)
}
sort.Ints(keys)
// 检查是否是增量模式
for i := 1; i < len(keys); i++ {
prev := responseMap[keys[i-1]]
curr := responseMap[keys[i]]
// 如果当前行不是前一行的超集,可能不是增量模式
if !strings.HasPrefix(curr, prev) && len(curr) >= len(prev) {
isIncrementalMode = false
break
}
}
}
// 根据不同模式合并响应
if isIncrementalMode {
// 增量模式:使用最后一行(最完整的)
fullResponse.Response = responseMap[maxIndex]
logInfo("[reqID:%s] 使用增量模式,选择最后一行作为完整响应,长度=%d", reqID, len(fullResponse.Response))
} else {
// 非增量模式,尝试两种策略:
// 1. 使用最长的响应
// 2. 合并所有响应(去重)
// 首先尝试找最长响应
var longestResponse string
var longestLength int = 0
for _, resp := range responseMap {
if len(resp) > longestLength {
longestResponse = resp
longestLength = len(resp)
}
}
// 如果最长响应足够长超过50个字符直接使用
if longestLength > 50 {
fullResponse.Response = longestResponse
logInfo("[reqID:%s] 使用最长响应,长度=%d", reqID, longestLength)
} else {
// 最长响应不够长,尝试合并
// 先获取有序的响应
var keys []int
for k := range responseMap {
keys = append(keys, k)
}
sort.Ints(keys)
var combinedResponse strings.Builder
var usedSubstrings = make(map[string]bool)
for _, k := range keys {
resp := responseMap[k]
// 只添加新内容
if !usedSubstrings[resp] {
combinedResponse.WriteString(resp)
usedSubstrings[resp] = true
}
}
combinedContent := combinedResponse.String()
if len(combinedContent) > longestLength {
fullResponse.Response = combinedContent
logInfo("[reqID:%s] 使用合并响应,长度=%d", reqID, len(combinedContent))
} else {
fullResponse.Response = longestResponse
logInfo("[reqID:%s] 合并后仍不如最长响应,使用最长响应,长度=%d", reqID, longestLength)
}
}
}
// 检查最终响应是否太短
if len(fullResponse.Response) < 10 {
logWarn("[reqID:%s] 最终响应内容过短(%d字符),可能不完整", reqID, len(fullResponse.Response))
// 尝试智能提取
smartExtraction := intelligentContentExtraction(body, reqID)
if smartExtraction != "" && len(smartExtraction) > len(fullResponse.Response) {
fullResponse.Response = smartExtraction
logInfo("[reqID:%s] 使用智能提取替换过短的响应,新长度=%d", reqID, len(smartExtraction))
}
// 保存响应数据用于后续分析
if appConfig.DevMode {
debugDir := "debug"
os.MkdirAll(debugDir, 0755)
debugPath := filepath.Join(debugDir, fmt.Sprintf("short_response_%s.json", reqID))
debug, _ := json.MarshalIndent(responseMap, "", " ")
os.WriteFile(debugPath, debug, 0644)
logInfo("[reqID:%s] 已保存短响应调试信息到: %s", reqID, debugPath)
}
}
}
parseElapsed := time.Since(parseStartTime).Milliseconds()
logInfo("[reqID:%s] 流式解析完成: 总行数=%d, 有效JSON行数=%d, 最终响应长度=%d, 解析耗时: %dms",
reqID, totalLines, validJsonLines, len(fullResponse.Response), parseElapsed)
return fullResponse, nil
}
// 流式请求 - 支持reasoning_content传递
func handleStreamingRequest(w http.ResponseWriter, r *http.Request, userMessage string, history []WoCloudHistory, token string, reqID string) error {
logInfo("[reqID:%s] 处理流式请求", reqID)
// 构建 WoCloud 请求
woReq := WoCloudRequest{
ModelId: 1,
Input: userMessage,
History: history,
}
jsonData, err := json.Marshal(woReq)
if err != nil {
logError("[reqID:%s] 序列化请求失败: %v", reqID, err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return err
}
// 创建请求
woURL := TargetURL + "/wohome/ai/assistant/query"
httpReq, err := http.NewRequest("POST", woURL, bytes.NewBuffer(jsonData))
if err != nil {
logError("[reqID:%s] 创建请求失败: %v", reqID, err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return err
}
// 设置请求头
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("Accept", "text/event-stream")
httpReq.Header.Set("Origin", TargetURL)
httpReq.Header.Set("Referer", TargetURL+"/h5/wocloud_ai/?modelType=1")
httpReq.Header.Set("User-Agent", "Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1")
httpReq.Header.Set("X-YP-Access-Token", token)
httpReq.Header.Set("X-YP-Client-ID", ClientID)
// 为每个请求创建一个新的HTTP客户端避免共享连接池导致的干扰
client := &http.Client{
Timeout: time.Duration(appConfig.Timeout) * time.Second,
Transport: &http.Transport{
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
},
}
// 记录请求细节
logDebug("[reqID:%s] 流式请求详情: URL=%s, 用户消息长度=%d, 历史记录数=%d",
reqID, woURL, len(userMessage), len(history))
// 使用重试机制
resp, err := doRequestWithRetry(httpReq, client, appConfig.MaxRetries)
if err != nil {
logError("[reqID:%s] 发送请求失败: %v", reqID, err)
http.Error(w, "Failed to connect to API", http.StatusBadGateway)
return err
}
defer resp.Body.Close()
// 检查响应状态
if resp.StatusCode != http.StatusOK {
logError("[reqID:%s] API返回非200状态码: %d", reqID, resp.StatusCode)
errorInfo, err := handleWoError(resp)
if err != nil {
logError("[reqID:%s] 解析错误响应失败: %v", reqID, err)
http.Error(w, "Failed to parse error response", http.StatusBadGateway)
return err
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(resp.StatusCode)
json.NewEncoder(w).Encode(map[string]interface{}{
"error": errorInfo,
})
return fmt.Errorf("API返回错误: %s", errorInfo.Message)
}
// 设置响应头
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// 生成响应 ID
respID := fmt.Sprintf("chatcmpl-%s", generateRandomID())
createdTime := time.Now().Unix()
// 发送角色信息
roleChunk := createRoleChunk(respID, createdTime)
w.Write([]byte("data: " + string(roleChunk) + "\n\n"))
flusher, ok := w.(http.Flusher)
if !ok {
logError("[reqID:%s] Streaming not supported", reqID)
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
return fmt.Errorf("streaming not supported")
}
flusher.Flush()
// 使用自定义reader而非scanner增加缓冲区大小提高性能
reader := bufio.NewReaderSize(resp.Body, 16384) // 增加缓冲区大小到16KB
// 用于存储完整消息的缓冲区
var messageBuffer bytes.Buffer
// 为每个请求创建独立的状态变量
accumulatedResponse := ""
accumulatedReasoning := ""
// 设置监控变量
totalLines := 0
validLines := 0
// 创建一个done通道用于确保goroutine退出
done := make(chan struct{})
defer close(done)
// 持续读取响应
for {
// 添加超时检测
select {
case <-r.Context().Done():
logWarn("[reqID:%s] 请求超时或被客户端取消", reqID)
return fmt.Errorf("请求超时或被取消")
default:
// 继续处理
}
// 读取一行数据
byteData, isPrefix, err := reader.ReadLine()
if err != nil {
if err != io.EOF {
logError("[reqID:%s] 读取响应出错: %v", reqID, err)
return err
} else {
logDebug("[reqID:%s] 响应流结束", reqID)
}
break
}
// 将字节数据添加到缓冲区
messageBuffer.Write(byteData)
// 如果数据没有结束,继续读取
if isPrefix {
continue
}
// 获取完整行
line := messageBuffer.String()
messageBuffer.Reset()
// 更新行计数
totalLines++
// 处理数据行
if strings.HasPrefix(line, "data:") {
jsonStr := strings.TrimPrefix(line, "data:")
jsonStr = strings.TrimSpace(jsonStr)
// 特殊处理[DONE]消息
if jsonStr == "[DONE]" {
logDebug("[reqID:%s] 收到[DONE]消息", reqID)
w.Write([]byte("data: [DONE]\n\n"))
flusher.Flush()
break
}
// 解析JSON数据
var woResp WoCloudResponse
if err := json.Unmarshal([]byte(jsonStr), &woResp); err != nil {
logWarn("[reqID:%s] 解析JSON失败: %v, data: %s", reqID, err, jsonStr)
// 尝试修复损坏的JSON
fixedJson := sanitizeJsonString(jsonStr, reqID)
if err := json.Unmarshal([]byte(fixedJson), &woResp); err != nil {
logWarn("[reqID:%s] 修复后仍解析失败,跳过此行", reqID)
continue
}
logDebug("[reqID:%s] JSON修复成功", reqID)
}
validLines++
// 检查错误码
if woResp.Code != 0 {
logError("[reqID:%s] API返回错误: code=%d, message=%v", reqID, woResp.Code, woResp.Message)
errorChunk := createErrorChunk(respID, createdTime, fmt.Sprintf("API error: %v", woResp.Message))
w.Write([]byte("data: " + string(errorChunk) + "\n\n"))
w.Write([]byte("data: [DONE]\n\n"))
flusher.Flush()
return fmt.Errorf("API返回错误码: %d", woResp.Code)
}
// 处理推理内容
if woResp.ReasoningContent != "" && woResp.ReasoningContent != accumulatedReasoning {
// 提取新的推理内容
newReasoning := woResp.ReasoningContent
if strings.HasPrefix(woResp.ReasoningContent, accumulatedReasoning) {
newReasoning = strings.TrimPrefix(woResp.ReasoningContent, accumulatedReasoning)
}
if newReasoning != "" {
// 使用sanitizeJsonString清理推理内容
cleanedReasoning := sanitizeJsonString(newReasoning, reqID)
// 发送推理内容块
reasoningChunk := createReasoningChunk(respID, createdTime, cleanedReasoning)
w.Write([]byte("data: " + string(reasoningChunk) + "\n\n"))
flusher.Flush()
// 更新累积的推理内容
accumulatedReasoning = woResp.ReasoningContent
logDebug("[reqID:%s] 发送推理内容块,长度=%d", reqID, len(cleanedReasoning))
}
}
// 处理响应内容
if woResp.Response != "" && woResp.Response != accumulatedResponse {
// 提取新的响应内容
newResponse := woResp.Response
if strings.HasPrefix(woResp.Response, accumulatedResponse) {
newResponse = strings.TrimPrefix(woResp.Response, accumulatedResponse)
}
if newResponse != "" {
// 使用sanitizeJsonString清理响应内容
cleanedResponse := sanitizeJsonString(newResponse, reqID)
// 发送新的响应内容
contentChunk := createContentChunk(respID, createdTime, cleanedResponse)
w.Write([]byte("data: " + string(contentChunk) + "\n\n"))
flusher.Flush()
// 更新累积的响应内容
accumulatedResponse = woResp.Response
logDebug("[reqID:%s] 发送响应内容块,长度=%d", reqID, len(cleanedResponse))
}
}
// 检查是否完成
if woResp.Finish == 1 {
logDebug("[reqID:%s] 收到完成标志 finish=1", reqID)
finishReason := "stop"
doneChunk := createDoneChunk(respID, createdTime, finishReason)
w.Write([]byte("data: " + string(doneChunk) + "\n\n"))
w.Write([]byte("data: [DONE]\n\n"))
flusher.Flush()
logInfo("[reqID:%s] 流式请求完成, 总行数=%d, 有效行数=%d, 响应长度=%d",
reqID, totalLines, validLines, len(accumulatedResponse))
return nil
}
}
}
// 发送结束信号(如果没有正常结束)
logInfo("[reqID:%s] 发送结束信号,总行数=%d, 有效行数=%d", reqID, totalLines, validLines)
finishReason := "stop"
doneChunk := createDoneChunk(respID, createdTime, finishReason)
w.Write([]byte("data: " + string(doneChunk) + "\n\n"))
w.Write([]byte("data: [DONE]\n\n"))
flusher.Flush()
return nil
}
// 处理非流式请求 - 专注于收集响应内容
func handleNonStreamingRequest(w http.ResponseWriter, r *http.Request, userMessage string, history []WoCloudHistory, token string, reqID string) error {
logInfo("[reqID:%s] 处理非流式请求", reqID)
// 构建 WoCloud 请求
woReq := WoCloudRequest{
ModelId: 1,
Input: userMessage,
History: history,
}
jsonData, err := json.Marshal(woReq)
if err != nil {
logError("[reqID:%s] 序列化请求失败: %v", reqID, err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return err
}
// 创建请求
woURL := TargetURL + "/wohome/ai/assistant/query"
httpReq, err := http.NewRequest("POST", woURL, bytes.NewBuffer(jsonData))
if err != nil {
logError("[reqID:%s] 创建请求失败: %v", reqID, err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
return err
}
// 设置请求头 - 明确要求流式响应
httpReq.Header.Set("Content-Type", "application/json")
httpReq.Header.Set("Accept", "text/event-stream") // 明确要求流式响应,以便后续处理
httpReq.Header.Set("Origin", TargetURL)
httpReq.Header.Set("Referer", TargetURL+"/h5/wocloud_ai/?modelType=1")
httpReq.Header.Set("User-Agent", "Mozilla/5.0 (iPhone; CPU iPhone OS 16_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.6 Mobile/15E148 Safari/604.1")
httpReq.Header.Set("X-YP-Access-Token", token)
httpReq.Header.Set("X-YP-Client-ID", ClientID)
// 为每个请求创建一个新的HTTP客户端避免共享连接池导致的干扰
client := &http.Client{
Timeout: time.Duration(appConfig.Timeout) * time.Second,
Transport: &http.Transport{
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
},
}
// 记录请求细节
logDebug("[reqID:%s] 非流式请求详情: URL=%s, 用户消息长度=%d, 历史记录数=%d",
reqID, woURL, len(userMessage), len(history))
// 使用重试机制
resp, err := doRequestWithRetry(httpReq, client, appConfig.MaxRetries)
if err != nil {
logError("[reqID:%s] 发送请求失败: %v", reqID, err)
http.Error(w, "Failed to connect to API", http.StatusBadGateway)
return err
}
defer resp.Body.Close()
// 检查响应状态
if resp.StatusCode != http.StatusOK {
logError("[reqID:%s] API返回非200状态码: %d", reqID, resp.StatusCode)
errorInfo, err := handleWoError(resp)
if err != nil {
logError("[reqID:%s] 解析错误响应失败: %v", reqID, err)
http.Error(w, "Failed to parse error response", http.StatusBadGateway)
return err
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(resp.StatusCode)
json.NewEncoder(w).Encode(map[string]interface{}{
"error": errorInfo,
})
return fmt.Errorf("API返回错误: %s", errorInfo.Message)
}
// 读取整个响应体,记录开始时间
startTime := time.Now()
logInfo("[reqID:%s] 开始读取响应体...", reqID)
// 增加缓冲区大小,提高大型响应的读取效率
bodyBytes, err := io.ReadAll(bufio.NewReaderSize(resp.Body, 65536))
if err != nil {
logError("[reqID:%s] 读取响应体失败: %v", reqID, err)
http.Error(w, "Failed to read API response", http.StatusInternalServerError)
return err
}
// 记录响应长度和读取时间
bodyLen := len(bodyBytes)
readDuration := time.Since(startTime)
logInfo("[reqID:%s] 收到流式响应,总长度: %d 字节,读取耗时: %v", reqID, bodyLen, readDuration)
// 解析流式响应,提取最后的有效内容
parsedResponse, err := parseStreamResponseAsNonStream(bodyBytes, reqID)
if err != nil {
logError("[reqID:%s] 解析流式响应失败: %v", reqID, err)
http.Error(w, "Failed to parse streaming response", http.StatusInternalServerError)
return err
}
// 获取最终响应内容
finalResponse := parsedResponse.Response
// 非流请求时,我们不需要思考内容
finalReasoning := ""
logInfo("[reqID:%s] 最终响应内容长度: %d", reqID, len(finalResponse))
// 确保响应不为空
if finalResponse == "" || len(finalResponse) < 10 {
logWarn("[reqID:%s] 警告: 最终响应为空或过短,尝试使用智能提取", reqID)
// 尝试智能提取
extracted := intelligentContentExtraction(bodyBytes, reqID)
if extracted != "" && len(extracted) > 10 {
finalResponse = extracted
logInfo("[reqID:%s] 成功通过智能提取获取响应内容,长度=%d", reqID, len(extracted))
} else {
finalResponse = "抱歉,无法获取有效回复。请稍后再试。"
logWarn("[reqID:%s] 无法获取有效回复,使用默认消息", reqID)
// 记录解析错误
atomic.AddInt64(&parseErrorCounter, 1)
}
}
// 清理响应内容中的问题字符
finalResponse = sanitizeJsonString(finalResponse, reqID)
// 构建 DeepSeek 格式的响应 - 非流请求时不包含reasoning_content
deepSeekResp := CompletionResponse{
ID: fmt.Sprintf("chatcmpl-%s", generateRandomID()),
Object: "chat.completion",
Created: time.Now().Unix(),
Model: "DeepSeek-R1",
Choices: []struct {
Index int `json:"index"`
FinishReason string `json:"finish_reason"`
Message struct {
Role string `json:"role"`
Content string `json:"content"`
ReasoningContent string `json:"reasoning_content,omitempty"`
} `json:"message"`
}{
{
Index: 0,
FinishReason: "stop",
Message: struct {
Role string `json:"role"`
Content string `json:"content"`
ReasoningContent string `json:"reasoning_content,omitempty"`
}{
Role: "assistant",
Content: finalResponse,
ReasoningContent: finalReasoning, // 非流式请求不使用推理内容
},
},
},
Usage: struct {
PromptTokens int `json:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens"`
TotalTokens int `json:"total_tokens"`
}{
PromptTokens: len(userMessage),
CompletionTokens: len(finalResponse),
TotalTokens: len(userMessage) + len(finalResponse),
},
}
// 尝试编码响应前进行验证
testData, err := json.Marshal(deepSeekResp)
if err != nil {
logError("[reqID:%s] 警告: 响应编码失败: %v", reqID, err)
// 进一步清理响应或使用默认响应
deepSeekResp.Choices[0].Message.Content = "抱歉,服务器返回的响应无法正确处理。请稍后再试。"
deepSeekResp.Choices[0].Message.ReasoningContent = ""
// 记录解析错误
atomic.AddInt64(&parseErrorCounter, 1)
} else {
logDebug("[reqID:%s] 响应验证成功,大小: %d 字节", reqID, len(testData))
}
// 返回响应
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(deepSeekResp); err != nil {
logError("[reqID:%s] 编码响应失败: %v", reqID, err)
http.Error(w, "Failed to encode response", http.StatusInternalServerError)
return err
}
// 记录总处理时间
totalDuration := time.Since(startTime)
logInfo("[reqID:%s] 非流式请求处理完成,总耗时: %v", reqID, totalDuration)
return nil
}
// 生成随机ID
func generateRandomID() string {
// 简化起见,使用时间戳和随机数
return fmt.Sprintf("%d-%d", time.Now().UnixNano(), time.Now().Unix()%1000)
}
// 生成请求ID用于跟踪
func generateRequestID() string {
return fmt.Sprintf("%x", time.Now().UnixNano())
}
// 调试辅助函数 - 保存响应内容到文件
func saveResponseToFile(data []byte, reqID string) {
// 创建logs目录如果不存在
logDir := "logs"
if _, err := os.Stat(logDir); os.IsNotExist(err) {
os.Mkdir(logDir, 0755)
}
// 创建文件名
timestamp := time.Now().Format("20060102_150405")
filename := filepath.Join(logDir, fmt.Sprintf("response_%s_%s.txt", timestamp, reqID[:8]))
// 写入原始内容
if err := os.WriteFile(filename, data, 0644); err != nil {
logError("[reqID:%s] 保存响应到文件失败: %v", reqID, err)
return
}
logDebug("[reqID:%s] 已保存响应到文件: %s", reqID, filename)
}
// 保存完整响应内容以便后续分析
func saveFullResponseForAnalysis(bodyBytes []byte, reqID string) {
analyzeDir := "analysis"
os.MkdirAll(analyzeDir, 0755)
// 创建文件名
filename := filepath.Join(analyzeDir, fmt.Sprintf("failed_response_%s.txt", reqID))
// 写入原始内容
if err := os.WriteFile(filename, bodyBytes, 0644); err != nil {
logError("[reqID:%s] 保存分析文件失败: %v", reqID, err)
return
}
logInfo("[reqID:%s] 已保存完整响应内容到文件: %s请检查分析", reqID, filename)
}
// 获取两个整数中的最小值
func min(a, b int) int {
if a < b {
return a
}
return b
}
// 获取两个整数中的最大值
func max(a, b int64) int64 {
if a > b {
return a
}
return b
}
// 检查JSON是否有效的辅助函数
func isValidJSON(str string) bool {
var js interface{}
return json.Unmarshal([]byte(str), &js) == nil
}