Files
FastDeploy/tests/inter_communicator/test_fmq.py
SunLei 5fb93d84f5 [Feature] [Benchmark]: add ZMQ-based FMQ implementation and benchmark tools (#5418)
* feat(fmq): add ZMQ-based FMQ implementation and benchmark tools

* move FMQ_CONFIG_JSON to envs

* fix top_p_candidates (#5400)

Co-authored-by: freeliuzc <lzc842650834@gmail.com>

* [RL] Support Rollout Routing Replay (#5321)

* [RL] Support Rollout Routing Replay

* add routing indices cache

* fix config bug and moe forward bug

* R3 Support GLM

* support eb4.5

* fix merge bug

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* add routing replay ci

* support glm topk

* support orther top_k

* fix ci bug

* pre-commit

* only support chatcmpl

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Yuanle Liu <yuanlehome@163.com>

* [Bug fix] Fix the multi-input accuracy issue in the pooling model. (#5374)

* fix multi-inputs

* fix threshold

* fix threshold

* fix

* [BugFix]remove _execute_empty_input (#5396)

* Revert "[RL] Support Rollout Routing Replay (#5321)" (#5402)

This reverts commit 96d2d4877b.

* [New][RL] Support Rollout Routing Replay (#5405)

* [RL] Support Rollout Routing Replay

* add routing indices cache

* fix config bug and moe forward bug

* R3 Support GLM

* support eb4.5

* fix merge bug

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Apply suggestion from @Copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* add routing replay ci

* support glm topk

* support orther top_k

* fix ci bug

* pre-commit

* only support chatcmpl

* Revert "Revert "[RL] Support Rollout Routing Replay (#5321)" (#5402)"

This reverts commit c45e064f3d.

* Fix XPU and NPU bug

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Yuanle Liu <yuanlehome@163.com>

* bf16 deepseek (#5379)

* fix deepseek (#5410)

* Update tests/inter_communicator/test_fmq_factory.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update benchmarks/benchmark_fmq.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update fastdeploy/inter_communicator/fmq.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: GoldPancake <56388518+Deleter-D@users.noreply.github.com>
Co-authored-by: freeliuzc <lzc842650834@gmail.com>
Co-authored-by: RAM <gstian5555@outlook.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Yuanle Liu <yuanlehome@163.com>
Co-authored-by: lizexu123 <39205361+lizexu123@users.noreply.github.com>
Co-authored-by: 周周周 <39978853+zhoutianzi666@users.noreply.github.com>
Co-authored-by: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com>
Co-authored-by: bukejiyu <52310069+bukejiyu@users.noreply.github.com>
2025-12-08 22:04:49 +08:00

93 lines
2.7 KiB
Python

"""
# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
import asyncio
import json
import os
import unittest
from fastdeploy.inter_communicator.fmq import FMQ, Message
# Prepare environment config for testing
cfg = {
"ipc_root": "/dev/shm",
"io_threads": 1,
"copy": False,
"endpoints": {
"test_queue": {"protocol": "ipc", "address": "/dev/shm/fmq_test_queue.ipc", "io_threads": 1, "copy": False},
"test_topic": {"protocol": "ipc", "address": "/dev/shm/fmq_test_topic.ipc", "io_threads": 1, "copy": False},
},
}
os.environ["FMQ_CONFIG_JSON"] = json.dumps(cfg)
class TestFMQ(unittest.TestCase):
def setUp(self):
self.fmq = FMQ()
def test_queue_send_receive(self):
async def run_test():
producer = self.fmq.queue("test_queue", role="producer")
consumer = self.fmq.queue("test_queue", role="consumer")
test_data = b"hello world"
await producer.put(test_data)
msg = await consumer.get(timeout=1000)
self.assertIsNotNone(msg)
self.assertEqual(msg.payload, test_data)
asyncio.run(run_test())
def test_queue_large_shm_transfer(self):
async def run_test():
producer = self.fmq.queue("test_queue", role="producer")
consumer = self.fmq.queue("test_queue", role="consumer")
large_data = b"x" * (2 * 1024 * 1024) # > 1MB
await producer.put(large_data)
msg = await consumer.get(timeout=1000)
self.assertIsNotNone(msg)
self.assertEqual(msg.payload, large_data)
self.assertIsNotNone(msg.descriptor)
asyncio.run(run_test())
def test_topic_pub_sub(self):
received = []
async def run_test():
topic = self.fmq.topic("test_topic")
async def callback(msg: Message):
received.append(msg.payload)
await topic.sub(callback)
await asyncio.sleep(0.1) # allow SUB to connect
await topic.pub("hello")
await asyncio.sleep(0.2)
self.assertIn("hello", received)
asyncio.run(run_test())
if __name__ == "__main__":
unittest.main()