Files
FastDeploy/fastdeploy/model_executor/models/model_base.py
Yuanle Liu b455fd39f3
Some checks failed
CE Compile Job / ce_job_pre_check (push) Has been cancelled
CE Compile Job / print_ce_job_pre_check_outputs (push) Has been cancelled
CE Compile Job / FD-Clone-Linux (push) Has been cancelled
CE Compile Job / Show Code Archive Output (push) Has been cancelled
CE Compile Job / BUILD_SM8090 (push) Has been cancelled
CE Compile Job / BUILD_SM8689 (push) Has been cancelled
CE Compile Job / CE_UPLOAD (push) Has been cancelled
Deploy GitHub Pages / deploy (push) Has been cancelled
register_model_class compatible with plugins (#4236)
2025-09-24 11:17:12 +08:00

394 lines
14 KiB
Python

"""
# Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,31 +11,265 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
import importlib
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from functools import lru_cache
from typing import Dict, List, Optional, Tuple, Type, Union
import numpy as np
import paddle
from paddle import nn
from paddleformers.transformers import PretrainedModel
from fastdeploy.config import (
ModelConfig,
iter_architecture_defaults,
try_match_architecture_defaults,
)
from fastdeploy.model_executor.models.interfaces_base import (
determine_model_category,
get_default_pooling_type,
is_multimodal_model,
is_pooling_model,
is_text_generation_model,
)
class ModelCategory(Enum):
TEXT_GENERATION = "text_generation"
MULTIMODAL = "multimodal"
EMBEDDING = "embedding"
@dataclass(frozen=True)
class ModelInfo:
architecture: str
category: ModelCategory
is_text_generation: bool
is_multimodal: bool
is_pooling: bool
module_path: str
default_pooling_type: str
@staticmethod
def from_model_cls(model_cls: Type[nn.Layer], module_path: str = "") -> "ModelInfo":
return ModelInfo(
architecture=model_cls.__name__,
category=determine_model_category(model_cls.__name__),
is_text_generation=is_text_generation_model(model_cls),
is_multimodal=is_multimodal_model(model_cls.__name__),
is_pooling=is_pooling_model(model_cls),
default_pooling_type=get_default_pooling_type(model_cls),
module_path=module_path,
)
class BaseRegisteredModel(ABC):
"""Base class for registered models"""
@abstractmethod
def load_model_cls(self) -> Type[nn.Layer]:
raise NotImplementedError
@abstractmethod
def inspect_model_cls(self) -> ModelInfo:
raise NotImplementedError
@dataclass(frozen=True)
class LazyRegisteredModel(BaseRegisteredModel):
"""Lazy loaded model"""
module_name: str
module_path: str
class_name: str
def load_model_cls(self) -> Type[nn.Layer]:
try:
full_module = f"{self.module_path}.{self.module_name}"
module = importlib.import_module(full_module)
return getattr(module, self.class_name)
except (ImportError, AttributeError) as e:
raise ImportError(f"Failed to load {self.class_name}: {e}")
def inspect_model_cls(self) -> ModelInfo:
model_cls = self.load_model_cls()
return ModelInfo.from_model_cls(model_cls, self.module_name)
@lru_cache(maxsize=128)
def _try_inspect_model_cls(
model_arch: str,
model: BaseRegisteredModel,
) -> Optional[ModelInfo]:
try:
return model.inspect_model_cls()
except Exception:
print("Error in inspecting model architecture '%s'", model_arch)
return None
class ModelRegistry:
_arch_to_model_cls = {}
_arch_to_pretrained_model_cls = {}
_enhanced_models: Dict[str, Dict] = {}
def __init__(self):
self.models: Dict[str, BaseRegisteredModel] = {}
self.pretrained_models: Dict[str, Type[PretrainedModel]] = {}
self._registered_models: Dict[str, BaseRegisteredModel] = {}
self._register_enhanced_models()
def _register_enhanced_models(self):
for arch, model_info in self._enhanced_models.items():
model = LazyRegisteredModel(
module_name=model_info["module_name"],
module_path=model_info["module_path"],
class_name=model_info["class_name"],
)
self.models[arch] = model
self._registered_models[arch] = model
@lru_cache(maxsize=128)
def _try_load_model_cls(self, architecture: str) -> Optional[Type[nn.Layer]]:
if architecture not in self.models:
return None
try:
return self.models[architecture].load_model_cls()
except Exception as e:
print(f"Failed to load model {architecture}: {e}")
return None
@lru_cache(maxsize=128)
def _try_inspect_model_cls(self, model_arch: str) -> Optional[ModelInfo]:
if model_arch not in self.models:
return None
try:
return self.models[model_arch].inspect_model_cls()
except Exception as e:
print(f"Failed to inspect model {model_arch}: {e}")
return None
def _normalize_arch(self, architecture: str, model_config: ModelConfig) -> str:
if architecture in self.models:
return architecture
match = try_match_architecture_defaults(
architecture,
runner_type=getattr(model_config, "runner_type", None),
convert_type=getattr(model_config, "convert_type", None),
)
if match:
suffix, _ = match
for repl_suffix, _ in iter_architecture_defaults():
base_arch = architecture.replace(suffix, repl_suffix)
if base_arch in self.models:
return base_arch
return architecture
def _raise_for_unsupported(self, architectures: list[str]):
all_supported_archs = self.get_supported_archs()
if any(arch in all_supported_archs for arch in architectures):
raise ValueError(
f"Model architectures {architectures} failed to be inspected. "
"Please check the logs for more details."
)
raise ValueError(
f"Model architectures {architectures} are not supported for now. "
f"Supported architectures: {all_supported_archs}"
)
def inspect_model_cls(
self, architectures: Union[str, List[str]], model_config: ModelConfig = None
) -> Tuple[ModelInfo, str]:
if isinstance(architectures, str):
architectures = [architectures]
if not architectures:
raise ValueError("No model architectures are specified")
for arch in architectures:
normalized_arch = self._normalize_arch(arch, model_config)
model_info = self._try_inspect_model_cls(normalized_arch)
if model_info is not None:
return (model_info, arch)
return self._raise_for_unsupported(architectures)
@classmethod
def register_model_class(
cls,
model_class=None,
*,
architecture: str = None,
module_name: str = None,
module_path: str = "fastdeploy.model_executor.models",
category: Union[ModelCategory, List[ModelCategory]] = ModelCategory.TEXT_GENERATION,
primary_use: ModelCategory = None,
):
"""
Enhanced model class registration supporting both traditional and decorator-style registration.
Can be used as:
1. Traditional decorator: @ModelRegistry.register_model_class
2. Enhanced decorator with metadata: @ModelRegistry.register_model_class(architecture="...", module_path="...")
Args:
model_class: The model class (when used as simple decorator)
architecture (str): Unique identifier for the model architecture
module_name (str): Relative path to the module containing the model
module_path (str): Absolute path to the module containing the model
category: Model category or list of categories
primary_use: Primary category for multi-category models
"""
def _register(model_cls):
# Traditional registration for ModelForCasualLM subclasses
if issubclass(model_cls, ModelForCasualLM) and model_cls is not ModelForCasualLM:
cls._arch_to_model_cls[model_cls.name()] = model_cls
# Enhanced decorator-style registration
if architecture and module_name:
categories = category if isinstance(category, list) else [category]
# Register main entry
arch_key = architecture
cls._enhanced_models[arch_key] = {
"class_name": model_cls.__name__,
"module_name": module_name,
"module_path": module_path,
"category": primary_use or categories[0],
"class": model_cls,
}
# Register category-specific entries for multi-category models
if len(categories) > 1:
for cat in categories:
key = f"{arch_key}_{cat.value}"
cls._enhanced_models[key] = {
"class_name": model_cls.__name__,
"module_name": module_name,
"module_path": module_path,
"category": cat,
"primary_use": primary_use or categories[0],
"class": model_cls,
}
return model_cls
if model_class is not None:
return _register(model_class)
else:
return _register
@classmethod
def register_pretrained_model(cls, pretrained_model):
"""register pretrained model class"""
if (
issubclass(pretrained_model, PretrainedModel)
and pretrained_model is not PretrainedModel
and hasattr(pretrained_model, "arch_name")
):
cls._arch_to_pretrained_model_cls[pretrained_model.arch_name()] = pretrained_model
return pretrained_model
@classmethod
def get_class(cls, name):
"""get model class"""
if name not in cls._arch_to_model_cls:
raise ValueError(f"Model '{name}' is not registered!")
return cls._arch_to_model_cls[name]
@classmethod
def get_pretrain_cls(cls, architectures: str):
"""get_pretrain_cls"""
return cls._arch_to_pretrained_model_cls[architectures]
@classmethod
def get_supported_archs(cls):
traditional_archs = list(cls._arch_to_model_cls.keys())
enhanced_archs = list(cls._enhanced_models.keys())
return traditional_archs + enhanced_archs
def resolve_model_cls(self, architectures: Union[str, List[str]]) -> Tuple[Type[nn.Layer], str]:
"""Resolve model class"""
if isinstance(architectures, str):
architectures = [architectures]
for arch in architectures:
model_cls = self._try_load_model_cls(arch)
if model_cls is not None:
return model_cls, arch
raise ValueError(f"Cannot find supported model: {architectures}")
def is_multimodal_model(self, architectures: Union[str, List[str]], model_config: ModelConfig = None) -> bool:
"""Check if it's a multimodal model"""
if isinstance(architectures, str):
architectures = [architectures]
for arch in architectures:
model_info = self._try_inspect_model_cls(arch)
if model_info is not None:
return model_info.is_multimodal
return False
def is_text_generation_model(self, architectures: Union[str, List[str]], model_config: ModelConfig = None) -> bool:
"""Check if it's a text generation model"""
if isinstance(architectures, str):
architectures = [architectures]
for arch in architectures:
model_info = self._try_inspect_model_cls(arch)
if model_info is not None:
return model_info.is_text_generation
return False
def is_pooling_model(self, architectures: Union[str, List[str]], model_config: ModelConfig = None) -> bool:
"""Check if it's a pooling model"""
if isinstance(architectures, str):
architectures = [architectures]
for arch in architectures:
model_info = self._try_inspect_model_cls(arch)
if model_info is not None:
return model_info.is_pooling
return False
class ModelForCasualLM(nn.Layer, ABC):
"""
Base class for LM
"""
def __init__(self, configs):
"""
Args:
configs (dict): Configurations including parameters such as max_dec_len, min_dec_len, decode_strategy,
vocab_size, use_topp_sampling, etc.
"""
super(ModelForCasualLM, self).__init__()
self.fd_config = configs
@abstractmethod
def set_state_dict(self, state_dict: Dict[str, Union[np.ndarray, paddle.Tensor]]):
"""
Load model parameters from a given state dictionary.
Args:
state_dict (dict[str, np.ndarray | paddle.Tensor]):
A dictionary containing model parameters, where keys are parameter names
and values are NumPy arrays or PaddlePaddle tensors.
"""
raise NotImplementedError
@abstractmethod
def forward(
self,
input_ids=None,
pos_emb=None,
**model_kwargs,
):
"""
Defines the forward pass of the model for generating text.
Args:
input_ids (Tensor, optional): The input token ids to the model.
pos_emb (Tensor, optional): position Embeddings for model.
**model_kwargs: Additional keyword arguments for the model.
Returns:
Tensor or list of Tensors: Generated tokens or decoded outputs.
"""
raise NotImplementedError
@abstractmethod
def compute_logits(self, hidden_state, **logits_prosessor_kwargs):
raise NotImplementedError
@classmethod
@abstractmethod
def name(self):
raise NotImplementedError