extract code into utils, cleanup

This commit is contained in:
Dima Gerasimov 2020-03-12 23:55:53 +00:00
parent ce8685618a
commit c37bc6e60e
3 changed files with 143 additions and 170 deletions

View file

@ -1,42 +1,26 @@
"""
Module for accessing photos and videos on the filesystem
"""
# pip install geopy magic
from datetime import datetime
import itertools
import os
from os.path import join, basename
import json
import re
from pathlib import Path
from typing import Tuple, Dict, Optional, NamedTuple, Iterator, Iterable, List
from geopy.geocoders import Nominatim # type: ignore
import magic # type: ignore
import PIL.Image # type: ignore
from PIL.ExifTags import TAGS, GPSTAGS # type: ignore
from ..common import LazyLogger, mcachew
from mycfg import photos as config
logger = LazyLogger('my.photos')
log = logger
from mycfg import photos as config
_DT_REGEX = re.compile(r'\D(\d{8})\D*(\d{6})\D')
def dt_from_path(p: str) -> Optional[datetime]:
name = basename(p)
mm = _DT_REGEX.search(name)
if mm is None:
return None
dates = mm.group(1) + mm.group(2)
return datetime.strptime(dates, "%Y%m%d%H%M%S")
# TODO ignore hidden dirs?
class LatLon(NamedTuple):
@ -45,55 +29,6 @@ class LatLon(NamedTuple):
# TODO PIL.ExifTags.TAGS
DATETIME = "DateTimeOriginal"
LAT = "GPSLatitude"
LAT_REF = "GPSLatitudeRef"
LON = "GPSLongitude"
LON_REF = "GPSLongitudeRef"
GPSINFO = "GPSInfo"
# TODO kython??
def get_exif_data(image):
"""Returns a dictionary from the exif data of an PIL Image item. Also converts the GPS Tags"""
exif_data = {}
info = image._getexif()
if info:
for tag, value in info.items():
decoded = TAGS.get(tag, tag)
if decoded == GPSINFO:
gps_data = {}
for t in value:
sub_decoded = GPSTAGS.get(t, t)
gps_data[sub_decoded] = value[t]
exif_data[decoded] = gps_data
else:
exif_data[decoded] = value
return exif_data
def to_degree(value):
"""Helper function to convert the GPS coordinates
stored in the EXIF to degress in float format"""
d0 = value[0][0]
d1 = value[0][1]
d = float(d0) / float(d1)
m0 = value[1][0]
m1 = value[1][1]
m = float(m0) / float(m1)
s0 = value[2][0]
s1 = value[2][1]
s = float(s0) / float(s1)
return d + (m / 60.0) + (s / 3600.0)
def convert(cstr, ref: str):
val = to_degree(cstr)
if ref == 'S' or ref == 'W':
val = -val
return val
class Photo(NamedTuple):
path: str
@ -122,61 +57,60 @@ class Photo(NamedTuple):
return PHOTOS_URL + self._basename
def _try_photo(photo: str, mtype: str, dgeo: Optional[LatLon]) -> Photo:
geo: Optional[LatLon]
from .utils import get_exif_from_file, ExifTags, Exif, dt_from_path, convert_ref
dt: Optional[datetime] = None
geo = dgeo
def _try_photo(photo: Path, mtype: str, *, parent_geo: Optional[LatLon]) -> Photo:
exif: Exif
if any(x in mtype for x in {'image/png', 'image/x-ms-bmp', 'video'}):
log.debug(f"skipping geo extraction for {photo} due to mime {mtype}")
# TODO don't remember why..
log.debug(f"skipping exif extraction for {photo} due to mime {mtype}")
exif = {}
else:
edata: Dict
try:
with PIL.Image.open(photo) as fo:
edata = get_exif_data(fo)
except Exception as e:
logger.warning(f"Couln't get exif for {photo}") # TODO meh
logger.exception(e)
else:
dtimes = edata.get('DateTimeOriginal', None)
if dtimes is not None:
try:
dtimes = dtimes.replace(' 24', ' 00') # jeez maybe log it?
if dtimes == "0000:00:00 00:00:00":
logger.info(f"Bad exif timestamp {dtimes} for {photo}")
else:
dt = datetime.strptime(dtimes, '%Y:%m:%d %H:%M:%S')
# # TODO timezone is local, should take into account...
except Exception as e:
logger.error(f"Error while trying to extract date from EXIF {photo}")
logger.exception(e)
exif = get_exif_from_file(photo)
meta = edata.get(GPSINFO, {})
if LAT in meta and LON in meta:
lat = convert(meta[LAT], meta[LAT_REF])
lon = convert(meta[LON], meta[LON_REF])
geo = (lat, lon)
if dt is None:
if 'Instagram/VID_' in photo:
logger.warning('ignoring timestamp extraction for %s, they are stupid for Instagram videos', photo)
else:
try:
edt = dt_from_path(photo) # ok, last try..
except Exception as e:
# TODO result type?
logger.error(f"Error while trying to extract date from name {photo}")
logger.exception(e)
def _get_geo() -> Optional[LatLon]:
meta = exif.get(ExifTags.GPSINFO, {})
if ExifTags.LAT in meta and ExifTags.LON in meta:
return LatLon(
lat=convert_ref(meta[ExifTags.LAT], meta[ExifTags.LAT_REF]),
lon=convert_ref(meta[ExifTags.LON], meta[ExifTags.LON_REF]),
)
return parent_geo
# TODO aware on unaware?
def _get_dt() -> Optional[datetime]:
edt = exif.get(ExifTags.DATETIME, None)
if edt is not None:
dtimes = edt.replace(' 24', ' 00') # jeez maybe log it?
if dtimes == "0000:00:00 00:00:00":
log.warning(f"Bad exif timestamp {dtimes} for {photo}")
else:
if edt is not None and edt > datetime.now():
logger.error('datetime for %s is too far in future: %s', photo, edt)
else:
dt = edt
dt = datetime.strptime(dtimes, '%Y:%m:%d %H:%M:%S')
# TODO timezone is local, should take into account...
return dt
if 'Instagram/VID_' in str(photo):
# TODO bit random...
log.warning('ignoring timestamp extraction for %s, they are stupid for Instagram videos', photo)
return None
return Photo(photo, dt, geo)
# plink = f"file://{photo}"
# plink = "https://upload.wikimedia.org/wikipedia/commons/thumb/1/19/Ichthyornis_Clean.png/800px-Ichthyornis_Clean.png"
# yield (geo, src.color, plink)
# TODO FIXME result type here??
edt = dt_from_path(photo) # ok, last try..
if edt is None:
return None
if edt is not None and edt > datetime.now():
# TODO also yield?
logger.error('datetime for %s is too far in future: %s', photo, edt)
return None
return edt
geo = _get_geo()
dt = _get_dt()
return Photo(str(photo), dt=dt, geo=geo)
import mimetypes # TODO do I need init()?
@ -222,7 +156,8 @@ def photos() -> Iterator[Photo]:
# if geo information is missing from photo, you can specify it manually in geo.json file
# @mcachew(logger=logger)
# TODO is there something more standard?
# @mcachew(cache_path=config.cache_path)
def _photos(candidates: Iterable[str]) -> Iterator[Photo]:
geolocator = Nominatim() # TODO does it cache??
@ -252,29 +187,15 @@ def _photos(candidates: Iterable[str]) -> Iterator[Photo]:
log.info('ignoring %s due to config', path)
continue
geo = get_geo(path.parent)
parent_geo = get_geo(path.parent)
mime = fastermime(str(path))
p = _try_photo(str(path), mime, geo)
p = _try_photo(path, mime, parent_geo=parent_geo)
yield p
def get_photos(cached: bool=False) -> List[Photo]:
# TODO get rid of it, use cachew..
import dill # type: ignore
if cached:
with open(CACHE_PATH, 'rb') as fo:
preph = dill.load(fo)
return [Photo(**p._asdict()) for p in preph] # meh. but otherwise it's not serialising methods...
else:
return list(iter_photos())
# TODO python3 -m photos update_cache
def update_cache():
import dill # type: ignore
photos = get_photos(cached=False)
with open(CACHE_PATH, 'wb') as fo:
dill.dump(photos, fo)
def print_all():
for p in photos():
print(f"{p.dt} {p.path} {p.tags}")
# TODO cachew -- improve AttributeError: type object 'tuple' has no attribute '__annotations__' -- improve errors?
# TODO cachew -- invalidate if function code changed?

View file

@ -1,32 +0,0 @@
import logging
# TODO eh?
logging.basicConfig(level=logging.INFO)
from kython.klogging import setup_logzero
from photos import get_photos, iter_photos, get_logger
import sys
def main():
setup_logzero(get_logger(), level=logging.DEBUG)
if len(sys.argv) > 1:
cmd = sys.argv[1]
if cmd == "update_cache":
from photos import update_cache, get_photos
update_cache()
get_photos(cached=True)
else:
raise RuntimeError(f"Unknown command {cmd}")
else:
for p in iter_photos():
print(f"{p.dt} {p.path} {p.tags}")
pass
# TODO need datetime!
# print(p)
if __name__ == '__main__':
main()

84
my/photos/utils.py Normal file
View file

@ -0,0 +1,84 @@
from typing import Dict
import PIL.Image # type: ignore
from PIL.ExifTags import TAGS, GPSTAGS # type: ignore
Exif = Dict
class ExifTags:
DATETIME = "DateTimeOriginal"
LAT = "GPSLatitude"
LAT_REF = "GPSLatitudeRef"
LON = "GPSLongitude"
LON_REF = "GPSLongitudeRef"
GPSINFO = "GPSInfo"
# TODO there must be something more standard for this...
def get_exif_from_file(path: str) -> Exif:
# TODO exception handler?
with PIL.Image.open(path) as fo:
return get_exif_data(fo)
def get_exif_data(image):
"""Returns a dictionary from the exif data of an PIL Image item. Also converts the GPS Tags"""
exif_data = {}
info = image._getexif()
if info:
for tag, value in info.items():
decoded = TAGS.get(tag, tag)
if decoded == ExifTags.GPSINFO:
gps_data = {}
for t in value:
sub_decoded = GPSTAGS.get(t, t)
gps_data[sub_decoded] = value[t]
exif_data[decoded] = gps_data
else:
exif_data[decoded] = value
return exif_data
def to_degree(value):
"""Helper function to convert the GPS coordinates
stored in the EXIF to degress in float format"""
d0 = value[0][0]
d1 = value[0][1]
d = float(d0) / float(d1)
m0 = value[1][0]
m1 = value[1][1]
m = float(m0) / float(m1)
s0 = value[2][0]
s1 = value[2][1]
s = float(s0) / float(s1)
return d + (m / 60.0) + (s / 3600.0)
def convert_ref(cstr, ref: str):
val = to_degree(cstr)
if ref == 'S' or ref == 'W':
val = -val
return val
import re
from datetime import datetime
from pathlib import Path
from typing import Optional
# TODO surely there is a library that does it??
_DT_REGEX = re.compile(r'\D(\d{8})\D*(\d{6})\D')
def dt_from_path(p: Path) -> Optional[datetime]:
name = p.stem
mm = _DT_REGEX.search(name)
if mm is None:
return None
dates = mm.group(1) + mm.group(2)
return datetime.strptime(dates, "%Y%m%d%H%M%S")