#!/usr/bin/env python3 import base64 import hashlib import json import logging import os import re import shlex import shutil import sqlite3 import subprocess import sys import tempfile import time import urllib.parse import urllib.request from datetime import datetime, timedelta, timezone from pathlib import Path DB_NAME = Path("metadata.db") TZ = 1 TOOLS = ("jrnl", "sqlite3") for t in TOOLS: if not shutil.which(t): raise FileNotFoundError(f"There is no existing path for '{t}'") class Config: memo_token = os.getenv("MEMOS_TOKEN") memo_url = os.getenv("MEMOS_URL") openweathermap_api_key = os.getenv("OPENWEATHER_APIKEY") owntracks_creds = os.getenv("OWNTRACKS_CREDS", "").encode() owntracks_url = os.getenv("OWNTRACKS_URL") geo_user, geo_device = os.getenv("OWNTRACKS_PARAMS", ",").split(",") @classmethod def validate(cls): if not cls.memo_token or not cls.memo_url: sys.exit("Missing MEMOS_TOKEN or MEMOS_URL environment variables.") elif not cls.openweathermap_api_key: sys.exit("Missing openweather api key") elif ( # TODO need more complex checking not cls.owntracks_creds or not cls.owntracks_url or not cls.geo_user or not cls.geo_device ): sys.exit("Missing OwnTracks data") Config.validate() def make_logger(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler(Path.home() / ".diary_requests.log", mode="a"), ], ) return logging.getLogger("urllib_logger") def get_diary_path_by_name(name: str): result = subprocess.run(["jrnl", "--list"], capture_output=True, text=True) if result.stderr: sys.exit(f"Error retrieving diary name: {result.stderr}") matches = dict(re.findall(r"\*\s+(\w+)\s+->\s+(.+)", result.stdout.strip())) diary_path = matches.get(name) if not diary_path or not Path(diary_path).exists(): sys.exit(f"Diary '{name}' not found or path does not exist.") return Path(diary_path) def make_tz_unixtime(target_time: str): return int( ( datetime.strptime(target_time, "%Y-%m-%dT%H:%M:%SZ") + timedelta(hours=TZ) ).timestamp() ) def find_closest_entry(data, target_timestamp: int): return min( (entry for entry in data if "tst" in entry), key=lambda e: abs(target_timestamp - e["tst"]), default=None, ) def convert_diary_date(date_str): try: dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ") + timedelta(hours=TZ) return dt.strftime("%d %b %Y at %H:%M:%S:") except ValueError: return None def fetch_data(url, headers={}, data=None, rjson=True, log=True): logit = make_logger() method = "POST" if data else "GET" encoded_data = urllib.parse.urlencode(data).encode("utf-8") if data else None req = urllib.request.Request(url, headers=headers, data=encoded_data, method=method) req.add_header("Content-Type", "application/x-www-form-urlencoded") try: with urllib.request.urlopen(req) as response: if response.status != 200: logit.error(response.read()) sys.exit(f"HTTP error {response.status}") try: response_data = response.read().decode("utf-8") except UnicodeDecodeError as e: logit.error(e) response_data = response.read() logit.info(response_data) if log else None if not rjson: return response_data return json.loads(response_data) except Exception as e: logit.error(str(e)) raise def delete_entity(url, headers): logit = make_logger() req = urllib.request.Request(url, headers=headers, method="DELETE") try: with urllib.request.urlopen(req) as response: if response.status != 200: logit.error(response.read()) sys.exit(f"HTTP delete error {response.status}") logit.info(response.read().decode("utf-8")) except Exception as e: logit.error(str(e)) raise def db_connection(diary_path: Path): conn = sqlite3.connect(diary_path / DB_NAME) conn.execute("PRAGMA foreign_keys = ON;") return conn def initialize_db(conn: sqlite3.Connection): with conn: conn.executescript( """ CREATE TABLE IF NOT EXISTS metadata ( id INTEGER PRIMARY KEY, unixtime INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS weather ( id INTEGER PRIMARY KEY, temp INTEGER NOT NULL, temp_like INTEGER NOT NULL, sunrise INTEGER NOT NULL, sunset INTEGER NOT NULL, icon TEXT NOT NULL DEFAULT 'none', metadata_id INTEGER NOT NULL, FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS location ( id INTEGER PRIMARY KEY, city TEXT NOT NULL, lon TEXT NOT NULL, lat TEXT NOT NULL, tz TEXT NOT NULL, metadata_id INTEGER NOT NULL, FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE ); """ ) def insert_metadata( conn: sqlite3.Connection, unixtime: int, ): cursor = conn.cursor() cursor.execute("INSERT INTO metadata(unixtime) VALUES(?)", [unixtime]) conn.commit() return cursor.lastrowid def remove_metadata(conn: sqlite3.Connection, metadata_id: int): cursor = conn.cursor() cursor.execute("DELETE FROM metadata WHERE id=?", (metadata_id,)) conn.commit() def insert_weather(weather: dict, conn: sqlite3.Connection, metadata_id: int): if isinstance(weather, list): weather = weather[0] cursor = conn.cursor() try: cursor.execute( """ INSERT INTO weather(temp, temp_like, sunrise, sunset, icon, metadata_id) VALUES(?, ?, ?, ?, ?, ?) """, [ weather["temp"], weather["feels_like"], weather["sunrise"], weather["sunset"], weather["weather"][0]["icon"], metadata_id, ], ) except Exception as e: remove_metadata(conn, metadata_id) conn.rollback() print(e) raise conn.commit() def insert_location(location: dict, conn: sqlite3.Connection, metadata_id: int): cursor = conn.cursor() try: # sometimes it can't get locality.... locality = location["locality"] if "locality" in location else "-" cursor.execute( """ INSERT INTO location(city, lon, lat, tz, metadata_id) VALUES(?, ?, ?, ?, ?) """, [ locality, location["lon"], location["lat"], location["tzname"], metadata_id, ], ) except Exception as e: remove_metadata(conn, metadata_id) conn.rollback() print(e) raise conn.commit() def fetch_geo(metadata_id: int, create_time_timestamp: int, conn: sqlite3.Connection): geo_url = f"{Config.owntracks_url}/api/0/locations" geo_headers = { "Authorization": f"Basic {base64.b64encode(Config.owntracks_creds).decode()}" } geo_response = fetch_data( geo_url, geo_headers, data={ "from": "1970-01-01", "limit": 20, "device": Config.geo_device, "user": Config.geo_user, }, ) closest_entry = find_closest_entry( geo_response.get("data", []), create_time_timestamp ) if closest_entry["tst"] + (48 * 60**2) < time.time(): # 2 days print("") if ( input("Do you really want to use such old geo data? (Y/N) ").strip().upper() != "Y" ): remove_metadata(conn, metadata_id) sys.exit("Operation canceled.") insert_location(closest_entry, conn, metadata_id) return closest_entry def fetch_weather( metadata_id: int, closest_entry: dict, unixtime: int, conn: sqlite3.Connection ): weather_response = fetch_data( f"https://api.openweathermap.org/data/3.0/onecall/timemachine?lat={closest_entry['lat']}&lon={closest_entry['lon']}&dt={unixtime}&appid={Config.openweathermap_api_key}&units=metric", headers={}, ) insert_weather(weather_response["data"], conn, metadata_id) def doctor(): args_len = len(sys.argv) check_diary = False fix_diary = False if args_len < 3: sys.exit("Usage: script.py doctor ") if args_len >= 4 and sys.argv[3] == "check_diary": check_diary = True if args_len >= 4 and sys.argv[3] == "check_and_fix_diary": fix_diary = True check_diary = True diary_name = sys.argv[2] diary_path = get_diary_path_by_name(diary_name).parent diary_filename = get_diary_path_by_name(diary_name) # do backup ever! make_backup(diary_filename, diary_path) conn = db_connection(diary_path) initialize_db(conn) cursor = conn.cursor() metadata = cursor.execute("SELECT * FROM metadata").fetchall() for m in metadata: weather = cursor.execute( "SELECT * FROM weather WHERE metadata_id = ?", (m[0],) ).fetchall() location = cursor.execute( "SELECT * FROM location WHERE metadata_id = ?", (m[0],) ).fetchall() if not weather: print(f"There is no weather info about {m[0]} - {m[1]}") if not location: print(f"There is no location info about {m[0]} - {m[1]}") if not weather and not location: # delete metadata entry if any of metadata type is not exists print("An empty metadata was deleted") remove_metadata(conn, m[0]) if check_diary: dt = datetime.fromtimestamp(m[1], tz=timezone(timedelta(hours=TZ))) diary_datetime = dt.strftime("%Y/%m/%d at %I:%M:%S %p") try: result = subprocess.run( ["jrnl", diary_name, "-on", diary_datetime], check=True, capture_output=True, text=True, ) if not result.stdout: print( f"There is some metadata that is not associated with a diary entity: {diary_datetime}." ) if not fix_diary: print( "You can automatically remove it by running the scripts with 'check_and_fix_diary' argument." ) else: remove_metadata(conn, m[0]) print("The problem was fixed.") except subprocess.CalledProcessError as e: print(e) raise def make_hash(file: Path): sha256_hash = hashlib.sha256() with open(file, "rb") as filed: sha256_hash.update(filed.read()) return sha256_hash.hexdigest() def make_backup(diary_name: str, diary_path: Path): try: os.remove(diary_path / f"{diary_name}.bak") except FileNotFoundError: ... # shutil.copy2(diary_path / diary_name, diary_path / f"{diary_name}.bak") # I can't make backup when the diary is not encrypted... shutil.copy2(diary_path / DB_NAME, diary_path / f"{DB_NAME}.bak") def export(): if len(sys.argv) < 4 or sys.argv[1] != "export": sys.exit("Usage: script.py export ") diary_name, tag = sys.argv[2], sys.argv[3] diary_path = get_diary_path_by_name(diary_name).parent diary_filename = get_diary_path_by_name(diary_name) # do backup ever! make_backup(diary_filename, diary_path) try: conn = db_connection(diary_path) initialize_db(conn) headers = {"Cookie": f"memos.access-token={Config.memo_token}"} query_string = urllib.parse.urlencode( {"filter": f"creator=='users/1'&&tag_search==['{tag}']"} ) data = fetch_data(f"{Config.memo_url}api/v1/memos?{query_string}", headers) memos = data.get("memos", []) if not memos: sys.exit("No memos found.") if ( input(f"There are {len(memos)} memos. Export them all? (Y/N): ") .strip() .upper() != "Y" ): sys.exit("Export canceled.") for memo in memos: # attachments part # memo_info = fetch_data(f"{Config.memo_url}/api/v1/{memo['name']}", headers) # # check if there are resources # if "resources" in memo_info: # for resource in memo_info["resources"]: # memo_resources = fetch_data( # f"{Config.memo_url}/api/v1/resources:by-uid/{resource['uid']}" # ) create_time = memo["createTime"] content = shlex.quote(memo["content"].replace(f"#{tag}", "").strip()) metadata_id = insert_metadata(conn, make_tz_unixtime(create_time)) closest_entry = fetch_geo(metadata_id, make_tz_unixtime(create_time), conn) fetch_weather( metadata_id, closest_entry, make_tz_unixtime(create_time), conn ) try: subprocess.run( f'printf "%s %s" "{convert_diary_date(create_time)}" {content} | jrnl {diary_name}', shell=True, capture_output=True, text=True, check=True, ) os.system("clear") print("Record has been inserted.") except subprocess.CalledProcessError as e: print(f"Error writing to journal: {e.stderr}") raise delete_entity(f"{Config.memo_url}/api/v1/{memo['name']}", headers) except Exception as e: print(f"An error occurred: {e}") raise def insert(): conn = None try: if len(sys.argv) < 3 or sys.argv[1] != "insert": sys.exit( "Usage: script.py insert [bulk|single (default)] 'content'" ) diary_name = sys.argv[2] insert_type = ( "bulk" if len(sys.argv) > 3 and sys.argv[3] == "bulk" else "single" ) diary_path = get_diary_path_by_name(diary_name).parent diary_filename = get_diary_path_by_name(diary_name) # do backup ever! make_backup(diary_filename, diary_path) conn = db_connection(diary_path) initialize_db(conn) # generating and converting current time datenow = datetime.now(timezone.utc) datenow_timestamp = datenow.strftime("%Y-%m-%dT%H:%M:%SZ") metadata_id = insert_metadata(conn, make_tz_unixtime(datenow_timestamp)) # fetching geo-data closest_entry = fetch_geo( metadata_id, make_tz_unixtime(datenow_timestamp), conn ) # fetching weather data fetch_weather( metadata_id, closest_entry, make_tz_unixtime(datenow_timestamp), conn ) if insert_type == "single": """ Inserting a string from the terminal """ content = shlex.quote(sys.argv[4]) if not content: print("There is no text") sys.exit(1) try: subprocess.run( f'printf "%s %s" "{convert_diary_date(datenow_timestamp)}" {content} | jrnl {diary_name}', shell=True, capture_output=True, text=True, check=True, ) os.system("clear") print("Record has been inserted.") except subprocess.CalledProcessError as e: print(f"Error inserting single entry: {e.stderr}") raise elif insert_type == "bulk": """ Inserting entry from your editor """ fd, temp_file_path = tempfile.mkstemp() os.close(fd) hash = make_hash(Path(temp_file_path)) subprocess.run(["nvim", temp_file_path], text=True, check=True) with open(temp_file_path, "r") as file: content = shlex.quote(file.read()) if hash != make_hash(temp_file_path): try: subprocess.run( f'printf "%s %s" "{convert_diary_date(datenow_timestamp)}" {content} | jrnl {diary_name}', shell=True, capture_output=True, text=True, check=True, ) os.system("clear") print("Record has been inserted.") except subprocess.CalledProcessError as e: print( f"Error during bulk import: {e.stderr}, file: {temp_file_path}" ) raise os.remove(temp_file_path) except Exception as e: print(f"An error occurred: {e}") raise finally: if conn: conn.close() print("Database connection closed.") if __name__ == "__main__": try: # I want to know if I'm connected... fetch_data(url="https://google.com", rjson=False, log=False) ... except Exception as e: print(f"Connection problem: {e}") raise else: if sys.argv[1] == "export": export() elif sys.argv[1] == "insert": insert() elif sys.argv[1] == "doctor": doctor() else: print("Unknown command") sys.exit(1)