Source code for kaiju_files.converters

from typing import Type, List, TypedDict, cast
from uuid import UUID

from kaiju_db import SQLService, DatabaseService
from kaiju_tools.class_registry import AbstractClassRegistry
from kaiju_tools.exceptions import NotFound, ValidationError
from kaiju_tools.mapping import recursive_update
from kaiju_tools.services import Service, ContextableService
from kaiju_tools.functions import async_run_in_thread
from kaiju_tools.rpc import AbstractRPCCompatible

from kaiju_files.abc import AbstractFileConverter
from kaiju_files.etc import ErrorCodes
from kaiju_files.tables import converters as converters_table
from kaiju_files.files import FileService

__all__ = ['ErrorCodes', 'FileConverterService', 'converters', 'FileConvertersRegistry']


[docs]class FileConvertersRegistry(AbstractClassRegistry): """Registry of all file converters.""" base_classes = (AbstractFileConverter,)
converters = FileConvertersRegistry()
[docs]class FileConverterService(SQLService, AbstractRPCCompatible): """File converters storage and execution. You can use it for image and other data types conversion. First if you need a specific converter class you should inherit it from `AbstractFileConverter` interface and register it in `converters` class registry. .. code-block:: python from kaiju_files.abc import AbstractFileConverter from kaiju_files.converters import converters class MyConverterClass(AbstractFileConverter): ... converters.register_class(MyConverterClass) From now you can use it in you converter service. It's best to init this service within a service context manager but you can init it directly by providing instances of a database service and a file service. Then you can save your converter settings and convert files using your converter. .. code-block:: python my_converter_settings = {...} async with FileConverterService(...) as fcs: row = {'cls': 'MyConverterClass', 'name': 'my converter', 'settings': my_converter_settings} row = await fcs.create(row) versions_info = await fsc.convert(row['id'], my_file_id) """ service_name = 'FileConverter' MAX_PROCESSING_TIME = 300 table = converters_table update_columns = {'system', 'settings'} class _ConvertedFileInfo(TypedDict): class _ConvertedVersionInfo(TypedDict): name: str extension: str meta: dict file_id: UUID converter_id: UUID versions: List[_ConvertedVersionInfo]
[docs] def __init__( self, app, database_service: DatabaseService, file_service: FileService = False, converters=converters, max_processing_time=MAX_PROCESSING_TIME, logger=None, ): """Initialize. :param app: :param database_service: :param file_service: :param converters: converters registry (uses default registry) :param max_processing_time: conversion time limit (sec) :param logger: """ super().__init__(app=app, database_service=database_service, logger=logger) self.file_service = self.discover_service(file_service) self.converters = converters self.max_processing_time = max(1, int(max_processing_time))
@property def routes(self) -> dict: return {**super().routes, 'classes': self.get_converter_class_specs, 'call': self.convert}
[docs] async def get_converter_class_specs(self, cls: str = None): """Return a list of registered converter classes specification.""" if cls: _cls = self._get_converter_class(cls) return {'cls': cls, 'spec': _cls.spec()} else: return [{'cls': cls, 'spec': _cls.spec()} for cls, _cls in self.converters.items()]
[docs] async def convert(self, id, file_id, settings: dict = None, metadata: dict = None, **__) -> _ConvertedFileInfo: """Convert a file. :param id: converter ID :param file_id: file to convert :param settings: additional converter settings :param metadata: additional metadata :return: """ base_file_info = await self.file_service.get(file_id, columns=['name', 'extension', 'meta', 'hash']) base_meta = base_file_info['meta'] if metadata is None: metadata = base_meta else: base_meta.update(metadata) metadata = base_meta file_path = self.file_service._get_local_file_path(base_file_info['hash']) if not file_path.exists(): raise RuntimeError('Local file referenced by this id (%s) doesn\'t exist.' % file_id) converter = await self.init_converter(id, settings) kws = {**metadata, 'return_exceptions': True, 'max_exec_time': converter.max_processing_time} result = await async_run_in_thread(converter.convert, (file_path,), kws) data = [] if isinstance(result, Exception): raise result for f, meta in result: version_name = meta.get('version') if not version_name: version_name = self.converters.class_key(converter) output_ext = meta.get('output_extension') if not output_ext: output_ext = base_file_info['extension'] name = meta.get('name') if not name: name = base_file_info['name'] name = f'{name}__{version_name}' version = {'name': name, 'extension': output_ext, 'meta': meta} file_info = await self.file_service.create(version) try: file_info = await self.file_service.upload_local_file(file_info['id'], f.name) except Exception: await self.file_service.delete_local_file(f.name) raise else: version['file'] = file_info data.append(version) return {'file_id': file_id, 'converter_id': id, 'versions': data} # noqa
[docs] def prepare_insert_data(self, data: dict) -> dict: cls, settings = data['cls'], data['settings'] converter = self._create_converter(cls, settings) return { 'cls': self.converters.class_key(converter), 'name': data['name'], 'system': data.get('system', False), 'settings': converter.settings.repr(), }
def _get_converter_class(self, cls: str) -> Type[AbstractFileConverter]: if cls not in self.converters: keys = self.converters.keys() raise NotFound( 'Converter class doesn\'t exist.', key=cls, available_classes=list(keys), code=ErrorCodes.NO_CONVERTER_CLASS_FOUND, ) return cast(Type[AbstractFileConverter], self.converters[cls])
[docs] async def init_converter(self, id, settings=None) -> AbstractFileConverter: """Return a converter instance ready to use. :param id: converter ID :param settings: additional settings to update default settings """ converter = await self.get(id, columns='*') if settings: settings = recursive_update(converter['settings'], settings) else: settings = converter['settings'] converter = self._create_converter(converter['cls'], settings) if isinstance(converter, ContextableService): await converter.init() return converter
def _create_converter(self, cls: str, settings: dict) -> AbstractFileConverter: """Create a new converter object from a class name and settings dict. :param cls: class name as in converters registry (uses `class.__name__` by default) """ cls = self._get_converter_class(cls) try: if issubclass(cls, Service): converter = cls( app=self.app, # noqa dir=self.file_service.temp_dir, settings=settings, max_processing_time=self.max_processing_time, logger=self.logger, # noqa ) else: converter = cls( dir=self.file_service.temp_dir, settings=settings, max_processing_time=self.max_processing_time ) except (ValueError, AttributeError, TypeError) as e: raise ValidationError( 'Invalid converter settings.', base_exc=e, converter_cls=cls, code=ErrorCodes.INVALID_CONVERTER_SETTINGS ) else: return converter