mirror of
https://github.com/PaddlePaddle/FastDeploy.git
synced 2025-12-24 13:28:13 +08:00
* feat(fmq): add ZMQ-based FMQ implementation and benchmark tools * move FMQ_CONFIG_JSON to envs * fix top_p_candidates (#5400) Co-authored-by: freeliuzc <lzc842650834@gmail.com> * [RL] Support Rollout Routing Replay (#5321) * [RL] Support Rollout Routing Replay * add routing indices cache * fix config bug and moe forward bug * R3 Support GLM * support eb4.5 * fix merge bug * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * add routing replay ci * support glm topk * support orther top_k * fix ci bug * pre-commit * only support chatcmpl --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Yuanle Liu <yuanlehome@163.com> * [Bug fix] Fix the multi-input accuracy issue in the pooling model. (#5374) * fix multi-inputs * fix threshold * fix threshold * fix * [BugFix]remove _execute_empty_input (#5396) * Revert "[RL] Support Rollout Routing Replay (#5321)" (#5402) This reverts commit96d2d4877b. * [New][RL] Support Rollout Routing Replay (#5405) * [RL] Support Rollout Routing Replay * add routing indices cache * fix config bug and moe forward bug * R3 Support GLM * support eb4.5 * fix merge bug * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Apply suggestion from @Copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * add routing replay ci * support glm topk * support orther top_k * fix ci bug * pre-commit * only support chatcmpl * Revert "Revert "[RL] Support Rollout Routing Replay (#5321)" (#5402)" This reverts commitc45e064f3d. * Fix XPU and NPU bug --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Yuanle Liu <yuanlehome@163.com> * bf16 deepseek (#5379) * fix deepseek (#5410) * Update tests/inter_communicator/test_fmq_factory.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update benchmarks/benchmark_fmq.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update fastdeploy/inter_communicator/fmq.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: GoldPancake <56388518+Deleter-D@users.noreply.github.com> Co-authored-by: freeliuzc <lzc842650834@gmail.com> Co-authored-by: RAM <gstian5555@outlook.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Yuanle Liu <yuanlehome@163.com> Co-authored-by: lizexu123 <39205361+lizexu123@users.noreply.github.com> Co-authored-by: 周周周 <39978853+zhoutianzi666@users.noreply.github.com> Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com> Co-authored-by: bukejiyu <52310069+bukejiyu@users.noreply.github.com>
93 lines
2.7 KiB
Python
93 lines
2.7 KiB
Python
"""
|
|
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import unittest
|
|
|
|
from fastdeploy.inter_communicator.fmq import FMQ, Message
|
|
|
|
# Prepare environment config for testing
|
|
cfg = {
|
|
"ipc_root": "/dev/shm",
|
|
"io_threads": 1,
|
|
"copy": False,
|
|
"endpoints": {
|
|
"test_queue": {"protocol": "ipc", "address": "/dev/shm/fmq_test_queue.ipc", "io_threads": 1, "copy": False},
|
|
"test_topic": {"protocol": "ipc", "address": "/dev/shm/fmq_test_topic.ipc", "io_threads": 1, "copy": False},
|
|
},
|
|
}
|
|
os.environ["FMQ_CONFIG_JSON"] = json.dumps(cfg)
|
|
|
|
|
|
class TestFMQ(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self.fmq = FMQ()
|
|
|
|
def test_queue_send_receive(self):
|
|
async def run_test():
|
|
producer = self.fmq.queue("test_queue", role="producer")
|
|
consumer = self.fmq.queue("test_queue", role="consumer")
|
|
|
|
test_data = b"hello world"
|
|
await producer.put(test_data)
|
|
msg = await consumer.get(timeout=1000)
|
|
|
|
self.assertIsNotNone(msg)
|
|
self.assertEqual(msg.payload, test_data)
|
|
|
|
asyncio.run(run_test())
|
|
|
|
def test_queue_large_shm_transfer(self):
|
|
async def run_test():
|
|
producer = self.fmq.queue("test_queue", role="producer")
|
|
consumer = self.fmq.queue("test_queue", role="consumer")
|
|
|
|
large_data = b"x" * (2 * 1024 * 1024) # > 1MB
|
|
await producer.put(large_data)
|
|
msg = await consumer.get(timeout=1000)
|
|
|
|
self.assertIsNotNone(msg)
|
|
self.assertEqual(msg.payload, large_data)
|
|
self.assertIsNotNone(msg.descriptor)
|
|
|
|
asyncio.run(run_test())
|
|
|
|
def test_topic_pub_sub(self):
|
|
received = []
|
|
|
|
async def run_test():
|
|
topic = self.fmq.topic("test_topic")
|
|
|
|
async def callback(msg: Message):
|
|
received.append(msg.payload)
|
|
|
|
await topic.sub(callback)
|
|
await asyncio.sleep(0.1) # allow SUB to connect
|
|
|
|
await topic.pub("hello")
|
|
await asyncio.sleep(0.2)
|
|
|
|
self.assertIn("hello", received)
|
|
|
|
asyncio.run(run_test())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|