From 0a68e3000d35935cdefdd3ae0f5f5aaa5f0f8938 Mon Sep 17 00:00:00 2001 From: Dima Gerasimov Date: Sun, 26 Aug 2018 14:07:26 +0100 Subject: [PATCH] providing geo and timestamp, caching, CI --- ci.sh | 10 +++ photos/__init__.py | 207 +++++++++++++++++++++++++++++++++++++++++++++ photos/__main__.py | 24 ++++++ requirements.txt | 5 ++ update_cache | 7 ++ 5 files changed, 253 insertions(+) create mode 100755 ci.sh create mode 100644 requirements.txt create mode 100755 update_cache diff --git a/ci.sh b/ci.sh new file mode 100755 index 0000000..0daaac3 --- /dev/null +++ b/ci.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +cd "$(this_dir)" || exit + +. ~/bash_ci + +ci_run mypy photos +ci_run pylint -E photos + +ci_report_errors diff --git a/photos/__init__.py b/photos/__init__.py index e69de29..3d90b32 100644 --- a/photos/__init__.py +++ b/photos/__init__.py @@ -0,0 +1,207 @@ +from datetime import datetime +import itertools +import os +from os.path import join +from typing import Tuple, Dict, Optional, NamedTuple, Iterator, Iterable, List + +from geopy.geocoders import Nominatim # type: ignore + +import magic # type: ignore + +import PIL.Image # type: ignore +from PIL.ExifTags import TAGS, GPSTAGS # type: ignore + +from kython import json_load + +import logging +def get_logger(): + return logging.getLogger('photo-provider') + + +geolocator = Nominatim() # TODO does it cache?? +mime = magic.Magic(mime=True) + +# TODO hmm, instead geo could be a dynamic property... although a bit wasteful + +PATHS = [ + "***REMOVED***", + "***REMOVED***", + "***REMOVED***", +] +# TODO could use other pathes I suppose? +# TODO or maybe just use symlinks +# TODO however then won't be accessible from dropbox + +# PATH = "***REMOVED***/***REMOVED***" +# PATH = "***REMOVED***/***REMOVED***" + +CACHE_PATH = "***REMOVED***" + +LatLon = Tuple[float, float] + +# TODO PIL.ExifTags.TAGS + +DATETIME = "DateTimeOriginal" +LAT = "GPSLatitude" +LAT_REF = "GPSLatitudeRef" +LON = "GPSLongitude" +LON_REF = "GPSLongitudeRef" +GPSINFO = "GPSInfo" + +# TODO kython?? +def get_exif_data(image): + """Returns a dictionary from the exif data of an PIL Image item. Also converts the GPS Tags""" + exif_data = {} + info = image._getexif() + if info: + for tag, value in info.items(): + decoded = TAGS.get(tag, tag) + if decoded == GPSINFO: + gps_data = {} + for t in value: + sub_decoded = GPSTAGS.get(t, t) + gps_data[sub_decoded] = value[t] + + exif_data[decoded] = gps_data + else: + exif_data[decoded] = value + + return exif_data + +def to_degree(value): + """Helper function to convert the GPS coordinates + stored in the EXIF to degress in float format""" + d0 = value[0][0] + d1 = value[0][1] + d = float(d0) / float(d1) + m0 = value[1][0] + m1 = value[1][1] + m = float(m0) / float(m1) + + s0 = value[2][0] + s1 = value[2][1] + s = float(s0) / float(s1) + + return d + (m / 60.0) + (s / 3600.0) + +def convert(cstr, ref: str): + val = to_degree(cstr) + if ref == 'S' or ref == 'W': + val = -val + return val + + +class Photo(NamedTuple): + path: str + dt: Optional[datetime] + geo: Optional[LatLon] + # TODO can we always extract date? I guess not... + + @property + def tags(self) -> List[str]: # TODO + return [] + +def _try_photo(photo: str, mtype: str, dgeo: Optional[LatLon]) -> Optional[Photo]: + logger = get_logger() + + geo: Optional[LatLon] + + dt: Optional[datetime] = None + geo = dgeo + if any(x in mtype for x in {'image/png', 'image/x-ms-bmp', 'video'}): + logger.info(f"Skipping geo extraction for {photo} due to mime {mtype}") + else: + edata: Dict + try: + with PIL.Image.open(photo) as fo: + edata = get_exif_data(fo) + except Exception as e: + logger.warning(f"Couln't get exif for {photo}") # TODO meh + logger.exception(e) + else: + dtimes = edata.get('DateTimeOriginal', None) + if dtimes is not None: + try: + dtimes = dtimes.replace(' 24', ' 00') # jeez maybe log it? + if dtimes == "0000:00:00 00:00:00": + logger.info(f"Bad exif timestamp {dtimes} for {photo}") + else: + dt = datetime.strptime(dtimes, '%Y:%m:%d %H:%M:%S') + # # TODO timezone is local, should take into account... + except Exception as e: + logger.error(f"Error while trying to extract date for {photo}") + logger.exception(e) + + meta = edata.get(GPSINFO, {}) + if LAT in meta and LON in meta: + lat = convert(meta[LAT], meta[LAT_REF]) + lon = convert(meta[LON], meta[LON_REF]) + geo = (lat, lon) + + return Photo(photo, dt, geo) + # plink = f"file://{photo}" + # plink = "https://upload.wikimedia.org/wikipedia/commons/thumb/1/19/Ichthyornis_Clean.png/800px-Ichthyornis_Clean.png" + # yield (geo, src.color, plink) + + +# if geo information is missing from photo, you can specify it manually in geo.json file +def iter_photos() -> Iterator[Photo]: + logger = get_logger() + + geos: List[LatLon] = [] # stack of geos so we could use the most specific one + # TODO could have this for all meta? e.g. time + for d, _, files in itertools.chain.from_iterable((os.walk(pp) for pp in PATHS)): + logger.info(f"Processing {d}") + + geof = join(d, 'geo.json') + cgeo = None + if os.path.isfile(geof): + j: Dict + with open(geof, 'r') as fo: + j = json_load(fo) + if 'name' in j: + g = geolocator.geocode(j['name']) + geo = (g.latitude, g.longitude) + else: + geo = j['lat'], j['lon'] + geos.append(geo) + + for f in sorted(files): + photo = join(d, f) + mtype = mime.from_file(photo) + + IGNORED = { + 'application', + 'audio', + 'text', + 'inode', + } + if any(i in mtype for i in IGNORED): + logger.info(f"Ignoring {photo} due to mime {mtype}") + continue + + try: + dgeo = None if len(geos) == 0 else geos[-1] + p = _try_photo(photo, mtype, dgeo) + if p is not None: + yield p + except Exception as e: + raise RuntimeError(f'Error while processing {photo}') from e + + if cgeo is not None: + geos.pop() + +def get_photos(cached: bool=False) -> Iterable[Photo]: + import dill # type: ignore + if cached: + with open(CACHE_PATH, 'rb') as fo: + preph = dill.load(fo) + return [Photo(**p._asdict()) for p in preph] # meh. but otherwise it's not serialising methods... + else: + return list(iter_photos()) + +def update_cache(): + import dill # type: ignore + photos = get_photos(cached=False) + with open(CACHE_PATH, 'wb') as fo: + dill.dump(photos, fo) diff --git a/photos/__main__.py b/photos/__main__.py index e69de29..04b4053 100644 --- a/photos/__main__.py +++ b/photos/__main__.py @@ -0,0 +1,24 @@ +import logging +logging.basicConfig(level=logging.INFO) + +from kython.logging import setup_logzero + +from photos import get_photos, iter_photos, get_logger + +setup_logzero(get_logger(), level=logging.DEBUG) + +import sys + +if len(sys.argv) > 1: + cmd = sys.argv[1] + if cmd == "update_cache": + from photos import update_cache, get_photos + update_cache() + get_photos(cached=True) + else: + raise RuntimeError(f"Unknown command {cmd}") +else: + for p in iter_photos(): + pass + # TODO need datetime! + # print(p) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..dc87a9e --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +geopy +magic + +# optional if you want caching +dill diff --git a/update_cache b/update_cache new file mode 100755 index 0000000..6daf62c --- /dev/null +++ b/update_cache @@ -0,0 +1,7 @@ +#!/bin/bash +set -eu + +cd "$(dirname "$0")" + +python3 -m photos update_cache +