.dotfiles/global/scripts/bin/diary.py
2025-01-17 21:19:43 +01:00

522 lines
16 KiB
Python
Executable file

#!/usr/bin/env python3
import base64
import hashlib
import json
import logging
import os
import re
import shlex
import shutil
import sqlite3
import subprocess
import sys
import tempfile
import urllib.parse
import urllib.request
from datetime import datetime, timedelta, timezone
from pathlib import Path
DB_NAME = Path("metadata.db")
TZ = 1
class Config:
memo_token = os.getenv("MEMOS_TOKEN")
memo_url = os.getenv("MEMOS_URL")
openweathermap_api_key = os.getenv("OPENWEATHER_APIKEY")
owntracks_creds = os.getenv("OWNTRACKS_CREDS", "").encode()
owntracks_url = os.getenv("OWNTRACKS_URL")
geo_user, geo_device = os.getenv("OWNTRACKS_PARAMS", ",").split(",")
@classmethod
def validate(cls):
if not cls.memo_token or not cls.memo_url:
sys.exit("Missing MEMOS_TOKEN or MEMOS_URL environment variables.")
Config.validate()
def make_logger():
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(Path.home() / ".diary_requests.log", mode="a"),
],
)
return logging.getLogger("urllib_logger")
def get_diary_path_by_name(name: str):
result = subprocess.run(["jrnl", "--list"], capture_output=True, text=True)
if result.stderr:
sys.exit(f"Error retrieving diary name: {result.stderr}")
matches = dict(re.findall(r"\*\s+(\w+)\s+->\s+(.+)", result.stdout.strip()))
diary_path = matches.get(name)
if not diary_path or not Path(diary_path).exists():
sys.exit(f"Diary '{name}' not found or path does not exist.")
return Path(diary_path)
def make_tz_unixtime(target_time: str):
return int(
(
datetime.strptime(target_time, "%Y-%m-%dT%H:%M:%SZ") + timedelta(hours=TZ)
).timestamp()
)
def find_closest_entry(data, target_timestamp: int):
return min(
(entry for entry in data if "tst" in entry),
key=lambda e: abs(target_timestamp - e["tst"]),
default=None,
)
def convert_diary_date(date_str):
try:
dt = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ") + timedelta(hours=TZ)
return dt.strftime("%d %b %Y at %H:%M:%S:")
except ValueError:
return None
def fetch_data(url, headers={}, data=None, rjson=True, log=True):
logit = make_logger()
method = "POST" if data else "GET"
encoded_data = urllib.parse.urlencode(data).encode("utf-8") if data else None
req = urllib.request.Request(url, headers=headers, data=encoded_data, method=method)
req.add_header("Content-Type", "application/x-www-form-urlencoded")
try:
with urllib.request.urlopen(req) as response:
if response.status != 200:
logit.error(response.read())
sys.exit(f"HTTP error {response.status}")
response_data = response.read().decode("utf-8")
logit.info(response_data) if log else None
if not rjson:
return response_data
return json.loads(response_data)
except Exception as e:
logit.error(str(e))
raise
def delete_entity(url, headers):
logit = make_logger()
req = urllib.request.Request(url, headers=headers, method="DELETE")
try:
with urllib.request.urlopen(req) as response:
if response.status != 200:
logit.error(response.read())
sys.exit(f"HTTP delete error {response.status}")
logit.info(response.read().decode("utf-8"))
except Exception as e:
logit.error(str(e))
raise
def db_connection(diary_path: Path):
conn = sqlite3.connect(diary_path / DB_NAME)
conn.execute("PRAGMA foreign_keys = ON;")
return conn
def initialize_db(conn: sqlite3.Connection):
with conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS metadata (
id INTEGER PRIMARY KEY,
unixtime INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS weather (
id INTEGER PRIMARY KEY,
temp INTEGER NOT NULL,
temp_like INTEGER NOT NULL,
sunrise INTEGER NOT NULL,
sunset INTEGER NOT NULL,
icon TEXT NOT NULL DEFAULT 'none',
metadata_id INTEGER NOT NULL,
FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS location (
id INTEGER PRIMARY KEY,
city TEXT NOT NULL,
lon TEXT NOT NULL,
lat TEXT NOT NULL,
tz TEXT NOT NULL,
metadata_id INTEGER NOT NULL,
FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE
);
"""
)
def insert_metadata(
conn: sqlite3.Connection,
unixtime: int,
):
cursor = conn.cursor()
cursor.execute("INSERT INTO metadata(unixtime) VALUES(?)", [unixtime])
conn.commit()
return cursor.lastrowid
def remove_metadata(conn: sqlite3.Connection, metadata_id: int):
cursor = conn.cursor()
cursor.execute("DELETE FROM metadata WHERE id=?", (metadata_id,))
conn.commit()
def insert_weather(weather: dict, conn: sqlite3.Connection, metadata_id: int):
if isinstance(weather, list):
weather = weather[0]
cursor = conn.cursor()
try:
cursor.execute(
"""
INSERT INTO weather(temp, temp_like, sunrise, sunset, icon, metadata_id)
VALUES(?, ?, ?, ?, ?, ?)
""",
[
weather["temp"],
weather["feels_like"],
weather["sunrise"],
weather["sunset"],
weather["weather"][0]["icon"],
metadata_id,
],
)
except Exception as e:
remove_metadata(conn, metadata_id)
conn.rollback()
print(e)
raise
conn.commit()
def insert_location(location: dict, conn: sqlite3.Connection, metadata_id: int):
cursor = conn.cursor()
try:
cursor.execute(
"""
INSERT INTO location(city, lon, lat, tz, metadata_id)
VALUES(?, ?, ?, ?, ?)
""",
[
location["locality"],
location["lon"],
location["lat"],
location["tzname"],
metadata_id,
],
)
except Exception as e:
remove_metadata(conn, metadata_id)
conn.rollback()
print(e)
raise
conn.commit()
def fetch_geo(metadata_id: int, create_time_timestamp: int, conn: sqlite3.Connection):
geo_url = f"{Config.owntracks_url}/api/0/locations"
geo_headers = {
"Authorization": f"Basic {base64.b64encode(Config.owntracks_creds).decode()}"
}
geo_response = fetch_data(
geo_url,
geo_headers,
data={
"from": "1970-01-01",
"limit": 20,
"device": Config.geo_device,
"user": Config.geo_user,
},
)
closest_entry = find_closest_entry(
geo_response.get("data", []), create_time_timestamp
)
insert_location(closest_entry, conn, metadata_id)
return closest_entry
def fetch_weather(
metadata_id: int, closest_entry: dict, unixtime: int, conn: sqlite3.Connection
):
weather_response = fetch_data(
f"https://api.openweathermap.org/data/3.0/onecall/timemachine?lat={closest_entry['lat']}&lon={closest_entry['lon']}&dt={unixtime}&appid={Config.openweathermap_api_key}&units=metric",
headers={},
)
insert_weather(weather_response["data"], conn, metadata_id)
def doctor():
args_len = len(sys.argv)
check_diary = False
fix_diary = False
if args_len < 3:
sys.exit("Usage: script.py doctor <diary_name>")
if args_len >= 4 and sys.argv[3] == "check_diary":
check_diary = True
if args_len >= 4 and sys.argv[3] == "check_and_fix_diary":
fix_diary = True
check_diary = True
diary_name = sys.argv[2]
diary_path = get_diary_path_by_name(diary_name).parent
diary_filename = get_diary_path_by_name(diary_name)
# do backup ever!
make_backup(diary_filename, diary_path)
conn = db_connection(diary_path)
initialize_db(conn)
cursor = conn.cursor()
metadata = cursor.execute("SELECT * FROM metadata").fetchall()
for m in metadata:
weather = cursor.execute(
"SELECT * FROM weather WHERE metadata_id = ?", (m[0],)
).fetchall()
location = cursor.execute(
"SELECT * FROM location WHERE metadata_id = ?", (m[0],)
).fetchall()
if not weather:
print(f"There is no weather info about {m[0]} - {m[1]}")
if not location:
print(f"There is no location info about {m[0]} - {m[1]}")
if check_diary:
dt = datetime.fromtimestamp(m[1], tz=timezone(timedelta(hours=TZ)))
diary_datetime = dt.strftime("%Y/%m/%d at %I:%M:%S %p")
# print(diary_datetime)
try:
result = subprocess.run(
["jrnl", diary_name, "-on", diary_datetime],
check=True,
capture_output=True,
text=True,
)
if not result.stdout:
print(
f"There is some metadata that is not associated with a diary entity: {diary_datetime}."
)
if not fix_diary:
print(
"You can automatically remove it by running the scripts with 'check_and_fix_diary' argument."
)
else:
remove_metadata(conn, m[0])
print("The problem was fixed.")
except subprocess.CalledProcessError as e:
print(e)
raise
def make_hash(file: Path):
sha256_hash = hashlib.sha256()
with open(file, "rb") as filed:
sha256_hash.update(filed.read())
return sha256_hash.hexdigest()
def make_backup(diary_name: str, diary_path: Path):
try:
os.remove(diary_path / f"{diary_name}.bak")
except FileNotFoundError:
...
# shutil.copy2(diary_path / diary_name, diary_path / f"{diary_name}.bak") # I can't make backup when the diary is not encrypted...
shutil.copy2(diary_path / DB_NAME, diary_path / f"{DB_NAME}.bak")
def export():
if len(sys.argv) < 4 or sys.argv[1] != "export":
sys.exit("Usage: script.py export <diary_name> <tag>")
diary_name, tag = sys.argv[2], sys.argv[3]
diary_path = get_diary_path_by_name(diary_name).parent
diary_filename = get_diary_path_by_name(diary_name)
# do backup ever!
make_backup(diary_filename, diary_path)
try:
conn = db_connection(diary_path)
initialize_db(conn)
headers = {"Cookie": f"memos.access-token={Config.memo_token}"}
query_string = urllib.parse.urlencode(
{"filter": f"creator=='users/1'&&tag_search==['{tag}']"}
)
data = fetch_data(f"{Config.memo_url}api/v1/memos?{query_string}", headers)
memos = data.get("memos", [])
if not memos:
sys.exit("No memos found.")
if (
input(f"There are {len(memos)} memos. Export them all? (Y/N): ")
.strip()
.upper()
!= "Y"
):
sys.exit("Export canceled.")
for memo in memos:
create_time = memo["createTime"]
content = shlex.quote(memo["content"].replace(f"#{tag}", "").strip())
metadata_id = insert_metadata(conn, make_tz_unixtime(create_time))
closest_entry = fetch_geo(metadata_id, make_tz_unixtime(create_time), conn)
fetch_weather(
metadata_id, closest_entry, make_tz_unixtime(create_time), conn
)
try:
subprocess.run(
f'printf "%s %s" "{convert_diary_date(create_time)}" {content} | jrnl {diary_name}',
shell=True,
capture_output=True,
text=True,
check=True,
)
except subprocess.CalledProcessError as e:
print(f"Error writing to journal: {e.stderr}")
continue
delete_entity(f"{Config.memo_url}/api/v1/{memo['name']}", headers)
except Exception as e:
print(f"An error occurred: {e}")
raise
def insert():
conn = None
try:
if len(sys.argv) < 3 or sys.argv[1] != "insert":
sys.exit(
"Usage: script.py insert <diary_name> [bulk|single (default)] 'content'"
)
# TODO is this really need?
# if len(sys.argv) == 5 and sys.argv[3] != "single":
# sys.exit("Invalid usage for bulk insert.")
diary_name = sys.argv[2]
# do backup ever!
insert_type = (
"bulk" if len(sys.argv) > 3 and sys.argv[3] == "bulk" else "single"
)
diary_path = get_diary_path_by_name(diary_name).parent
diary_filename = get_diary_path_by_name(diary_name)
# do backup ever!
make_backup(diary_filename, diary_path)
conn = db_connection(diary_path)
initialize_db(conn)
# generating and converting current time
datenow = datetime.now(timezone.utc)
datenow_timestamp = datenow.strftime("%Y-%m-%dT%H:%M:%SZ")
metadata_id = insert_metadata(conn, make_tz_unixtime(datenow_timestamp))
# fetching geo-data
closest_entry = fetch_geo(
metadata_id, make_tz_unixtime(datenow_timestamp), conn
)
# fetching weather data
fetch_weather(
metadata_id, closest_entry, make_tz_unixtime(datenow_timestamp), conn
)
if insert_type == "single":
"""
Inserting a string from the terminal
"""
content = shlex.quote(sys.argv[4])
if not content:
print("There is no text")
sys.exit(1)
try:
subprocess.run(
f'printf "%s %s" "{convert_diary_date(datenow_timestamp)}" {content} | jrnl {diary_name}',
shell=True,
capture_output=True,
text=True,
check=True,
)
except subprocess.CalledProcessError as e:
print(f"Error inserting single entry: {e.stderr}")
raise
elif insert_type == "bulk":
"""
Inserting entry from your editor
"""
fd, temp_file_path = tempfile.mkstemp()
os.close(fd)
hash = make_hash(Path(temp_file_path))
subprocess.run(["nvim", temp_file_path], text=True, check=True)
with open(temp_file_path, "r") as file:
content = shlex.quote(file.read())
if hash != make_hash(temp_file_path):
try:
subprocess.run(
f'printf "%s %s" "{convert_diary_date(datenow_timestamp)}" {content} | jrnl {diary_name}',
shell=True,
capture_output=True,
text=True,
check=True,
)
except subprocess.CalledProcessError as e:
print(
f"Error during bulk import: {e.stderr}, file: {temp_file_path}"
)
raise
os.remove(temp_file_path)
except Exception as e:
print(f"An error occurred: {e}")
raise
finally:
if conn:
conn.close()
print("Database connection closed.")
if __name__ == "__main__":
try:
fetch_data(url="https://google.com", rjson=False, log=False)
...
except Exception as e:
print(f"Connection problem: {e}")
raise
else:
if sys.argv[1] == "export":
export()
elif sys.argv[1] == "insert":
insert()
elif sys.argv[1] == "doctor":
doctor()
else:
print("Unknown command")
sys.exit(1)