From d1aecddaad52f2d2499a27e8ab54983eeaa83270 Mon Sep 17 00:00:00 2001 From: fz0x1 Date: Fri, 17 Jan 2025 13:30:12 +0100 Subject: [PATCH] 20250117.1737117012 --- global/.zshrc | 13 +- global/scripts/bin/diary.py | 362 +++++++++++++++++++++++++++++++ global/scripts/bin/memos2jrnl.py | 295 ------------------------- 3 files changed, 373 insertions(+), 297 deletions(-) create mode 100755 global/scripts/bin/diary.py delete mode 100755 global/scripts/bin/memos2jrnl.py diff --git a/global/.zshrc b/global/.zshrc index f6a359a..d142449 100644 --- a/global/.zshrc +++ b/global/.zshrc @@ -47,6 +47,13 @@ ex () fi } +DIARY="default" + +dis () +{ + "${HOME:?}/scripts/bin/diary.py" insert "${DIARY:?}" single "$*" +} + # aliases ## just alias j="just" @@ -116,8 +123,10 @@ alias resmacs="killall emacs &> /dev/null && emacs --daemon &> /dev/null" alias em="emacsclient -c" alias relmacs="doom sync" ## diary -alias di="diary.sh" -alias m2jrnl="memos2jrnl.py export default diaryf" +# alias di="diary.sh" +alias m2jrnl="diary.py export ${DIARY:?} diaryf" +alias di="diary.py insert" +alias dib="diary.py insert ${DIARY:?} bulk" # bindkeys ## autosuggest diff --git a/global/scripts/bin/diary.py b/global/scripts/bin/diary.py new file mode 100755 index 0000000..30fa068 --- /dev/null +++ b/global/scripts/bin/diary.py @@ -0,0 +1,362 @@ +#!/usr/bin/env python3 + +import base64 +import json +import os +import re +import shlex +import sqlite3 +import subprocess +import sys +import tempfile +import urllib.parse +import urllib.request +from datetime import datetime, timedelta, timezone +from enum import IntEnum +from pathlib import Path + +DB_NAME = Path("metadata.db") +TZ = 1 + + +class MetadataType(IntEnum): + WEATHER = 1 + LOCATION = 2 + + +class Config: + memo_token = os.getenv("MEMOS_TOKEN") + memo_url = os.getenv("MEMOS_URL") + openweathermap_api_key = os.getenv("OPENWEATHER_APIKEY") + owntracks_creds = os.getenv("OWNTRACKS_CREDS", "").encode() + owntracks_url = os.getenv("OWNTRACKS_URL") + geo_user, geo_device = os.getenv("OWNTRACKS_PARAMS", ",").split(",") + + @classmethod + def validate(cls): + if not cls.memo_token or not cls.memo_url: + sys.exit("Missing MEMOS_TOKEN or MEMOS_URL environment variables.") + + +Config.validate() + + +def get_diary_path_by_name(name: str): + result = subprocess.run(["jrnl", "--list"], capture_output=True, text=True) + if result.stderr: + sys.exit(f"Error retrieving diary name: {result.stderr}") + + matches = dict(re.findall(r"\*\s+(\w+)\s+->\s+(.+)", result.stdout.strip())) + diary_path = matches.get(name) + + if not diary_path or not Path(diary_path).exists(): + sys.exit(f"Diary '{name}' not found or path does not exist.") + return Path(diary_path) + + +def make_tz_unixtime(target_time: str): + return int( + ( + datetime.strptime(target_time, "%Y-%m-%dT%H:%M:%SZ") + timedelta(hours=TZ) + ).timestamp() + ) + + +def find_closest_entry(data, target_timestamp: int): + return min( + (entry for entry in data if "tst" in entry), + key=lambda e: abs(target_timestamp - e["tst"]), + default=None, + ) + + +def convert_diary_date(date_str): + try: + dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ") + timedelta(hours=TZ) + return dt.strftime("%d %b %Y at %H:%M:%S:") + except ValueError: + return None + + +def fetch_data(url, headers, data=None): + method = "POST" if data else "GET" + encoded_data = urllib.parse.urlencode(data).encode("utf-8") if data else None + + req = urllib.request.Request(url, headers=headers, data=encoded_data, method=method) + req.add_header("Content-Type", "application/x-www-form-urlencoded") + + with urllib.request.urlopen(req) as response: + if response.status != 200: + sys.exit(f"HTTP error {response.status}") + return json.loads(response.read().decode("utf-8")) + + +def delete_entity(url, headers): + req = urllib.request.Request(url, headers=headers, method="DELETE") + with urllib.request.urlopen(req) as response: + if response.status != 200: + sys.exit(f"HTTP delete error {response.status}") + + +def db_connection(diary_path: Path): + conn = sqlite3.connect(diary_path / DB_NAME) + conn.execute("PRAGMA foreign_keys = ON;") + return conn + + +def initialize_db(conn: sqlite3.Connection): + with conn: + conn.executescript( + """ + CREATE TABLE IF NOT EXISTS metadata ( + id INTEGER PRIMARY KEY, + unixtime INTEGER NOT NULL, + type INTEGER NOT NULL + ); + CREATE TABLE IF NOT EXISTS weather ( + id INTEGER PRIMARY KEY, + temp INTEGER NOT NULL, + temp_like INTEGER NOT NULL, + sunrise INTEGER NOT NULL, + sunset INTEGER NOT NULL, + icon TEXT NOT NULL DEFAULT 'none', + metadata_id INTEGER NOT NULL, + FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE + ); + CREATE TABLE IF NOT EXISTS location ( + id INTEGER PRIMARY KEY, + city TEXT NOT NULL, + lon TEXT NOT NULL, + lat TEXT NOT NULL, + tz TEXT NOT NULL, + metadata_id INTEGER NOT NULL, + FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE + ); + """ + ) + + +def insert_metadata( + conn: sqlite3.Connection, unixtime: int, metadata_type: MetadataType +): + cursor = conn.cursor() + cursor.execute( + "INSERT INTO metadata(unixtime, type) VALUES(?, ?)", [unixtime, metadata_type] + ) + conn.commit() + return cursor.lastrowid + + +def insert_weather(weather: dict, conn: sqlite3.Connection, metadata_id: int): + if isinstance(weather, list): + weather = weather[0] + cursor = conn.cursor() + cursor.execute( + """ + INSERT INTO weather(temp, temp_like, sunrise, sunset, icon, metadata_id) + VALUES(?, ?, ?, ?, ?, ?) + """, + [ + weather["temp"], + weather["feels_like"], + weather["sunrise"], + weather["sunset"], + weather["weather"][0]["icon"], + metadata_id, + ], + ) + conn.commit() + + +def insert_location(location: dict, conn: sqlite3.Connection, metadata_id: int): + cursor = conn.cursor() + cursor.execute( + """ + INSERT INTO location(city, lon, lat, tz, metadata_id) + VALUES(?, ?, ?, ?, ?) + """, + [ + location["locality"], + location["lon"], + location["lat"], + location["tzname"], + metadata_id, + ], + ) + conn.commit() + + +def fetch_geo(create_time_timestamp: int, conn: sqlite3.Connection): + geo_url = f"{Config.owntracks_url}/api/0/locations" + geo_headers = { + "Authorization": f"Basic {base64.b64encode(Config.owntracks_creds).decode()}" + } + geo_response = fetch_data( + geo_url, + geo_headers, + data={ + "from": "1970-01-01", + "limit": 20, + "device": Config.geo_device, + "user": Config.geo_user, + }, + ) + closest_entry = find_closest_entry( + geo_response.get("data", []), create_time_timestamp + ) + metadata_id = insert_metadata(conn, create_time_timestamp, MetadataType.LOCATION) + insert_location(closest_entry, conn, metadata_id) + return closest_entry + + +def fetch_weather(closest_entry: dict, unixtime: int, conn: sqlite3.Connection): + weather_response = fetch_data( + f"https://api.openweathermap.org/data/3.0/onecall/timemachine?lat={closest_entry['lat']}&lon={closest_entry['lon']}&dt={unixtime}&appid={Config.openweathermap_api_key}&units=metric", + headers={}, + ) + metadata_id = insert_metadata(conn, unixtime, MetadataType.WEATHER) + insert_weather(weather_response["data"], conn, metadata_id) + + +def export(): + if len(sys.argv) < 4 or sys.argv[1] != "export": + sys.exit("Usage: script.py export ") + + diary_name, tag = sys.argv[2], sys.argv[3] + diary_path = get_diary_path_by_name(diary_name).parent + + try: + conn = db_connection(diary_path) + initialize_db(conn) + + headers = {"Cookie": f"memos.access-token={Config.memo_token}"} + query_string = urllib.parse.urlencode( + {"filter": f"creator=='users/1'&&tag_search==['{tag}']"} + ) + data = fetch_data(f"{Config.memo_url}api/v1/memos?{query_string}", headers) + + memos = data.get("memos", []) + if not memos: + sys.exit("No memos found.") + + if ( + input(f"There are {len(memos)} memos. Export them all? (Y/N): ") + .strip() + .upper() + != "Y" + ): + sys.exit("Export canceled.") + + for memo in memos: + create_time = memo["createTime"] + content = shlex.quote(memo["content"].replace(f"#{tag}", "").strip()) + + try: + result = subprocess.run( + f'printf "%s %s" "{convert_diary_date(create_time)}" {content} | jrnl', + shell=True, + capture_output=True, + text=True, + check=True, + ) + except subprocess.CalledProcessError as e: + print(f"Error writing to journal: {e.stderr}") + continue + + closest_entry = fetch_geo(make_tz_unixtime(create_time), conn) + fetch_weather(closest_entry, make_tz_unixtime(create_time), conn) + + delete_entity(f"{Config.memo_url}/api/v1/{memo['name']}", headers) + + except Exception as e: + print(f"An error occurred: {e}") + + +def insert(): + conn = None + try: + if len(sys.argv) < 3 or sys.argv[1] != "insert": + sys.exit( + "Usage: script.py insert [bulk|single (default)] 'content'" + ) + + if len(sys.argv) == 5 and sys.argv[3] != "single": + sys.exit("Invalid usage for bulk insert.") + + diary_name = sys.argv[2] + insert_type = ( + "bulk" if len(sys.argv) > 3 and sys.argv[3] == "bulk" else "single" + ) + diary_path = get_diary_path_by_name(diary_name).parent + + conn = db_connection(diary_path) + initialize_db(conn) + + if insert_type == "single": + content = sys.argv[4] + if not content: + print("There is no text") + sys.exit(1) + try: + subprocess.run( + ["jrnl", diary_name, content], + capture_output=True, + text=True, + check=True, + ) + except subprocess.CalledProcessError as e: + print(f"Error inserting single entry: {e.stderr}") + raise + + datenow = datetime.now(timezone.utc) + datenow_timestamp = datenow.strftime("%Y-%m-%dT%H:%M:%SZ") + closest_entry = fetch_geo(make_tz_unixtime(datenow_timestamp), conn) + fetch_weather(closest_entry, make_tz_unixtime(datenow_timestamp), conn) + + elif insert_type == "bulk": + fd, temp_file_path = tempfile.mkstemp() + os.close(fd) + + try: + subprocess.run(["nvim", temp_file_path], text=True, check=True) + with open(temp_file_path, "r") as tmp_file: + content = tmp_file.read() + + try: + subprocess.run( + f"cat {temp_file_path} | jrnl {diary_name} --import", + shell=True, + capture_output=True, + text=True, + check=True, + ) + except subprocess.CalledProcessError as e: + print(f"Error during bulk import: {e.stderr}") + raise + + datenow = datetime.now(timezone.utc) + datenow_timestamp = datenow.strftime("%Y-%m-%dT%H:%M:%SZ") + closest_entry = fetch_geo(make_tz_unixtime(datenow_timestamp), conn) + fetch_weather(closest_entry, make_tz_unixtime(datenow_timestamp), conn) + + finally: + os.remove(temp_file_path) + + except Exception as e: + print(f"An error occurred: {e}") + raise + + finally: + if conn: + conn.close() + print("Database connection closed.") + + +if __name__ == "__main__": + if "export" in sys.argv: + export() + elif "insert" in sys.argv: + insert() + else: + print("Unknown command") + sys.exit(1) diff --git a/global/scripts/bin/memos2jrnl.py b/global/scripts/bin/memos2jrnl.py deleted file mode 100755 index 36e0c97..0000000 --- a/global/scripts/bin/memos2jrnl.py +++ /dev/null @@ -1,295 +0,0 @@ -#!/usr/bin/env python3 - -import base64 -import json -import os -import re -import shlex -import sqlite3 -import subprocess -import sys -import urllib.parse -import urllib.request -from datetime import datetime, timedelta -from enum import IntEnum -from pathlib import Path - -DB_NAME = Path("metadata.db") -TZ = 1 - - -class MetadataType(IntEnum): - WEATHER = 1 - LOCATION = 2 - - -def get_diary_path_by_name(name: str): - """Retrieve the path of a diary by its name.""" - result = subprocess.run(["jrnl", "--list"], capture_output=True, text=True) - if result.stderr: - sys.exit(f"Get diary name error: {result.stderr}") - - pattern = r"\*\s+(\w+)\s+->\s+(.+)" - matches = dict(re.findall(pattern, result.stdout.strip())) - diary_path = matches.get(name) - - if not diary_path or not Path(diary_path).exists(): - sys.exit(f"Diary '{name}' not found or path does not exist.") - return diary_path - - -def find_closest_entry(data, target_time): - """Find the entry closest to the target timestamp.""" - target_timestamp = int( - ( - datetime.strptime(target_time, "%Y-%m-%dT%H:%M:%SZ") + timedelta(hours=TZ) - ).timestamp() - ) - return min( - (entry for entry in data if "tst" in entry), - key=lambda e: abs(target_timestamp - e["tst"]), - default=None, - ), target_timestamp - - -def convert_diary_date(date_str): - """Convert date string to formatted diary date.""" - try: - dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ") + timedelta(hours=TZ) - return dt.strftime("%d %b %Y at %H:%M:%S:") - except ValueError: - return None - - -def fetch_data(url, headers, data=None): - """Fetch data from the specified URL.""" - data = data or {} - method = "POST" if data else "GET" - encoded_data = urllib.parse.urlencode(data).encode("utf-8") if data else None - - req = urllib.request.Request( - url=url, headers=headers, data=encoded_data, method=method - ) - req.add_header("Content-Type", "application/x-www-form-urlencoded") - - with urllib.request.urlopen(req) as response: - if response.status != 200: - sys.exit(f"Error: HTTP {response.status}") - return json.loads(response.read().decode("utf-8")) - - -def delete_entity(url, headers): - """Delete an entity using the DELETE method.""" - req = urllib.request.Request(url=url, headers=headers, method="DELETE") - with urllib.request.urlopen(req) as response: - if response.status != 200: - sys.exit(f"Error while deleting entity: HTTP {response.status}") - - -def db_connection(diary_path: Path): - """Establish a database connection and enable foreign keys.""" - conn = sqlite3.connect(diary_path / DB_NAME) - conn.execute("PRAGMA foreign_keys = ON;") - return conn - - -def initialize_db(conn: sqlite3.Connection): - """Initialize the database with necessary tables.""" - with conn: - conn.executescript( - """ - CREATE TABLE IF NOT EXISTS metadata ( - id INTEGER PRIMARY KEY, - unixtime INTEGER NOT NULL, - type INTEGER NOT NULL - ); - CREATE TABLE IF NOT EXISTS weather ( - id INTEGER PRIMARY KEY, - temp INTEGER NOT NULL, - temp_like INTEGER NOT NULL, - sunrise INTEGER NOT NULL, - sunset INTEGER NOT NULL, - icon TEXT NOT NULL DEFAULT 'none', - metadata_id INTEGER NOT NULL, - FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE - ); - CREATE TABLE IF NOT EXISTS location ( - id INTEGER PRIMARY KEY, - city TEXT NOT NULL, - lon TEXT NOT NULL, - lat TEXT NOT NULL, - tz TEXT NOT NULL, - metadata_id INTEGER NOT NULL, - FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE - ); - """ - ) - - -def insert_metadata( - conn: sqlite3.Connection, unixtime: int, metadata_type: MetadataType -): - """Insert metadata and return its ID.""" - try: - cursor = conn.cursor() - cursor.execute( - "INSERT INTO metadata(unixtime, type) VALUES(?, ?)", - [unixtime, metadata_type], - ) - conn.commit() - return cursor.lastrowid - except sqlite3.DatabaseError as e: - print(f"Error inserting metadata: {e}") - conn.rollback() - raise - - -def insert_weather(weather: dict, conn: sqlite3.Connection, metadata_id: int): - """Insert weather data into the database.""" - cursor = conn.cursor() - weather = weather[0] - cursor.execute( - """ - INSERT INTO weather(temp, temp_like, sunrise, sunset, icon, metadata_id) - VALUES(?, ?, ?, ?, ?, ?) - """, - [ - weather["temp"], - weather["feels_like"], - weather["sunrise"], - weather["sunset"], - weather["weather"][0]["icon"], - metadata_id, - ], - ) - conn.commit() - - -def insert_location(location: dict, conn: sqlite3.Connection, metadata_id: int): - """Insert location data into the database.""" - cursor = conn.cursor() - cursor.execute( - """ - INSERT INTO location(city, lon, lat, tz, metadata_id) - VALUES(?, ?, ?, ?, ?) - """, - [ - location["locality"], - location["lon"], - location["lat"], - location["tzname"], - metadata_id, - ], - ) - conn.commit() - - -def export(): - """Main export function.""" - memo_token = os.getenv("MEMOS_TOKEN") - memo_url = os.getenv("MEMOS_URL") - openweathermap_api_key = os.getenv("OPENWEATHER_APIKEY") - owntracks_creds = os.getenv("OWNTRACKS_CREDS").encode() - owntracks_url = os.getenv("OWNTRACKS_URL") - geo_user, geo_device = os.getenv("OWNTRACKS_PARAMS").split(",") - - if not memo_token or not memo_url: - sys.exit("Missing MEMOS_TOKEN or MEMOS_URL environment variables.") - - if len(sys.argv) < 4 or sys.argv[1] != "export": - sys.exit("Usage: script.py export ") - - diary_name = sys.argv[2] - tag = sys.argv[3] - diary_full_path = Path(get_diary_path_by_name(diary_name)) - diary_path = diary_full_path.parent - - conn = None - try: - conn = db_connection(diary_path) - initialize_db(conn) - - headers = {"Cookie": f"memos.access-token={memo_token}"} - query_string = urllib.parse.urlencode( - {"filter": f"creator=='users/1'&&tag_search==['{tag}']"} - ) - data = fetch_data(f"{memo_url}api/v1/memos?{query_string}", headers) - - memos = data.get("memos", []) - if not memos: - sys.exit("No memos found.") - - if ( - input(f"There are {len(memos)} memos. Export them all? (Y/N): ") - .strip() - .upper() - != "Y" - ): - sys.exit("Export canceled.") - - for memo in memos: - create_time = memo["createTime"] - - # Check if entry exists - result = subprocess.run( - ["jrnl", "-on", create_time, "--format", "json"], - capture_output=True, - text=True, - ) - if result.stdout.strip(): - print(f"Skipping existing memo: {memo['name']}") - continue - - content = shlex.quote(memo["content"].replace(f"#{tag}", "").strip()) - result = subprocess.run( - f'printf "%s %s" "{convert_diary_date(memo["createTime"])}" {content} | jrnl', - shell=True, - capture_output=True, - text=True, - ) - if result.stderr: - print(f"There are some errors: {result.stderr}") - sys.exit(1) - - geo_url = f"{owntracks_url}/api/0/locations" - geo_headers = { - "Authorization": f"Basic {base64.b64encode(owntracks_creds).decode()}" - } - geo_response = fetch_data( - geo_url, - geo_headers, - data={ - "from": "1970-01-01", - "limit": 20, - "device": geo_device, - "user": geo_user, - }, - ) - closest_entry, create_time_unix = find_closest_entry( - geo_response.get("data", []), create_time - ) - print(f"Closest geo entry: {closest_entry}") - - metadata_id = insert_metadata(conn, create_time_unix, MetadataType.LOCATION) - insert_location(closest_entry, conn, metadata_id) - - weather_response = fetch_data( - f"https://api.openweathermap.org/data/3.0/onecall/timemachine?lat={closest_entry['lat']}&lon={closest_entry['lon']}&dt={create_time_unix}&appid={openweathermap_api_key}&units=metric", - headers={}, - ) - print(f"Weather: {create_time_unix} - {weather_response}") - metadata_id = insert_metadata(conn, create_time_unix, MetadataType.WEATHER) - insert_weather(weather_response["data"], conn, metadata_id) - - delete_entity(f"{memo_url}/api/v1/{memo['name']}", headers) - - except Exception as e: - print(f"An error occurred: {e}") - finally: - if conn: - conn.close() - print("Database connection closed.") - - -if __name__ == "__main__": - export()