diff --git a/lmdeploy/messages.py b/lmdeploy/messages.py index 742a48008b..5b84fd0f2b 100644 --- a/lmdeploy/messages.py +++ b/lmdeploy/messages.py @@ -301,6 +301,7 @@ class ResponseType(enum.Enum): SESSION_NOT_EXIST = enum.auto() HANDLER_NOT_EXIST = enum.auto() INPUT_LENGTH_ERROR = enum.auto() + INTERNAL_ENGINE_ERROR = enum.auto() @dataclass diff --git a/lmdeploy/serve/async_engine.py b/lmdeploy/serve/async_engine.py index 4cd9965f9c..ff13b79083 100644 --- a/lmdeploy/serve/async_engine.py +++ b/lmdeploy/serve/async_engine.py @@ -13,7 +13,7 @@ from lmdeploy.logger import RequestLogger from lmdeploy.messages import (GenerationConfig, PytorchEngineConfig, Response, - TurbomindEngineConfig) + ResponseType, TurbomindEngineConfig) from lmdeploy.model import MODELS, ChatTemplateConfig, best_match_model from lmdeploy.serve.utils import LogitsMixin, _get_event_loop from lmdeploy.tokenizer import DetokenizeState @@ -46,7 +46,7 @@ class GenOut: history_token_len: int input_token_len: int generate_token_len: int - finish_reason: Optional[Literal['stop', 'length']] = None + finish_reason: Optional[Literal['stop', 'length', 'error']] = None token_ids: List[int] = None logprobs: List[Dict[int, float]] = None @@ -503,7 +503,7 @@ async def generate( gen_config.temperature = 1.0 gen_config.repetition_penalty = 1.0 # set random if it is not set and sequence_start is True - if gen_config.random_seed is None and sequence_start: + elif gen_config.random_seed is None and sequence_start: gen_config.random_seed = random.getrandbits(64) if gen_config.n > 1: logger.ERROR(f"n({gen_config.n}) > 1 hasn't been supported yet. " @@ -551,6 +551,12 @@ async def generate( if sequence_end is True and sequence_start is False: await self.end_session(session_id) else: + + def is_error(status): + return status not in [ + ResponseType.SUCCESS, ResponseType.FINISH + ] + generator = await self.get_generator(False, session_id) async with self.safe_run(session_id): state = DetokenizeState(len(input_ids)) @@ -566,6 +572,9 @@ async def generate( sequence_end=sequence_end, step=self.id2step[str(session_id)]): # decode res + if is_error(outputs.status): + tokens = 0 + break res, tokens = input_ids + outputs.token_ids, outputs.num_token # noqa if len(res) <= state.ids_offset: continue @@ -587,15 +596,24 @@ async def generate( yield GenOut(response, self.id2step[str(session_id)], len(input_ids), tokens, finish_reason, res, logprobs) - - finish_reason = 'length' \ - if tokens >= gen_config.max_new_tokens else 'stop' - # utf-8 char at the end means it's a potential unfinished - # byte sequence - if not response.endswith('�'): - response = '' # avaid returning the last response twice - yield GenOut(response, self.id2step[str(session_id)], - len(input_ids), tokens, finish_reason) + if not is_error(outputs.status): + finish_reason = 'length' \ + if tokens >= gen_config.max_new_tokens else 'stop' + # utf-8 char at the end means it's a potential unfinished + # byte sequence + if not response.endswith('�'): + # avaid returning the last response twice + response = '' + yield GenOut(response, self.id2step[str(session_id)], + len(input_ids), tokens, finish_reason) + else: + yield GenOut( + response='internal error happened', + history_token_len=self.id2step[str(session_id)], + input_token_len=len(input_ids), + generate_token_len=0, + finish_reason='error', + token_ids=[]) # update step self.id2step[str(session_id)] += len(input_ids) + tokens if sequence_end: diff --git a/lmdeploy/turbomind/turbomind.py b/lmdeploy/turbomind/turbomind.py index 8439c4a816..00b419ded1 100644 --- a/lmdeploy/turbomind/turbomind.py +++ b/lmdeploy/turbomind/turbomind.py @@ -358,7 +358,12 @@ def _forward_thread(self, inputs): self.gpu_count) def _func(): - output = self.model_inst.forward(inputs, instance_comm) + try: + output = self.model_inst.forward(inputs, instance_comm) + except Exception as e: + logger.error(f'unhandled exception: {e}') + self.que.put((-1, None)) + return self.que.put((True, output)) self.executor = ThreadPoolExecutor(1) @@ -372,7 +377,12 @@ def _async_forward_thread(self, inputs, que: LifoQueue): self.gpu_count) def _func(): - output = self.model_inst.forward(inputs, instance_comm) + try: + output = self.model_inst.forward(inputs, instance_comm) + except Exception as e: + logger.error(f'unhandled exception: {e}') + que.put((-1, None)) + return que.put((True, output)) self.executor = ThreadPoolExecutor(1) @@ -653,6 +663,12 @@ async def async_stream_infer(self, await asyncio.sleep(0.002) finish, tm_outputs = que.get() + if finish < 0: + yield EngineOutput(status=ResponseType.INTERNAL_ENGINE_ERROR, + token_ids=[], + num_token=0) + self.executor.shutdown() + break outputs = _tm_dict_to_torch_dict(tm_outputs) @@ -766,6 +782,12 @@ def stream_infer(self, self.que.get() finish, tm_outputs = self.que.get() + if finish < 0: + yield EngineOutput(status=ResponseType.INTERNAL_ENGINE_ERROR, + token_ids=[], + num_token=0) + self.executor.shutdown() + break outputs = _tm_dict_to_torch_dict(tm_outputs) @@ -892,7 +914,9 @@ def _broadcast_np(data, dtype, shape=(batch_size, )): # start forward thread self._forward_thread(tm_inputs) - _, tm_outputs = self.que.get() + res, tm_outputs = self.que.get() + if res < 0: + return None outputs = _tm_dict_to_torch_dict(tm_outputs) logits = outputs['logits'] @@ -942,6 +966,8 @@ def get_ppl(self, input_ids: Union[List[int], List[List[int]]]): steps, sequence_start=(i == 0), sequence_end=(i == n_max_iter - 1)) + if _logits is None: + return None _logits = _logits.to(device=device) logits.append(_logits)