#!/usr/bin/env python3 import base64 import json import os import re import shlex import sqlite3 import subprocess import sys import urllib.parse import urllib.request from datetime import datetime, timedelta from enum import IntEnum from pathlib import Path DB_NAME = Path("metadata.db") TZ = 1 class MetadataType(IntEnum): WEATHER = 1 LOCATION = 2 def get_diary_path_by_name(name: str): """Retrieve the path of a diary by its name.""" result = subprocess.run(["jrnl", "--list"], capture_output=True, text=True) if result.stderr: sys.exit(f"Get diary name error: {result.stderr}") pattern = r"\*\s+(\w+)\s+->\s+(.+)" matches = dict(re.findall(pattern, result.stdout.strip())) diary_path = matches.get(name) if not diary_path or not Path(diary_path).exists(): sys.exit(f"Diary '{name}' not found or path does not exist.") return diary_path def find_closest_entry(data, target_time): """Find the entry closest to the target timestamp.""" target_timestamp = int( ( datetime.strptime(target_time, "%Y-%m-%dT%H:%M:%SZ") + timedelta(hours=TZ) ).timestamp() ) return min( (entry for entry in data if "tst" in entry), key=lambda e: abs(target_timestamp - e["tst"]), default=None, ), target_timestamp def convert_diary_date(date_str): """Convert date string to formatted diary date.""" try: dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ") + timedelta(hours=TZ) return dt.strftime("%d %b %Y at %H:%M:%S:") except ValueError: return None def fetch_data(url, headers, data=None): """Fetch data from the specified URL.""" data = data or {} method = "POST" if data else "GET" encoded_data = urllib.parse.urlencode(data).encode("utf-8") if data else None req = urllib.request.Request( url=url, headers=headers, data=encoded_data, method=method ) req.add_header("Content-Type", "application/x-www-form-urlencoded") with urllib.request.urlopen(req) as response: if response.status != 200: sys.exit(f"Error: HTTP {response.status}") return json.loads(response.read().decode("utf-8")) def delete_entity(url, headers): """Delete an entity using the DELETE method.""" req = urllib.request.Request(url=url, headers=headers, method="DELETE") with urllib.request.urlopen(req) as response: if response.status != 200: sys.exit(f"Error while deleting entity: HTTP {response.status}") def db_connection(diary_path: Path): """Establish a database connection and enable foreign keys.""" conn = sqlite3.connect(diary_path / DB_NAME) conn.execute("PRAGMA foreign_keys = ON;") return conn def initialize_db(conn: sqlite3.Connection): """Initialize the database with necessary tables.""" with conn: conn.executescript( """ CREATE TABLE IF NOT EXISTS metadata ( id INTEGER PRIMARY KEY, unixtime INTEGER NOT NULL, type INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS weather ( id INTEGER PRIMARY KEY, temp INTEGER NOT NULL, temp_like INTEGER NOT NULL, sunrise INTEGER NOT NULL, sunset INTEGER NOT NULL, icon TEXT NOT NULL DEFAULT 'none', metadata_id INTEGER NOT NULL, FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS location ( id INTEGER PRIMARY KEY, city TEXT NOT NULL, lon TEXT NOT NULL, lat TEXT NOT NULL, tz TEXT NOT NULL, metadata_id INTEGER NOT NULL, FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE ); """ ) def insert_metadata( conn: sqlite3.Connection, unixtime: int, metadata_type: MetadataType ): """Insert metadata and return its ID.""" try: cursor = conn.cursor() cursor.execute( "INSERT INTO metadata(unixtime, type) VALUES(?, ?)", [unixtime, metadata_type], ) conn.commit() return cursor.lastrowid except sqlite3.DatabaseError as e: print(f"Error inserting metadata: {e}") conn.rollback() raise def insert_weather(weather: dict, conn: sqlite3.Connection, metadata_id: int): """Insert weather data into the database.""" cursor = conn.cursor() weather = weather[0] cursor.execute( """ INSERT INTO weather(temp, temp_like, sunrise, sunset, icon, metadata_id) VALUES(?, ?, ?, ?, ?, ?) """, [ weather["temp"], weather["feels_like"], weather["sunrise"], weather["sunset"], weather["weather"][0]["icon"], metadata_id, ], ) conn.commit() def insert_location(location: dict, conn: sqlite3.Connection, metadata_id: int): """Insert location data into the database.""" cursor = conn.cursor() cursor.execute( """ INSERT INTO location(city, lon, lat, tz, metadata_id) VALUES(?, ?, ?, ?, ?) """, [ location["locality"], location["lon"], location["lat"], location["tzname"], metadata_id, ], ) conn.commit() def export(): """Main export function.""" memo_token = os.getenv("MEMOS_TOKEN") memo_url = os.getenv("MEMOS_URL") openweathermap_api_key = os.getenv("OPENWEATHER_APIKEY") owntracks_creds = os.getenv("OWNTRACKS_CREDS").encode() owntracks_url = os.getenv("OWNTRACKS_URL") geo_user, geo_device = os.getenv("OWNTRACKS_PARAMS").split(",") if not memo_token or not memo_url: sys.exit("Missing MEMOS_TOKEN or MEMOS_URL environment variables.") if len(sys.argv) < 4 or sys.argv[1] != "export": sys.exit("Usage: script.py export ") diary_name = sys.argv[2] tag = sys.argv[3] diary_full_path = Path(get_diary_path_by_name(diary_name)) diary_path = diary_full_path.parent conn = None try: conn = db_connection(diary_path) initialize_db(conn) headers = {"Cookie": f"memos.access-token={memo_token}"} query_string = urllib.parse.urlencode( {"filter": f"creator=='users/1'&&tag_search==['{tag}']"} ) data = fetch_data(f"{memo_url}api/v1/memos?{query_string}", headers) memos = data.get("memos", []) if not memos: sys.exit("No memos found.") if ( input(f"There are {len(memos)} memos. Export them all? (Y/N): ") .strip() .upper() != "Y" ): sys.exit("Export canceled.") for memo in memos: create_time = memo["createTime"] # Check if entry exists result = subprocess.run( ["jrnl", "-on", create_time, "--format", "json"], capture_output=True, text=True, ) if result.stdout.strip(): print(f"Skipping existing memo: {memo['name']}") continue content = shlex.quote(memo["content"].replace(f"#{tag}", "").strip()) result = subprocess.run( f'printf "%s %s" "{convert_diary_date(memo["createTime"])}" {content} | jrnl', shell=True, capture_output=True, text=True, ) if result.stderr: print(f"There are some errors: {result.stderr}") sys.exit(1) geo_url = f"{owntracks_url}/api/0/locations" geo_headers = { "Authorization": f"Basic {base64.b64encode(owntracks_creds).decode()}" } geo_response = fetch_data( geo_url, geo_headers, data={ "from": "1970-01-01", "limit": 20, "device": geo_device, "user": geo_user, }, ) closest_entry, create_time_unix = find_closest_entry( geo_response.get("data", []), create_time ) print(f"Closest geo entry: {closest_entry}") metadata_id = insert_metadata(conn, create_time_unix, MetadataType.LOCATION) insert_location(closest_entry, conn, metadata_id) weather_response = fetch_data( f"https://api.openweathermap.org/data/3.0/onecall/timemachine?lat={closest_entry['lat']}&lon={closest_entry['lon']}&dt={create_time_unix}&appid={openweathermap_api_key}&units=metric", headers={}, ) print(f"Weather: {create_time_unix} - {weather_response}") metadata_id = insert_metadata(conn, create_time_unix, MetadataType.WEATHER) insert_weather(weather_response["data"], conn, metadata_id) delete_entity(f"{memo_url}/api/v1/{memo['name']}", headers) except Exception as e: print(f"An error occurred: {e}") finally: if conn: conn.close() print("Database connection closed.") if __name__ == "__main__": export()