Skip to content

omnipy.data.serializer

CLASS DESCRIPTION
Serializer
SerializerRegistry
TarFileSerializer

Serializer

Bases: ABC, Generic[_DatasetT]

METHOD DESCRIPTION
deserialize
get_dataset_cls_for_new
get_output_file_suffix
is_dataset_directly_supported
serialize
Source code in src/omnipy/data/serializer.py
class Serializer(ABC, Generic[_DatasetT]):
    """"""
    @classmethod
    @abstractmethod
    def is_dataset_directly_supported(cls, dataset: IsDataset) -> bool:
        pass

    @classmethod
    @abstractmethod
    def get_dataset_cls_for_new(cls) -> type[IsDataset]:
        pass

    @classmethod
    @abstractmethod
    def get_output_file_suffix(cls) -> str:
        pass

    @classmethod
    @abstractmethod
    def serialize(cls, dataset: _DatasetT) -> bytes | memoryview:
        pass

    @classmethod
    @abstractmethod
    def deserialize(cls, serialized: bytes, any_file_suffix=False) -> _DatasetT:
        pass

deserialize abstractmethod classmethod

deserialize(serialized: bytes, any_file_suffix=False) -> _DatasetT
Source code in src/omnipy/data/serializer.py
@classmethod
@abstractmethod
def deserialize(cls, serialized: bytes, any_file_suffix=False) -> _DatasetT:
    pass

get_dataset_cls_for_new abstractmethod classmethod

get_dataset_cls_for_new() -> type[IsDataset]
Source code in src/omnipy/data/serializer.py
@classmethod
@abstractmethod
def get_dataset_cls_for_new(cls) -> type[IsDataset]:
    pass

get_output_file_suffix abstractmethod classmethod

get_output_file_suffix() -> str
Source code in src/omnipy/data/serializer.py
@classmethod
@abstractmethod
def get_output_file_suffix(cls) -> str:
    pass

is_dataset_directly_supported abstractmethod classmethod

is_dataset_directly_supported(dataset: IsDataset) -> bool
Source code in src/omnipy/data/serializer.py
@classmethod
@abstractmethod
def is_dataset_directly_supported(cls, dataset: IsDataset) -> bool:
    pass

serialize abstractmethod classmethod

serialize(dataset: _DatasetT) -> bytes | memoryview
Source code in src/omnipy/data/serializer.py
@classmethod
@abstractmethod
def serialize(cls, dataset: _DatasetT) -> bytes | memoryview:
    pass

SerializerRegistry

METHOD DESCRIPTION
__init__
auto_detect
auto_detect_tar_file_serializer
detect_tar_file_serializers_from_dataset_cls
detect_tar_file_serializers_from_file_suffix
load_from_tar_file_path_based_on_dataset_cls
load_from_tar_file_path_based_on_file_suffix
register
ATTRIBUTE DESCRIPTION
serializers

TYPE: tuple[Type[IsSerializer], ...]

tar_file_serializers

TYPE: tuple[Type[IsTarFileSerializer], ...]

Source code in src/omnipy/data/serializer.py
class SerializerRegistry:
    def __init__(self) -> None:
        self._serializer_classes: list[Type[IsSerializer]] = []

    def register(self, serializer_cls: Type[IsSerializer]) -> None:
        self._serializer_classes.append(serializer_cls)

    @property
    def serializers(self) -> tuple[Type[IsSerializer], ...]:
        return tuple(self._serializer_classes)

    @property
    def tar_file_serializers(self) -> tuple[Type[IsTarFileSerializer], ...]:
        return tuple(cls for cls in self._serializer_classes if issubclass(cls, TarFileSerializer))

    def auto_detect(self, dataset: IsDataset) -> tuple[IsDataset, IsSerializer] | tuple[None, None]:
        return self._autodetect_serializer(dataset, self.serializers)

    def auto_detect_tar_file_serializer(
            self, dataset: IsDataset) -> tuple[IsDataset, IsSerializer] | tuple[None, None]:
        return self._autodetect_serializer(dataset, self.tar_file_serializers)

    @classmethod
    def _autodetect_serializer(
        cls,
        dataset: IsDataset,
        serializers: tuple[Type[IsSerializer], ...],
    ) -> tuple[IsDataset, IsSerializer] | tuple[None, None]:

        from omnipy.hub.runtime import runtime
        if runtime:
            with hold_and_reset_prev_attrib_value(
                    runtime.config.data.model,
                    'interactive',
            ):
                with hold_and_reset_prev_attrib_value(
                        runtime.config.data.model,
                        'dynamically_convert_elements_to_models',
                ):
                    runtime.config.data.model.interactive = False
                    runtime.config.data.model.dynamically_convert_elements_to_models = False

                    return cls._test_all_serializer_combos(dataset, serializers)
        else:
            return cls._test_all_serializer_combos(dataset, serializers)

    @classmethod
    def _test_all_serializer_combos(
        cls,
        dataset: IsDataset,
        serializers: tuple[Type[IsSerializer], ...],
    ) -> tuple[IsDataset, IsSerializer] | tuple[None, None]:

        # def _direct(dataset: Dataset, serializer: Serializer):
        #     new_dataset_cls = serializer.get_dataset_cls_for_new()
        #     new_dataset = new_dataset_cls(dataset)
        #     return new_dataset

        def _to_data_from_json(dataset: IsDataset, serializer: IsSerializer):
            new_dataset_cls = serializer.get_dataset_cls_for_new()
            new_dataset = new_dataset_cls()
            new_dataset.from_json(dataset.to_data())
            return new_dataset

        def _to_data_from_data(dataset: IsDataset, serializer: IsSerializer):
            new_dataset_cls = serializer.get_dataset_cls_for_new()
            new_dataset = new_dataset_cls()
            new_dataset.from_data(dataset.to_data())
            return new_dataset

        def _to_data_from_data_if_direct(dataset: IsDataset, serializer: IsSerializer):
            assert serializer.is_dataset_directly_supported(dataset)
            return _to_data_from_data(dataset, serializer)

        # def _to_json_from_json(dataset: Dataset, serializer: Serializer):
        #     new_dataset_cls = serializer.get_dataset_cls_for_new()
        #     new_dataset = new_dataset_cls()
        #     new_dataset.from_json(dataset.to_json())
        #     return new_dataset

        for func in (_to_data_from_data_if_direct, _to_data_from_json, _to_data_from_data):
            for serializer in serializers:
                try:
                    new_dataset = func(dataset, serializer)
                    return new_dataset, serializer
                except (TypeError, ValueError, ValidationError, AssertionError):
                    pass

        return None, None

    def detect_tar_file_serializers_from_dataset_cls(
            self, dataset: IsDataset) -> tuple[Type[IsTarFileSerializer], ...]:
        serializers = tuple(
            serializer_cls for serializer_cls in self.tar_file_serializers
            if serializer_cls.is_dataset_directly_supported(dataset))
        if len(serializers) == 0:
            serializers = tuple(serializer_cls for serializer_cls in self.tar_file_serializers
                                if serializer_cls.get_output_file_suffix() == 'bytes')
        return serializers

    def detect_tar_file_serializers_from_file_suffix(
            self, file_suffix: str) -> tuple[Type[IsTarFileSerializer], ...]:
        return tuple(serializer_cls for serializer_cls in self.tar_file_serializers
                     if serializer_cls.get_output_file_suffix() == file_suffix)

    def load_from_tar_file_path_based_on_file_suffix(
        self,
        log_obj: CanLog,
        tar_file_path: str,
        to_dataset: IsDataset,
    ) -> IsDataset | None:
        log: Callable
        if hasattr(log_obj, 'log'):
            log = log_obj.log
        else:
            log = print

        with tarfile.open(tar_file_path, 'r:gz') as tarfile_obj:
            file_suffixes = set(fn.split('.')[-1] for fn in tarfile_obj.getnames())
        if len(file_suffixes) != 1:
            log(f'Tar archive contains files with different or '
                f'no file suffixes: {file_suffixes}. Serializer '
                f'cannot be uniquely determined. Aborting '
                f'restore.')
        else:
            file_suffix = file_suffixes.pop()
            serializers = self.detect_tar_file_serializers_from_file_suffix(file_suffix)
            if len(serializers) == 0:
                log(f'No serializer for file suffix "{file_suffix}" can be'
                    f'determined. Aborting restore.')
            else:
                log(f'Reading dataset from a gzipped tarpack at'
                    f' "{os.path.abspath(tar_file_path)}"')

                serializer = serializers[0]
                with open(tar_file_path, 'rb') as tarfile_binary:
                    auto_dataset = serializer.deserialize(tarfile_binary.read())

                if to_dataset.get_type() is auto_dataset.get_type():
                    cast(HasData, to_dataset).data = cast(HasData, auto_dataset).data
                    return to_dataset
                else:
                    try:
                        if to_dataset.get_type().inner_type == str:
                            to_dataset.from_data(auto_dataset.to_json())
                        else:
                            to_dataset.from_json(auto_dataset.to_data())
                        return to_dataset
                    except Exception:
                        return auto_dataset

    def load_from_tar_file_path_based_on_dataset_cls(
        self,
        log_obj: CanLog,
        tar_file_path: str,
        to_dataset: IsDataset,
        any_file_suffix: bool = False,
    ) -> IsDataset | None:
        log: Callable
        if hasattr(log_obj, 'log'):
            log = log_obj.log
        else:
            log = print

        serializers = self.detect_tar_file_serializers_from_dataset_cls(to_dataset)
        if len(serializers) == 0:
            log(f'No serializer for Dataset with type "{type(to_dataset)}" can be '
                f'determined.')
        else:
            for serializer in serializers:
                log(f'Reading dataset from a gzipped tarpack at'
                    f' "{os.path.abspath(tar_file_path)}" with serializer type: '
                    f'"{serializer.__name__}"')

                with open(tar_file_path, 'rb') as tarfile_binary:
                    out_dataset = serializer.deserialize(
                        tarfile_binary.read(),
                        any_file_suffix=any_file_suffix,
                    )

                return out_dataset

serializers property

serializers: tuple[Type[IsSerializer], ...]

tar_file_serializers property

tar_file_serializers: tuple[Type[IsTarFileSerializer], ...]

__init__

__init__() -> None
Source code in src/omnipy/data/serializer.py
def __init__(self) -> None:
    self._serializer_classes: list[Type[IsSerializer]] = []

auto_detect

auto_detect(dataset: IsDataset) -> tuple[IsDataset, IsSerializer] | tuple[None, None]
Source code in src/omnipy/data/serializer.py
def auto_detect(self, dataset: IsDataset) -> tuple[IsDataset, IsSerializer] | tuple[None, None]:
    return self._autodetect_serializer(dataset, self.serializers)

auto_detect_tar_file_serializer

auto_detect_tar_file_serializer(
    dataset: IsDataset,
) -> tuple[IsDataset, IsSerializer] | tuple[None, None]
Source code in src/omnipy/data/serializer.py
def auto_detect_tar_file_serializer(
        self, dataset: IsDataset) -> tuple[IsDataset, IsSerializer] | tuple[None, None]:
    return self._autodetect_serializer(dataset, self.tar_file_serializers)

detect_tar_file_serializers_from_dataset_cls

detect_tar_file_serializers_from_dataset_cls(
    dataset: IsDataset,
) -> tuple[Type[IsTarFileSerializer], ...]
Source code in src/omnipy/data/serializer.py
def detect_tar_file_serializers_from_dataset_cls(
        self, dataset: IsDataset) -> tuple[Type[IsTarFileSerializer], ...]:
    serializers = tuple(
        serializer_cls for serializer_cls in self.tar_file_serializers
        if serializer_cls.is_dataset_directly_supported(dataset))
    if len(serializers) == 0:
        serializers = tuple(serializer_cls for serializer_cls in self.tar_file_serializers
                            if serializer_cls.get_output_file_suffix() == 'bytes')
    return serializers

detect_tar_file_serializers_from_file_suffix

detect_tar_file_serializers_from_file_suffix(
    file_suffix: str,
) -> tuple[Type[IsTarFileSerializer], ...]
Source code in src/omnipy/data/serializer.py
def detect_tar_file_serializers_from_file_suffix(
        self, file_suffix: str) -> tuple[Type[IsTarFileSerializer], ...]:
    return tuple(serializer_cls for serializer_cls in self.tar_file_serializers
                 if serializer_cls.get_output_file_suffix() == file_suffix)

load_from_tar_file_path_based_on_dataset_cls

load_from_tar_file_path_based_on_dataset_cls(
    log_obj: CanLog, tar_file_path: str, to_dataset: IsDataset, any_file_suffix: bool = False
) -> IsDataset | None
Source code in src/omnipy/data/serializer.py
def load_from_tar_file_path_based_on_dataset_cls(
    self,
    log_obj: CanLog,
    tar_file_path: str,
    to_dataset: IsDataset,
    any_file_suffix: bool = False,
) -> IsDataset | None:
    log: Callable
    if hasattr(log_obj, 'log'):
        log = log_obj.log
    else:
        log = print

    serializers = self.detect_tar_file_serializers_from_dataset_cls(to_dataset)
    if len(serializers) == 0:
        log(f'No serializer for Dataset with type "{type(to_dataset)}" can be '
            f'determined.')
    else:
        for serializer in serializers:
            log(f'Reading dataset from a gzipped tarpack at'
                f' "{os.path.abspath(tar_file_path)}" with serializer type: '
                f'"{serializer.__name__}"')

            with open(tar_file_path, 'rb') as tarfile_binary:
                out_dataset = serializer.deserialize(
                    tarfile_binary.read(),
                    any_file_suffix=any_file_suffix,
                )

            return out_dataset

load_from_tar_file_path_based_on_file_suffix

load_from_tar_file_path_based_on_file_suffix(
    log_obj: CanLog, tar_file_path: str, to_dataset: IsDataset
) -> IsDataset | None
Source code in src/omnipy/data/serializer.py
def load_from_tar_file_path_based_on_file_suffix(
    self,
    log_obj: CanLog,
    tar_file_path: str,
    to_dataset: IsDataset,
) -> IsDataset | None:
    log: Callable
    if hasattr(log_obj, 'log'):
        log = log_obj.log
    else:
        log = print

    with tarfile.open(tar_file_path, 'r:gz') as tarfile_obj:
        file_suffixes = set(fn.split('.')[-1] for fn in tarfile_obj.getnames())
    if len(file_suffixes) != 1:
        log(f'Tar archive contains files with different or '
            f'no file suffixes: {file_suffixes}. Serializer '
            f'cannot be uniquely determined. Aborting '
            f'restore.')
    else:
        file_suffix = file_suffixes.pop()
        serializers = self.detect_tar_file_serializers_from_file_suffix(file_suffix)
        if len(serializers) == 0:
            log(f'No serializer for file suffix "{file_suffix}" can be'
                f'determined. Aborting restore.')
        else:
            log(f'Reading dataset from a gzipped tarpack at'
                f' "{os.path.abspath(tar_file_path)}"')

            serializer = serializers[0]
            with open(tar_file_path, 'rb') as tarfile_binary:
                auto_dataset = serializer.deserialize(tarfile_binary.read())

            if to_dataset.get_type() is auto_dataset.get_type():
                cast(HasData, to_dataset).data = cast(HasData, auto_dataset).data
                return to_dataset
            else:
                try:
                    if to_dataset.get_type().inner_type == str:
                        to_dataset.from_data(auto_dataset.to_json())
                    else:
                        to_dataset.from_json(auto_dataset.to_data())
                    return to_dataset
                except Exception:
                    return auto_dataset

register

register(serializer_cls: Type[IsSerializer]) -> None
Source code in src/omnipy/data/serializer.py
def register(self, serializer_cls: Type[IsSerializer]) -> None:
    self._serializer_classes.append(serializer_cls)

TarFileSerializer

Bases: Serializer[_DatasetT], Generic[_DatasetT]

METHOD DESCRIPTION
create_dataset_from_tarfile
create_tarfile_from_dataset
deserialize
get_dataset_cls_for_new
get_output_file_suffix
is_dataset_directly_supported
serialize
Source code in src/omnipy/data/serializer.py
class TarFileSerializer(Serializer[_DatasetT], Generic[_DatasetT]):
    """"""
    @classmethod
    def create_tarfile_from_dataset(cls,
                                    dataset: _DatasetT,
                                    data_encode_func: Callable[..., bytes | memoryview]) -> bytes:
        bytes_io = BytesIO()
        with tarfile.open(fileobj=bytes_io, mode='w:gz') as tarfile_stream:
            for data_file, data in dataset.items():  # type: ignore[attr-defined]
                json_data_bytestream = BytesIO(data_encode_func(data))
                json_data_bytestream.seek(0)
                tarinfo = TarInfo(name=f'{data_file}.{cls.get_output_file_suffix()}')
                tarinfo.size = len(json_data_bytestream.getbuffer())
                tarfile_stream.addfile(tarinfo, json_data_bytestream)
        return bytes_io.getbuffer().tobytes()

    @classmethod
    def create_dataset_from_tarfile(cls,
                                    dataset: _DatasetT,
                                    tarfile_bytes: bytes,
                                    data_decode_func: Callable[[IO[bytes]], Any],
                                    dictify_object_func: Callable[[str, Any], dict | str],
                                    import_method: str = 'from_data',
                                    any_file_suffix: bool = False) -> None:
        with tarfile.open(fileobj=BytesIO(tarfile_bytes), mode='r:gz') as tarfile_stream:
            for filename in tarfile_stream.getnames():
                data_file = tarfile_stream.extractfile(filename)
                assert data_file is not None
                if not any_file_suffix:
                    assert filename.endswith(f'.{cls.get_output_file_suffix()}')
                data_file_name = os.path.basename('.'.join(filename.split('.')[:-1]))
                getattr(dataset, import_method)(
                    dictify_object_func(data_file_name, data_decode_func(data_file)))

create_dataset_from_tarfile classmethod

create_dataset_from_tarfile(
    dataset: _DatasetT,
    tarfile_bytes: bytes,
    data_decode_func: Callable[[IO[bytes]], Any],
    dictify_object_func: Callable[[str, Any], dict | str],
    import_method: str = "from_data",
    any_file_suffix: bool = False,
) -> None
Source code in src/omnipy/data/serializer.py
@classmethod
def create_dataset_from_tarfile(cls,
                                dataset: _DatasetT,
                                tarfile_bytes: bytes,
                                data_decode_func: Callable[[IO[bytes]], Any],
                                dictify_object_func: Callable[[str, Any], dict | str],
                                import_method: str = 'from_data',
                                any_file_suffix: bool = False) -> None:
    with tarfile.open(fileobj=BytesIO(tarfile_bytes), mode='r:gz') as tarfile_stream:
        for filename in tarfile_stream.getnames():
            data_file = tarfile_stream.extractfile(filename)
            assert data_file is not None
            if not any_file_suffix:
                assert filename.endswith(f'.{cls.get_output_file_suffix()}')
            data_file_name = os.path.basename('.'.join(filename.split('.')[:-1]))
            getattr(dataset, import_method)(
                dictify_object_func(data_file_name, data_decode_func(data_file)))

create_tarfile_from_dataset classmethod

create_tarfile_from_dataset(
    dataset: _DatasetT, data_encode_func: Callable[..., bytes | memoryview]
) -> bytes
Source code in src/omnipy/data/serializer.py
@classmethod
def create_tarfile_from_dataset(cls,
                                dataset: _DatasetT,
                                data_encode_func: Callable[..., bytes | memoryview]) -> bytes:
    bytes_io = BytesIO()
    with tarfile.open(fileobj=bytes_io, mode='w:gz') as tarfile_stream:
        for data_file, data in dataset.items():  # type: ignore[attr-defined]
            json_data_bytestream = BytesIO(data_encode_func(data))
            json_data_bytestream.seek(0)
            tarinfo = TarInfo(name=f'{data_file}.{cls.get_output_file_suffix()}')
            tarinfo.size = len(json_data_bytestream.getbuffer())
            tarfile_stream.addfile(tarinfo, json_data_bytestream)
    return bytes_io.getbuffer().tobytes()

deserialize abstractmethod classmethod

deserialize(serialized: bytes, any_file_suffix=False) -> _DatasetT
Source code in src/omnipy/data/serializer.py
@classmethod
@abstractmethod
def deserialize(cls, serialized: bytes, any_file_suffix=False) -> _DatasetT:
    pass

get_dataset_cls_for_new abstractmethod classmethod

get_dataset_cls_for_new() -> type[IsDataset]
Source code in src/omnipy/data/serializer.py
@classmethod
@abstractmethod
def get_dataset_cls_for_new(cls) -> type[IsDataset]:
    pass

get_output_file_suffix abstractmethod classmethod

get_output_file_suffix() -> str
Source code in src/omnipy/data/serializer.py
@classmethod
@abstractmethod
def get_output_file_suffix(cls) -> str:
    pass

is_dataset_directly_supported abstractmethod classmethod

is_dataset_directly_supported(dataset: IsDataset) -> bool
Source code in src/omnipy/data/serializer.py
@classmethod
@abstractmethod
def is_dataset_directly_supported(cls, dataset: IsDataset) -> bool:
    pass

serialize abstractmethod classmethod

serialize(dataset: _DatasetT) -> bytes | memoryview
Source code in src/omnipy/data/serializer.py
@classmethod
@abstractmethod
def serialize(cls, dataset: _DatasetT) -> bytes | memoryview:
    pass