Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug-fix: when using stream is False, continuous batching doesn't work #346

Merged
merged 1 commit into from
Sep 7, 2023

Conversation

sleepwalker2017
Copy link
Contributor

@sleepwalker2017 sleepwalker2017 commented Aug 31, 2023

Motivation

bug fix : when using stream=False, the continuous batching is not working.
See details in this issue: #308

Modification

In non-streaming mode, the main thread which is responsible for receiving request is stuck in the following code since self.que.get() will wait until the queue is not empty.

But when using non-stream mode, the queue is always empty unless a request is finished.

So the newly coming request won't be processed before the last request is done. So the batching doesn't work.

 while True:
            while self.que.qsize() > 1:
                self.que.get()
            finish, tm_outputs = self.que.get()
            outputs = _tm_dict_to_torch_dict(tm_outputs)

I modify here to avoid the long wait. just see the commits.

I'm not sure if there is a better way to fix this. If any, please comment.

I have a question, what is this line for? Won't it cause output tokens being lost???

while self.que.qsize() > 1: self.que.get()

BC-breaking (Optional)

Does the modification introduce changes that break the backward-compatibility of the downstream repositories?
If so, please describe how it breaks the compatibility and how the downstream projects should modify their code to keep compatibility with this PR.

Use cases (Optional)

If this PR introduces a new feature, it is better to list some use cases here, and update the documentation.

Checklist

  1. Pre-commit or other linting tools are used to fix the potential lint issues.
  2. The modification is covered by complete unit tests. If not, please add more unit tests to ensure the correctness.
  3. If the modification has a dependency on downstream projects of a newer version, this PR should be tested with all supported versions of downstream projects.
  4. The documentation has been modified accordingly, like docstring or example tutorials.

@sleepwalker2017 sleepwalker2017 changed the title bug-fix: when using stream is False, continuous batching doesn't work #345 bug-fix: when using stream is False, continuous batching doesn't work Aug 31, 2023
@lvhan028 lvhan028 requested review from AllentDan and grimoire August 31, 2023 13:37
@lvhan028
Copy link
Collaborator

lvhan028 commented Sep 1, 2023

@sleepwalker2017 May fix linting problems. You can install pre-commit in your host, which can help fix them

pip install pre-commit
# Move the the root path of lmdeploy repository and execute the following commands
pre-commit install

You can run pre-commit run --all-files. It will report and try to fix linting errors

@lvhan028 lvhan028 added the Bug:P1 label Sep 1, 2023
@@ -136,6 +136,8 @@ async def generate(
repetition_penalty=repetition_penalty,
ignore_eos=ignore_eos,
random_seed=seed if sequence_start else None):
if outputs is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generate_openai should be updated too.

@@ -329,6 +329,9 @@ def _broadcast_np(data, dtype, shape=(batch_size, )):
while self.que.qsize() > 1:
self.que.get()

if stream_output == False and self.que.qsize() == 0:
yield
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this made GPU-Util quite low. And the benchmark went down to an unacceptable number.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found this made GPU-Util quite low. And the benchmark went down to an unacceptable number.

I find the first version has this problem with streaming mode,

if  self.que.qsize() == 0:
    yield

So I modified it, and I test the GPU util is 100% for both modes:

            if stream_output == False and self.que.qsize() == 0:
                yield

Anyway, I followed your advice and use stream infer to process non-stream requests.

@AllentDan
Copy link
Collaborator

Another method for this is we do stream=True inside the restful API even though the client passed stream=False. And we return the final result all at once.

@sleepwalker2017
Copy link
Contributor Author

Another method for this is we do stream=True inside the restful API even though the client passed stream=False. And we return the final result all at once.

Hi, I think it's a better solution. As my solution keeps the main thread running in the while loop doing little work.

I tried this method, it works well.

Does the openai generate function also needs to be modified?

I'm not familiar with the openai generate function and don't know how to test it.

If it need more changes, you can refer to my changes and send a new pr to replace this one.

Thank you.

@AllentDan
Copy link
Collaborator

Not a final solution. Maybe there are other methods. @grimoire Any comments?

@sleepwalker2017
Copy link
Contributor Author

Not a final solution. Maybe there are other methods. @grimoire Any comments?

The core problem is that: the main thread waits for the output queue until there is some output data generated.

So when non-stream request come, the thread can't process new requests.

I have a question, what is this line for?

while self.que.qsize() > 1: self.que.get()

why do you lose all results until it only has 1 left?

@AllentDan

@grimoire
Copy link
Collaborator

grimoire commented Sep 1, 2023

The turbomind instance is designed to process the session in it's own threads. I think it is the coroutine that block the processing of continuous batching. Await the queue getting might be helpful.

why do you lose all results until it only has 1 left?

assume we have queue:

que = [
  [1]
  [1, 2]
  [1, 2, 3]
  [1, 2, 3, 4]
]

Output the last one [1,2,3,4] and throw other tokens away is enough for our task. We just need to perform memory copy once.

@sleepwalker2017
Copy link
Contributor Author

sleepwalker2017 commented Sep 6, 2023

The turbomind instance is designed to process the session in it's own threads. I think it is the coroutine that block the processing of continuous batching. Await the queue getting might be helpful.

why do you lose all results until it only has 1 left?

assume we have queue:

que = [
  [1]
  [1, 2]
  [1, 2, 3]
  [1, 2, 3, 4]
]

Output the last one [1,2,3,4] and throw other tokens away is enough for our task. We just need to perform memory copy once.

I try this, but it doesn't work. Always 1 thread appending tasks. Is there anything wrong with it?
I'm not familiar with the coroutine api in python. I use python 3.8, the api may be different from 3.10.

image

@grimoire
Copy link
Collaborator

grimoire commented Sep 6, 2023

Can we warp the queue get while loop inside the wrapper?
Or we can initialize a counter with current queue size n, pop the top n-1 from the queue and output next one?

@sleepwalker2017
Copy link
Contributor Author

the problem comes from the situation when the queue is empty.

I non-stream mode, the queue is empty for a long time until a sequence is finished. So the line waits for a long time and can't process new requests. That's the problem.

I think this line is the cause for non-stream modes.

            finish, tm_outputs = self.que.get()

@grimoire
Copy link
Collaborator

grimoire commented Sep 6, 2023

I see. I guess that Queue is not awaitable since there is another Queue in module asyncio (which we can not use since it is not thread safe).
Can we use threads instead of coroutine in restful API? Or can we make the queue awaitable?

@sleepwalker2017
Copy link
Contributor Author

I see. I guess that Queue is not awaitable since there is another Queue in module asyncio (which we can not use since it is not thread safe). Can we use threads instead of coroutine in restful API? Or can we make the queue awaitable?

The code I posted above is trying to wrap the que.get() in a awaitable mode, but it doesn't work, I wonder why is that.

Using multiple threads is ok, the api_server file may be modified a lot or it needs to be refactored.

@grimoire
Copy link
Collaborator

grimoire commented Sep 7, 2023

An object is awaitable only when it has __await__ attribute.
I think we can merge this PR as a quick fix and find a better solution in the future(threads or something better).

@lvhan028 lvhan028 merged commit 57cf99b into InternLM:main Sep 7, 2023
@AllentDan AllentDan mentioned this pull request Sep 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants