Skip to content

Commit

Permalink
Add missing docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
Cito committed Aug 11, 2024
1 parent fd9edfb commit bb50a3f
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
2 changes: 1 addition & 1 deletion src/graphql/execution/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -1758,7 +1758,7 @@ async def await_completed_item(
index += 1

def add_task(self, awaitable: Awaitable[Any]) -> None:
"""Add task."""
"""Add the given task to the tasks set for later execution."""
tasks = self._tasks
task = ensure_future(awaitable)
tasks.add(task)
Expand Down
24 changes: 20 additions & 4 deletions src/graphql/execution/incremental_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
from typing import TypedDict
except ImportError: # Python < 3.8
from typing_extensions import TypedDict
try:
from typing import TypeGuard
except ImportError: # Python < 3.10
pass


if TYPE_CHECKING:
Expand Down Expand Up @@ -377,11 +373,13 @@ def __init__(self) -> None:
self._tasks: set[Awaitable] = set()

def has_next(self) -> bool:
"""Check whether there is a next incremental result."""
return bool(self._pending)

async def subscribe(
self,
) -> AsyncGenerator[SubsequentIncrementalExecutionResult, None]:
"""Subscribe to the incremental results."""
is_done = False
pending = self._pending

Expand Down Expand Up @@ -422,6 +420,7 @@ def prepare_new_deferred_fragment_record(
path: Path | None,
parent_context: IncrementalDataRecord | None,
) -> DeferredFragmentRecord:
"""Prepare a new deferred fragment record."""
deferred_fragment_record = DeferredFragmentRecord(label, path, parent_context)

context = parent_context or self._initial_result
Expand All @@ -435,6 +434,7 @@ def prepare_new_stream_items_record(
parent_context: IncrementalDataRecord | None,
async_iterator: AsyncIterator[Any] | None = None,
) -> StreamItemsRecord:
"""Prepare a new stream items record."""
stream_items_record = StreamItemsRecord(
label, path, parent_context, async_iterator
)
Expand All @@ -448,6 +448,7 @@ def complete_deferred_fragment_record(
deferred_fragment_record: DeferredFragmentRecord,
data: dict[str, Any] | None,
) -> None:
"""Complete the given deferred fragment record."""
deferred_fragment_record.data = data
deferred_fragment_record.is_completed = True
self._release(deferred_fragment_record)
Expand All @@ -457,21 +458,25 @@ def complete_stream_items_record(
stream_items_record: StreamItemsRecord,
items: list[str] | None,
) -> None:
"""Complete the given stream items record."""
stream_items_record.items = items
stream_items_record.is_completed = True
self._release(stream_items_record)

def set_is_completed_async_iterator(
self, stream_items_record: StreamItemsRecord
) -> None:
"""Mark async iterator for stream items as completed."""
stream_items_record.is_completed_async_iterator = True

def add_field_error(
self, incremental_data_record: IncrementalDataRecord, error: GraphQLError
) -> None:
"""Add a field error to the given incremental data record."""
incremental_data_record.errors.append(error)

def publish_initial(self) -> None:
"""Publish the initial result."""
for child in self._initial_result.children:
self._publish(child)

Expand All @@ -480,6 +485,7 @@ def filter(
null_path: Path,
erroring_incremental_data_record: IncrementalDataRecord | None,
) -> None:
"""Filter out the given erroring incremental data record."""
null_path_list = null_path.as_list()

children = (erroring_incremental_data_record or self._initial_result).children
Expand All @@ -504,23 +510,28 @@ def filter(
self._add_task(close_async_iterator)

def _trigger(self) -> None:
"""Trigger the resolve event."""
self._resolve.set()
self._resolve = Event()

def _introduce(self, item: IncrementalDataRecord) -> None:
"""Introduce a new IncrementalDataRecord."""
self._pending[item] = None

def _release(self, item: IncrementalDataRecord) -> None:
"""Release the given IncrementalDataRecord."""
if item in self._pending:
self._released[item] = None
self._trigger()

def _push(self, item: IncrementalDataRecord) -> None:
"""Push the given IncrementalDataRecord."""
self._released[item] = None
self._pending[item] = None
self._trigger()

def _delete(self, item: IncrementalDataRecord) -> None:
"""Delete the given IncrementalDataRecord."""
with suppress_key_error:
del self._released[item]
with suppress_key_error:
Expand All @@ -530,6 +541,7 @@ def _delete(self, item: IncrementalDataRecord) -> None:
def _get_incremental_result(
self, completed_records: Collection[IncrementalDataRecord]
) -> SubsequentIncrementalExecutionResult | None:
"""Get the incremental result with the completed records."""
incremental_results: list[IncrementalResult] = []
encountered_completed_async_iterator = False
append_result = incremental_results.append
Expand Down Expand Up @@ -572,6 +584,7 @@ def _get_incremental_result(
return None

def _publish(self, incremental_data_record: IncrementalDataRecord) -> None:
"""Publish the given incremental data record."""
if incremental_data_record.is_completed:
self._push(incremental_data_record)
else:
Expand All @@ -582,6 +595,7 @@ def _get_descendants(
children: dict[IncrementalDataRecord, None],
descendants: dict[IncrementalDataRecord, None] | None = None,
) -> dict[IncrementalDataRecord, None]:
"""Get the descendants of the given children."""
if descendants is None:
descendants = {}
for child in children:
Expand All @@ -592,9 +606,11 @@ def _get_descendants(
def _matches_path(
self, test_path: list[str | int], base_path: list[str | int]
) -> bool:
"""Get whether the given test path matches the base path."""
return all(item == test_path[i] for i, item in enumerate(base_path))

def _add_task(self, awaitable: Awaitable[Any]) -> None:
"""Add the given task to the tasks set for later execution."""
tasks = self._tasks
task = ensure_future(awaitable)
tasks.add(task)
Expand Down

0 comments on commit bb50a3f

Please sign in to comment.