Skip to content

omnipy.components.remote.tasks

CLASS DESCRIPTION
GithubRepoContext
FUNCTION DESCRIPTION
async_get_github_repo_urls
async_load_urls_into_new_dataset
get_auto_from_api_endpoint
get_bytes_from_api_endpoint
get_github_repo_urls
get_json_from_api_endpoint
get_str_from_api_endpoint
load_urls_into_new_dataset

GithubRepoContext dataclass

METHOD DESCRIPTION
__init__
ATTRIBUTE DESCRIPTION
branch

TYPE: str

owner

TYPE: str

path

TYPE: str | Path

repo

TYPE: str

Source code in src/omnipy/components/remote/tasks.py
@dataclass
class GithubRepoContext:
    owner: str
    repo: str
    branch: str
    path: str | Path

branch instance-attribute

branch: str

owner instance-attribute

owner: str

path instance-attribute

path: str | Path

repo instance-attribute

repo: str

__init__

__init__(owner: str, repo: str, branch: str, path: str | Path) -> None

async_get_github_repo_urls async

async_get_github_repo_urls(
    owner: str, repo: str, branch: str, path: str | Path, file_suffix: str | None = None
) -> HttpUrlDataset
Source code in src/omnipy/components/remote/tasks.py
@TaskTemplate()
async def async_get_github_repo_urls(
    owner: str,
    repo: str,
    branch: str,
    path: str | Path,
    file_suffix: str | None = None,
) -> HttpUrlDataset:

    repo_context = GithubRepoContext(owner=owner, repo=repo, branch=branch, path=path)

    if file_suffix:
        return await _async_get_urls_for_files_in_dir_with_suffix(repo_context, file_suffix)
    else:
        return _get_url_for_single_file(repo_context)

async_load_urls_into_new_dataset async

async_load_urls_into_new_dataset(
    urls: HttpUrlDataset,
    dataset_cls: type[_JsonDatasetT] = JsonDataset,
    as_mime_type: str | None = None,
) -> _JsonDatasetT
Source code in src/omnipy/components/remote/tasks.py
@TaskTemplate()
async def async_load_urls_into_new_dataset(
    urls: HttpUrlDataset,
    dataset_cls: type[_JsonDatasetT] = JsonDataset,
    as_mime_type: str | None = None,
) -> _JsonDatasetT:
    return await dataset_cls.load(urls, as_mime_type=as_mime_type)

get_auto_from_api_endpoint async

get_auto_from_api_endpoint(
    url: HttpUrlModel,
    client_session: ClientSession | None = None,
    retry_http_statuses: tuple[int, ...] = DEFAULT_RETRY_STATUSES,
    retry_attempts: int = DEFAULT_RETRIES,
    retry_backoff_strategy: BackoffStrategy.Literals = DEFAULT_BACKOFF_STRATEGY,
    as_mime_type: str | None = None,
) -> AutoResponseContentModel
Source code in src/omnipy/components/remote/tasks.py
@TaskTemplate(iterate_over_data_files=True, output_dataset_cls=AutoResponseContentDataset)
async def get_auto_from_api_endpoint(
    url: HttpUrlModel,
    client_session: 'ClientSession | None' = None,
    retry_http_statuses: tuple[int, ...] = DEFAULT_RETRY_STATUSES,
    retry_attempts: int = DEFAULT_RETRIES,
    retry_backoff_strategy: BackoffStrategy.Literals = DEFAULT_BACKOFF_STRATEGY,
    as_mime_type: str | None = None,
) -> AutoResponseContentModel:
    from .lazy_import import ClientSession, CONTENT_TYPE

    async for retry_session in _ensure_retry_session(
            client_session,
            retry_http_statuses,
            retry_attempts,
            retry_backoff_strategy,
    ):
        async for response in _call_get(url, cast(ClientSession, retry_session)):
            if as_mime_type:
                content_type = content_type_header = as_mime_type
            else:
                assert CONTENT_TYPE in response.headers
                content_type_header = response.headers[CONTENT_TYPE]
                content_type = response.content_type
            match content_type:
                case 'application/json':
                    content = await response.json(content_type=None)
                case 'text/plain':
                    content = await response.text()
                case 'application/octet-stream' | _:
                    content = await response.read()

    model = AutoResponseContentModel(
        ResponseContentPydModel(content_type=content_type_header, response=content))
    return model

get_bytes_from_api_endpoint async

get_bytes_from_api_endpoint(
    url: HttpUrlModel,
    client_session: ClientSession | None = None,
    retry_http_statuses: tuple[int, ...] = DEFAULT_RETRY_STATUSES,
    retry_attempts: int = DEFAULT_RETRIES,
    retry_backoff_strategy: BackoffStrategy.Literals = DEFAULT_BACKOFF_STRATEGY,
) -> BytesModel
Source code in src/omnipy/components/remote/tasks.py
@TaskTemplate(iterate_over_data_files=True, output_dataset_cls=BytesDataset)
async def get_bytes_from_api_endpoint(
    url: HttpUrlModel,
    client_session: 'ClientSession | None' = None,
    retry_http_statuses: tuple[int, ...] = DEFAULT_RETRY_STATUSES,
    retry_attempts: int = DEFAULT_RETRIES,
    retry_backoff_strategy: BackoffStrategy.Literals = DEFAULT_BACKOFF_STRATEGY,
) -> BytesModel:
    from .lazy_import import ClientSession

    async for retry_session in _ensure_retry_session(
            client_session,
            retry_http_statuses,
            retry_attempts,
            retry_backoff_strategy,
    ):
        async for response in _call_get(url, cast(ClientSession, retry_session)):
            output_data = BytesModel(await response.read())
    return output_data

get_github_repo_urls

get_github_repo_urls(
    owner: str, repo: str, branch: str, path: str | Path, file_suffix: str | None = None
) -> HttpUrlDataset
Source code in src/omnipy/components/remote/tasks.py
@TaskTemplate()
def get_github_repo_urls(
    owner: str,
    repo: str,
    branch: str,
    path: str | Path,
    file_suffix: str | None = None,
) -> HttpUrlDataset:

    repo_context = GithubRepoContext(owner=owner, repo=repo, branch=branch, path=path)

    if file_suffix:
        return _get_urls_for_files_in_dir_with_suffix(repo_context, file_suffix)
    else:
        return _get_url_for_single_file(repo_context)

get_json_from_api_endpoint async

get_json_from_api_endpoint(
    url: HttpUrlModel,
    client_session: ClientSession | None = None,
    retry_http_statuses: tuple[int, ...] = DEFAULT_RETRY_STATUSES,
    retry_attempts: int = DEFAULT_RETRIES,
    retry_backoff_strategy: BackoffStrategy.Literals = DEFAULT_BACKOFF_STRATEGY,
) -> JsonModel
Source code in src/omnipy/components/remote/tasks.py
@TaskTemplate(iterate_over_data_files=True, output_dataset_cls=JsonDataset)
async def get_json_from_api_endpoint(
    url: HttpUrlModel,
    client_session: 'ClientSession | None' = None,
    retry_http_statuses: tuple[int, ...] = DEFAULT_RETRY_STATUSES,
    retry_attempts: int = DEFAULT_RETRIES,
    retry_backoff_strategy: BackoffStrategy.Literals = DEFAULT_BACKOFF_STRATEGY,
) -> JsonModel:
    from .lazy_import import ClientSession

    async for retry_session in _ensure_retry_session(
            client_session,
            retry_http_statuses,
            retry_attempts,
            retry_backoff_strategy,
    ):
        async for response in _call_get(url, cast(ClientSession, retry_session)):
            output_data = JsonModel(await response.json(content_type=None))
    return output_data

get_str_from_api_endpoint async

get_str_from_api_endpoint(
    url: HttpUrlModel,
    client_session: ClientSession | None = None,
    retry_http_statuses: tuple[int, ...] = DEFAULT_RETRY_STATUSES,
    retry_attempts: int = DEFAULT_RETRIES,
    retry_backoff_strategy: BackoffStrategy.Literals = DEFAULT_BACKOFF_STRATEGY,
) -> StrModel
Source code in src/omnipy/components/remote/tasks.py
@TaskTemplate(iterate_over_data_files=True, output_dataset_cls=StrDataset)
async def get_str_from_api_endpoint(
    url: HttpUrlModel,
    client_session: 'ClientSession | None' = None,
    retry_http_statuses: tuple[int, ...] = DEFAULT_RETRY_STATUSES,
    retry_attempts: int = DEFAULT_RETRIES,
    retry_backoff_strategy: BackoffStrategy.Literals = DEFAULT_BACKOFF_STRATEGY,
) -> StrModel:
    from .lazy_import import ClientSession

    async for retry_session in _ensure_retry_session(
            client_session,
            retry_http_statuses,
            retry_attempts,
            retry_backoff_strategy,
    ):
        async for response in _call_get(url, cast(ClientSession, retry_session)):
            output_data = StrModel(await response.text())
    return output_data

load_urls_into_new_dataset

load_urls_into_new_dataset(
    urls: HttpUrlDataset,
    dataset_cls: type[_JsonDatasetT] = JsonDataset,
    as_mime_type: str | None = None,
) -> _JsonDatasetT
Source code in src/omnipy/components/remote/tasks.py
@TaskTemplate()
def load_urls_into_new_dataset(
    urls: HttpUrlDataset,
    dataset_cls: type[_JsonDatasetT] = JsonDataset,
    as_mime_type: str | None = None,
) -> _JsonDatasetT:
    return dataset_cls.load(urls, as_mime_type=as_mime_type)