768 lines
24 KiB
Python
Executable file
768 lines
24 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import base64
|
|
|
|
# import cProfile
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
|
|
# import pstats
|
|
import re
|
|
import shlex
|
|
import shutil
|
|
import sqlite3
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import urllib.parse
|
|
import urllib.request
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from zoneinfo import ZoneInfo
|
|
|
|
DB_NAME = Path("metadata.db")
|
|
# TZ = 1
|
|
TZ = ZoneInfo("Europe/Warsaw")
|
|
TOOLS = ("jrnl", "sqlite3")
|
|
FILES = (".jpeg", ".jpg", ".png", ".gif")
|
|
FILES_PATH = Path("attachments")
|
|
|
|
for t in TOOLS:
|
|
if not shutil.which(t):
|
|
raise FileNotFoundError(f"There is no existing path for '{t}'")
|
|
|
|
|
|
class Config:
|
|
memo_token = os.getenv("MEMOS_TOKEN")
|
|
memo_url = os.getenv("MEMOS_URL")
|
|
openweathermap_api_key = os.getenv("OPENWEATHER_APIKEY")
|
|
owntracks_creds = os.getenv("OWNTRACKS_CREDS", "").encode()
|
|
owntracks_url = os.getenv("OWNTRACKS_URL")
|
|
geo_user, geo_device = os.getenv("OWNTRACKS_PARAMS", ",").split(",")
|
|
|
|
@classmethod
|
|
def validate(cls):
|
|
if not cls.memo_token or not cls.memo_url:
|
|
sys.exit("Missing MEMOS_TOKEN or MEMOS_URL environment variables.")
|
|
elif not cls.openweathermap_api_key:
|
|
sys.exit("Missing openweather api key")
|
|
elif ( # TODO need more complex checking
|
|
not cls.owntracks_creds
|
|
or not cls.owntracks_url
|
|
or not cls.geo_user
|
|
or not cls.geo_device
|
|
):
|
|
sys.exit("Missing OwnTracks data")
|
|
|
|
|
|
Config.validate()
|
|
|
|
|
|
def daytime(dt: datetime):
|
|
hour = int(dt.strftime("%I"))
|
|
|
|
am_pm = dt.strftime("%p") # "AM"/"PM"
|
|
|
|
part_of_day = "none"
|
|
|
|
if am_pm == "AM":
|
|
if 5 <= hour < 12:
|
|
part_of_day = "morning"
|
|
elif 12 <= hour < 5:
|
|
part_of_day = "night"
|
|
|
|
return part_of_day
|
|
|
|
|
|
def make_logger():
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(levelname)s - %(message)s",
|
|
handlers=[
|
|
logging.FileHandler(Path.home() / ".diary_requests.log", mode="a"),
|
|
],
|
|
)
|
|
return logging.getLogger("urllib_logger")
|
|
|
|
|
|
def get_diary_path_by_name(name: str):
|
|
result = subprocess.run(["jrnl", "--list"], capture_output=True, text=True)
|
|
if result.stderr:
|
|
sys.exit(f"Error retrieving diary name: {result.stderr}")
|
|
|
|
matches = dict(re.findall(r"\*\s+(\w+)\s+->\s+(.+)", result.stdout.strip()))
|
|
diary_path = matches.get(name)
|
|
|
|
if not diary_path or not Path(diary_path).exists():
|
|
sys.exit(f"Diary '{name}' not found or path does not exist.")
|
|
return Path(diary_path)
|
|
|
|
|
|
def make_tz_unixtime(target_time: str):
|
|
return int(
|
|
(
|
|
datetime.strptime(target_time, "%Y-%m-%dT%H:%M:%SZ")
|
|
.replace(tzinfo=ZoneInfo("UTC"))
|
|
.astimezone(TZ)
|
|
).timestamp()
|
|
)
|
|
|
|
|
|
def find_closest_entry(data, target_timestamp: int):
|
|
return min(
|
|
(entry for entry in data if "tst" in entry),
|
|
key=lambda e: abs(target_timestamp - e["tst"]),
|
|
default=None,
|
|
)
|
|
|
|
|
|
def convert_diary_date(date_str, to_str: bool = True):
|
|
try:
|
|
dt = (
|
|
datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ")
|
|
.replace(tzinfo=ZoneInfo("UTC"))
|
|
.astimezone(TZ)
|
|
)
|
|
if to_str:
|
|
return dt.strftime("%d %b %Y at %H:%M:%S:")
|
|
return dt
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def download_file(url, filename, diary_path, headers, metadata_id):
|
|
ext = os.path.splitext(url)[-1]
|
|
filename = f"id{metadata_id}_{int(time.time())}_{filename}"
|
|
if not Path(diary_path / FILES_PATH).exists():
|
|
Path(diary_path / FILES_PATH).mkdir(parents=True, exist_ok=True)
|
|
filepath = Path(diary_path / FILES_PATH / filename)
|
|
if ext not in FILES:
|
|
print(f"File {url} is not supporting")
|
|
if filepath.exists():
|
|
print(f"File {url} exists")
|
|
try:
|
|
request = urllib.request.Request(url, headers=headers)
|
|
with (
|
|
urllib.request.urlopen(request) as response,
|
|
open(filepath, "wb") as out_file,
|
|
):
|
|
out_file.write(response.read())
|
|
except Exception as e:
|
|
sys.exit(str(e))
|
|
|
|
return Path(FILES_PATH / filename)
|
|
|
|
|
|
def fetch_data(url, headers={}, data=None, rjson=True, log=True):
|
|
logit = make_logger()
|
|
method = "POST" if data else "GET"
|
|
encoded_data = urllib.parse.urlencode(data).encode("utf-8") if data else None
|
|
|
|
req = urllib.request.Request(url, headers=headers, data=encoded_data, method=method)
|
|
req.add_header("Content-Type", "application/x-www-form-urlencoded")
|
|
|
|
try:
|
|
with urllib.request.urlopen(req) as response:
|
|
if response.status != 200:
|
|
logit.error(response.read())
|
|
sys.exit(f"HTTP error {response.status}")
|
|
try:
|
|
response_data = response.read().decode("utf-8")
|
|
except UnicodeDecodeError as e:
|
|
logit.error(e)
|
|
response_data = response.read()
|
|
logit.info(response_data) if log else None
|
|
if not rjson:
|
|
return response_data
|
|
return json.loads(response_data)
|
|
except Exception as e:
|
|
logit.error(str(e))
|
|
raise
|
|
|
|
|
|
def delete_entity(url, headers):
|
|
logit = make_logger()
|
|
req = urllib.request.Request(url, headers=headers, method="DELETE")
|
|
try:
|
|
with urllib.request.urlopen(req) as response:
|
|
if response.status != 200:
|
|
logit.error(response.read())
|
|
sys.exit(f"HTTP delete error {response.status}")
|
|
logit.info(response.read().decode("utf-8"))
|
|
except Exception as e:
|
|
logit.error(str(e))
|
|
raise
|
|
|
|
|
|
def db_connection(diary_path: Path):
|
|
conn = sqlite3.connect(diary_path / DB_NAME)
|
|
conn.execute("PRAGMA foreign_keys = ON;")
|
|
return conn
|
|
|
|
|
|
def initialize_db(conn: sqlite3.Connection):
|
|
with conn:
|
|
conn.executescript(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS metadata (
|
|
id INTEGER PRIMARY KEY,
|
|
unixtime INTEGER NOT NULL
|
|
);
|
|
CREATE TABLE IF NOT EXISTS weather (
|
|
id INTEGER PRIMARY KEY,
|
|
temp INTEGER NOT NULL,
|
|
temp_like INTEGER NOT NULL,
|
|
sunrise INTEGER NOT NULL,
|
|
sunset INTEGER NOT NULL,
|
|
icon TEXT NOT NULL DEFAULT 'none',
|
|
metadata_id INTEGER NOT NULL,
|
|
FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE
|
|
);
|
|
CREATE TABLE IF NOT EXISTS attachments (
|
|
id INTEGER PRIMARY KEY,
|
|
filepath TEXT NOT NULL,
|
|
metadata_id INTEGER NOT NULL,
|
|
FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE
|
|
);
|
|
CREATE TABLE IF NOT EXISTS location (
|
|
id INTEGER PRIMARY KEY,
|
|
city TEXT NOT NULL,
|
|
lon TEXT NOT NULL,
|
|
lat TEXT NOT NULL,
|
|
tz TEXT NOT NULL,
|
|
metadata_id INTEGER NOT NULL,
|
|
FOREIGN KEY (metadata_id) REFERENCES metadata (id) ON DELETE CASCADE
|
|
);
|
|
"""
|
|
)
|
|
|
|
|
|
def insert_metadata(
|
|
conn: sqlite3.Connection,
|
|
unixtime: int,
|
|
):
|
|
cursor = conn.cursor()
|
|
cursor.execute("INSERT INTO metadata(unixtime) VALUES(?)", [unixtime])
|
|
conn.commit()
|
|
return cursor.lastrowid
|
|
|
|
|
|
def remove_metadata(conn: sqlite3.Connection, metadata_id: int):
|
|
cursor = conn.cursor()
|
|
cursor.execute("DELETE FROM metadata WHERE id=?", (metadata_id,))
|
|
conn.commit()
|
|
|
|
|
|
def insert_weather(weather: dict, conn: sqlite3.Connection, metadata_id: int):
|
|
if isinstance(weather, list):
|
|
weather = weather[0]
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO weather(temp, temp_like, sunrise, sunset, icon, metadata_id)
|
|
VALUES(?, ?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
weather["temp"],
|
|
weather["feels_like"],
|
|
weather["sunrise"],
|
|
weather["sunset"],
|
|
weather["weather"][0]["icon"],
|
|
metadata_id,
|
|
],
|
|
)
|
|
except Exception as e:
|
|
remove_metadata(conn, metadata_id)
|
|
conn.rollback()
|
|
print(e)
|
|
raise
|
|
conn.commit()
|
|
|
|
|
|
def insert_attach(filepath: str, conn: sqlite3.Connection, metadata_id: int):
|
|
cursor = conn.cursor()
|
|
try:
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO attachments(filepath, metadata_id)
|
|
VALUES(?, ?)
|
|
""",
|
|
[
|
|
filepath,
|
|
metadata_id,
|
|
],
|
|
)
|
|
except Exception as e:
|
|
remove_metadata(conn, metadata_id)
|
|
conn.rollback()
|
|
print(e)
|
|
raise
|
|
conn.commit()
|
|
|
|
|
|
def insert_location(location: dict, conn: sqlite3.Connection, metadata_id: int):
|
|
cursor = conn.cursor()
|
|
try:
|
|
# sometimes it can't get locality....
|
|
locality = location["locality"] if "locality" in location else "-"
|
|
cursor.execute(
|
|
"""
|
|
INSERT INTO location(city, lon, lat, tz, metadata_id)
|
|
VALUES(?, ?, ?, ?, ?)
|
|
""",
|
|
[
|
|
locality,
|
|
location["lon"],
|
|
location["lat"],
|
|
location["tzname"],
|
|
metadata_id,
|
|
],
|
|
)
|
|
except Exception as e:
|
|
remove_metadata(conn, metadata_id)
|
|
conn.rollback()
|
|
print(e)
|
|
raise
|
|
conn.commit()
|
|
|
|
|
|
def fetch_geo(metadata_id: int, create_time_timestamp: int, conn: sqlite3.Connection):
|
|
geo_url = f"{Config.owntracks_url}/api/0/locations"
|
|
geo_headers = {
|
|
"Authorization": f"Basic {base64.b64encode(Config.owntracks_creds).decode()}"
|
|
}
|
|
geo_response = fetch_data(
|
|
geo_url,
|
|
geo_headers,
|
|
data={
|
|
"from": datetime.fromtimestamp(int(time.time()) - 2592000).strftime(
|
|
"%Y-%m-%d"
|
|
), # 30 days
|
|
# "limit": 1000,
|
|
"device": Config.geo_device,
|
|
"user": Config.geo_user,
|
|
},
|
|
)
|
|
closest_entry = find_closest_entry(
|
|
geo_response.get("data", []), create_time_timestamp
|
|
)
|
|
if closest_entry["tst"] + (48 * 60**2) < time.time(): # 2 days
|
|
print("")
|
|
if (
|
|
input("Do you really want to use such old geo data? (Y/N) ").strip().upper()
|
|
!= "Y"
|
|
):
|
|
remove_metadata(conn, metadata_id)
|
|
sys.exit("Operation canceled.")
|
|
insert_location(closest_entry, conn, metadata_id)
|
|
return closest_entry
|
|
|
|
|
|
def fetch_weather(
|
|
metadata_id: int, closest_entry: dict, unixtime: int, conn: sqlite3.Connection
|
|
):
|
|
weather_response = fetch_data(
|
|
f"https://api.openweathermap.org/data/3.0/onecall/timemachine?lat={closest_entry['lat']}&lon={closest_entry['lon']}&dt={unixtime}&appid={Config.openweathermap_api_key}&units=metric",
|
|
headers={},
|
|
)
|
|
insert_weather(weather_response["data"], conn, metadata_id)
|
|
|
|
|
|
def doctor():
|
|
args_len = len(sys.argv)
|
|
check_diary = False
|
|
fix_diary = False
|
|
limit = 50
|
|
if args_len < 3:
|
|
sys.exit("Usage: script.py doctor <diary_name>")
|
|
if args_len >= 4 and sys.argv[3] == "check_diary":
|
|
check_diary = True
|
|
|
|
if args_len >= 5:
|
|
limit = int(sys.argv[4])
|
|
|
|
if args_len >= 4 and sys.argv[3] == "check_and_fix_diary":
|
|
fix_diary = True
|
|
check_diary = True
|
|
|
|
diary_name = sys.argv[2]
|
|
diary_path = get_diary_path_by_name(diary_name).parent
|
|
diary_filename = get_diary_path_by_name(diary_name)
|
|
|
|
# do backup ever!
|
|
make_backup(diary_filename, diary_path)
|
|
|
|
conn = db_connection(diary_path)
|
|
initialize_db(conn)
|
|
|
|
cursor = conn.cursor()
|
|
metadata = cursor.execute(
|
|
f"SELECT * FROM metadata ORDER BY id DESC LIMIT {limit}"
|
|
).fetchall()
|
|
for m in metadata:
|
|
weather = cursor.execute(
|
|
"SELECT * FROM weather WHERE metadata_id = ?", (m[0],)
|
|
).fetchall()
|
|
location = cursor.execute(
|
|
"SELECT * FROM location WHERE metadata_id = ?", (m[0],)
|
|
).fetchall()
|
|
attachment = cursor.execute(
|
|
"SELECT * FROM attachments WHERE metadata_id = ?", (m[0],)
|
|
).fetchall()
|
|
|
|
if not weather:
|
|
print(f"There is no weather info about {m[0]} - {m[1]}")
|
|
if not location:
|
|
print(f"There is no location info about {m[0]} - {m[1]}")
|
|
# I can't have attachments in every entity...
|
|
# if not attachment:
|
|
# print(f"There is no attachment info about {m[0]} - {m[1]}")
|
|
|
|
if not weather and not location and not attachment:
|
|
# delete metadata entry if any of metadata type is not exists
|
|
print("An empty metadata was deleted")
|
|
remove_metadata(conn, m[0])
|
|
|
|
# Close the connection before starting the multithreading part of the script
|
|
conn.close()
|
|
|
|
if check_diary:
|
|
|
|
def check_jrnl(metadata: list):
|
|
dt = datetime.fromtimestamp(metadata[1], tz=TZ)
|
|
diary_datetime = dt.strftime("%Y/%m/%d at %I:%M:%S %p")
|
|
try:
|
|
result = subprocess.run(
|
|
["jrnl", diary_name, "-on", diary_datetime],
|
|
check=True,
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
return diary_datetime, result
|
|
except subprocess.CalledProcessError as e:
|
|
print(e)
|
|
raise
|
|
|
|
def remove_attachment(metadata_id):
|
|
"""Remove file from attachments directory by 'metadata_id' - 'id12_1741177528_IMG_6141.jpeg'"""
|
|
files = list(Path(diary_path / FILES_PATH).glob(f"id{metadata_id}_*"))
|
|
if len(files) < 1:
|
|
return
|
|
|
|
if len(files) > 1:
|
|
if (
|
|
input(f"There are {len(files)} files. Remove them all? (Y/N): ")
|
|
.strip()
|
|
.upper()
|
|
!= "Y"
|
|
):
|
|
return
|
|
|
|
print("There are mistaken attachment, I will delete it!")
|
|
for file in files:
|
|
file.unlink(missing_ok=True)
|
|
|
|
def process_metadata(m):
|
|
conn = db_connection(diary_path)
|
|
diary_datetime, result = check_jrnl(metadata=m)
|
|
|
|
if not result.stdout.strip():
|
|
print(
|
|
f"There is some metadata that is not associated with a diary entity: {diary_datetime}."
|
|
)
|
|
if not fix_diary:
|
|
print(
|
|
"You can automatically remove it by running the scripts with 'check_and_fix_diary' argument."
|
|
)
|
|
else:
|
|
remove_metadata(conn, m[0])
|
|
remove_attachment(m[0]) # remove attachments
|
|
print("The problem was fixed.")
|
|
conn.close()
|
|
|
|
with ThreadPoolExecutor(max_workers=10) as executor:
|
|
futures = {executor.submit(process_metadata, m): m for m in metadata}
|
|
|
|
for future in as_completed(futures):
|
|
try:
|
|
future.result()
|
|
except Exception as e:
|
|
print(f"Thread error: {e}")
|
|
raise
|
|
|
|
|
|
def make_hash(file: Path):
|
|
sha256_hash = hashlib.sha256()
|
|
|
|
with open(file, "rb") as filed:
|
|
sha256_hash.update(filed.read())
|
|
|
|
return sha256_hash.hexdigest()
|
|
|
|
|
|
def make_backup(diary_name: str, diary_path: Path):
|
|
try:
|
|
os.remove(diary_path / f"{diary_name}.bak")
|
|
except FileNotFoundError:
|
|
...
|
|
# shutil.copy2(diary_path / diary_name, diary_path / f"{diary_name}.bak") # I can't make backup when the diary is not encrypted...
|
|
shutil.copy2(diary_path / DB_NAME, diary_path / f"{DB_NAME}.bak")
|
|
|
|
|
|
def export():
|
|
if len(sys.argv) < 4 or sys.argv[1] != "export":
|
|
sys.exit("Usage: script.py export <diary_name> <tag>")
|
|
|
|
diary_name, tag = sys.argv[2], sys.argv[3]
|
|
diary_path = get_diary_path_by_name(diary_name).parent
|
|
diary_filename = get_diary_path_by_name(diary_name)
|
|
|
|
# do backup ever!
|
|
make_backup(diary_filename, diary_path)
|
|
|
|
try:
|
|
conn = db_connection(diary_path)
|
|
initialize_db(conn)
|
|
|
|
headers = {"Cookie": f"memos.access-token={Config.memo_token}"}
|
|
# query_string = urllib.parse.urlencode(
|
|
# {"filter": f"creator=='users/1'&&tag_search==['{tag}']"}
|
|
# )
|
|
query_string = urllib.parse.urlencode({"filter": f"tag in ['{tag}']"})
|
|
data = fetch_data(
|
|
f"{Config.memo_url}api/v1/users/1/memos?{query_string}", headers
|
|
)
|
|
|
|
memos = data.get("memos", [])
|
|
if not memos:
|
|
sys.exit("No memos found.")
|
|
|
|
if (
|
|
input(f"There are {len(memos)} memos. Export them all? (Y/N): ")
|
|
.strip()
|
|
.upper()
|
|
!= "Y"
|
|
):
|
|
sys.exit("Export canceled.")
|
|
|
|
for memo in memos:
|
|
create_time = memo["createTime"]
|
|
content = memo["content"].replace(f"#{tag}", "").strip()
|
|
|
|
metadata_id = insert_metadata(conn, make_tz_unixtime(create_time))
|
|
|
|
# attachments part
|
|
memo_info = fetch_data(f"{Config.memo_url}/api/v1/{memo['name']}", headers)
|
|
# check if there are resources
|
|
if "resources" in memo_info:
|
|
for resource in memo_info["resources"]:
|
|
# download files
|
|
url = f"{Config.memo_url}file/{resource['name']}/{resource['filename']}"
|
|
try:
|
|
filepath = download_file(
|
|
url=url,
|
|
diary_path=diary_path,
|
|
filename=resource["filename"],
|
|
headers=headers,
|
|
metadata_id=metadata_id,
|
|
)
|
|
except Exception:
|
|
remove_metadata(conn, metadata_id)
|
|
raise
|
|
|
|
insert_attach(
|
|
filepath=str(filepath),
|
|
conn=conn,
|
|
metadata_id=metadata_id,
|
|
)
|
|
content += "\n@attach_photo"
|
|
|
|
closest_entry = fetch_geo(metadata_id, make_tz_unixtime(create_time), conn)
|
|
fetch_weather(
|
|
metadata_id, closest_entry, make_tz_unixtime(create_time), conn
|
|
)
|
|
|
|
try:
|
|
diary_datetime = convert_diary_date(create_time)
|
|
daytime_tag = daytime(convert_diary_date(create_time, False))
|
|
|
|
if daytime_tag != "none":
|
|
content += f"\n@{daytime_tag}"
|
|
|
|
content = shlex.quote(content)
|
|
subprocess.run(
|
|
f'printf "%s %b" "{diary_datetime}" {content} | jrnl {diary_name}',
|
|
shell=True,
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
os.system("clear")
|
|
print("Record has been inserted.")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error writing to journal: {e.stderr}")
|
|
raise
|
|
|
|
delete_entity(f"{Config.memo_url}/api/v1/{memo['name']}", headers)
|
|
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}")
|
|
raise
|
|
|
|
|
|
def insert():
|
|
conn = None
|
|
try:
|
|
if len(sys.argv) < 3 or sys.argv[1] != "insert":
|
|
sys.exit(
|
|
"Usage: script.py insert <diary_name> [bulk|single (default)] 'content'"
|
|
)
|
|
|
|
diary_name = sys.argv[2]
|
|
insert_type = (
|
|
"bulk" if len(sys.argv) > 3 and sys.argv[3] == "bulk" else "single"
|
|
)
|
|
diary_path = get_diary_path_by_name(diary_name).parent
|
|
diary_filename = get_diary_path_by_name(diary_name)
|
|
|
|
# do backup ever!
|
|
make_backup(diary_filename, diary_path)
|
|
|
|
conn = db_connection(diary_path)
|
|
initialize_db(conn)
|
|
|
|
# generating and converting current time
|
|
datenow = datetime.now(timezone.utc)
|
|
datenow_timestamp = datenow.strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
metadata_id = insert_metadata(conn, make_tz_unixtime(datenow_timestamp))
|
|
|
|
# fetching geo-data
|
|
closest_entry = fetch_geo(
|
|
metadata_id, make_tz_unixtime(datenow_timestamp), conn
|
|
)
|
|
# fetching weather data
|
|
fetch_weather(
|
|
metadata_id, closest_entry, make_tz_unixtime(datenow_timestamp), conn
|
|
)
|
|
|
|
if insert_type == "single":
|
|
"""
|
|
Inserting a string from the terminal
|
|
"""
|
|
|
|
diary_datetime = convert_diary_date(datenow_timestamp)
|
|
daytime_tag = daytime(convert_diary_date(datenow_timestamp, False))
|
|
|
|
content = sys.argv[4]
|
|
|
|
# inserting daytime tag
|
|
if daytime_tag != "none":
|
|
content += f"\n@{daytime_tag}"
|
|
|
|
content = shlex.quote(content)
|
|
|
|
if not content:
|
|
print("There is no text")
|
|
sys.exit(1)
|
|
try:
|
|
subprocess.run(
|
|
f'printf "%s %s" "{diary_datetime}" {content} | jrnl {diary_name}',
|
|
shell=True,
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
os.system("clear")
|
|
print("Record has been inserted.")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error inserting single entry: {e.stderr}")
|
|
raise
|
|
|
|
elif insert_type == "bulk":
|
|
"""
|
|
Inserting entry from your editor
|
|
"""
|
|
fd, temp_file_path = tempfile.mkstemp()
|
|
os.close(fd)
|
|
|
|
hash = make_hash(Path(temp_file_path))
|
|
|
|
subprocess.run(["nvim", temp_file_path], text=True, check=True)
|
|
with open(temp_file_path, "r") as file:
|
|
diary_datetime = convert_diary_date(datenow_timestamp)
|
|
daytime_tag = daytime(convert_diary_date(datenow_timestamp, False))
|
|
|
|
content = file.read()
|
|
|
|
# inserting daytime tag
|
|
if daytime_tag != "none":
|
|
content += f"\n@{daytime_tag}"
|
|
|
|
content = shlex.quote(file.read())
|
|
|
|
if hash != make_hash(temp_file_path):
|
|
try:
|
|
subprocess.run(
|
|
f'printf "%s %s" "{diary_datetime}" {content} | jrnl {diary_name}',
|
|
shell=True,
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
os.system("clear")
|
|
print("Record has been inserted.")
|
|
except subprocess.CalledProcessError as e:
|
|
print(
|
|
f"Error during bulk import: {e.stderr}, file: {temp_file_path}"
|
|
)
|
|
raise
|
|
|
|
os.remove(temp_file_path)
|
|
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}")
|
|
raise
|
|
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
print("Database connection closed.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
# I want to know if I'm connected...
|
|
fetch_data(url="https://google.com", rjson=False, log=False)
|
|
...
|
|
except Exception as e:
|
|
print(f"Connection problem: {e}")
|
|
raise
|
|
|
|
try:
|
|
# I want to know if diary is encrypted...
|
|
subprocess.run(
|
|
["jrnl", "--decrypt"],
|
|
input=b"", # Пустой ввод, чтобы не ввести пароль
|
|
)
|
|
except Exception as e:
|
|
print(str(e))
|
|
raise
|
|
|
|
else:
|
|
if sys.argv[1] == "export":
|
|
export()
|
|
elif sys.argv[1] == "insert":
|
|
insert()
|
|
elif sys.argv[1] == "doctor":
|
|
# cProfile.run("doctor()", "output.prof")
|
|
# stats = pstats.Stats("output.prof")
|
|
# stats.strip_dirs().sort_stats("cumulative").print_stats(10)
|
|
doctor()
|
|
else:
|
|
print("Unknown command")
|
|
sys.exit(1)
|