my.photos: minor fixes/configcleanup + speedup
This commit is contained in:
parent
f8db8c7b98
commit
8abe66526d
3 changed files with 63 additions and 43 deletions
|
@ -214,6 +214,7 @@ if TYPE_CHECKING:
|
||||||
|
|
||||||
mcachew: McachewType
|
mcachew: McachewType
|
||||||
|
|
||||||
|
# TODO set default cache dir here instead?
|
||||||
# todo ugh. I think it needs doublewrap, otherwise @mcachew without args doesn't work
|
# todo ugh. I think it needs doublewrap, otherwise @mcachew without args doesn't work
|
||||||
def mcachew(*args, **kwargs): # type: ignore[no-redef]
|
def mcachew(*args, **kwargs): # type: ignore[no-redef]
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -1,9 +1,13 @@
|
||||||
"""
|
"""
|
||||||
Photos and videos on your filesystem, their GPS and timestamps
|
Photos and videos on your filesystem, their GPS and timestamps
|
||||||
"""
|
"""
|
||||||
|
REQUIRES = [
|
||||||
|
'geopy',
|
||||||
|
'magic',
|
||||||
|
]
|
||||||
|
# NOTE: also uses fdfind to search photos
|
||||||
|
|
||||||
# pip install geopy magic
|
from concurrent.futures import ProcessPoolExecutor as Pool
|
||||||
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import json
|
import json
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
@ -11,14 +15,14 @@ from typing import Tuple, Dict, Optional, NamedTuple, Iterator, Iterable, List
|
||||||
|
|
||||||
from geopy.geocoders import Nominatim # type: ignore
|
from geopy.geocoders import Nominatim # type: ignore
|
||||||
|
|
||||||
from ..common import LazyLogger, mcachew, fastermime
|
from ..core.common import LazyLogger, mcachew, fastermime
|
||||||
from ..error import Res
|
from ..core.error import Res, sort_res_by
|
||||||
|
from ..core.cachew import cache_dir
|
||||||
|
|
||||||
from my.config import photos as config
|
from my.config import photos as config
|
||||||
|
|
||||||
|
|
||||||
log = LazyLogger(__name__)
|
logger = LazyLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# TODO ignore hidden dirs?
|
# TODO ignore hidden dirs?
|
||||||
|
@ -32,10 +36,6 @@ class Photo(NamedTuple):
|
||||||
dt: Optional[datetime]
|
dt: Optional[datetime]
|
||||||
geo: Optional[LatLon]
|
geo: Optional[LatLon]
|
||||||
|
|
||||||
@property
|
|
||||||
def tags(self) -> List[str]: # TODO
|
|
||||||
return []
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def _basename(self) -> str:
|
def _basename(self) -> str:
|
||||||
# TODO 'canonical' or something? only makes sense for organized ones
|
# TODO 'canonical' or something? only makes sense for organized ones
|
||||||
|
@ -51,6 +51,7 @@ class Photo(NamedTuple):
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def url(self) -> str:
|
def url(self) -> str:
|
||||||
|
# TODO belongs to private overlay..
|
||||||
return f'{config.base_url}{self._basename}'
|
return f'{config.base_url}{self._basename}'
|
||||||
|
|
||||||
|
|
||||||
|
@ -58,14 +59,23 @@ from .utils import get_exif_from_file, ExifTags, Exif, dt_from_path, convert_ref
|
||||||
|
|
||||||
Result = Res[Photo]
|
Result = Res[Photo]
|
||||||
|
|
||||||
|
def _make_photo_aux(*args, **kwargs) -> List[Result]:
|
||||||
|
# for the process pool..
|
||||||
|
return list(_make_photo(*args, **kwargs))
|
||||||
|
|
||||||
def _make_photo(photo: Path, mtype: str, *, parent_geo: Optional[LatLon]) -> Iterator[Result]:
|
def _make_photo(photo: Path, mtype: str, *, parent_geo: Optional[LatLon]) -> Iterator[Result]:
|
||||||
exif: Exif
|
exif: Exif
|
||||||
if any(x in mtype for x in {'image/png', 'image/x-ms-bmp', 'video'}):
|
if any(x in mtype for x in {'image/png', 'image/x-ms-bmp', 'video'}):
|
||||||
# TODO don't remember why..
|
# TODO don't remember why..
|
||||||
log.debug(f"skipping exif extraction for {photo} due to mime {mtype}")
|
logger.debug(f"skipping exif extraction for {photo} due to mime {mtype}")
|
||||||
exif = {}
|
exif = {}
|
||||||
else:
|
else:
|
||||||
exif = get_exif_from_file(photo)
|
try:
|
||||||
|
exif = get_exif_from_file(photo)
|
||||||
|
except Exception as e:
|
||||||
|
# TODO reuse echain from promnesia
|
||||||
|
yield e
|
||||||
|
exif = {}
|
||||||
|
|
||||||
def _get_geo() -> Optional[LatLon]:
|
def _get_geo() -> Optional[LatLon]:
|
||||||
meta = exif.get(ExifTags.GPSINFO, {})
|
meta = exif.get(ExifTags.GPSINFO, {})
|
||||||
|
@ -82,7 +92,7 @@ def _make_photo(photo: Path, mtype: str, *, parent_geo: Optional[LatLon]) -> Ite
|
||||||
if edt is not None:
|
if edt is not None:
|
||||||
dtimes = edt.replace(' 24', ' 00') # jeez maybe log it?
|
dtimes = edt.replace(' 24', ' 00') # jeez maybe log it?
|
||||||
if dtimes == "0000:00:00 00:00:00":
|
if dtimes == "0000:00:00 00:00:00":
|
||||||
log.warning(f"Bad exif timestamp {dtimes} for {photo}")
|
logger.warning(f"Bad exif timestamp {dtimes} for {photo}")
|
||||||
else:
|
else:
|
||||||
dt = datetime.strptime(dtimes, '%Y:%m:%d %H:%M:%S')
|
dt = datetime.strptime(dtimes, '%Y:%m:%d %H:%M:%S')
|
||||||
# TODO timezone is local, should take into account...
|
# TODO timezone is local, should take into account...
|
||||||
|
@ -90,7 +100,7 @@ def _make_photo(photo: Path, mtype: str, *, parent_geo: Optional[LatLon]) -> Ite
|
||||||
|
|
||||||
if 'Instagram/VID_' in str(photo):
|
if 'Instagram/VID_' in str(photo):
|
||||||
# TODO bit random...
|
# TODO bit random...
|
||||||
log.warning('ignoring timestamp extraction for %s, they are stupid for Instagram videos', photo)
|
logger.warning('ignoring timestamp extraction for %s, they are stupid for Instagram videos', photo)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
edt = dt_from_path(photo) # ok, last try..
|
edt = dt_from_path(photo) # ok, last try..
|
||||||
|
@ -100,7 +110,7 @@ def _make_photo(photo: Path, mtype: str, *, parent_geo: Optional[LatLon]) -> Ite
|
||||||
|
|
||||||
if edt is not None and edt > datetime.now():
|
if edt is not None and edt > datetime.now():
|
||||||
# TODO also yield?
|
# TODO also yield?
|
||||||
log.error('datetime for %s is too far in future: %s', photo, edt)
|
logger.error('datetime for %s is too far in future: %s', photo, edt)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
return edt
|
return edt
|
||||||
|
@ -111,11 +121,12 @@ def _make_photo(photo: Path, mtype: str, *, parent_geo: Optional[LatLon]) -> Ite
|
||||||
yield Photo(str(photo), dt=dt, geo=geo)
|
yield Photo(str(photo), dt=dt, geo=geo)
|
||||||
|
|
||||||
|
|
||||||
# TODO exclude
|
def _candidates() -> Iterable[Res[str]]:
|
||||||
def _candidates() -> Iterable[str]:
|
|
||||||
# TODO that could be a bit slow if there are to many extra files?
|
# TODO that could be a bit slow if there are to many extra files?
|
||||||
from subprocess import Popen, PIPE
|
from subprocess import Popen, PIPE
|
||||||
# TODO could extract this to common?
|
# TODO could extract this to common?
|
||||||
|
# TODO would be nice to reuse get_files (or even let it use find)
|
||||||
|
# that way would be easier to exclude
|
||||||
with Popen([
|
with Popen([
|
||||||
'fdfind',
|
'fdfind',
|
||||||
'--follow',
|
'--follow',
|
||||||
|
@ -131,21 +142,22 @@ def _candidates() -> Iterable[str]:
|
||||||
if tp in {'inode', 'text', 'application', 'audio'}:
|
if tp in {'inode', 'text', 'application', 'audio'}:
|
||||||
continue
|
continue
|
||||||
if tp not in {'image', 'video'}:
|
if tp not in {'image', 'video'}:
|
||||||
# TODO yield error?
|
msg = f'{path}: unexpected mime {tp}'
|
||||||
log.warning('%s: unexpected mime %s', path, tp)
|
logger.warning(msg)
|
||||||
|
yield RuntimeError(msg) # not sure if necessary
|
||||||
# TODO return mime too? so we don't have to call it again in _photos?
|
# TODO return mime too? so we don't have to call it again in _photos?
|
||||||
yield path
|
yield path
|
||||||
|
|
||||||
|
|
||||||
def photos() -> Iterator[Result]:
|
def photos() -> Iterator[Result]:
|
||||||
candidates = tuple(sorted(_candidates()))
|
candidates = tuple(sort_res_by(_candidates(), key=lambda i: i))
|
||||||
return _photos(candidates)
|
return _photos(candidates)
|
||||||
|
|
||||||
|
|
||||||
# if geo information is missing from photo, you can specify it manually in geo.json file
|
# if geo information is missing from photo, you can specify it manually in geo.json file
|
||||||
# TODO is there something more standard?
|
# TODO is there something more standard?
|
||||||
@mcachew(cache_path=config.cache_path)
|
@mcachew(cache_path=cache_dir())
|
||||||
def _photos(candidates: Iterable[str]) -> Iterator[Result]:
|
def _photos(candidates: Iterable[Res[str]]) -> Iterator[Result]:
|
||||||
geolocator = Nominatim() # TODO does it cache??
|
geolocator = Nominatim() # TODO does it cache??
|
||||||
|
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
|
@ -168,23 +180,39 @@ def _photos(candidates: Iterable[str]) -> Iterator[Result]:
|
||||||
lon = j['lon']
|
lon = j['lon']
|
||||||
return LatLon(lat=lat, lon=lon)
|
return LatLon(lat=lat, lon=lon)
|
||||||
|
|
||||||
|
pool = Pool()
|
||||||
|
futures = []
|
||||||
|
|
||||||
for path in map(Path, candidates):
|
for p in candidates:
|
||||||
|
if isinstance(p, Exception):
|
||||||
|
yield p
|
||||||
|
continue
|
||||||
|
path = Path(p)
|
||||||
|
# TODO rely on get_files
|
||||||
if config.ignored(path):
|
if config.ignored(path):
|
||||||
log.info('ignoring %s due to config', path)
|
logger.info('ignoring %s due to config', path)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
logger.debug('processing %s', path)
|
||||||
parent_geo = get_geo(path.parent)
|
parent_geo = get_geo(path.parent)
|
||||||
mime = fastermime(str(path))
|
mime = fastermime(str(path))
|
||||||
yield from _make_photo(path, mime, parent_geo=parent_geo)
|
|
||||||
|
futures.append(pool.submit(_make_photo_aux, path, mime, parent_geo=parent_geo))
|
||||||
|
|
||||||
|
for f in futures:
|
||||||
|
yield from f.result()
|
||||||
|
|
||||||
|
|
||||||
def print_all():
|
def print_all() -> None:
|
||||||
for p in photos():
|
for p in photos():
|
||||||
if isinstance(p, Exception):
|
if isinstance(p, Exception):
|
||||||
print('ERROR!', p)
|
print('ERROR!', p)
|
||||||
else:
|
else:
|
||||||
print(f"{p.dt} {p.path} {p.tags}")
|
print(f"{str(p.dt):25} {p.path} {p.geo}")
|
||||||
|
|
||||||
# todo cachew -- improve AttributeError: type object 'tuple' has no attribute '__annotations__' -- improve errors?
|
# todo cachew -- improve AttributeError: type object 'tuple' has no attribute '__annotations__' -- improve errors?
|
||||||
# todo cachew -- invalidate if function code changed?
|
# todo cachew -- invalidate if function code changed?
|
||||||
|
|
||||||
|
from ..core import Stats, stat
|
||||||
|
def stats() -> Stats:
|
||||||
|
return stat(photos)
|
|
@ -23,10 +23,10 @@ class ExifTags:
|
||||||
def get_exif_from_file(path: Path) -> Exif:
|
def get_exif_from_file(path: Path) -> Exif:
|
||||||
# TODO exception handler?
|
# TODO exception handler?
|
||||||
with PIL.Image.open(str(path)) as fo:
|
with PIL.Image.open(str(path)) as fo:
|
||||||
return get_exif_data(fo)
|
return _get_exif_data(fo)
|
||||||
|
|
||||||
|
|
||||||
def get_exif_data(image):
|
def _get_exif_data(image) -> Exif:
|
||||||
"""Returns a dictionary from the exif data of an PIL Image item. Also converts the GPS Tags"""
|
"""Returns a dictionary from the exif data of an PIL Image item. Also converts the GPS Tags"""
|
||||||
exif_data = {}
|
exif_data = {}
|
||||||
info = image._getexif()
|
info = image._getexif()
|
||||||
|
@ -46,36 +46,27 @@ def get_exif_data(image):
|
||||||
return exif_data
|
return exif_data
|
||||||
|
|
||||||
|
|
||||||
def to_degree(value):
|
def to_degree(value) -> float:
|
||||||
"""Helper function to convert the GPS coordinates
|
"""Helper function to convert the GPS coordinates
|
||||||
stored in the EXIF to degress in float format"""
|
stored in the EXIF to degress in float format"""
|
||||||
d0 = value[0][0]
|
(d, m, s) = value
|
||||||
d1 = value[0][1]
|
|
||||||
d = float(d0) / float(d1)
|
|
||||||
m0 = value[1][0]
|
|
||||||
m1 = value[1][1]
|
|
||||||
m = float(m0) / float(m1)
|
|
||||||
|
|
||||||
s0 = value[2][0]
|
|
||||||
s1 = value[2][1]
|
|
||||||
s = float(s0) / float(s1)
|
|
||||||
|
|
||||||
return d + (m / 60.0) + (s / 3600.0)
|
return d + (m / 60.0) + (s / 3600.0)
|
||||||
|
|
||||||
|
|
||||||
def convert_ref(cstr, ref: str):
|
def convert_ref(cstr, ref: str) -> float:
|
||||||
val = to_degree(cstr)
|
val = to_degree(cstr)
|
||||||
if ref == 'S' or ref == 'W':
|
if ref == 'S' or ref == 'W':
|
||||||
val = -val
|
val = -val
|
||||||
return val
|
return val
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
import re
|
import re
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
# TODO surely there is a library that does it??
|
# TODO surely there is a library that does it??
|
||||||
|
# TODO this belogs to a private overlay or something
|
||||||
|
# basically have a function that patches up dates after the files were yielded..
|
||||||
_DT_REGEX = re.compile(r'\D(\d{8})\D*(\d{6})\D')
|
_DT_REGEX = re.compile(r'\D(\d{8})\D*(\d{6})\D')
|
||||||
def dt_from_path(p: Path) -> Optional[datetime]:
|
def dt_from_path(p: Path) -> Optional[datetime]:
|
||||||
name = p.stem
|
name = p.stem
|
||||||
|
|
Loading…
Add table
Reference in a new issue