#!/usr/bin/env python3 import base64 # import cProfile import hashlib import json import logging import os # import pstats import re import shlex import shutil import sqlite3 import subprocess import sys import tempfile import time import urllib.parse import urllib.request from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from pathlib import Path from zoneinfo import ZoneInfo DB_NAME = Path("metadata.db") # TZ = 1 TZ = ZoneInfo("Europe/Warsaw") TOOLS = ("jrnl", "sqlite3") FILES = (".jpeg", ".jpg", ".png", ".gif") FILES_PATH = Path("attachments") for t in TOOLS: if not shutil.which(t): raise FileNotFoundError(f"There is no existing path for '{t}'") class Config: memo_token = os.getenv("MEMOS_TOKEN") memo_url = os.getenv("MEMOS_URL") openweathermap_api_key = os.getenv("OPENWEATHER_APIKEY") owntracks_creds = os.getenv("OWNTRACKS_CREDS", "").encode() owntracks_url = os.getenv("OWNTRACKS_URL") geo_user, geo_device = os.getenv("OWNTRACKS_PARAMS", ",").split(",") @classmethod def validate(cls): if not cls.memo_token or not cls.memo_url: sys.exit("Missing MEMOS_TOKEN or MEMOS_URL environment variables.") elif not cls.openweathermap_api_key: sys.exit("Missing openweather api key") elif ( # TODO need more complex checking not cls.owntracks_creds or not cls.owntracks_url or not cls.geo_user or not cls.geo_device ): sys.exit("Missing OwnTracks data") Config.validate() def daytime(dt: datetime): hour = int(dt.strftime("%I")) am_pm = dt.strftime("%p") # "AM"/"PM" part_of_day = "none" if am_pm == "AM": if 5 <= hour < 12: part_of_day = "morning" elif 12 <= hour < 5: part_of_day = "night" return part_of_day def make_logger(): logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[ logging.FileHandler(Path.home() / ".diary_requests.log", mode="a"), ], ) return logging.getLogger("urllib_logger") def get_diary_path_by_name(name: str): result = subprocess.run(["jrnl", "--list"], capture_output=True, text=True) if result.stderr: sys.exit(f"Error retrieving diary name: {result.stderr}") matches = dict(re.findall(r"\*\s+(\w+)\s+->\s+(.+)", result.stdout.strip())) diary_path = matches.get(name) if not diary_path or not Path(diary_path).exists(): sys.exit(f"Diary '{name}' not found or path does not exist.") return Path(diary_path) def make_tz_unixtime(target_time: str): return int( ( datetime.strptime(target_time, "%Y-%m-%dT%H:%M:%SZ") .replace(tzinfo=ZoneInfo("UTC")) .astimezone(TZ) ).timestamp() ) def find_closest_entry(data, target_timestamp: int): return min( (entry for entry in data if "tst" in entry), key=lambda e: abs(target_timestamp - e["tst"]), default=None, ) def convert_diary_date(date_str, to_str: bool = True): try: dt = ( datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ") .replace(tzinfo=ZoneInfo("UTC")) .astimezone(TZ) ) if to_str: return dt.strftime("%d %b %Y at %H:%M:%S:") return dt except ValueError: return None def download_file(url, filename, diary_path, headers, metadata_id): ext = os.path.splitext(url)[-1] filename = f"id{metadata_id}_{int(time.time())}_{filename}" if not Path(diary_path / FILES_PATH).exists(): Path(diary_path / FILES_PATH).mkdir(parents=True, exist_ok=True) filepath = Path(diary_path / FILES_PATH / filename) if ext not in FILES: print(f"File {url} is not supporting") if filepath.exists(): print(f"File {url} exists") try: request = urllib.request.Request(url, headers=headers) with ( urllib.request.urlopen(request) as response, open(filepath, "wb") as out_file, ): out_file.write(response.read()) except Exception as e: sys.exit(str(e)) return Path(FILES_PATH / filename) def fetch_data(url, headers={}, data=None, rjson=True, log=True): logit = make_logger() method = "POST" if data else "GET" encoded_data = urllib.parse.urlencode(data).encode("utf-8") if data else None req = urllib.request.Request(url, headers=headers, data=encoded_data, method=method) req.add_header("Content-Type", "application/x-www-form-urlencoded") try: with urllib.request.urlopen(req) as response: if response.status != 200: logit.error(response.read()) sys.exit(f"HTTP error {response.status}") try: response_data = response.read().decode("utf-8") except UnicodeDecodeError as e: logit.error(e) response_data = response.read() logit.info(response_data) if log else None if not rjson: return response_data return json.loads(response_data) except Exception as e: logit.error(str(e)) raise def delete_entity(url, headers): logit = make_logger() req = urllib.request.Request(url, headers=headers, method="DELETE") try: with urllib.request.urlopen(req) as response: if response.status != 200: logit.error(response.read()) sys.exit(f"HTTP delete error {response.status}") logit.info(response.read().decode("utf-8")) except Exception as e: logit.error(str(e)) raise def db_connection(diary_path: Path): conn = sqlite3.connect(diary_path / DB_NAME) conn.execute("PRAGMA foreign_keys = ON;") return conn def initialize_db(conn: sqlite3.Connection): with conn: conn.executescript( """ CREATE TABLE IF NOT EXISTS metadata ( id INTEGER PRIMARY KEY, unixtime INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS weather ( id INTEGER PRIMARY KEY, temp INTEGER NOT NULL, temp_like INTEGER NOT NULL, sunrise INTEGER NOT NULL, sunset INTEGER NOT NULL, icon TEXT NOT NULL DEFAULT 'none', metadata_id INTEGER NOT NULL, FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS attachments ( id INTEGER PRIMARY KEY, filepath TEXT NOT NULL, metadata_id INTEGER NOT NULL, FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE ); CREATE TABLE IF NOT EXISTS location ( id INTEGER PRIMARY KEY, city TEXT NOT NULL, lon TEXT NOT NULL, lat TEXT NOT NULL, tz TEXT NOT NULL, metadata_id INTEGER NOT NULL, FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE ); """ ) def insert_metadata( conn: sqlite3.Connection, unixtime: int, ): cursor = conn.cursor() cursor.execute("INSERT INTO metadata(unixtime) VALUES(?)", [unixtime]) conn.commit() return cursor.lastrowid def remove_metadata(conn: sqlite3.Connection, metadata_id: int): cursor = conn.cursor() cursor.execute("DELETE FROM metadata WHERE id=?", (metadata_id,)) conn.commit() def insert_weather(weather: dict, conn: sqlite3.Connection, metadata_id: int): if isinstance(weather, list): weather = weather[0] cursor = conn.cursor() try: cursor.execute( """ INSERT INTO weather(temp, temp_like, sunrise, sunset, icon, metadata_id) VALUES(?, ?, ?, ?, ?, ?) """, [ weather["temp"], weather["feels_like"], weather["sunrise"], weather["sunset"], weather["weather"][0]["icon"], metadata_id, ], ) except Exception as e: remove_metadata(conn, metadata_id) conn.rollback() print(e) raise conn.commit() def insert_attach(filepath: str, conn: sqlite3.Connection, metadata_id: int): cursor = conn.cursor() try: cursor.execute( """ INSERT INTO attachments(filepath, metadata_id) VALUES(?, ?) """, [ filepath, metadata_id, ], ) except Exception as e: remove_metadata(conn, metadata_id) conn.rollback() print(e) raise conn.commit() def insert_location(location: dict, conn: sqlite3.Connection, metadata_id: int): cursor = conn.cursor() try: # sometimes it can't get locality.... locality = location["locality"] if "locality" in location else "-" cursor.execute( """ INSERT INTO location(city, lon, lat, tz, metadata_id) VALUES(?, ?, ?, ?, ?) """, [ locality, location["lon"], location["lat"], location["tzname"], metadata_id, ], ) except Exception as e: remove_metadata(conn, metadata_id) conn.rollback() print(e) raise conn.commit() def fetch_geo(metadata_id: int, create_time_timestamp: int, conn: sqlite3.Connection): geo_url = f"{Config.owntracks_url}/api/0/locations" geo_headers = { "Authorization": f"Basic {base64.b64encode(Config.owntracks_creds).decode()}" } geo_response = fetch_data( geo_url, geo_headers, data={ "from": datetime.fromtimestamp(int(time.time()) - 2592000).strftime( "%Y-%m-%d" ), # 30 days # "limit": 1000, "device": Config.geo_device, "user": Config.geo_user, }, ) closest_entry = find_closest_entry( geo_response.get("data", []), create_time_timestamp ) if closest_entry["tst"] + (48 * 60**2) < time.time(): # 2 days print("") if ( input("Do you really want to use such old geo data? (Y/N) ").strip().upper() != "Y" ): remove_metadata(conn, metadata_id) sys.exit("Operation canceled.") insert_location(closest_entry, conn, metadata_id) return closest_entry def fetch_weather( metadata_id: int, closest_entry: dict, unixtime: int, conn: sqlite3.Connection ): weather_response = fetch_data( f"https://api.openweathermap.org/data/3.0/onecall/timemachine?lat={closest_entry['lat']}&lon={closest_entry['lon']}&dt={unixtime}&appid={Config.openweathermap_api_key}&units=metric", headers={}, ) insert_weather(weather_response["data"], conn, metadata_id) def doctor(): args_len = len(sys.argv) check_diary = False fix_diary = False limit = 50 if args_len < 3: sys.exit("Usage: script.py doctor ") if args_len >= 4 and sys.argv[3] == "check_diary": check_diary = True if args_len >= 5: limit = int(sys.argv[4]) if args_len >= 4 and sys.argv[3] == "check_and_fix_diary": fix_diary = True check_diary = True diary_name = sys.argv[2] diary_path = get_diary_path_by_name(diary_name).parent diary_filename = get_diary_path_by_name(diary_name) # do backup ever! make_backup(diary_filename, diary_path) conn = db_connection(diary_path) initialize_db(conn) cursor = conn.cursor() metadata = cursor.execute( f"SELECT * FROM metadata ORDER BY id DESC LIMIT {limit}" ).fetchall() for m in metadata: weather = cursor.execute( "SELECT * FROM weather WHERE metadata_id = ?", (m[0],) ).fetchall() location = cursor.execute( "SELECT * FROM location WHERE metadata_id = ?", (m[0],) ).fetchall() attachment = cursor.execute( "SELECT * FROM attachments WHERE metadata_id = ?", (m[0],) ).fetchall() if not weather: print(f"There is no weather info about {m[0]} - {m[1]}") if not location: print(f"There is no location info about {m[0]} - {m[1]}") # I can't have attachments in every entity... # if not attachment: # print(f"There is no attachment info about {m[0]} - {m[1]}") if not weather and not location and not attachment: # delete metadata entry if any of metadata type is not exists print("An empty metadata was deleted") remove_metadata(conn, m[0]) # Close the connection before starting the multithreading part of the script conn.close() if check_diary: def check_jrnl(metadata: list): dt = datetime.fromtimestamp(metadata[1], tz=TZ) diary_datetime = dt.strftime("%Y/%m/%d at %I:%M:%S %p") try: result = subprocess.run( ["jrnl", diary_name, "-on", diary_datetime], check=True, capture_output=True, text=True, ) return diary_datetime, result except subprocess.CalledProcessError as e: print(e) raise def remove_attachment(metadata_id): """Remove file from attachments directory by 'metadata_id' - 'id12_1741177528_IMG_6141.jpeg'""" files = list(Path(diary_path / FILES_PATH).glob(f"id{metadata_id}_*")) if len(files) < 1: return if len(files) > 1: if ( input(f"There are {len(files)} files. Remove them all? (Y/N): ") .strip() .upper() != "Y" ): return print("There are mistaken attachment, I will delete it!") for file in files: file.unlink(missing_ok=True) def process_metadata(m): conn = db_connection(diary_path) diary_datetime, result = check_jrnl(metadata=m) if not result.stdout.strip(): print( f"There is some metadata that is not associated with a diary entity: {diary_datetime}." ) if not fix_diary: print( "You can automatically remove it by running the scripts with 'check_and_fix_diary' argument." ) else: remove_metadata(conn, m[0]) remove_attachment(m[0]) # remove attachments print("The problem was fixed.") conn.close() with ThreadPoolExecutor(max_workers=10) as executor: futures = {executor.submit(process_metadata, m): m for m in metadata} for future in as_completed(futures): try: future.result() except Exception as e: print(f"Thread error: {e}") raise def make_hash(file: Path): sha256_hash = hashlib.sha256() with open(file, "rb") as filed: sha256_hash.update(filed.read()) return sha256_hash.hexdigest() def make_backup(diary_name: str, diary_path: Path): try: os.remove(diary_path / f"{diary_name}.bak") except FileNotFoundError: ... # shutil.copy2(diary_path / diary_name, diary_path / f"{diary_name}.bak") # I can't make backup when the diary is not encrypted... shutil.copy2(diary_path / DB_NAME, diary_path / f"{DB_NAME}.bak") def export(): if len(sys.argv) < 4 or sys.argv[1] != "export": sys.exit("Usage: script.py export ") diary_name, tag = sys.argv[2], sys.argv[3] diary_path = get_diary_path_by_name(diary_name).parent diary_filename = get_diary_path_by_name(diary_name) # do backup ever! make_backup(diary_filename, diary_path) try: conn = db_connection(diary_path) initialize_db(conn) headers = {"Cookie": f"memos.access-token={Config.memo_token}"} # query_string = urllib.parse.urlencode( # {"filter": f"creator=='users/1'&&tag_search==['{tag}']"} # ) query_string = urllib.parse.urlencode({"filter": f"tag in ['{tag}']"}) data = fetch_data( f"{Config.memo_url}api/v1/users/1/memos?{query_string}", headers ) memos = data.get("memos", []) if not memos: sys.exit("No memos found.") if ( input(f"There are {len(memos)} memos. Export them all? (Y/N): ") .strip() .upper() != "Y" ): sys.exit("Export canceled.") for memo in memos: create_time = memo["createTime"] content = memo["content"].replace(f"#{tag}", "").strip() metadata_id = insert_metadata(conn, make_tz_unixtime(create_time)) # attachments part memo_info = fetch_data(f"{Config.memo_url}/api/v1/{memo['name']}", headers) # check if there are resources if "resources" in memo_info: for resource in memo_info["resources"]: # download files url = f"{Config.memo_url}file/{resource['name']}/{resource['filename']}" try: filepath = download_file( url=url, diary_path=diary_path, filename=resource["filename"], headers=headers, metadata_id=metadata_id, ) except Exception: remove_metadata(conn, metadata_id) raise insert_attach( filepath=str(filepath), conn=conn, metadata_id=metadata_id, ) content += "\n@attach_photo" closest_entry = fetch_geo(metadata_id, make_tz_unixtime(create_time), conn) fetch_weather( metadata_id, closest_entry, make_tz_unixtime(create_time), conn ) try: diary_datetime = convert_diary_date(create_time) daytime_tag = daytime(convert_diary_date(create_time, False)) if daytime_tag != "none": content += f"\n@{daytime_tag}" content = shlex.quote(content) subprocess.run( f'printf "%s %b" "{diary_datetime}" {content} | jrnl {diary_name}', shell=True, capture_output=True, text=True, check=True, ) os.system("clear") print("Record has been inserted.") except subprocess.CalledProcessError as e: print(f"Error writing to journal: {e.stderr}") raise delete_entity(f"{Config.memo_url}/api/v1/{memo['name']}", headers) except Exception as e: print(f"An error occurred: {e}") raise def insert(): conn = None try: if len(sys.argv) < 3 or sys.argv[1] != "insert": sys.exit( "Usage: script.py insert [bulk|single (default)] 'content'" ) diary_name = sys.argv[2] insert_type = ( "bulk" if len(sys.argv) > 3 and sys.argv[3] == "bulk" else "single" ) diary_path = get_diary_path_by_name(diary_name).parent diary_filename = get_diary_path_by_name(diary_name) # do backup ever! make_backup(diary_filename, diary_path) conn = db_connection(diary_path) initialize_db(conn) # generating and converting current time datenow = datetime.now(timezone.utc) datenow_timestamp = datenow.strftime("%Y-%m-%dT%H:%M:%SZ") metadata_id = insert_metadata(conn, make_tz_unixtime(datenow_timestamp)) # fetching geo-data closest_entry = fetch_geo( metadata_id, make_tz_unixtime(datenow_timestamp), conn ) # fetching weather data fetch_weather( metadata_id, closest_entry, make_tz_unixtime(datenow_timestamp), conn ) if insert_type == "single": """ Inserting a string from the terminal """ diary_datetime = convert_diary_date(datenow_timestamp) daytime_tag = daytime(convert_diary_date(datenow_timestamp, False)) content = sys.argv[4] # inserting daytime tag if daytime_tag != "none": content += f"\n@{daytime_tag}" content = shlex.quote(content) if not content: print("There is no text") sys.exit(1) try: subprocess.run( f'printf "%s %s" "{diary_datetime}" {content} | jrnl {diary_name}', shell=True, capture_output=True, text=True, check=True, ) os.system("clear") print("Record has been inserted.") except subprocess.CalledProcessError as e: print(f"Error inserting single entry: {e.stderr}") raise elif insert_type == "bulk": """ Inserting entry from your editor """ fd, temp_file_path = tempfile.mkstemp() os.close(fd) hash = make_hash(Path(temp_file_path)) subprocess.run(["nvim", temp_file_path], text=True, check=True) with open(temp_file_path, "r") as file: diary_datetime = convert_diary_date(datenow_timestamp) daytime_tag = daytime(diary_datetime) content = file.read() # inserting daytime tag if daytime_tag != "none": content += f"\n@{daytime_tag}" content = shlex.quote(file.read()) if hash != make_hash(temp_file_path): try: subprocess.run( f'printf "%s %s" "{diary_datetime}" {content} | jrnl {diary_name}', shell=True, capture_output=True, text=True, check=True, ) os.system("clear") print("Record has been inserted.") except subprocess.CalledProcessError as e: print( f"Error during bulk import: {e.stderr}, file: {temp_file_path}" ) raise os.remove(temp_file_path) except Exception as e: print(f"An error occurred: {e}") raise finally: if conn: conn.close() print("Database connection closed.") if __name__ == "__main__": try: # I want to know if I'm connected... fetch_data(url="https://google.com", rjson=False, log=False) ... except Exception as e: print(f"Connection problem: {e}") raise try: # I want to know if diary is encrypted... subprocess.run( ["jrnl", "--decrypt"], input=b"", # Пустой ввод, чтобы не ввести пароль ) except Exception as e: print(str(e)) raise else: if sys.argv[1] == "export": export() elif sys.argv[1] == "insert": insert() elif sys.argv[1] == "doctor": # cProfile.run("doctor()", "output.prof") # stats = pstats.Stats("output.prof") # stats.strip_dirs().sort_stats("cumulative").print_stats(10) doctor() else: print("Unknown command") sys.exit(1)