from datetime import datetime import itertools import os from os.path import join from typing import Tuple, Dict, Optional, NamedTuple, Iterator, Iterable, List from geopy.geocoders import Nominatim # type: ignore import magic # type: ignore import PIL.Image # type: ignore from PIL.ExifTags import TAGS, GPSTAGS # type: ignore from kython import json_load import logging def get_logger(): return logging.getLogger('photo-provider') geolocator = Nominatim() # TODO does it cache?? mime = magic.Magic(mime=True) # TODO hmm, instead geo could be a dynamic property... although a bit wasteful PATHS = [ "***REMOVED***", "***REMOVED***", "***REMOVED***", ] # TODO could use other pathes I suppose? # TODO or maybe just use symlinks # TODO however then won't be accessible from dropbox # PATH = "***REMOVED***/***REMOVED***" # PATH = "***REMOVED***/***REMOVED***" CACHE_PATH = "***REMOVED***" LatLon = Tuple[float, float] # TODO PIL.ExifTags.TAGS DATETIME = "DateTimeOriginal" LAT = "GPSLatitude" LAT_REF = "GPSLatitudeRef" LON = "GPSLongitude" LON_REF = "GPSLongitudeRef" GPSINFO = "GPSInfo" # TODO kython?? def get_exif_data(image): """Returns a dictionary from the exif data of an PIL Image item. Also converts the GPS Tags""" exif_data = {} info = image._getexif() if info: for tag, value in info.items(): decoded = TAGS.get(tag, tag) if decoded == GPSINFO: gps_data = {} for t in value: sub_decoded = GPSTAGS.get(t, t) gps_data[sub_decoded] = value[t] exif_data[decoded] = gps_data else: exif_data[decoded] = value return exif_data def to_degree(value): """Helper function to convert the GPS coordinates stored in the EXIF to degress in float format""" d0 = value[0][0] d1 = value[0][1] d = float(d0) / float(d1) m0 = value[1][0] m1 = value[1][1] m = float(m0) / float(m1) s0 = value[2][0] s1 = value[2][1] s = float(s0) / float(s1) return d + (m / 60.0) + (s / 3600.0) def convert(cstr, ref: str): val = to_degree(cstr) if ref == 'S' or ref == 'W': val = -val return val class Photo(NamedTuple): path: str dt: Optional[datetime] geo: Optional[LatLon] # TODO can we always extract date? I guess not... @property def tags(self) -> List[str]: # TODO return [] def _try_photo(photo: str, mtype: str, dgeo: Optional[LatLon]) -> Optional[Photo]: logger = get_logger() geo: Optional[LatLon] dt: Optional[datetime] = None geo = dgeo if any(x in mtype for x in {'image/png', 'image/x-ms-bmp', 'video'}): logger.info(f"Skipping geo extraction for {photo} due to mime {mtype}") else: edata: Dict try: with PIL.Image.open(photo) as fo: edata = get_exif_data(fo) except Exception as e: logger.warning(f"Couln't get exif for {photo}") # TODO meh logger.exception(e) else: dtimes = edata.get('DateTimeOriginal', None) if dtimes is not None: try: dtimes = dtimes.replace(' 24', ' 00') # jeez maybe log it? if dtimes == "0000:00:00 00:00:00": logger.info(f"Bad exif timestamp {dtimes} for {photo}") else: dt = datetime.strptime(dtimes, '%Y:%m:%d %H:%M:%S') # # TODO timezone is local, should take into account... except Exception as e: logger.error(f"Error while trying to extract date for {photo}") logger.exception(e) meta = edata.get(GPSINFO, {}) if LAT in meta and LON in meta: lat = convert(meta[LAT], meta[LAT_REF]) lon = convert(meta[LON], meta[LON_REF]) geo = (lat, lon) return Photo(photo, dt, geo) # plink = f"file://{photo}" # plink = "https://upload.wikimedia.org/wikipedia/commons/thumb/1/19/Ichthyornis_Clean.png/800px-Ichthyornis_Clean.png" # yield (geo, src.color, plink) # if geo information is missing from photo, you can specify it manually in geo.json file def iter_photos() -> Iterator[Photo]: logger = get_logger() geos: List[LatLon] = [] # stack of geos so we could use the most specific one # TODO could have this for all meta? e.g. time for d, _, files in itertools.chain.from_iterable((os.walk(pp) for pp in PATHS)): logger.info(f"Processing {d}") geof = join(d, 'geo.json') cgeo = None if os.path.isfile(geof): j: Dict with open(geof, 'r') as fo: j = json_load(fo) if 'name' in j: g = geolocator.geocode(j['name']) geo = (g.latitude, g.longitude) else: geo = j['lat'], j['lon'] geos.append(geo) for f in sorted(files): photo = join(d, f) mtype = mime.from_file(photo) IGNORED = { 'application', 'audio', 'text', 'inode', } if any(i in mtype for i in IGNORED): logger.info(f"Ignoring {photo} due to mime {mtype}") continue try: dgeo = None if len(geos) == 0 else geos[-1] p = _try_photo(photo, mtype, dgeo) if p is not None: yield p except Exception as e: raise RuntimeError(f'Error while processing {photo}') from e if cgeo is not None: geos.pop() def get_photos(cached: bool=False) -> Iterable[Photo]: import dill # type: ignore if cached: with open(CACHE_PATH, 'rb') as fo: preph = dill.load(fo) return [Photo(**p._asdict()) for p in preph] # meh. but otherwise it's not serialising methods... else: return list(iter_photos()) def update_cache(): import dill # type: ignore photos = get_photos(cached=False) with open(CACHE_PATH, 'wb') as fo: dill.dump(photos, fo)