import uuid
from pathlib import Path
import os
import shutil
import tempfile
import hashlib
from datetime import datetime, timedelta
from tempfile import TemporaryDirectory
from typing import *
import aiofiles
import sqlalchemy as sa
from kaiju_tools.services import ContextableService
from kaiju_tools.rpc import AbstractRPCCompatible
from kaiju_tools.functions import async_run_in_thread
from kaiju_db.services import SQLService, DatabaseService
from kaiju_files.tables import files
__all__ = ['FileService']
[docs]class FileService(SQLService, ContextableService, AbstractRPCCompatible):
"""File management service which handlers uploads and downloads.
It's expected to be initialized within a service context manager, but it's possible to do
initialization manually by directly providing an instance of the database service.
File service consists of two main parts: file records table and actual files stored in a local
directory.
To upload a file first you have to create an empty file record with all metadata about the file
and then link it to a local file or upload data referencing the record by its id.
.. code-block:: python
async with FileService(app, database_service):
data = await file_service.create({'name': 'test', 'extension': 'txt', 'meta': {'tag': 'file'}})
data = await file_service.upload_local_file(data['id'], file_path)
Files are stored locally under their hash UUID and symlinked using their specified names, thus
same files with different names can coexist in a filesystem and may be served statically via nginx
or other server.
"""
service_name = 'files'
table = files
URI_PREFIX = '/files/'
DELETE_UNLINKED_INTERVAL_DAYS = 1
insert_columns = ('name', 'extension', 'meta')
update_columns = ('name', 'extension', 'hash', 'meta')
[docs] def __init__(
self,
app,
database_service: DatabaseService,
dir='.',
uri_prefix: str = URI_PREFIX,
logger=None,
):
"""Initialize.
:param app:
:param database_service:
:param dir: local file storage path
:param uri_prefix: optional prefix for returned URIs
:param logger:
"""
super().__init__(app=app, database_service=database_service, logger=logger)
self._dir = Path(dir).resolve()
self._dir.mkdir(exist_ok=True, parents=True)
self._uri_prefix = Path(uri_prefix)
self._temp_dir = None
self.virtual_columns = {'uri': f"'{self._uri_prefix}' || '/' || hash || '/' || name || '.' || extension"}
@property
def routes(self) -> dict:
routes = {**super().routes, 'delete_unlinked': self.delete_unlinked_files}
return routes
@property
def permissions(self) -> dict:
return {self.DEFAULT_PERMISSION: self.PermissionKeys.GLOBAL_USER_PERMISSION}
[docs] async def init(self):
self._temp_dir = TemporaryDirectory(prefix='FileService')
self._temp_dir.__enter__()
[docs] async def close(self):
if not self.closed:
self._temp_dir.__exit__(None, None, None)
self._temp_dir = None
@property
def closed(self) -> bool:
return self._temp_dir is None
@property
def temp_dir(self) -> Path:
return Path(self._temp_dir.name)
[docs] async def upload_local_file(self, id: uuid.UUID, path: Union[str, Path, tempfile.NamedTemporaryFile], move=True):
"""Upload local file.
Use this method to 'upload' a local file. This operation will move the file into a file service
directory and link it to a file record.
:param id: file record id
:param path: local file path
:param move: move file instead of copying it
:return: file and URI
"""
data = await self.get(id, columns=['name', 'extension', 'meta'])
_hash = hashlib.md5()
temp_file = self.get_temp_file_path()
temp_file.parent.mkdir(exist_ok=True, parents=True)
if isinstance(path, str):
pass
elif isinstance(path, Path):
path = str(path)
else:
path = path.name
async with aiofiles.open(path, 'rb') as _f:
size = 1024**2
chunk = await _f.read(size)
while chunk:
_hash.update(chunk)
chunk = await _f.read(size)
_hash = uuid.UUID(_hash.hexdigest())
id, uri = await self._upload(
file_id=id,
path=path,
hash=_hash,
name=data['name'],
extension=data['extension'],
meta=data['meta'],
_move=move,
)
return {'id': id, 'uri': str(uri)}
[docs] async def upload_content(self, id: uuid.UUID, content, _move=True):
data = await self.get(id, columns=['name', 'extension', 'meta'])
hash = hashlib.md5()
temp_file = self.get_temp_file_path()
temp_file.parent.mkdir(exist_ok=True, parents=True)
async with aiofiles.open(temp_file, 'wb') as _f:
async for chunk in content.iter_chunked(1024**2):
hash.update(chunk)
await _f.write(chunk)
hash = uuid.UUID(hash.hexdigest())
id, uri = await self._upload(
file_id=id,
path=temp_file,
hash=hash,
name=data['name'],
extension=data['extension'],
meta=data['meta'],
_move=_move,
)
return {'id': id, 'uri': str(uri), 'hash': hash, 'tmp_file': temp_file}
[docs] async def delete_unlinked_files(self, days=DELETE_UNLINKED_INTERVAL_DAYS):
"""Remove all old file record which have no hash (i.e. an actual file) linked to them."""
t = datetime.now() - timedelta(days=days)
sql = self.table.delete().where(sa.and_(self.table.c.hash == None, self.table.c.timestamp < t)) # noqa alchemy
await super()._wrap_delete(self._db.execute(sql))
[docs] async def delete_local_file(self, name: Union[Path, str, tempfile.TemporaryFile]):
if isinstance(name, str):
pass
elif isinstance(name, Path):
name = str(name)
else:
name = name.name
path = Path(name)
if path.exists():
await async_run_in_thread(os.unlink, args=(name,))
[docs] async def get_local_file_path(self, id: uuid.UUID):
file_info = await self.get(id=id, columns=['hash'])
path = self._get_local_file_path(file_info['hash'])
return path
def _get_local_file_path(self, hash: Optional[uuid.UUID]) -> Optional[Path]:
if hash:
return self._dir / str(hash) / str(hash)
def _get_local_file_name(
self, name: Optional[str], hash: Optional[uuid.UUID], extension: Optional[str]
) -> Optional[str]:
if extension:
return f'{name}.{extension}'
elif name:
return f'{name}'
elif hash:
return str(hash)
def _get_file_uri(self, name: Optional[str], hash: Optional[uuid.UUID], extension: Optional[str]) -> Optional[Path]:
name = self._get_local_file_name(name=name, hash=hash, extension=extension)
if name:
return self._uri_prefix / str(hash) / name
def _get_local_symlink_path(
self, name: Optional[str], extension: Optional[str], hash: Optional[uuid.UUID]
) -> Optional[Path]:
name = self._get_local_file_name(name=name, extension=extension, hash=hash)
if name:
return self._dir / str(hash) / name
[docs] def get_temp_file_path(self) -> Path:
temp_file_name = str(uuid.uuid4())
return self.temp_dir / temp_file_name
[docs] async def get_temp_dir(self, *args, **kws) -> tempfile.TemporaryDirectory:
kws = {**kws, 'dir': self._temp_dir}
_dir = await async_run_in_thread(tempfile.TemporaryDirectory, args, kws)
return _dir
[docs] async def get_temp_file(self, *args, **kws) -> tempfile.NamedTemporaryFile:
kws = {**kws, 'dir': self._temp_dir}
temp_file = await async_run_in_thread(tempfile.NamedTemporaryFile, args, kws)
return temp_file
[docs] def get_temp_file_sync(self, *args, **kws) -> tempfile.NamedTemporaryFile:
temp_file = tempfile.NamedTemporaryFile(*args, dir=self._temp_dir, **kws)
return temp_file
async def _delete_local_files(self, hash: Optional[uuid.UUID]):
if hash:
d = self._get_local_file_path(hash).parent
if d.exists():
await async_run_in_thread(shutil.rmtree, args=(str(d),))
async def _upload(
self,
file_id: uuid.UUID,
path: Union[Path, str],
hash: uuid.UUID,
name: str,
extension: str,
meta: dict,
_move=True,
) -> (uuid.UUID, Path):
file_path = self._get_local_file_path(hash)
file_path.parent.mkdir(exist_ok=True, parents=True)
# uri = self._get_file_uri(name=name, hash=hash, extension=extension)
if not file_path.exists():
if _move:
await async_run_in_thread(shutil.move, args=(str(path), str(file_path)))
else:
await async_run_in_thread(shutil.copy, args=(str(path), str(file_path)))
else:
await async_run_in_thread(os.unlink, args=(str(path),))
link = self._get_local_symlink_path(name=name, hash=hash, extension=extension)
if not link.exists():
# sql = self.table.select().with_only_columns([
# self.table.c.id
# ]).where(
# sa.and_(
# self.table.c.extension == extension,
# self.table.c.name == name,
# self.table.c.hash == hash
# )
# ).limit(1)
# data = await super()._wrap_get(self._db.fetchrow(sql))
# if data:
# await self.delete(file_id, columns=None)
# file_id = data['id']
# return file_id, uri
# else:
await async_run_in_thread(os.symlink, args=(str(file_path), str(link)))
file_size = file_path.stat().st_size
meta['file_size'] = file_size
await self.update(id=file_id, data={'hash': hash, 'meta': meta}, columns=None)
uri = self._get_file_uri(name=name, hash=hash, extension=extension)
return file_id, uri
@staticmethod
def _create_file(name: str, extension: str = None, meta: dict = None):
return {'name': name, 'extension': extension, 'meta': {} if meta is None else meta}
def _insert_uri(self, data):
data['uri'] = self._get_file_uri(name=data['name'], hash=data['hash'], extension=data['extension'])