import abc
import re
import string
import tempfile
from pathlib import Path
from typing import *
import kaiju_tools.jsonschema as schema
from kaiju_tools.serialization import Serializable
from kaiju_tools.services import Service
from kaiju_files.etc import ErrorCodes
__all__ = (
'AbstractFileTransportInterface',
'AbstractFileLoader',
'AbstractFileConverter',
'FileOperationConfigurationError',
)
[docs]class AbstractFileTransportInterface(abc.ABC):
"""Connector to an external file storage."""
[docs] @abc.abstractmethod
async def has_new_files(self) -> bool:
"""Return True if new downloadable files found in shared folders."""
[docs] @abc.abstractmethod
async def list(self) -> AsyncGenerator[Path, None]:
"""List all files in shared folders."""
[docs] @abc.abstractmethod
async def download(self, uri: Path) -> tempfile.NamedTemporaryFile:
"""Download a file to a local temp dir and return the location."""
[docs] @abc.abstractmethod
async def delete(self, uri: Path):
"""Remove a downloaded file from a shared directory."""
[docs] @abc.abstractmethod
async def mark_failed(self, uri: Path, reason: ErrorCodes, message: str = ''):
"""Mark a shared file as failed and (optionally) move it to another shared location."""
[docs]class FileOperationConfigurationError(ValueError):
"""An error due to invalid file converter settings."""
class AbstractFileOperation(abc.ABC):
class Settings(Serializable, abc.ABC):
"""Settings object for a file operation class.
Inner class because it's not to be used apart from its parent converter.
"""
_strip_set = string.punctuation + string.whitespace
__slots__ = ('filename_mask', 'directory_mask', 'ext', 'meta', 'output_extension')
def __init__(
self,
filename_mask: str = None,
directory_mask: str = None,
ext: List[str] = None,
meta: dict = None,
output_extension: str = None,
):
"""Initialize.
:param filename_mask: regular expression mask for input filenames,
named groups are allowed and will be used for extracting
file metadata if needed, None (default) for no specific mask
:param directory_mask: same as filename mask but for parent dir
:param ext: list of allowed file extensions, only alphanumeric
chars allowed, None (default) for no specific extensions
:param meta: specific metadata added to converted files
:param output_extension: output file extension
"""
try:
self.filename_mask = re.compile(filename_mask) if filename_mask else None
except re.error:
raise FileOperationConfigurationError(
'Invalid filename mask "%s". Must be None or' ' a valid regular expression.' % filename_mask
)
try:
self.directory_mask = re.compile(directory_mask) if directory_mask else None
except re.error:
raise FileOperationConfigurationError(
'Invalid directory mask "%s". Must be None or' ' a valid regular expression.' % directory_mask
)
if isinstance(ext, Iterable):
self.ext = []
for e in ext:
e = str(e).strip(self._strip_set).lower()
if e:
self.ext.append(e)
if self.ext:
self.ext = frozenset(self.ext)
else:
self.ext = None
else:
raise FileOperationConfigurationError('Invalid extension set "%s". Must be an iterable object' % ext)
self.meta = dict(meta) if meta else {}
self.output_extension = output_extension
def match(self, uri: Path) -> Optional[dict]:
"""Match a filename using an `input_mask`.
The resulting data will be returned in a group dict. In case of no match the dict will be empty.
"""
groups = {}
if self.ext:
ext = uri.suffix.lower().lstrip('.')
if ext in self.ext:
groups['extension'] = ext
else:
return None
if self.filename_mask:
match = self.filename_mask.fullmatch(str(uri.stem))
if match:
groups.update(match.groupdict())
else:
return None
if self.directory_mask:
match = self.filename_mask.fullmatch(str(uri.parent))
if match:
groups.update(match.groupdict())
else:
return None
groups['uri'] = str(uri)
return groups
def repr(self) -> dict:
return {
'filename_mask': self.filename_mask.pattern if self.filename_mask else None,
'directory_mask': self.directory_mask.pattern if self.directory_mask else None,
'ext': list(self.ext) if self.ext else None,
'meta': self.meta,
'output_extension': self.output_extension,
}
@classmethod
def spec(cls) -> schema.Object:
return schema.Object(
filename_mask=schema.String(
title='Regex mask for matching a file name.',
description='All groups will be written to "meta" dict of a file.'
' If no match, the file counts as rejected.'
' If mask is "null", then all filenames are accepted.',
format='regex',
nullable=True,
minLength=1,
),
directory_mask=schema.String(
title='Regex mask for matching a file directory name.',
description='All groups will be written to "meta" dict of a file.'
' If no match, the file counts as rejected.'
' If mask is "null", then all filenames are accepted.',
format='regex',
nullable=True,
minLength=1,
),
ext=schema.Array(
title='A list of allowed file extensions.',
description='If it is "null", then all extensions are accepted.',
items=schema.String(minLength=1),
uniqueItems=True,
nullable=True,
minItems=1,
),
meta=schema.Object(title='Optional file metadata, will be written to a file.', nullable=True),
output_extension=schema.String(
title='An output file extension.',
description='If it is "null", then an original extension will be used.',
nullable=True,
minLength=1,
),
title='File operation class settings.',
additionalProperties=False,
)
def __init__(self, settings: Union[dict, List[dict]] = None):
"""Initialize.
:param settings: converter specific settings
"""
self.settings = self.Settings(**settings)
@classmethod
def spec(cls):
return schema.Object(settings=cls.Settings.spec())
def match(self, uri: Path) -> Optional[dict]:
"""Match a filename using an `input_mask`.
The resulting data will be returned in a group dict. In case of no match the dict will be empty.
"""
return self.settings.match(uri)
[docs]class AbstractFileLoader(AbstractFileOperation, Service, abc.ABC):
"""File uploading and metadata interface."""
[docs] class Settings(AbstractFileOperation.Settings):
"""File loader settings."""
[docs] def __init__(self, app, *args, logger=None, **kws):
"""Initialize."""
AbstractFileOperation.__init__(self, *args, **kws)
Service.__init__(self, app=app, logger=logger)
[docs] @abc.abstractmethod
async def upload(self, data: tempfile.NamedTemporaryFile, **metadata):
"""Upload a file from local temp dir and sets its metadata."""
[docs]class AbstractFileConverter(AbstractFileOperation, abc.ABC):
"""File converter/normalizer which is supposed to be run in a thread / process."""
[docs] class Settings(AbstractFileOperation.Settings):
"""File converter settings."""
READ_MODE = 'rb'
WRITE_MODE = 'wb'
MAX_PROCESSING_TIME = 300
[docs] def __init__(
self,
dir: str = '.',
read_mode=READ_MODE,
write_mode=WRITE_MODE,
max_processing_time=MAX_PROCESSING_TIME,
settings: Union[dict, List[dict]] = None,
):
"""Initialize.
:param dir: path to a temp local data storage
:param read_mode: read mode for opening original files
:param write_mode: write mode for writing new (converted) files
:param max_processing_time: maximum file processing time in seconds
:param settings: converter specific settings
"""
super().__init__(settings=settings)
self._dir = Path(dir).resolve()
self.max_processing_time = max(1, int(max_processing_time))
self._read_mode = read_mode
self._write_mode = write_mode
self._files = []
[docs] def convert(
self, file: Union[Path, str, tempfile.NamedTemporaryFile], return_exceptions=False, **metadata
) -> List[Tuple[tempfile.NamedTemporaryFile, dict]]:
"""Call a converter and returns created file paths and metadata."""
def _process_file(input_buffer):
data = self._convert(input_buffer, **metadata)
for t_file, _metadata in data:
if not t_file.closed:
t_file.close()
version_metadata = {}
version_metadata.update(metadata)
version_metadata.update(self.settings.meta)
version_metadata.update(_metadata)
files.append((t_file, version_metadata))
try:
files = []
try:
if isinstance(file, (Path, str)):
with open(str(file), mode=self._read_mode) as input_buffer:
_process_file(input_buffer)
else:
with open(file.name, mode=self._read_mode) as input_buffer:
_process_file(input_buffer)
except Exception as err:
if return_exceptions:
files = err
else:
raise
finally:
for t_file in self._files:
if not t_file.closed:
t_file.close()
self._files = []
return files
@abc.abstractmethod
def _convert(self, input_buffer, **metadata) -> Generator[Tuple[tempfile.NamedTemporaryFile, dict], None, None]:
"""Convert files.
You must define your custom file processing method here.
This method should yield your new files aside with its meta-information.
:param input_buffer: opened source file buffer passed automatically
:param metadata: additional meta-information
"""
output_file = self._create_file()
with output_file.open(self._write_mode):
"""Do here what you need and return your
output file path and additional data."""
yield output_file, metadata
def _create_file(self, ext=None, dir=None):
"""Create new files."""
if ext:
ext = f'.{ext}'
if dir is None:
dir = self._dir
else:
dir = str(dir)
t_file = tempfile.NamedTemporaryFile(
dir=dir, mode=self._write_mode, delete=False, prefix=f'{self.__class__.__name__}_', suffix=ext
)
self._files.append(t_file)
return t_file