Files
chatgpt-dingtalk/pkg/llm/stream.go
二丫讲梵 f7326b6797 增加卡片交互流式输出的能力 (#315)
* 将ai交互切换为go-openai

* add stream

*  feat(stream): 优化流式响应机制,实现实时卡片更新

- 将固定1.5秒更新改为基于300ms最小间隔的实时更新策略
- 新增内容缓冲区机制,避免过于频繁的API调用
- 改进流式中断处理,保护已接收的内容不丢失

🔧 chore(llm): 优化HTTP客户端配置

- 增加连接池设置(MaxIdleConns: 100, MaxIdleConnsPerHost: 10)
- 设置空闲连接超时时间为90秒
- 添加HTTP/2禁用选项注释,用于解决流式错误问题

📝 docs(stream): 更新流式更新策略文档

- 详细说明实时流式更新机制和缓冲策略
- 新增HTTP/2流式错误的故障排除指南
- 更新配置参数说明和建议范围

🐛 fix(stream): 修复流式中断时的内容丢失问题

- 在流式接收中断时,确保已接收的内容不会丢失
- 改进错误处理逻辑,区分有内容和无内容的情况

* modify ai
2025-12-11 18:22:35 +08:00

154 lines
3.1 KiB
Go

package llm
import (
"errors"
"io"
"github.com/pandodao/tokenizer-go"
openai "github.com/sashabaranov/go-openai"
"github.com/eryajf/chatgpt-dingtalk/public"
)
// ChatWithContextStream 流式对话,返回一个channel用于接收流式内容
func (c *Client) ChatWithContextStream(question string) (<-chan string, error) {
if tokenizer.MustCalToken(question) > c.maxQuestionLen {
return nil, ErrOverMaxQuestionLength
}
// 构建消息列表
messages := c.buildMessages(question)
model := public.Config.Model
userId := c.userId
if public.Config.AzureOn {
userId = ""
}
req := openai.ChatCompletionRequest{
Model: model,
Messages: messages,
MaxTokens: c.maxAnswerLen,
Temperature: 0.6,
User: userId,
Stream: true,
}
contentCh := make(chan string, 10)
go func() {
defer close(contentCh)
stream, err := c.client.CreateChatCompletionStream(c.ctx, req)
if err != nil {
contentCh <- err.Error()
return
}
defer stream.Close()
fullAnswer := ""
for {
response, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
if fullAnswer == "" {
contentCh <- err.Error()
}
return
}
if len(response.Choices) > 0 {
delta := response.Choices[0].Delta.Content
if delta != "" {
fullAnswer += delta
contentCh <- delta
}
}
}
// 保存对话上下文
c.ChatContext.old = append(c.ChatContext.old,
conversation{Role: c.ChatContext.humanRole, Prompt: question},
conversation{Role: c.ChatContext.aiRole, Prompt: fullAnswer},
)
c.ChatContext.seqTimes++
}()
return contentCh, nil
}
// buildMessages 构建消息列表
func (c *Client) buildMessages(question string) []openai.ChatCompletionMessage {
var messages []openai.ChatCompletionMessage
// 添加历史对话
for _, v := range c.ChatContext.old {
role := "assistant"
if v.Role == c.ChatContext.humanRole {
role = "user"
}
messages = append(messages, openai.ChatCompletionMessage{
Role: role,
Content: v.Prompt,
})
}
// 添加当前问题
messages = append(messages, openai.ChatCompletionMessage{
Role: "user",
Content: question,
})
return messages
}
// SingleQaStream 单聊流式版本
func SingleQaStream(question, userId string) (<-chan string, func(), error) {
client := NewClient(userId)
contentCh := make(chan string, 10)
done := make(chan struct{})
go func() {
defer close(contentCh)
defer close(done)
stream, err := client.ChatWithContextStream(question)
if err != nil {
contentCh <- err.Error()
client.Close()
return
}
for content := range stream {
contentCh <- content
}
client.Close()
}()
cleanup := func() {
<-done
}
return contentCh, cleanup, nil
}
// ContextQaStream 串聊流式版本
func ContextQaStream(question, userId string) (*Client, <-chan string, error) {
client := NewClient(userId)
if public.UserService.GetUserSessionContext(userId) != "" {
_ = client.ChatContext.LoadConversation(userId)
}
stream, err := client.ChatWithContextStream(question)
if err != nil {
client.Close()
return nil, nil, err
}
return client, stream, nil
}