Source code for aymara_ai.core.summaries

import asyncio
import time
from typing import Coroutine, List, Union

from aymara_ai.core.protocols import AymaraAIProtocol
from aymara_ai.generated.aymara_api_client import models
from aymara_ai.generated.aymara_api_client.api.score_runs import (
    create_score_run_suite_summary,
    delete_score_run_suite_summary,
    get_score_run_suite_summary,
    list_score_run_suite_summaries,
)
from aymara_ai.generated.aymara_api_client.models.score_run_suite_summary_in_schema import (
    ScoreRunSuiteSummaryInSchema,
)
from aymara_ai.types import (
    ScoreRunResponse,
    ScoreRunSuiteSummaryResponse,
    Status,
)
from aymara_ai.utils.constants import (
    DEFAULT_SUMMARY_MAX_WAIT_TIME_SECS,
    POLLING_INTERVAL,
)


[docs] class SummaryMixin(AymaraAIProtocol):
[docs] def create_summary( self, score_runs: Union[List[ScoreRunResponse], List[str]] ) -> ScoreRunSuiteSummaryResponse: """ Create summaries for a list of score runs and wait for completion synchronously. :param score_runs: List of score runs or their UUIDs for which to create summaries. :type score_run_uuids: Union[List[ScoreRunResponse], List[str]] :return: Summary response. :rtype: ScoreRunSuiteSummaryResponse """ return self._create_summary(score_runs, is_async=False)
[docs] async def create_summary_async( self, score_runs: Union[List[ScoreRunResponse], List[str]] ) -> ScoreRunSuiteSummaryResponse: """ Create summaries for a list of score runs and wait for completion asynchronously. :param score_runs: List of score runs or their UUIDs for which to create summaries. :type score_run_uuids: Union[List[ScoreRunResponse], List[str]] :return: Summary response. :rtype: ScoreRunsSummaryResponse """ return await self._create_summary(score_runs, is_async=True)
def _create_summary( self, score_runs: Union[List[ScoreRunResponse], List[str]], is_async: bool ) -> Union[ ScoreRunSuiteSummaryResponse, Coroutine[ScoreRunSuiteSummaryResponse, None, None], ]: if len(score_runs) == 0: raise ValueError("At least one score run must be provided") score_run_uuids = self._score_runs_to_score_run_uuids(score_runs) if is_async: return self._create_summary_async_impl(score_run_uuids) else: return self._create_summary_sync_impl(score_run_uuids) def _create_summary_sync_impl( self, score_run_uuids: List[str] ) -> ScoreRunSuiteSummaryResponse: start_time = time.time() response = create_score_run_suite_summary.sync_detailed( client=self.client, body=ScoreRunSuiteSummaryInSchema( score_run_uuids=score_run_uuids, ), ) if response.status_code == 422: raise ValueError(f"{response.parsed.detail}") summary_response = response.parsed summary_uuid = summary_response.score_run_suite_summary_uuid remaining_summaries = summary_response.remaining_summaries if remaining_summaries is not None: summary_plural = "summary" if remaining_summaries == 1 else "summaries" self.logger.warning( f"You have {remaining_summaries} {summary_plural} remaining. To upgrade, visit https://aymara.ai/upgrade." ) with self.logger.progress_bar( "Summary", summary_uuid, Status.from_api_status(summary_response.status), ): while True: response = get_score_run_suite_summary.sync_detailed( client=self.client, summary_uuid=summary_uuid ) if response.status_code == 404: raise ValueError(f"Summary with UUID {summary_uuid} not found") if response.status_code == 422: raise ValueError(f"{response.parsed.detail}") summary_response = response.parsed self.logger.update_progress_bar( summary_uuid, Status.from_api_status(summary_response.status), ) if summary_response.status == models.ScoreRunSuiteSummaryStatus.FAILED: return ScoreRunSuiteSummaryResponse.from_summary_out_schema_and_failure_reason( summary_response, "Internal server error. Please try again.", ) elapsed_time = time.time() - start_time if elapsed_time >= DEFAULT_SUMMARY_MAX_WAIT_TIME_SECS: summary_response.status = models.ScoreRunSuiteSummaryStatus.FAILED self.logger.update_progress_bar(summary_uuid, Status.FAILED) return ScoreRunSuiteSummaryResponse.from_summary_out_schema_and_failure_reason( summary_response, failure_reason="Summary creation timed out.", ) if ( summary_response.status == models.ScoreRunSuiteSummaryStatus.FINISHED ): return ScoreRunSuiteSummaryResponse.from_summary_out_schema_and_failure_reason( summary_response ) time.sleep(POLLING_INTERVAL) async def _create_summary_async_impl( self, score_run_uuids: List[str] ) -> ScoreRunSuiteSummaryResponse: start_time = time.time() response = await create_score_run_suite_summary.asyncio_detailed( client=self.client, body=ScoreRunSuiteSummaryInSchema( score_run_uuids=score_run_uuids, ), ) if response.status_code == 422: raise ValueError(f"{response.parsed.detail}") summary_response = response.parsed summary_uuid = summary_response.score_run_suite_summary_uuid remaining_summaries = summary_response.remaining_summaries if remaining_summaries is not None: summary_plural = "summary" if remaining_summaries == 1 else "summaries" self.logger.warning( f"You have {remaining_summaries} {summary_plural} remaining. To upgrade, visit https://aymara.ai/upgrade." ) with self.logger.progress_bar( "Summary", summary_uuid, Status.from_api_status(summary_response.status), ): while True: response = await get_score_run_suite_summary.asyncio_detailed( client=self.client, summary_uuid=summary_uuid ) if response.status_code == 404: raise ValueError(f"Summary with UUID {summary_uuid} not found") if response.status_code == 422: raise ValueError(f"{response.parsed.detail}") summary_response = response.parsed self.logger.update_progress_bar( summary_uuid, Status.from_api_status(summary_response.status), ) elapsed_time = time.time() - start_time if elapsed_time >= DEFAULT_SUMMARY_MAX_WAIT_TIME_SECS: summary_response.status = models.ScoreRunSuiteSummaryStatus.FAILED self.logger.update_progress_bar(summary_uuid, Status.FAILED) return ScoreRunSuiteSummaryResponse.from_summary_out_schema_and_failure_reason( summary_response, failure_reason="Summary creation timed out.", ) if summary_response.status == models.ScoreRunSuiteSummaryStatus.FAILED: return ScoreRunSuiteSummaryResponse.from_summary_out_schema_and_failure_reason( summary_response, "Internal server error. Please try again.", ) if ( summary_response.status == models.ScoreRunSuiteSummaryStatus.FINISHED ): return ScoreRunSuiteSummaryResponse.from_summary_out_schema_and_failure_reason( summary_response ) await asyncio.sleep(POLLING_INTERVAL) def _score_runs_to_score_run_uuids(self, score_runs): if isinstance(score_runs[0], ScoreRunResponse): return [score_run.score_run_uuid for score_run in score_runs] else: return score_runs # Get Summary Methods
[docs] def get_summary(self, summary_uuid: str) -> ScoreRunSuiteSummaryResponse: """ Get the current status of an summary synchronously. :param summary_uuid: UUID of the summary. :type summary_uuid: str :return: Summary response. :rtype: ScoreRunSuiteSummaryResponse """ return self._get_summary(summary_uuid, is_async=False)
[docs] async def get_summary_async( self, summary_uuid: str ) -> ScoreRunSuiteSummaryResponse: """ Get the current status of an summary asynchronously. :param summary_uuid: UUID of the summary. :type summary_uuid: str :return: Summary response. :rtype: ScoreRunSuiteSummaryResponse """ return await self._get_summary(summary_uuid, is_async=True)
def _get_summary( self, summary_uuid: str, is_async: bool ) -> Union[ ScoreRunSuiteSummaryResponse, Coroutine[ScoreRunSuiteSummaryResponse, None, None], ]: if is_async: return self._get_summary_async_impl(summary_uuid) else: return self._get_summary_sync_impl(summary_uuid) def _get_summary_sync_impl(self, summary_uuid: str) -> ScoreRunSuiteSummaryResponse: response = get_score_run_suite_summary.sync_detailed( client=self.client, summary_uuid=summary_uuid ) if response.status_code == 404: raise ValueError(f"Summary with UUID {summary_uuid} not found") summary_response = response.parsed return ScoreRunSuiteSummaryResponse.from_summary_out_schema_and_failure_reason( summary_response ) async def _get_summary_async_impl( self, summary_uuid: str ) -> ScoreRunSuiteSummaryResponse: response = await get_score_run_suite_summary.asyncio_detailed( client=self.client, summary_uuid=summary_uuid ) if response.status_code == 404: raise ValueError(f"Summary with UUID {summary_uuid} not found") summary_response = response.parsed return ScoreRunSuiteSummaryResponse.from_summary_out_schema_and_failure_reason( summary_response ) # List Summaries Methods
[docs] def list_summaries(self) -> List[ScoreRunSuiteSummaryResponse]: """ List all summaries synchronously. """ return self._list_summaries_sync_impl()
[docs] async def list_summaries_async(self) -> List[ScoreRunSuiteSummaryResponse]: """ List all summaries asynchronously. """ return await self._list_summaries_async_impl()
def _list_summaries_sync_impl(self) -> List[ScoreRunSuiteSummaryResponse]: all_summaries = [] offset = 0 while True: response = list_score_run_suite_summaries.sync_detailed( client=self.client, offset=offset ) paged_response = response.parsed all_summaries.extend(paged_response.items) if len(all_summaries) >= paged_response.count: break offset += len(paged_response.items) return [ ScoreRunSuiteSummaryResponse.from_summary_out_schema_and_failure_reason( summary ) for summary in all_summaries ] async def _list_summaries_async_impl(self) -> List[ScoreRunSuiteSummaryResponse]: all_summaries = [] offset = 0 while True: response = await list_score_run_suite_summaries.asyncio_detailed( client=self.client, offset=offset ) paged_response = response.parsed all_summaries.extend(paged_response.items) if len(all_summaries) >= paged_response.count: break offset += len(paged_response.items) return [ ScoreRunSuiteSummaryResponse.from_summary_out_schema_and_failure_reason( summary ) for summary in all_summaries ]
[docs] def delete_summary(self, summary_uuid: str) -> None: """ Delete a summary synchronously. :param summary_uuid: UUID of the summary. :type summary_uuid: str """ response = delete_score_run_suite_summary.sync_detailed( client=self.client, summary_uuid=summary_uuid ) if response.status_code == 404: raise ValueError(f"Summary with UUID {summary_uuid} not found") if response.status_code == 422: raise ValueError(f"{response.parsed.detail}")
[docs] async def delete_summary_async(self, summary_uuid: str) -> None: """ Delete a summary asynchronously. :param summary_uuid: UUID of the summary. :type summary_uuid: str """ response = await delete_score_run_suite_summary.asyncio_detailed( client=self.client, summary_uuid=summary_uuid ) if response.status_code == 404: raise ValueError(f"Summary with UUID {summary_uuid} not found") if response.status_code == 422: raise ValueError(f"{response.parsed.detail}")