#!/usr/bin/env python3 import base64 import json import os import re import shlex import sqlite3 import subprocess import sys import tempfile import urllib.parse import urllib.request from datetime import datetime, timedelta, timezone from enum import IntEnum from pathlib import Path DB_NAME = Path("metadata.db") TZ = 1 class MetadataType(IntEnum): WEATHER = 1 LOCATION = 2 class Config: memo_token = os.getenv("MEMOS_TOKEN") memo_url = os.getenv("MEMOS_URL") openweathermap_api_key = os.getenv("OPENWEATHER_APIKEY") owntracks_creds = os.getenv("OWNTRACKS_CREDS", "").encode() owntracks_url = os.getenv("OWNTRACKS_URL") geo_user, geo_device = os.getenv("OWNTRACKS_PARAMS", ",").split(",") @classmethod def validate(cls): if not cls.memo_token or not cls.memo_url: sys.exit("Missing MEMOS_TOKEN or MEMOS_URL environment variables.") Config.validate() def get_diary_path_by_name(name: str): result = subprocess.run(["jrnl", "--list"], capture_output=True, text=True) if result.stderr: sys.exit(f"Error retrieving diary name: {result.stderr}") matches = dict(re.findall(r"\*\s+(\w+)\s+->\s+(.+)", result.stdout.strip())) diary_path = matches.get(name) if not diary_path or not Path(diary_path).exists(): sys.exit(f"Diary '{name}' not found or path does not exist.") return Path(diary_path) def make_tz_unixtime(target_time: str): return int( ( datetime.strptime(target_time, "%Y-%m-%dT%H:%M:%SZ") + timedelta(hours=TZ) ).timestamp() ) def find_closest_entry(data, target_timestamp: int): return min( (entry for entry in data if "tst" in entry), key=lambda e: abs(target_timestamp - e["tst"]), default=None, ) def convert_diary_date(date_str): try: dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ") + timedelta(hours=TZ) return dt.strftime("%d %b %Y at %H:%M:%S:") except ValueError: return None def fetch_data(url, headers, data=None): method = "POST" if data else "GET" encoded_data = urllib.parse.urlencode(data).encode("utf-8") if data else None req = urllib.request.Request(url, headers=headers, data=encoded_data, method=method) req.add_header("Content-Type", "application/x-www-form-urlencoded") with urllib.request.urlopen(req) as response: if response.status != 200: sys.exit(f"HTTP error {response.status}") return json.loads(response.read().decode("utf-8")) def delete_entity(url, headers): req = urllib.request.Request(url, headers=headers, method="DELETE") with urllib.request.urlopen(req) as response: if response.status != 200: sys.exit(f"HTTP delete error {response.status}") def db_connection(diary_path: Path): conn = sqlite3.connect(diary_path / DB_NAME) conn.execute("PRAGMA foreign_keys = ON;") return conn def initialize_db(conn: sqlite3.Connection): with conn: conn.executescript( """ CREATE TABLE IF NOT EXISTS metadata ( id INTEGER PRIMARY KEY, unixtime INTEGER NOT NULL, type INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS weather ( id INTEGER PRIMARY KEY, temp INTEGER NOT NULL, temp_like INTEGER NOT NULL, sunrise INTEGER NOT NULL, sunset INTEGER NOT NULL, icon TEXT NOT NULL DEFAULT 'none', metadata_id INTEGER NOT NULL, FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS location ( id INTEGER PRIMARY KEY, city TEXT NOT NULL, lon TEXT NOT NULL, lat TEXT NOT NULL, tz TEXT NOT NULL, metadata_id INTEGER NOT NULL, FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE ); """ ) def insert_metadata( conn: sqlite3.Connection, unixtime: int, metadata_type: MetadataType ): cursor = conn.cursor() cursor.execute( "INSERT INTO metadata(unixtime, type) VALUES(?, ?)", [unixtime, metadata_type] ) conn.commit() return cursor.lastrowid def insert_weather(weather: dict, conn: sqlite3.Connection, metadata_id: int): if isinstance(weather, list): weather = weather[0] cursor = conn.cursor() cursor.execute( """ INSERT INTO weather(temp, temp_like, sunrise, sunset, icon, metadata_id) VALUES(?, ?, ?, ?, ?, ?) """, [ weather["temp"], weather["feels_like"], weather["sunrise"], weather["sunset"], weather["weather"][0]["icon"], metadata_id, ], ) conn.commit() def insert_location(location: dict, conn: sqlite3.Connection, metadata_id: int): cursor = conn.cursor() cursor.execute( """ INSERT INTO location(city, lon, lat, tz, metadata_id) VALUES(?, ?, ?, ?, ?) """, [ location["locality"], location["lon"], location["lat"], location["tzname"], metadata_id, ], ) conn.commit() def fetch_geo(create_time_timestamp: int, conn: sqlite3.Connection): geo_url = f"{Config.owntracks_url}/api/0/locations" geo_headers = { "Authorization": f"Basic {base64.b64encode(Config.owntracks_creds).decode()}" } geo_response = fetch_data( geo_url, geo_headers, data={ "from": "1970-01-01", "limit": 20, "device": Config.geo_device, "user": Config.geo_user, }, ) closest_entry = find_closest_entry( geo_response.get("data", []), create_time_timestamp ) metadata_id = insert_metadata(conn, create_time_timestamp, MetadataType.LOCATION) insert_location(closest_entry, conn, metadata_id) return closest_entry def fetch_weather(closest_entry: dict, unixtime: int, conn: sqlite3.Connection): weather_response = fetch_data( f"https://api.openweathermap.org/data/3.0/onecall/timemachine?lat={closest_entry['lat']}&lon={closest_entry['lon']}&dt={unixtime}&appid={Config.openweathermap_api_key}&units=metric", headers={}, ) metadata_id = insert_metadata(conn, unixtime, MetadataType.WEATHER) insert_weather(weather_response["data"], conn, metadata_id) def export(): if len(sys.argv) < 4 or sys.argv[1] != "export": sys.exit("Usage: script.py export ") diary_name, tag = sys.argv[2], sys.argv[3] diary_path = get_diary_path_by_name(diary_name).parent try: conn = db_connection(diary_path) initialize_db(conn) headers = {"Cookie": f"memos.access-token={Config.memo_token}"} query_string = urllib.parse.urlencode( {"filter": f"creator=='users/1'&&tag_search==['{tag}']"} ) data = fetch_data(f"{Config.memo_url}api/v1/memos?{query_string}", headers) memos = data.get("memos", []) if not memos: sys.exit("No memos found.") if ( input(f"There are {len(memos)} memos. Export them all? (Y/N): ") .strip() .upper() != "Y" ): sys.exit("Export canceled.") for memo in memos: create_time = memo["createTime"] content = shlex.quote(memo["content"].replace(f"#{tag}", "").strip()) try: result = subprocess.run( f'printf "%s %s" "{convert_diary_date(create_time)}" {content} | jrnl', shell=True, capture_output=True, text=True, check=True, ) except subprocess.CalledProcessError as e: print(f"Error writing to journal: {e.stderr}") continue closest_entry = fetch_geo(make_tz_unixtime(create_time), conn) fetch_weather(closest_entry, make_tz_unixtime(create_time), conn) delete_entity(f"{Config.memo_url}/api/v1/{memo['name']}", headers) except Exception as e: print(f"An error occurred: {e}") def insert(): conn = None try: if len(sys.argv) < 3 or sys.argv[1] != "insert": sys.exit( "Usage: script.py insert [bulk|single (default)] 'content'" ) if len(sys.argv) == 5 and sys.argv[3] != "single": sys.exit("Invalid usage for bulk insert.") diary_name = sys.argv[2] insert_type = ( "bulk" if len(sys.argv) > 3 and sys.argv[3] == "bulk" else "single" ) diary_path = get_diary_path_by_name(diary_name).parent conn = db_connection(diary_path) initialize_db(conn) if insert_type == "single": content = sys.argv[4] if not content: print("There is no text") sys.exit(1) try: subprocess.run( ["jrnl", diary_name, content], capture_output=True, text=True, check=True, ) except subprocess.CalledProcessError as e: print(f"Error inserting single entry: {e.stderr}") raise datenow = datetime.now(timezone.utc) datenow_timestamp = datenow.strftime("%Y-%m-%dT%H:%M:%SZ") closest_entry = fetch_geo(make_tz_unixtime(datenow_timestamp), conn) fetch_weather(closest_entry, make_tz_unixtime(datenow_timestamp), conn) elif insert_type == "bulk": fd, temp_file_path = tempfile.mkstemp() os.close(fd) try: subprocess.run(["nvim", temp_file_path], text=True, check=True) with open(temp_file_path, "r") as tmp_file: content = tmp_file.read() try: subprocess.run( f"cat {temp_file_path} | jrnl {diary_name} --import", shell=True, capture_output=True, text=True, check=True, ) except subprocess.CalledProcessError as e: print(f"Error during bulk import: {e.stderr}") raise datenow = datetime.now(timezone.utc) datenow_timestamp = datenow.strftime("%Y-%m-%dT%H:%M:%SZ") closest_entry = fetch_geo(make_tz_unixtime(datenow_timestamp), conn) fetch_weather(closest_entry, make_tz_unixtime(datenow_timestamp), conn) finally: os.remove(temp_file_path) except Exception as e: print(f"An error occurred: {e}") raise finally: if conn: conn.close() print("Database connection closed.") if __name__ == "__main__": if "export" in sys.argv: export() elif "insert" in sys.argv: insert() else: print("Unknown command") sys.exit(1)