Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Internal] Update Jobs list_runs function to support paginated responses #890

Merged
merged 5 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

### Internal Changes
* Update Jobs ListJobs API to support paginated responses ([#896](https://github.com/databricks/databricks-sdk-py/pull/896))
* Update Jobs ListRuns API to support paginated responses ([#890](https://github.com/databricks/databricks-sdk-py/pull/890))
* Introduce automated tagging ([#888](https://github.com/databricks/databricks-sdk-py/pull/888))
* Update Jobs GetJob API to support paginated responses ([#869](https://github.com/databricks/databricks-sdk-py/pull/869)).

Expand Down
81 changes: 80 additions & 1 deletion databricks/sdk/mixins/jobs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Iterator, Optional

from databricks.sdk.service import jobs
from databricks.sdk.service.jobs import BaseJob, Job
from databricks.sdk.service.jobs import BaseJob, BaseRun, Job, RunType


class JobsExt(jobs.JobsAPI):
Expand Down Expand Up @@ -59,6 +59,85 @@ def list(self,
delattr(job, 'has_more')
yield job

def list_runs(self,
*,
active_only: Optional[bool] = None,
completed_only: Optional[bool] = None,
expand_tasks: Optional[bool] = None,
job_id: Optional[int] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
page_token: Optional[str] = None,
run_type: Optional[RunType] = None,
start_time_from: Optional[int] = None,
start_time_to: Optional[int] = None) -> Iterator[BaseRun]:
"""List job runs.

List runs in descending order by start time. If the job has multiple pages of tasks, job_clusters, parameters or repair history,
it will paginate through all pages and aggregate the results.

:param active_only: bool (optional)
If active_only is `true`, only active runs are included in the results; otherwise, lists both active
and completed runs. An active run is a run in the `QUEUED`, `PENDING`, `RUNNING`, or `TERMINATING`.
This field cannot be `true` when completed_only is `true`.
:param completed_only: bool (optional)
If completed_only is `true`, only completed runs are included in the results; otherwise, lists both
active and completed runs. This field cannot be `true` when active_only is `true`.
:param expand_tasks: bool (optional)
Whether to include task and cluster details in the response. Note that in API 2.2, only the first
100 elements will be shown. Use :method:jobs/getrun to paginate through all tasks and clusters.
:param job_id: int (optional)
The job for which to list runs. If omitted, the Jobs service lists runs from all jobs.
:param limit: int (optional)
The number of runs to return. This value must be greater than 0 and less than 25. The default value
is 20. If a request specifies a limit of 0, the service instead uses the maximum limit.
:param offset: int (optional)
The offset of the first run to return, relative to the most recent run. Deprecated since June 2023.
Use `page_token` to iterate through the pages instead.
:param page_token: str (optional)
Use `next_page_token` or `prev_page_token` returned from the previous request to list the next or
previous page of runs respectively.
:param run_type: :class:`RunType` (optional)
The type of runs to return. For a description of run types, see :method:jobs/getRun.
:param start_time_from: int (optional)
Show runs that started _at or after_ this value. The value must be a UTC timestamp in milliseconds.
Can be combined with _start_time_to_ to filter by a time range.
:param start_time_to: int (optional)
Show runs that started _at or before_ this value. The value must be a UTC timestamp in milliseconds.
Can be combined with _start_time_from_ to filter by a time range.

:returns: Iterator over :class:`BaseRun`
"""
# fetch runs with limited elements in top level arrays
runs_list = super().list_runs(active_only=active_only,
completed_only=completed_only,
expand_tasks=expand_tasks,
job_id=job_id,
limit=limit,
offset=offset,
page_token=page_token,
run_type=run_type,
start_time_from=start_time_from,
start_time_to=start_time_to)

if not expand_tasks:
yield from runs_list

# fully fetch all top level arrays for each run in the list
for run in runs_list:
if run.has_more:
run_from_get_call = self.get_run(run.run_id)
run.tasks = run_from_get_call.tasks
run.job_clusters = run_from_get_call.job_clusters
run.job_parameters = run_from_get_call.job_parameters
run.repair_history = run_from_get_call.repair_history
# Remove has_more fields for each run in the list.
# This field in Jobs API 2.2 is useful for pagination. It indicates if there are more than 100 tasks or job_clusters in the run.
# This function hides pagination details from the user. So the field does not play useful role here.
if hasattr(run, 'has_more'):
delattr(run, 'has_more')
yield run

def get_run(self,
run_id: int,
*,
Expand Down
205 changes: 201 additions & 4 deletions tests/test_jobs_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
from databricks.sdk import WorkspaceClient


def make_getrun_path_pattern(run_id: int, page_token: str) -> Pattern[str]:
return re.compile(
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?page_token={page_token}&run_id={run_id}")}'
)
def make_getrun_path_pattern(run_id: int, page_token: Optional[str] = None) -> Pattern[str]:
if page_token:
return re.compile(
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?page_token={page_token}&run_id={run_id}")}'
)
else:
return re.compile(
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?run_id={run_id}")}')


def make_getjob_path_pattern(job_id: int, page_token: Optional[str] = None) -> Pattern[str]:
Expand All @@ -27,6 +31,12 @@ def make_listjobs_path_pattern(page_token: str) -> Pattern[str]:
)


def make_listruns_path_pattern(page_token: str) -> Pattern[str]:
return re.compile(
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/list")}\?(?:expand_tasks=(?:true|false)&)?page_token={re.escape(page_token)}'
)


def test_get_run_with_no_pagination(config, requests_mock):
run1 = {"tasks": [{"run_id": 0}, {"run_id": 1}], }
requests_mock.get(make_getrun_path_pattern(1337, "initialToken"), text=json.dumps(run1))
Expand Down Expand Up @@ -617,3 +627,190 @@ def test_list_jobs_with_many_tasks(config, requests_mock):
# check that job_id 300 was never used in jobs/get call
history = requests_mock.request_history
assert all('300' not in request.qs.get("job_id", ['']) for request in history)


def test_list_runs_without_task_expansion(config, requests_mock):
listruns_page1 = {
"runs": [{
"run_id": 100,
"run_name": "run100",
}, {
"run_id":
200,
"run_name":
"run200",
"job_parameters": [{
"name": "param1",
"default": "default1"
}, {
"name": "param2",
"default": "default2"
}]
}, {
"run_id": 300,
"run_name": "run300",
}],
"next_page_token":
"tokenToSecondPage"
}
listruns_page2 = {
"runs": [{
"run_id": 400,
"run_name": "run400",
"repair_history": [{
"id": "repair400_1",
}, {
"id": "repair400_2",
}]
}]
}

requests_mock.get(make_listruns_path_pattern("initialToken"), text=json.dumps(listruns_page1))
requests_mock.get(make_listruns_path_pattern("tokenToSecondPage"), text=json.dumps(listruns_page2))
w = WorkspaceClient(config=config)

runs_list = list(w.jobs.list_runs(expand_tasks=False, page_token="initialToken"))
runs_dict = [run.as_dict() for run in runs_list]

assert runs_dict == [{
"run_id": 100,
"run_name": "run100",
}, {
"run_id":
200,
"run_name":
"run200",
"job_parameters": [{
"name": "param1",
"default": "default1"
}, {
"name": "param2",
"default": "default2"
}]
}, {
"run_id": 300,
"run_name": "run300",
}, {
"run_id": 400,
"run_name": "run400",
"repair_history": [{
"id": "repair400_1",
}, {
"id": "repair400_2",
}]
}]

# only two requests should be made which are jobs/list requests
assert requests_mock.call_count == 2


def test_list_runs(config, requests_mock):
listruns_page1 = {
"runs": [{
"run_id": 100,
"tasks": [{
"task_key": "taskkey101"
}, {
"task_key": "taskkey102"
}],
"has_more": True
}, {
"run_id": 200,
"tasks": [{
"task_key": "taskkey201"
}]
}, {
"run_id": 300,
"tasks": [{
"task_key": "taskkey301"
}]
}],
"next_page_token":
"tokenToSecondPage"
}
listruns_page2 = {
"runs": [{
"run_id": 400,
"tasks": [{
"task_key": "taskkey401"
}, {
"task_key": "taskkey402"
}],
"has_more": True
}]
}

getrun_100_page1 = {
"run_id": 100,
"tasks": [{
"task_key": "taskkey101"
}, {
"task_key": "taskkey102"
}],
"next_page_token": "tokenToSecondPage_100"
}
getrun_100_page2 = {"run_id": 100, "tasks": [{"task_key": "taskkey103"}]}
getrun_400_page1 = {
"run_id": 400,
"tasks": [{
"task_key": "taskkey401"
}, {
"task_key": "taskkey403"
}],
"next_page_token": "tokenToSecondPage_400"
}
getrun_400_page2 = {"run_id": 400, "tasks": [{"task_key": "taskkey402"}, {"task_key": "taskkey404"}]}

requests_mock.get(make_listruns_path_pattern("initialToken"), text=json.dumps(listruns_page1))
requests_mock.get(make_listruns_path_pattern("tokenToSecondPage"), text=json.dumps(listruns_page2))

requests_mock.get(make_getrun_path_pattern(100), text=json.dumps(getrun_100_page1))
requests_mock.get(make_getrun_path_pattern(100, "tokenToSecondPage_100"),
text=json.dumps(getrun_100_page2))

requests_mock.get(make_getrun_path_pattern(400), text=json.dumps(getrun_400_page1))
requests_mock.get(make_getrun_path_pattern(400, "tokenToSecondPage_400"),
text=json.dumps(getrun_400_page2))
w = WorkspaceClient(config=config)

runs_list = list(w.jobs.list_runs(expand_tasks=True, page_token="initialToken"))
runs_dict = [run.as_dict() for run in runs_list]

assert runs_dict == [{
"run_id":
100,
"tasks": [{
"task_key": "taskkey101",
}, {
"task_key": "taskkey102",
}, {
"task_key": "taskkey103",
}],
}, {
"run_id": 200,
"tasks": [{
"task_key": "taskkey201",
}],
}, {
"run_id": 300,
"tasks": [{
"task_key": "taskkey301",
}],
}, {
"run_id":
400,
"tasks": [{
"task_key": "taskkey401",
}, {
"task_key": "taskkey403",
}, {
"task_key": "taskkey402",
}, {
"task_key": "taskkey404",
}],
}]

# check that job_id 200 and 300 was never used in runs/get call
history = requests_mock.request_history
assert all('300' not in request.qs.get("run_id", ['']) for request in history)
assert all('200' not in request.qs.get("run_id", ['']) for request in history)
Loading