Skip to content

omnipy.components.raw.serializers

CLASS DESCRIPTION
RawBytesDatasetToTarFileSerializer
RawStrDatasetToTarFileSerializer

RawBytesDatasetToTarFileSerializer

Bases: TarFileSerializer[StrictBytesDataset]

METHOD DESCRIPTION
deserialize
get_dataset_cls_for_new
get_output_file_suffix
is_dataset_directly_supported
serialize
Source code in src/omnipy/components/raw/serializers.py
class RawBytesDatasetToTarFileSerializer(TarFileSerializer[StrictBytesDataset]):
    """"""
    @classmethod
    def is_dataset_directly_supported(cls, dataset: IsDataset) -> bool:
        type_variants = all_dataset_type_variants(dataset)
        return len(type_variants) > 0 and type_variants[0] is bytes

    @classmethod
    def get_dataset_cls_for_new(cls) -> Type[IsDataset]:
        return StrictBytesDataset

    @classmethod
    def get_output_file_suffix(cls) -> str:
        return 'bytes'

    @classmethod
    def serialize(cls, dataset: StrictBytesDataset) -> bytes | memoryview:
        def raw_encode_func(content: bytes) -> bytes:
            return content

        return cls.create_tarfile_from_dataset(dataset, data_encode_func=raw_encode_func)

    @classmethod
    def deserialize(cls, serialized: bytes, any_file_suffix=False) -> StrictBytesDataset:
        dataset = Dataset[Model[bytes]]()

        def raw_decode_func(file_stream: IO[bytes]) -> bytes:
            return file_stream.read()

        def python_dictify_object(data_file: str, obj_val: Any) -> dict:
            return {data_file: obj_val}

        cls.create_dataset_from_tarfile(
            dataset,
            serialized,
            data_decode_func=raw_decode_func,
            dictify_object_func=python_dictify_object,
            import_method='from_data',
            any_file_suffix=any_file_suffix,
        )  # noqa

        return dataset

deserialize classmethod

deserialize(serialized: bytes, any_file_suffix=False) -> StrictBytesDataset
Source code in src/omnipy/components/raw/serializers.py
@classmethod
def deserialize(cls, serialized: bytes, any_file_suffix=False) -> StrictBytesDataset:
    dataset = Dataset[Model[bytes]]()

    def raw_decode_func(file_stream: IO[bytes]) -> bytes:
        return file_stream.read()

    def python_dictify_object(data_file: str, obj_val: Any) -> dict:
        return {data_file: obj_val}

    cls.create_dataset_from_tarfile(
        dataset,
        serialized,
        data_decode_func=raw_decode_func,
        dictify_object_func=python_dictify_object,
        import_method='from_data',
        any_file_suffix=any_file_suffix,
    )  # noqa

    return dataset

get_dataset_cls_for_new classmethod

get_dataset_cls_for_new() -> Type[IsDataset]
Source code in src/omnipy/components/raw/serializers.py
@classmethod
def get_dataset_cls_for_new(cls) -> Type[IsDataset]:
    return StrictBytesDataset

get_output_file_suffix classmethod

get_output_file_suffix() -> str
Source code in src/omnipy/components/raw/serializers.py
@classmethod
def get_output_file_suffix(cls) -> str:
    return 'bytes'

is_dataset_directly_supported classmethod

is_dataset_directly_supported(dataset: IsDataset) -> bool
Source code in src/omnipy/components/raw/serializers.py
@classmethod
def is_dataset_directly_supported(cls, dataset: IsDataset) -> bool:
    type_variants = all_dataset_type_variants(dataset)
    return len(type_variants) > 0 and type_variants[0] is bytes

serialize classmethod

serialize(dataset: StrictBytesDataset) -> bytes | memoryview
Source code in src/omnipy/components/raw/serializers.py
@classmethod
def serialize(cls, dataset: StrictBytesDataset) -> bytes | memoryview:
    def raw_encode_func(content: bytes) -> bytes:
        return content

    return cls.create_tarfile_from_dataset(dataset, data_encode_func=raw_encode_func)

RawStrDatasetToTarFileSerializer

Bases: TarFileSerializer[StrictStrDataset]

METHOD DESCRIPTION
deserialize
get_dataset_cls_for_new
get_output_file_suffix
is_dataset_directly_supported
serialize
Source code in src/omnipy/components/raw/serializers.py
class RawStrDatasetToTarFileSerializer(TarFileSerializer[StrictStrDataset]):
    """"""
    @classmethod
    def is_dataset_directly_supported(cls, dataset: IsDataset) -> bool:
        type_variants = all_dataset_type_variants(dataset)
        return len(type_variants) > 0 and type_variants[0] is str

    @classmethod
    def get_dataset_cls_for_new(cls) -> Type[IsDataset]:
        return StrictStrDataset

    @classmethod
    def get_output_file_suffix(cls) -> str:
        return 'txt'

    @classmethod
    def serialize(cls, dataset: StrictStrDataset) -> bytes | memoryview:
        def raw_encode_func(content: str) -> bytes:
            return content.encode('utf8')

        return cls.create_tarfile_from_dataset(dataset, data_encode_func=raw_encode_func)

    @classmethod
    def deserialize(cls, serialized: bytes, any_file_suffix=False) -> StrictStrDataset:
        dataset = StrictStrDataset()

        def raw_decode_func(file_stream: IO[bytes]) -> str:
            return file_stream.read().decode('utf8')

        def python_dictify_object(data_file: str, obj_val: Any) -> dict:
            return {data_file: obj_val}

        cls.create_dataset_from_tarfile(
            dataset,
            serialized,
            data_decode_func=raw_decode_func,
            dictify_object_func=python_dictify_object,
            import_method='from_data',
            any_file_suffix=any_file_suffix,
        )  # noqa

        return dataset

deserialize classmethod

deserialize(serialized: bytes, any_file_suffix=False) -> StrictStrDataset
Source code in src/omnipy/components/raw/serializers.py
@classmethod
def deserialize(cls, serialized: bytes, any_file_suffix=False) -> StrictStrDataset:
    dataset = StrictStrDataset()

    def raw_decode_func(file_stream: IO[bytes]) -> str:
        return file_stream.read().decode('utf8')

    def python_dictify_object(data_file: str, obj_val: Any) -> dict:
        return {data_file: obj_val}

    cls.create_dataset_from_tarfile(
        dataset,
        serialized,
        data_decode_func=raw_decode_func,
        dictify_object_func=python_dictify_object,
        import_method='from_data',
        any_file_suffix=any_file_suffix,
    )  # noqa

    return dataset

get_dataset_cls_for_new classmethod

get_dataset_cls_for_new() -> Type[IsDataset]
Source code in src/omnipy/components/raw/serializers.py
@classmethod
def get_dataset_cls_for_new(cls) -> Type[IsDataset]:
    return StrictStrDataset

get_output_file_suffix classmethod

get_output_file_suffix() -> str
Source code in src/omnipy/components/raw/serializers.py
@classmethod
def get_output_file_suffix(cls) -> str:
    return 'txt'

is_dataset_directly_supported classmethod

is_dataset_directly_supported(dataset: IsDataset) -> bool
Source code in src/omnipy/components/raw/serializers.py
@classmethod
def is_dataset_directly_supported(cls, dataset: IsDataset) -> bool:
    type_variants = all_dataset_type_variants(dataset)
    return len(type_variants) > 0 and type_variants[0] is str

serialize classmethod

serialize(dataset: StrictStrDataset) -> bytes | memoryview
Source code in src/omnipy/components/raw/serializers.py
@classmethod
def serialize(cls, dataset: StrictStrDataset) -> bytes | memoryview:
    def raw_encode_func(content: str) -> bytes:
        return content.encode('utf8')

    return cls.create_tarfile_from_dataset(dataset, data_encode_func=raw_encode_func)