From 7adaf5ac844b0c13df06d4deb7ff37c2aeede39c Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Thu, 19 Sep 2024 17:30:45 +0800 Subject: [PATCH 1/9] log error before throw exception --- src/turbomind/utils/cuda_utils.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/turbomind/utils/cuda_utils.h b/src/turbomind/utils/cuda_utils.h index 2148fcc164..0335b5b245 100644 --- a/src/turbomind/utils/cuda_utils.h +++ b/src/turbomind/utils/cuda_utils.h @@ -120,6 +120,7 @@ template void check(T result, char const* const func, const char* const file, int const line) { if (result) { + TM_LOG_ERROR("[TM][ERROR] CUDA runtime error: %s, %s:%d", _cudaGetErrorEnum(result), file, line); throw std::runtime_error(std::string("[TM][ERROR] CUDA runtime error: ") + (_cudaGetErrorEnum(result)) + " " + file + ":" + std::to_string(line) + " \n"); } @@ -138,6 +139,7 @@ inline void syncAndCheck(const char* const file, int const line) cudaDeviceSynchronize(); cudaError_t result = cudaGetLastError(); if (result) { + TM_LOG_ERROR("[TM][ERROR] CUDA runtime error: %s, %s:%d", _cudaGetErrorEnum(result), file, line); throw std::runtime_error(std::string("[TM][ERROR] CUDA runtime error: ") + (_cudaGetErrorEnum(result)) + " " + file + ":" + std::to_string(line) + " \n"); } @@ -149,6 +151,7 @@ inline void syncAndCheck(const char* const file, int const line) cudaDeviceSynchronize(); cudaError_t result = cudaGetLastError(); if (result) { + TM_LOG_ERROR("[TM][ERROR] CUDA runtime error: %s, %s:%d", _cudaGetErrorEnum(result), file, line); throw std::runtime_error(std::string("[TM][ERROR] CUDA runtime error: ") + (_cudaGetErrorEnum(result)) + " " + file + ":" + std::to_string(line) + " \n"); } @@ -200,6 +203,7 @@ void check_abs_mean_val(const T* result, const int size); [[noreturn]] inline void throwRuntimeError(const char* const file, int const line, std::string const& info = "") { + TM_LOG_ERROR("[TM][ERROR] %s Assertion fail: %s:%d", info, file, line); throw std::runtime_error(std::string("[TM][ERROR] ") + info + " Assertion fail: " + file + ":" + std::to_string(line) + " \n"); } From 8c1ad0217596262256cd0d405507f8619ae9e860 Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Sun, 22 Sep 2024 18:23:57 +0800 Subject: [PATCH 2/9] add try-exception in forward thread --- lmdeploy/messages.py | 1 + lmdeploy/serve/async_engine.py | 2 +- lmdeploy/turbomind/turbomind.py | 28 +++++++++++++++++++++++++--- 3 files changed, 27 insertions(+), 4 deletions(-) diff --git a/lmdeploy/messages.py b/lmdeploy/messages.py index 742a48008b..5b84fd0f2b 100644 --- a/lmdeploy/messages.py +++ b/lmdeploy/messages.py @@ -301,6 +301,7 @@ class ResponseType(enum.Enum): SESSION_NOT_EXIST = enum.auto() HANDLER_NOT_EXIST = enum.auto() INPUT_LENGTH_ERROR = enum.auto() + INTERNAL_ENGINE_ERROR = enum.auto() @dataclass diff --git a/lmdeploy/serve/async_engine.py b/lmdeploy/serve/async_engine.py index 4cd9965f9c..214eb3cbdd 100644 --- a/lmdeploy/serve/async_engine.py +++ b/lmdeploy/serve/async_engine.py @@ -503,7 +503,7 @@ async def generate( gen_config.temperature = 1.0 gen_config.repetition_penalty = 1.0 # set random if it is not set and sequence_start is True - if gen_config.random_seed is None and sequence_start: + elif gen_config.random_seed is None and sequence_start: gen_config.random_seed = random.getrandbits(64) if gen_config.n > 1: logger.ERROR(f"n({gen_config.n}) > 1 hasn't been supported yet. " diff --git a/lmdeploy/turbomind/turbomind.py b/lmdeploy/turbomind/turbomind.py index 8439c4a816..632f9dfca1 100644 --- a/lmdeploy/turbomind/turbomind.py +++ b/lmdeploy/turbomind/turbomind.py @@ -358,7 +358,12 @@ def _forward_thread(self, inputs): self.gpu_count) def _func(): - output = self.model_inst.forward(inputs, instance_comm) + try: + output = self.model_inst.forward(inputs, instance_comm) + except Exception as e: + logger.error(f'Exception happened: {e}') + self.que.put((-1, None)) + return self.que.put((True, output)) self.executor = ThreadPoolExecutor(1) @@ -372,7 +377,12 @@ def _async_forward_thread(self, inputs, que: LifoQueue): self.gpu_count) def _func(): - output = self.model_inst.forward(inputs, instance_comm) + try: + output = self.model_inst.forward(inputs, instance_comm) + except Exception as e: + logger.error(f'Exception happened: {e}') + self.que.put((-1, None)) + return que.put((True, output)) self.executor = ThreadPoolExecutor(1) @@ -653,6 +663,10 @@ async def async_stream_infer(self, await asyncio.sleep(0.002) finish, tm_outputs = que.get() + if finish < 0: + yield EngineOutput() + self.executor.shutdown() + break outputs = _tm_dict_to_torch_dict(tm_outputs) @@ -766,6 +780,10 @@ def stream_infer(self, self.que.get() finish, tm_outputs = self.que.get() + if finish < 0: + yield EngineOutput() + self.executor.shutdown() + break outputs = _tm_dict_to_torch_dict(tm_outputs) @@ -892,7 +910,9 @@ def _broadcast_np(data, dtype, shape=(batch_size, )): # start forward thread self._forward_thread(tm_inputs) - _, tm_outputs = self.que.get() + res, tm_outputs = self.que.get() + if res < 0: + return None outputs = _tm_dict_to_torch_dict(tm_outputs) logits = outputs['logits'] @@ -942,6 +962,8 @@ def get_ppl(self, input_ids: Union[List[int], List[List[int]]]): steps, sequence_start=(i == 0), sequence_end=(i == n_max_iter - 1)) + if _logits is None: + return None _logits = _logits.to(device=device) logits.append(_logits) From 23b724ee19e6f2ed65d7aa189f557100ab432057 Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Mon, 23 Sep 2024 19:47:48 +0800 Subject: [PATCH 3/9] update --- lmdeploy/serve/async_engine.py | 33 ++++++++++++++++++++++----------- lmdeploy/turbomind/turbomind.py | 8 ++++++-- 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/lmdeploy/serve/async_engine.py b/lmdeploy/serve/async_engine.py index 214eb3cbdd..53a37db77c 100644 --- a/lmdeploy/serve/async_engine.py +++ b/lmdeploy/serve/async_engine.py @@ -13,7 +13,7 @@ from lmdeploy.logger import RequestLogger from lmdeploy.messages import (GenerationConfig, PytorchEngineConfig, Response, - TurbomindEngineConfig) + ResponseType, TurbomindEngineConfig) from lmdeploy.model import MODELS, ChatTemplateConfig, best_match_model from lmdeploy.serve.utils import LogitsMixin, _get_event_loop from lmdeploy.tokenizer import DetokenizeState @@ -46,7 +46,7 @@ class GenOut: history_token_len: int input_token_len: int generate_token_len: int - finish_reason: Optional[Literal['stop', 'length']] = None + finish_reason: Optional[Literal['stop', 'length', 'error']] = None token_ids: List[int] = None logprobs: List[Dict[int, float]] = None @@ -566,6 +566,8 @@ async def generate( sequence_end=sequence_end, step=self.id2step[str(session_id)]): # decode res + if outputs.status > ResponseType.FINISH: + break res, tokens = input_ids + outputs.token_ids, outputs.num_token # noqa if len(res) <= state.ids_offset: continue @@ -587,15 +589,24 @@ async def generate( yield GenOut(response, self.id2step[str(session_id)], len(input_ids), tokens, finish_reason, res, logprobs) - - finish_reason = 'length' \ - if tokens >= gen_config.max_new_tokens else 'stop' - # utf-8 char at the end means it's a potential unfinished - # byte sequence - if not response.endswith('�'): - response = '' # avaid returning the last response twice - yield GenOut(response, self.id2step[str(session_id)], - len(input_ids), tokens, finish_reason) + if outputs.status > ResponseType.FINISH: + yield GenOut( + response='internal error happened', + history_token_len=self.id2step[str(session_id)], + input_token_len=len(input_ids), + generate_token_len=0, + finish_reason='error', + token_ids=[]) + else: + finish_reason = 'length' \ + if tokens >= gen_config.max_new_tokens else 'stop' + # utf-8 char at the end means it's a potential unfinished + # byte sequence + if not response.endswith('�'): + # avaid returning the last response twice + response = '' + yield GenOut(response, self.id2step[str(session_id)], + len(input_ids), tokens, finish_reason) # update step self.id2step[str(session_id)] += len(input_ids) + tokens if sequence_end: diff --git a/lmdeploy/turbomind/turbomind.py b/lmdeploy/turbomind/turbomind.py index 632f9dfca1..07661d9b81 100644 --- a/lmdeploy/turbomind/turbomind.py +++ b/lmdeploy/turbomind/turbomind.py @@ -664,7 +664,9 @@ async def async_stream_infer(self, finish, tm_outputs = que.get() if finish < 0: - yield EngineOutput() + yield EngineOutput(status=ResponseType.INTERNAL_ENGINE_ERROR, + token_ids=[], + num_token=0) self.executor.shutdown() break @@ -781,7 +783,9 @@ def stream_infer(self, finish, tm_outputs = self.que.get() if finish < 0: - yield EngineOutput() + yield EngineOutput(status=ResponseType.INTERNAL_ENGINE_ERROR, + token_ids=[], + num_token=0) self.executor.shutdown() break From 468446a14f74a826cd46cbdc7d23bf44b0efd494 Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Tue, 24 Sep 2024 11:24:20 +0800 Subject: [PATCH 4/9] update --- lmdeploy/messages.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lmdeploy/messages.py b/lmdeploy/messages.py index 5b84fd0f2b..54441ebe8b 100644 --- a/lmdeploy/messages.py +++ b/lmdeploy/messages.py @@ -303,6 +303,11 @@ class ResponseType(enum.Enum): INPUT_LENGTH_ERROR = enum.auto() INTERNAL_ENGINE_ERROR = enum.auto() + def __lt__(self, other): + if self.__class__ is other.__class__: + return self.value < other.value + return NotImplemented + @dataclass class Response: From e66731a5b3485efd89998ed6157224d2d1b8df9f Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Tue, 24 Sep 2024 14:14:45 +0800 Subject: [PATCH 5/9] update --- lmdeploy/serve/async_engine.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/lmdeploy/serve/async_engine.py b/lmdeploy/serve/async_engine.py index 53a37db77c..329b9aee70 100644 --- a/lmdeploy/serve/async_engine.py +++ b/lmdeploy/serve/async_engine.py @@ -589,15 +589,7 @@ async def generate( yield GenOut(response, self.id2step[str(session_id)], len(input_ids), tokens, finish_reason, res, logprobs) - if outputs.status > ResponseType.FINISH: - yield GenOut( - response='internal error happened', - history_token_len=self.id2step[str(session_id)], - input_token_len=len(input_ids), - generate_token_len=0, - finish_reason='error', - token_ids=[]) - else: + if outputs.status <= ResponseType.FINISH: finish_reason = 'length' \ if tokens >= gen_config.max_new_tokens else 'stop' # utf-8 char at the end means it's a potential unfinished @@ -607,6 +599,14 @@ async def generate( response = '' yield GenOut(response, self.id2step[str(session_id)], len(input_ids), tokens, finish_reason) + else: + yield GenOut( + response='internal error happened', + history_token_len=self.id2step[str(session_id)], + input_token_len=len(input_ids), + generate_token_len=0, + finish_reason='error', + token_ids=[]) # update step self.id2step[str(session_id)] += len(input_ids) + tokens if sequence_end: From 81530eeba89b24a6345a9fb9259aefd4e71fac96 Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Tue, 24 Sep 2024 14:15:31 +0800 Subject: [PATCH 6/9] miss messages --- lmdeploy/messages.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/lmdeploy/messages.py b/lmdeploy/messages.py index 54441ebe8b..473d307bb6 100644 --- a/lmdeploy/messages.py +++ b/lmdeploy/messages.py @@ -308,6 +308,31 @@ def __lt__(self, other): return self.value < other.value return NotImplemented + def __le__(self, other): + if self.__class__ is other.__class__: + return self.value <= other.value + return NotImplemented + + def __eq__(self, other): + if self.__class__ is other.__class__: + return self.value == other.value + return NotImplemented + + def __ne__(self, other): + if self.__class__ is other.__class__: + return self.value != other.value + return NotImplemented + + def __gt__(self, other): + if self.__class__ is other.__class__: + return self.value > other.value + return NotImplemented + + def __ge__(self, other): + if self.__class__ is other.__class__: + return self.value >= other.value + return NotImplemented + @dataclass class Response: From fc806871361b859ac9e5d5cd6ae37901c921de1d Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Tue, 24 Sep 2024 16:23:21 +0800 Subject: [PATCH 7/9] fix --- lmdeploy/turbomind/turbomind.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lmdeploy/turbomind/turbomind.py b/lmdeploy/turbomind/turbomind.py index 07661d9b81..d768fc313a 100644 --- a/lmdeploy/turbomind/turbomind.py +++ b/lmdeploy/turbomind/turbomind.py @@ -381,7 +381,7 @@ def _func(): output = self.model_inst.forward(inputs, instance_comm) except Exception as e: logger.error(f'Exception happened: {e}') - self.que.put((-1, None)) + que.put((-1, None)) return que.put((True, output)) From 900cf4aa3d6a7eaad119b396913f4e6a93a85577 Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Tue, 24 Sep 2024 21:03:38 +0800 Subject: [PATCH 8/9] update --- lmdeploy/messages.py | 30 ------------------------------ lmdeploy/serve/async_engine.py | 11 +++++++++-- 2 files changed, 9 insertions(+), 32 deletions(-) diff --git a/lmdeploy/messages.py b/lmdeploy/messages.py index 473d307bb6..5b84fd0f2b 100644 --- a/lmdeploy/messages.py +++ b/lmdeploy/messages.py @@ -303,36 +303,6 @@ class ResponseType(enum.Enum): INPUT_LENGTH_ERROR = enum.auto() INTERNAL_ENGINE_ERROR = enum.auto() - def __lt__(self, other): - if self.__class__ is other.__class__: - return self.value < other.value - return NotImplemented - - def __le__(self, other): - if self.__class__ is other.__class__: - return self.value <= other.value - return NotImplemented - - def __eq__(self, other): - if self.__class__ is other.__class__: - return self.value == other.value - return NotImplemented - - def __ne__(self, other): - if self.__class__ is other.__class__: - return self.value != other.value - return NotImplemented - - def __gt__(self, other): - if self.__class__ is other.__class__: - return self.value > other.value - return NotImplemented - - def __ge__(self, other): - if self.__class__ is other.__class__: - return self.value >= other.value - return NotImplemented - @dataclass class Response: diff --git a/lmdeploy/serve/async_engine.py b/lmdeploy/serve/async_engine.py index 329b9aee70..ff13b79083 100644 --- a/lmdeploy/serve/async_engine.py +++ b/lmdeploy/serve/async_engine.py @@ -551,6 +551,12 @@ async def generate( if sequence_end is True and sequence_start is False: await self.end_session(session_id) else: + + def is_error(status): + return status not in [ + ResponseType.SUCCESS, ResponseType.FINISH + ] + generator = await self.get_generator(False, session_id) async with self.safe_run(session_id): state = DetokenizeState(len(input_ids)) @@ -566,7 +572,8 @@ async def generate( sequence_end=sequence_end, step=self.id2step[str(session_id)]): # decode res - if outputs.status > ResponseType.FINISH: + if is_error(outputs.status): + tokens = 0 break res, tokens = input_ids + outputs.token_ids, outputs.num_token # noqa if len(res) <= state.ids_offset: @@ -589,7 +596,7 @@ async def generate( yield GenOut(response, self.id2step[str(session_id)], len(input_ids), tokens, finish_reason, res, logprobs) - if outputs.status <= ResponseType.FINISH: + if not is_error(outputs.status): finish_reason = 'length' \ if tokens >= gen_config.max_new_tokens else 'stop' # utf-8 char at the end means it's a potential unfinished From 9cb9be5823247fca16fcca58cbffb20b65ced27a Mon Sep 17 00:00:00 2001 From: lvhan028 Date: Tue, 24 Sep 2024 21:11:45 +0800 Subject: [PATCH 9/9] update --- lmdeploy/turbomind/turbomind.py | 4 ++-- src/turbomind/utils/cuda_utils.h | 4 ---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/lmdeploy/turbomind/turbomind.py b/lmdeploy/turbomind/turbomind.py index d768fc313a..00b419ded1 100644 --- a/lmdeploy/turbomind/turbomind.py +++ b/lmdeploy/turbomind/turbomind.py @@ -361,7 +361,7 @@ def _func(): try: output = self.model_inst.forward(inputs, instance_comm) except Exception as e: - logger.error(f'Exception happened: {e}') + logger.error(f'unhandled exception: {e}') self.que.put((-1, None)) return self.que.put((True, output)) @@ -380,7 +380,7 @@ def _func(): try: output = self.model_inst.forward(inputs, instance_comm) except Exception as e: - logger.error(f'Exception happened: {e}') + logger.error(f'unhandled exception: {e}') que.put((-1, None)) return que.put((True, output)) diff --git a/src/turbomind/utils/cuda_utils.h b/src/turbomind/utils/cuda_utils.h index 0335b5b245..2148fcc164 100644 --- a/src/turbomind/utils/cuda_utils.h +++ b/src/turbomind/utils/cuda_utils.h @@ -120,7 +120,6 @@ template void check(T result, char const* const func, const char* const file, int const line) { if (result) { - TM_LOG_ERROR("[TM][ERROR] CUDA runtime error: %s, %s:%d", _cudaGetErrorEnum(result), file, line); throw std::runtime_error(std::string("[TM][ERROR] CUDA runtime error: ") + (_cudaGetErrorEnum(result)) + " " + file + ":" + std::to_string(line) + " \n"); } @@ -139,7 +138,6 @@ inline void syncAndCheck(const char* const file, int const line) cudaDeviceSynchronize(); cudaError_t result = cudaGetLastError(); if (result) { - TM_LOG_ERROR("[TM][ERROR] CUDA runtime error: %s, %s:%d", _cudaGetErrorEnum(result), file, line); throw std::runtime_error(std::string("[TM][ERROR] CUDA runtime error: ") + (_cudaGetErrorEnum(result)) + " " + file + ":" + std::to_string(line) + " \n"); } @@ -151,7 +149,6 @@ inline void syncAndCheck(const char* const file, int const line) cudaDeviceSynchronize(); cudaError_t result = cudaGetLastError(); if (result) { - TM_LOG_ERROR("[TM][ERROR] CUDA runtime error: %s, %s:%d", _cudaGetErrorEnum(result), file, line); throw std::runtime_error(std::string("[TM][ERROR] CUDA runtime error: ") + (_cudaGetErrorEnum(result)) + " " + file + ":" + std::to_string(line) + " \n"); } @@ -203,7 +200,6 @@ void check_abs_mean_val(const T* result, const int size); [[noreturn]] inline void throwRuntimeError(const char* const file, int const line, std::string const& info = "") { - TM_LOG_ERROR("[TM][ERROR] %s Assertion fail: %s:%d", info, file, line); throw std::runtime_error(std::string("[TM][ERROR] ") + info + " Assertion fail: " + file + ":" + std::to_string(line) + " \n"); }