From 8abe66526dddf7307f2b68e28517d5531801b8a4 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Wed, 25 Nov 2020 03:32:58 +0000 Subject: [PATCH] my.photos: minor fixes/configcleanup + speedup --- my/core/common.py | 1 + my/photos/{__init__.py => main.py} | 82 ++++++++++++++++++++---------- my/photos/utils.py | 23 +++------ 3 files changed, 63 insertions(+), 43 deletions(-) rename my/photos/{__init__.py => main.py} (68%) diff --git a/my/core/common.py b/my/core/common.py index 7f9e4e5..79e06da 100644 --- a/my/core/common.py +++ b/my/core/common.py @@ -214,6 +214,7 @@ if TYPE_CHECKING: mcachew: McachewType +# TODO set default cache dir here instead? # todo ugh. I think it needs doublewrap, otherwise @mcachew without args doesn't work def mcachew(*args, **kwargs): # type: ignore[no-redef] """ diff --git a/my/photos/__init__.py b/my/photos/main.py similarity index 68% rename from my/photos/__init__.py rename to my/photos/main.py index d444658..c2ea37e 100644 --- a/my/photos/__init__.py +++ b/my/photos/main.py @@ -1,9 +1,13 @@ """ Photos and videos on your filesystem, their GPS and timestamps """ +REQUIRES = [ + 'geopy', + 'magic', +] +# NOTE: also uses fdfind to search photos -# pip install geopy magic - +from concurrent.futures import ProcessPoolExecutor as Pool from datetime import datetime import json from pathlib import Path @@ -11,14 +15,14 @@ from typing import Tuple, Dict, Optional, NamedTuple, Iterator, Iterable, List from geopy.geocoders import Nominatim # type: ignore -from ..common import LazyLogger, mcachew, fastermime -from ..error import Res +from ..core.common import LazyLogger, mcachew, fastermime +from ..core.error import Res, sort_res_by +from ..core.cachew import cache_dir from my.config import photos as config -log = LazyLogger(__name__) - +logger = LazyLogger(__name__) # TODO ignore hidden dirs? @@ -32,10 +36,6 @@ class Photo(NamedTuple): dt: Optional[datetime] geo: Optional[LatLon] - @property - def tags(self) -> List[str]: # TODO - return [] - @property def _basename(self) -> str: # TODO 'canonical' or something? only makes sense for organized ones @@ -51,6 +51,7 @@ class Photo(NamedTuple): @property def url(self) -> str: + # TODO belongs to private overlay.. return f'{config.base_url}{self._basename}' @@ -58,14 +59,23 @@ from .utils import get_exif_from_file, ExifTags, Exif, dt_from_path, convert_ref Result = Res[Photo] +def _make_photo_aux(*args, **kwargs) -> List[Result]: + # for the process pool.. + return list(_make_photo(*args, **kwargs)) + def _make_photo(photo: Path, mtype: str, *, parent_geo: Optional[LatLon]) -> Iterator[Result]: exif: Exif if any(x in mtype for x in {'image/png', 'image/x-ms-bmp', 'video'}): # TODO don't remember why.. - log.debug(f"skipping exif extraction for {photo} due to mime {mtype}") + logger.debug(f"skipping exif extraction for {photo} due to mime {mtype}") exif = {} else: - exif = get_exif_from_file(photo) + try: + exif = get_exif_from_file(photo) + except Exception as e: + # TODO reuse echain from promnesia + yield e + exif = {} def _get_geo() -> Optional[LatLon]: meta = exif.get(ExifTags.GPSINFO, {}) @@ -82,7 +92,7 @@ def _make_photo(photo: Path, mtype: str, *, parent_geo: Optional[LatLon]) -> Ite if edt is not None: dtimes = edt.replace(' 24', ' 00') # jeez maybe log it? if dtimes == "0000:00:00 00:00:00": - log.warning(f"Bad exif timestamp {dtimes} for {photo}") + logger.warning(f"Bad exif timestamp {dtimes} for {photo}") else: dt = datetime.strptime(dtimes, '%Y:%m:%d %H:%M:%S') # TODO timezone is local, should take into account... @@ -90,7 +100,7 @@ def _make_photo(photo: Path, mtype: str, *, parent_geo: Optional[LatLon]) -> Ite if 'Instagram/VID_' in str(photo): # TODO bit random... - log.warning('ignoring timestamp extraction for %s, they are stupid for Instagram videos', photo) + logger.warning('ignoring timestamp extraction for %s, they are stupid for Instagram videos', photo) return None edt = dt_from_path(photo) # ok, last try.. @@ -100,7 +110,7 @@ def _make_photo(photo: Path, mtype: str, *, parent_geo: Optional[LatLon]) -> Ite if edt is not None and edt > datetime.now(): # TODO also yield? - log.error('datetime for %s is too far in future: %s', photo, edt) + logger.error('datetime for %s is too far in future: %s', photo, edt) return None return edt @@ -111,11 +121,12 @@ def _make_photo(photo: Path, mtype: str, *, parent_geo: Optional[LatLon]) -> Ite yield Photo(str(photo), dt=dt, geo=geo) -# TODO exclude -def _candidates() -> Iterable[str]: +def _candidates() -> Iterable[Res[str]]: # TODO that could be a bit slow if there are to many extra files? from subprocess import Popen, PIPE # TODO could extract this to common? + # TODO would be nice to reuse get_files (or even let it use find) + # that way would be easier to exclude with Popen([ 'fdfind', '--follow', @@ -131,21 +142,22 @@ def _candidates() -> Iterable[str]: if tp in {'inode', 'text', 'application', 'audio'}: continue if tp not in {'image', 'video'}: - # TODO yield error? - log.warning('%s: unexpected mime %s', path, tp) + msg = f'{path}: unexpected mime {tp}' + logger.warning(msg) + yield RuntimeError(msg) # not sure if necessary # TODO return mime too? so we don't have to call it again in _photos? yield path def photos() -> Iterator[Result]: - candidates = tuple(sorted(_candidates())) + candidates = tuple(sort_res_by(_candidates(), key=lambda i: i)) return _photos(candidates) # if geo information is missing from photo, you can specify it manually in geo.json file # TODO is there something more standard? -@mcachew(cache_path=config.cache_path) -def _photos(candidates: Iterable[str]) -> Iterator[Result]: +@mcachew(cache_path=cache_dir()) +def _photos(candidates: Iterable[Res[str]]) -> Iterator[Result]: geolocator = Nominatim() # TODO does it cache?? from functools import lru_cache @@ -168,23 +180,39 @@ def _photos(candidates: Iterable[str]) -> Iterator[Result]: lon = j['lon'] return LatLon(lat=lat, lon=lon) + pool = Pool() + futures = [] - for path in map(Path, candidates): + for p in candidates: + if isinstance(p, Exception): + yield p + continue + path = Path(p) + # TODO rely on get_files if config.ignored(path): - log.info('ignoring %s due to config', path) + logger.info('ignoring %s due to config', path) continue + logger.debug('processing %s', path) parent_geo = get_geo(path.parent) mime = fastermime(str(path)) - yield from _make_photo(path, mime, parent_geo=parent_geo) + + futures.append(pool.submit(_make_photo_aux, path, mime, parent_geo=parent_geo)) + + for f in futures: + yield from f.result() -def print_all(): +def print_all() -> None: for p in photos(): if isinstance(p, Exception): print('ERROR!', p) else: - print(f"{p.dt} {p.path} {p.tags}") + print(f"{str(p.dt):25} {p.path} {p.geo}") # todo cachew -- improve AttributeError: type object 'tuple' has no attribute '__annotations__' -- improve errors? # todo cachew -- invalidate if function code changed? + +from ..core import Stats, stat +def stats() -> Stats: + return stat(photos) diff --git a/my/photos/utils.py b/my/photos/utils.py index 20f3669..15d7659 100644 --- a/my/photos/utils.py +++ b/my/photos/utils.py @@ -23,10 +23,10 @@ class ExifTags: def get_exif_from_file(path: Path) -> Exif: # TODO exception handler? with PIL.Image.open(str(path)) as fo: - return get_exif_data(fo) + return _get_exif_data(fo) -def get_exif_data(image): +def _get_exif_data(image) -> Exif: """Returns a dictionary from the exif data of an PIL Image item. Also converts the GPS Tags""" exif_data = {} info = image._getexif() @@ -46,36 +46,27 @@ def get_exif_data(image): return exif_data -def to_degree(value): +def to_degree(value) -> float: """Helper function to convert the GPS coordinates stored in the EXIF to degress in float format""" - d0 = value[0][0] - d1 = value[0][1] - d = float(d0) / float(d1) - m0 = value[1][0] - m1 = value[1][1] - m = float(m0) / float(m1) - - s0 = value[2][0] - s1 = value[2][1] - s = float(s0) / float(s1) - + (d, m, s) = value return d + (m / 60.0) + (s / 3600.0) -def convert_ref(cstr, ref: str): +def convert_ref(cstr, ref: str) -> float: val = to_degree(cstr) if ref == 'S' or ref == 'W': val = -val return val - import re from datetime import datetime from typing import Optional # TODO surely there is a library that does it?? +# TODO this belogs to a private overlay or something +# basically have a function that patches up dates after the files were yielded.. _DT_REGEX = re.compile(r'\D(\d{8})\D*(\d{6})\D') def dt_from_path(p: Path) -> Optional[datetime]: name = p.stem