Skip to content

omnipy.util.helpers

FUNCTION DESCRIPTION
all_equals
all_type_variants
as_dictable
called_from_omnipy_tests
create_merged_dict
ensure_non_str_byte_iterable
ensure_plain_type
evaluate_any_forward_refs_if_possible
extract_newline
first_key_in_mapping
first_value_in_mapping
format_classname_with_params
generate_job_slug
generic_aware_issubclass_ignore_args
get_calling_module_name
get_datetime_format
get_default_if_typevar
get_event_loop_and_check_if_loop_is_running
get_first_item
get_job_slug_base
get_parametrized_type
has_items
is_iterable
is_literal_type
is_non_str_byte_iterable
is_optional
is_package_editable

Check if package is installed in editable mode using metadata.

is_strict_subclass
is_type_specialization
is_union
is_unreserved_identifier
max_newline_stripped_width
max_or_none
merge_dataclasses
min_or_none
recursive_module_import
recursive_module_import_new
remove_forward_ref_notation
remove_none_vals
repr_max_len
resolve
sorted_dict_hash
split_all_content_to_lines
split_to_union_variants
strip_and_split_newline
strip_newline
strip_newlines
takes_input_params_from
transfer_generic_args_to_cls
ATTRIBUTE DESCRIPTION
Dictable

TYPE: TypeAlias

NumberT

Dictable module-attribute

Dictable: TypeAlias = Mapping[_KeyT, Any] | Iterable[tuple[_KeyT, Any]]

NumberT module-attribute

NumberT = TypeVar('NumberT', float, int)

all_equals

all_equals(first, second) -> bool
Source code in src/omnipy/util/helpers.py
def all_equals(first, second) -> bool:
    equals = first == second
    if is_iterable(equals):
        if hasattr(equals, 'all') and callable(getattr(equals, 'all')):
            # Works for both pandas and numpy
            return equals.all(None)  # pyright: ignore [reportAttributeAccessIssue]
        else:
            return all(equals)
    else:
        return equals

all_type_variants

all_type_variants(
    in_type: type | GenericAlias | UnionType | _UnionGenericAlias,
) -> tuple[type | GenericAlias, ...]
Source code in src/omnipy/util/helpers.py
def all_type_variants(
    in_type: type | GenericAlias | UnionType | _UnionGenericAlias
) -> tuple[type | GenericAlias, ...]:
    if is_union(in_type):
        return get_args(in_type)
    else:
        return (cast(type | GenericAlias, in_type),)

as_dictable

as_dictable(obj: object) -> Dictable | None
Source code in src/omnipy/util/helpers.py
def as_dictable(obj: object) -> Dictable | None:
    def _is_iterable_of_tuple_pairs(obj_inner: object) -> bool:
        return isinstance(obj_inner, Iterable) and \
            all(isinstance(el, tuple) and len(el) == 2 for el in obj_inner)

    if isinstance(obj, Mapping) or _is_iterable_of_tuple_pairs(obj):
        return cast(Dictable, obj)
    else:
        return None

called_from_omnipy_tests

called_from_omnipy_tests() -> bool
Source code in src/omnipy/util/helpers.py
def called_from_omnipy_tests() -> bool:
    stack = inspect.stack()
    for index in range(len(stack)):
        frame = stack[index][0]
        module = inspect.getmodule(frame)
        if module is not None \
                and module.__name__.startswith('tests') \
                and module.__file__ is not None \
                and 'omnipy/tests' in module.__file__:
            return True
    return False

create_merged_dict

create_merged_dict(dictable_1: Dictable[_KeyT], dictable_2: Dictable[_KeyT]) -> dict[_KeyT, Any]
Source code in src/omnipy/util/helpers.py
def create_merged_dict(dictable_1: Dictable[_KeyT],
                       dictable_2: Dictable[_KeyT]) -> dict[_KeyT, Any]:
    merged_dict = cast(
        dict[_KeyT, Any],
        dictable_1 if isinstance(dictable_1, dict) else dict(dictable_1),
    )
    dict_2 = cast(
        dict[_KeyT, Any],
        dictable_2 if isinstance(dictable_2, dict) else dict(dictable_2),
    )
    merged_dict |= dict_2
    return merged_dict

ensure_non_str_byte_iterable

ensure_non_str_byte_iterable(value)
Source code in src/omnipy/util/helpers.py
def ensure_non_str_byte_iterable(value):
    return value if is_non_str_byte_iterable(value) else (value,)

ensure_plain_type

ensure_plain_type(in_type: ForwardRef) -> ForwardRef
ensure_plain_type(in_type: TypeVar) -> TypeVar
ensure_plain_type(in_type: type | GenericAlias | UnionType) -> type
ensure_plain_type(in_type: _SpecialForm) -> _SpecialForm
ensure_plain_type(in_type: _LiteralGenericAlias | _UnionGenericAlias | _AnnotatedAlias) -> type
ensure_plain_type(in_type: TypeForm) -> ForwardRef | TypeVar | type | _SpecialForm | NoneType
Source code in src/omnipy/util/helpers.py
def ensure_plain_type(in_type: TypeForm) -> ForwardRef | TypeVar | type | _SpecialForm | NoneType:

    if in_type == NoneType:
        return None

    origin = get_origin(in_type)
    if origin == Union:
        return UnionType  # pyright: ignore [reportReturnType]
    else:
        return cast(type | ForwardRef | TypeVar, origin if get_args(in_type) else in_type)

evaluate_any_forward_refs_if_possible

evaluate_any_forward_refs_if_possible(
    in_type: TypeForm, calling_module: str | None = None, **localns
) -> TypeForm
Source code in src/omnipy/util/helpers.py
def evaluate_any_forward_refs_if_possible(in_type: TypeForm,
                                          calling_module: str | None = None,
                                          **localns) -> TypeForm:
    if not calling_module:
        calling_module = get_calling_module_name() if 'ForwardRef' in str(in_type) else None

    if isinstance(in_type, ForwardRef):
        if calling_module and calling_module in sys.modules:
            globalns = sys.modules[calling_module].__dict__.copy()
        else:
            globalns = {}
        try:
            return cast(
                type | GenericAlias,
                in_type._evaluate(
                    globalns, localns if localns else locals(), recursive_guard=frozenset()))
        except NameError:
            pass
    else:
        origin = get_origin(in_type)
        args = get_args(in_type)
        if origin and args:
            new_args = tuple(
                evaluate_any_forward_refs_if_possible(arg, calling_module, **localns)
                for arg in args)
            if origin == UnionType:
                return functools.reduce(operator.or_, new_args)
            else:
                return origin[new_args]
    return in_type

extract_newline

extract_newline(line: str) -> str
Source code in src/omnipy/util/helpers.py
def extract_newline(line: str) -> str:
    return strip_and_split_newline(line)[1]

first_key_in_mapping

first_key_in_mapping(mapping: Mapping[_KeyT, Any]) -> _KeyT
Source code in src/omnipy/util/helpers.py
def first_key_in_mapping(mapping: Mapping[_KeyT, Any],) -> _KeyT:
    for _key in mapping.keys():
        return _key
    raise KeyError('Mapping is empty, no first key.')

first_value_in_mapping

first_value_in_mapping(mapping: Mapping[Any, _ValueT]) -> _ValueT
Source code in src/omnipy/util/helpers.py
def first_value_in_mapping(mapping: Mapping[Any, _ValueT],) -> _ValueT:
    for _val in mapping.values():
        return _val
    raise ValueError('Mapping is empty, no first value.')

format_classname_with_params

format_classname_with_params(cls_name: str, params_str: str) -> str
Source code in src/omnipy/util/helpers.py
def format_classname_with_params(cls_name: str, params_str: str) -> str:
    return f'{cls_name}[{params_str}]'

generate_job_slug

generate_job_slug(job_cls_name: str, job_name: str)
Source code in src/omnipy/util/helpers.py
def generate_job_slug(job_cls_name: str, job_name: str):
    import coolname

    return f'{get_job_slug_base(job_cls_name, job_name)}-{coolname.generate_slug(3)}'

generic_aware_issubclass_ignore_args

generic_aware_issubclass_ignore_args(cls, cls_or_tuple)
Source code in src/omnipy/util/helpers.py
def generic_aware_issubclass_ignore_args(cls, cls_or_tuple):
    try:
        return issubclass(cls, cls_or_tuple)
    except TypeError:
        return issubclass(get_origin(cls), cls_or_tuple)

get_calling_module_name

get_calling_module_name() -> str | None
Source code in src/omnipy/util/helpers.py
def get_calling_module_name() -> str | None:
    stack = inspect.stack()
    start_frame_index = 2
    while len(stack) > start_frame_index:
        grandparent_frame = stack[start_frame_index][0]
        module = inspect.getmodule(grandparent_frame)
        if module is not None:
            return module.__name__
        start_frame_index += 1

get_datetime_format

get_datetime_format(locale: LocaleType | None = None) -> str
Source code in src/omnipy/util/helpers.py
def get_datetime_format(locale: LocaleType | None = None) -> str:
    pkg_locale.setlocale(pkg_locale.LC_ALL, locale)

    if hasattr(pkg_locale, 'nl_langinfo'):  # noqa
        datetime_format = pkg_locale.nl_langinfo(pkg_locale.D_T_FMT)
    else:
        datetime_format = '%a %b %e %X %Y'
    return datetime_format

get_default_if_typevar

get_default_if_typevar(typ_: type[_ObjT] | TypeForm | TypeVar) -> type[_ObjT] | TypeForm
Source code in src/omnipy/util/helpers.py
def get_default_if_typevar(typ_: type[_ObjT] | TypeForm | TypeVar) -> type[_ObjT] | TypeForm:
    if isinstance(typ_, TypeVar):
        if hasattr(typ_, '__default__'):
            return typ_.__default__
        else:
            raise TypeError(f'The TypeVar "{typ_.__name__}" needs to specify a default value. '
                            f'This requires Python 3.13, but is supported in earlier versions '
                            f'of Python by importing TypeVar from the library '
                            f'"typing-extensions".')
    return typ_

get_event_loop_and_check_if_loop_is_running

get_event_loop_and_check_if_loop_is_running() -> tuple[asyncio.AbstractEventLoop | None, bool]
Source code in src/omnipy/util/helpers.py
def get_event_loop_and_check_if_loop_is_running() -> tuple[asyncio.AbstractEventLoop | None, bool]:
    loop_is_running: bool
    loop: asyncio.AbstractEventLoop | None = None

    try:
        loop = asyncio.get_event_loop()
        loop_is_running = loop.is_running()
    except RuntimeError:
        loop_is_running = False

    return loop, loop_is_running

get_first_item

get_first_item(iterable: Iterable[object]) -> object
Source code in src/omnipy/util/helpers.py
def get_first_item(iterable: Iterable[object]) -> object:
    assert has_items(iterable)
    for item in iterable:
        return item

get_job_slug_base

get_job_slug_base(job_cls_name: str, job_name: str)
Source code in src/omnipy/util/helpers.py
def get_job_slug_base(job_cls_name: str, job_name: str):
    import inflection

    job_cls_name_slug = inflection.underscore(job_cls_name).replace('_', '-')
    job_name_slug = inflection.underscore(job_name).replace('_', '-')
    return f'{job_cls_name_slug}-{job_name_slug}'

get_parametrized_type

get_parametrized_type(obj: object)
Source code in src/omnipy/util/helpers.py
def get_parametrized_type(obj: object):
    return getattr(obj, '__orig_class__', type(obj))

has_items

has_items(obj: object) -> bool
Source code in src/omnipy/util/helpers.py
def has_items(obj: object) -> bool:
    return hasattr(obj, '__len__') \
        and obj.__len__() > 0  # pyright: ignore [reportAttributeAccessIssue]

is_iterable

is_iterable(obj: Iterable[_ObjT] | _ObjT) -> TypeGuard[Iterable[_ObjT]]
Source code in src/omnipy/util/helpers.py
def is_iterable(obj: Iterable[_ObjT] | _ObjT) -> TypeGuard[Iterable[_ObjT]]:
    try:
        iter(obj)  # type: ignore[arg-type]
        return True
    except TypeError:
        return False

is_literal_type

is_literal_type(value: Any) -> bool
Source code in src/omnipy/util/helpers.py
def is_literal_type(value: Any) -> bool:
    return get_origin(value) is Literal

is_non_str_byte_iterable

is_non_str_byte_iterable(obj: Iterable[_ObjT] | _ObjT | str | bytes) -> TypeGuard[Iterable[_ObjT]]
Source code in src/omnipy/util/helpers.py
def is_non_str_byte_iterable(
        obj: Iterable[_ObjT] | _ObjT | str | bytes) -> TypeGuard[Iterable[_ObjT]]:
    return is_iterable(obj) and not type(obj) in (str, bytes)

is_optional

is_optional(cls_or_type: type | UnionType | None | object) -> bool
Source code in src/omnipy/util/helpers.py
@cached(LRUCache(128))
def is_optional(cls_or_type: type | UnionType | None | object) -> bool:
    return is_union(cls_or_type) and any(pyd.is_none_type(arg) for arg in get_args(cls_or_type))

is_package_editable cached

is_package_editable(package_name)

Check if package is installed in editable mode using metadata.

Source code in src/omnipy/util/helpers.py
@functools.cache
def is_package_editable(package_name):
    """Check if package is installed in editable mode using metadata."""
    try:
        dist = distribution(package_name)
        # Get the location where package is installed
        if dist.read_text('direct_url.json'):
            import json
            direct_url = json.loads(dist.read_text('direct_url.json'))  # pyright: ignore
            return direct_url.get('dir_info', {}).get('editable', False)
        return False
    except Exception:
        return False

is_strict_subclass cached

is_strict_subclass(
    __cls: type, __class_or_tuple: type | UnionType | tuple[type | UnionType | tuple[Any, ...], ...]
) -> bool
Source code in src/omnipy/util/helpers.py
@functools.cache
def is_strict_subclass(
        __cls: type,
        __class_or_tuple: type | UnionType | tuple[type | UnionType | tuple[Any, ...], ...]
) -> bool:
    if issubclass(__cls, __class_or_tuple):
        if isinstance(__class_or_tuple, Iterable):
            return __cls not in __class_or_tuple
        else:
            return __cls != __class_or_tuple
    return False

is_type_specialization

is_type_specialization(value: Any) -> bool
Source code in src/omnipy/util/helpers.py
def is_type_specialization(value: Any) -> bool:
    return get_origin(value) is type

is_union

is_union(cls_or_type: type | UnionType | None | object) -> bool
Source code in src/omnipy/util/helpers.py
@cached(LRUCache(128))
def is_union(cls_or_type: type | UnionType | None | object) -> bool:
    union_types = [Union, UnionType]
    return cls_or_type in union_types or get_origin(cls_or_type) in union_types

is_unreserved_identifier

is_unreserved_identifier(identifier: str) -> bool
Source code in src/omnipy/util/helpers.py
def is_unreserved_identifier(identifier: str) -> bool:
    return identifier.isidentifier() and not iskeyword(identifier) and not issoftkeyword(identifier)

max_newline_stripped_width

max_newline_stripped_width(lines: list[str]) -> int
Source code in src/omnipy/util/helpers.py
def max_newline_stripped_width(lines: list[str]) -> int:
    return max((len(strip_newline(line)) for line in lines), default=0)

max_or_none

max_or_none(*args: NumberT | None) -> NumberT | None
Source code in src/omnipy/util/helpers.py
def max_or_none(*args: NumberT | None) -> NumberT | None:
    filtered_args = [arg for arg in args if arg is not None]
    return max(filtered_args) if filtered_args else None

merge_dataclasses

merge_dataclasses(dataclass: _DataclassT, other_dict: dict[str, Any]) -> _DataclassT
Source code in src/omnipy/util/helpers.py
def merge_dataclasses(dataclass: _DataclassT, other_dict: dict[str, Any]) -> _DataclassT:
    return dataclass.__class__(**(asdict(dataclass) | other_dict))

min_or_none

min_or_none(*args: NumberT | None) -> NumberT | None
Source code in src/omnipy/util/helpers.py
def min_or_none(*args: NumberT | None) -> NumberT | None:
    filtered_args = [arg for arg in args if arg is not None]
    return min(filtered_args) if filtered_args else None

recursive_module_import

recursive_module_import(
    module: ModuleType, imported_modules: list[ModuleType] = []
) -> dict[str, object]
Source code in src/omnipy/util/helpers.py
def recursive_module_import(module: ModuleType,
                            imported_modules: list[ModuleType] = []) -> dict[str, object]:
    module_vars = vars(module)
    imported_modules.append(module)

    for obj in module_vars.values():
        if isclass(obj):
            for base_cls in obj.__bases__:
                base_cls_module = getmodule(base_cls)
                if base_cls_module and _is_internal_module(base_cls_module, imported_modules):
                    module_vars = create_merged_dict(
                        recursive_module_import(base_cls_module, imported_modules),
                        module_vars,
                    )

    return module_vars

recursive_module_import_new

recursive_module_import_new(
    root_path: list[str], imported_modules: dict[str, ModuleType], excluded_set: set[str]
)
Source code in src/omnipy/util/helpers.py
def recursive_module_import_new(root_path: list[str],
                                imported_modules: dict[str, ModuleType],
                                excluded_set: set[str]):

    import pkgutil

    module_finder: FileFinder
    module_name: str
    _is_pkg: bool

    cur_excluded_prefix = ''

    for module_finder, module_name, _is_pkg in pkgutil.walk_packages(root_path):  # type: ignore
        # print(f'{module_name}: {_is_pkg}')
        if cur_excluded_prefix and module_name.startswith(cur_excluded_prefix):
            continue
        else:
            cur_excluded_prefix = ''

        if module_name in excluded_set:
            cur_excluded_prefix = f'{module_name}.'
            continue

        module_spec: ModuleSpec | None = module_finder.find_spec(module_name)
        if module_spec:
            loader: Loader | None = module_spec.loader
            if loader:
                imported_modules[module_name] = loader.load_module(module_name)

remove_forward_ref_notation

remove_forward_ref_notation(type_str: str)
Source code in src/omnipy/util/helpers.py
def remove_forward_ref_notation(type_str: str):
    return type_str.replace("ForwardRef('", '').replace("')", '')

remove_none_vals

remove_none_vals(**kwargs: object) -> dict[object, object]
Source code in src/omnipy/util/helpers.py
def remove_none_vals(**kwargs: object) -> dict[object, object]:
    return {key: obj for key, obj in kwargs.items() if obj is not None}

repr_max_len

repr_max_len(data: object, max_len: int = 200)
Source code in src/omnipy/util/helpers.py
def repr_max_len(data: object, max_len: int = 200):
    repr_str = repr(data)
    return f'{repr_str[:max_len]}...' if len(repr_str) > max_len else repr_str

resolve async

resolve(obj)
Source code in src/omnipy/util/helpers.py
async def resolve(obj):
    return await obj if inspect.isawaitable(obj) else obj

sorted_dict_hash

sorted_dict_hash(d: dict) -> tuple
Source code in src/omnipy/util/helpers.py
def sorted_dict_hash(d: dict) -> tuple:
    return tuple((k, d[k]) for k in sorted(d.keys()))

split_all_content_to_lines

split_all_content_to_lines(content: str) -> list[str]
Source code in src/omnipy/util/helpers.py
def split_all_content_to_lines(content: str) -> list[str]:
    all_content_lines_stripped = content.splitlines(keepends=False)
    all_content_lines = content.splitlines(keepends=True)

    if len(all_content_lines) == 0 \
            or all_content_lines[-1] != all_content_lines_stripped[-1]:
        # If no content or the last line ends with a newline, we add an
        # empty line. This is needed as the last newline is excluded
        # from self.content (in line with typical repr output).
        all_content_lines.append('')

    return all_content_lines

split_to_union_variants

split_to_union_variants(type_: TypeForm) -> tuple[TypeForm]
Source code in src/omnipy/util/helpers.py
def split_to_union_variants(type_: TypeForm) -> tuple[TypeForm]:
    return get_args(type_) if is_union(type_) else (type_,)

strip_and_split_newline

strip_and_split_newline(line: str) -> tuple[str, str]
Source code in src/omnipy/util/helpers.py
def strip_and_split_newline(line: str) -> tuple[str, str]:
    line_stripped = strip_newline(line)
    newline = line[len(line_stripped):]
    return line_stripped, newline

strip_newline

strip_newline(line: str) -> str
Source code in src/omnipy/util/helpers.py
def strip_newline(line: str) -> str:
    line_stripped = line.splitlines()
    assert len(line_stripped) <= 1, \
        f'Expected a single line, but got {len(line_stripped)} lines: {line_stripped}'
    if len(line_stripped) == 0:
        return ''
    else:
        return line_stripped[0]

strip_newlines

strip_newlines(lines: Iterable[str]) -> list[str]
Source code in src/omnipy/util/helpers.py
def strip_newlines(lines: Iterable[str]) -> list[str]:
    return [strip_newline(line) for line in lines]

takes_input_params_from

takes_input_params_from(
    func: Callable[Concatenate[Any, _P], Any],
) -> Callable[[Callable[Concatenate[Any, _P], _RetT]], Callable[Concatenate[Any, _P], _RetT]]
Source code in src/omnipy/util/helpers.py
def takes_input_params_from(
    func: Callable[Concatenate[Any, _P], Any]
) -> Callable[[Callable[Concatenate[Any, _P], _RetT]], Callable[Concatenate[Any, _P], _RetT]]:
    return lambda _: _

transfer_generic_args_to_cls

transfer_generic_args_to_cls(to_cls, from_generic_type)
Source code in src/omnipy/util/helpers.py
def transfer_generic_args_to_cls(to_cls, from_generic_type):
    try:
        return to_cls[get_args(from_generic_type)]
    except (TypeError, AttributeError):
        return to_cls