diff --git a/test/ce/server/core/__init__.py b/test/ce/server/core/__init__.py index aeb855632..083bc19e3 100644 --- a/test/ce/server/core/__init__.py +++ b/test/ce/server/core/__init__.py @@ -10,9 +10,9 @@ from .logger import Logger base_logger = Logger(loggername="FDSentry", save_level="channel", log_path="./fd_logs").get_logger() base_logger.setLevel("INFO") from .request_template import TEMPLATES -from .utils import build_request_payload, send_request +from .utils import build_request_payload, get_stream_chunks, send_request -__all__ = ["build_request_payload", "send_request", "TEMPLATES"] +__all__ = ["build_request_payload", "send_request", "TEMPLATES", "get_stream_chunks"] # 检查环境变量是否存在 URL = os.environ.get("URL") diff --git a/test/ce/server/core/utils.py b/test/ce/server/core/utils.py index 6aeeb1fb9..45ef2b780 100644 --- a/test/ce/server/core/utils.py +++ b/test/ce/server/core/utils.py @@ -3,6 +3,8 @@ # @author DDDivano # encoding=utf-8 vi:ts=4:sw=4:expandtab:ft=python +import json + import requests from core import TEMPLATES, base_logger @@ -55,3 +57,28 @@ def send_request(url, payload, timeout=600, stream=False): except requests.exceptions.RequestException as e: base_logger.error(f"❌ 请求失败:{e}") return None + + +def get_stream_chunks(response): + """解析流式返回,生成chunk List[dict]""" + chunks = [] + + if response.status_code == 200: + for line in response.iter_lines(decode_unicode=True): + if line: + if line.startswith("data: "): + line = line[len("data: ") :] + + if line.strip() == "[DONE]": + break + + try: + chunk = json.loads(line) + chunks.append(chunk) + except Exception as e: + base_logger.error(f"解析失败: {e}, 行内容: {line}") + else: + base_logger.error(f"请求失败,状态码: {response.status_code}") + base_logger.error("返回内容:", response.text) + + return chunks diff --git a/test/ce/server/test_seed_usage.py b/test/ce/server/test_seed_usage.py new file mode 100644 index 000000000..49f8ef056 --- /dev/null +++ b/test/ce/server/test_seed_usage.py @@ -0,0 +1,74 @@ +#!/bin/env python3 +# -*- coding: utf-8 -*- +# @author ZhangYulongg +# encoding=utf-8 vi:ts=4:sw=4:expandtab:ft=python + +import json + +from core import TEMPLATE, URL, build_request_payload, get_stream_chunks, send_request + + +def test_seed_stream(): + """测试payload seed参数""" + data = { + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "牛顿的三大运动定律是什么?"}, + ], + "seed": 26, + "max_tokens": 50, + "stream": True, + "stream_options": {"include_usage": True, "continuous_usage_stats": True}, + } + + payload = build_request_payload(TEMPLATE, data) + response_1 = send_request(url=URL, payload=payload, stream=True) + # print(response_1.text) + chunks_1 = get_stream_chunks(response_1) + # print(chunks_1) + # for idx, chunk in enumerate(chunks_1): + # print(f"\nchunk[{idx}]:\n{json.dumps(chunk, indent=2, ensure_ascii=False)}") + resul_1 = "".join([x["choices"][0]["delta"]["content"] for x in chunks_1[:-1]]) + logprobs_1 = [json.dumps(x["choices"][0]["logprobs"]["content"][0], ensure_ascii=False) for x in chunks_1[1:-1]] + # print(resul_1) + # print(logprobs_1, type(logprobs_1[0])) + + response_2 = send_request(url=URL, payload=payload, stream=True) + chunks_2 = get_stream_chunks(response_2) + resul_2 = "".join([x["choices"][0]["delta"]["content"] for x in chunks_2[:-1]]) + logprobs_2 = [json.dumps(x["choices"][0]["logprobs"]["content"][0], ensure_ascii=False) for x in chunks_2[1:-1]] + # print(resul_2) + + assert resul_1 == resul_2, "top_p=0, 固定seed, 两次请求结果不一致" + for idx, (l1, l2) in enumerate(zip(logprobs_1, logprobs_2)): + assert l1 == l2, f"top_p=0, 固定seed, logprobs[{idx}]不一致" + + +def test_usage_stream(): + """测试payload max_tokens参数""" + data = { + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "牛顿的三大运动定律是什么?"}, + ], + "max_tokens": 50, + "stream": True, + "stream_options": {"include_usage": True, "continuous_usage_stats": True}, + "metadata": {"min_tokens": 10}, + } + + payload = build_request_payload(TEMPLATE, data) + response = send_request(url=URL, payload=payload, stream=True) + chunks = get_stream_chunks(response) + # for idx, chunk in enumerate(chunks): + # print(f"\nchunk[{idx}]:\n{json.dumps(chunk, indent=2, ensure_ascii=False)}") + + usage = chunks[-1]["usage"] + total_tokens = usage["completion_tokens"] + usage["prompt_tokens"] + assert data["max_tokens"] >= usage["completion_tokens"], "completion_tokens大于max_tokens" + assert data["metadata"]["min_tokens"] <= usage["completion_tokens"], "completion_tokens小于min_tokens" + assert usage["total_tokens"] == total_tokens, "total_tokens不等于prompt_tokens + completion_tokens" + + +if __name__ == "__main__": + test_seed_stream()