simplify and start adapting to data provider
This commit is contained in:
parent
78a8bbdc77
commit
c10141411b
1 changed files with 25 additions and 158 deletions
183
rtm/__init__.py
183
rtm/__init__.py
|
@ -1,23 +1,26 @@
|
|||
#!/usr/bin/env python3.6
|
||||
from config import IGNORED, BACKUPS_PATH
|
||||
from config import RTM_API_KEY, RTM_API_TOKEN, RTM_API_SECRET
|
||||
|
||||
from collections import deque
|
||||
#!/usr/bin/env python3
|
||||
# pip3 install icalendar
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from collections import deque
|
||||
from pathlib import Path
|
||||
from sys import argv
|
||||
from typing import Dict, List, Optional, TypeVar
|
||||
|
||||
from kython import *
|
||||
from kython.klogging import LazyLogger
|
||||
from kython import group_by_key
|
||||
from kython import kompress
|
||||
|
||||
import icalendar # type: ignore
|
||||
from icalendar.cal import Todo # type: ignore
|
||||
|
||||
|
||||
def extract_backup_date(s: str):
|
||||
s = s[len(BACKUPS_PATH + "/rtm_"):-4]
|
||||
s = s.replace('_', '-')
|
||||
return parse_date(s, dayfirst=False)
|
||||
logger = LazyLogger('rtm-provider')
|
||||
|
||||
|
||||
def get_last_backup():
|
||||
return max(Path('***REMOVED***').glob('*.ical*'))
|
||||
|
||||
|
||||
# TODO extract in a module to parse RTM's ical?
|
||||
|
@ -37,15 +40,17 @@ class MyTodo:
|
|||
[tags_str] = re.findall(r'\nTags:(.*?)\n', desc, flags=re.DOTALL)
|
||||
self.tags = [t.strip() for t in tags_str.split(',')] # TODO handle none?
|
||||
|
||||
# TODO use caching wrapper
|
||||
# TODO use decorator that stores cache in the object itself?
|
||||
def get_notes(self) -> List[str]:
|
||||
if self.notes is None:
|
||||
self._init_notes()
|
||||
return self.notes
|
||||
return self.notes # type: ignore
|
||||
|
||||
def get_tags(self) -> List[str]:
|
||||
if self.tags is None:
|
||||
self._init_tags()
|
||||
return self.tags
|
||||
return self.tags # type: ignore
|
||||
|
||||
def get_uid(self) -> str:
|
||||
return str(self.todo['UID'])
|
||||
|
@ -55,7 +60,8 @@ class MyTodo:
|
|||
|
||||
def get_status(self) -> str:
|
||||
if 'STATUS' not in self.todo:
|
||||
return None # TODO 'COMPLETED'?
|
||||
return None # type: ignore
|
||||
# TODO 'COMPLETED'?
|
||||
return str(self.todo['STATUS'])
|
||||
|
||||
def get_time(self):
|
||||
|
@ -84,10 +90,10 @@ class RtmBackup:
|
|||
self.revision = revision
|
||||
|
||||
@staticmethod
|
||||
def from_path(path: str) -> 'RtmBackup':
|
||||
with open(path, 'rb') as fo:
|
||||
def from_path(path: Path) -> 'RtmBackup':
|
||||
with kompress.open(path, 'rb') as fo:
|
||||
data = fo.read()
|
||||
revision = extract_backup_date(path)
|
||||
revision = 'TODO FIXME' # extract_backup_date(path)
|
||||
return RtmBackup(data, revision)
|
||||
|
||||
def get_all_todos(self) -> List[MyTodo]:
|
||||
|
@ -104,150 +110,11 @@ class RtmBackup:
|
|||
return group_by_key(todos, lambda todo: todo.get_title())
|
||||
|
||||
|
||||
# TODO move to kython?
|
||||
def group_by_any(items: List[T], key1, key2):
|
||||
kk1 = group_by_key(items, key1)
|
||||
kk2 = group_by_key(items, key2)
|
||||
|
||||
used: Set[T] = set()
|
||||
def bfs(i):
|
||||
queue = deque()
|
||||
res = []
|
||||
def register(i):
|
||||
if i not in used:
|
||||
res.append(i)
|
||||
used.add(i)
|
||||
queue.append(i)
|
||||
register(i)
|
||||
while len(queue) > 0:
|
||||
top = queue.popleft()
|
||||
for ni in kk1[key1(top)]:
|
||||
register(ni)
|
||||
for ni in kk2[key2(top)]:
|
||||
register(ni)
|
||||
return res
|
||||
|
||||
groups = []
|
||||
for i in items:
|
||||
if i not in used:
|
||||
groups.append(bfs(i))
|
||||
return groups
|
||||
|
||||
|
||||
def check_wiped_notes(backups: List[str]):
|
||||
all_todos = []
|
||||
for b in backups:
|
||||
backup = RtmBackup.from_path(b)
|
||||
all_todos.extend(backup.get_all_todos())
|
||||
|
||||
# first, let tasks with same titles or uids be in the same class. (if we rename a task, it retains UID)
|
||||
groups = group_by_any(all_todos, lambda k: k.get_title(), lambda k: k.get_uid())
|
||||
kk_map = {i: g for i, g in enumerate(groups)} # type: Dict[int, List[MyTodo]]
|
||||
|
||||
def has_safe_tag(todos: List[MyTodo]) -> bool:
|
||||
all_tags = set.union(*(set(todo.get_tags()) for todo in todos))
|
||||
return 'z_dn_safe' in all_tags or 'y_see_org' in all_tags
|
||||
|
||||
def has_safe_note(todo: MyTodo) -> bool:
|
||||
notes = todo.get_notes()
|
||||
for note in notes:
|
||||
if 'is_safe' in note:
|
||||
return True
|
||||
return False
|
||||
|
||||
kk_map = {kk: todos for kk, todos in kk_map.items() if not has_safe_tag(todos)}
|
||||
|
||||
def boring(todos: List[MyTodo]) -> bool:
|
||||
if len(todos) <= 1:
|
||||
return True
|
||||
|
||||
counts = [len(todo.get_notes()) for todo in todos]
|
||||
|
||||
if counts.count(counts[0]) == len(counts):
|
||||
return True
|
||||
|
||||
if all(a <= b for a, b in zip(counts, counts[1:])):
|
||||
# increasing
|
||||
return True
|
||||
|
||||
for tags in [todo.get_tags() for todo in todos]:
|
||||
if 'routine' in tags: # TODO FIXME 'safe' tag. If a task has it in ANY of the backups, it is considered safe. tag is better for completed
|
||||
return True
|
||||
|
||||
# good thing about safe note is if it disappears, we'll notice it!
|
||||
if has_safe_note(todos[-1]):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
bads = []
|
||||
for kk, todos in sorted(kk_map.items()):
|
||||
# TODO sorts by 1) competion 2) modified date
|
||||
todos = sorted(todos, key = MyTodo.alala_key)
|
||||
if not boring(todos):
|
||||
for todo in todos:
|
||||
bads.append(todo)
|
||||
|
||||
|
||||
if len(bads) != 0:
|
||||
logging.error('\n'.join(set(t.get_title() for t in bads)))
|
||||
for t in bads:
|
||||
logging.error(f"{t.revision} {t.get_title()} {t.get_uid()} {t.get_notes()}")
|
||||
|
||||
|
||||
def are_suspicious(l: List[MyTodo]) -> bool:
|
||||
if len(l) <= 1: # probably not repeating
|
||||
return False
|
||||
|
||||
all_tags = set.union(*(set(todo.get_tags()) for todo in l))
|
||||
if 'z_ac_safe' in all_tags:
|
||||
return False
|
||||
|
||||
suspicious = 0
|
||||
for c in l:
|
||||
if c.is_completed():
|
||||
suspicious += 1
|
||||
elif 'DUE' not in c.todo:
|
||||
suspicious += 1
|
||||
|
||||
return len(l) == suspicious
|
||||
|
||||
def check_accidentally_completed(path: str):
|
||||
backup = RtmBackup.from_path(path)
|
||||
|
||||
groups = backup.get_todos_by_title()
|
||||
|
||||
susp = []
|
||||
for k, g in sorted(groups.items()):
|
||||
if k in IGNORED:
|
||||
logging.info(k + " is ignored, skipping...")
|
||||
continue
|
||||
|
||||
if are_suspicious(g):
|
||||
susp.append(f'"{k}",')
|
||||
logging.error(k)
|
||||
|
||||
if len(susp) > 0:
|
||||
print("Suspicious:")
|
||||
for s in susp:
|
||||
print(s)
|
||||
else:
|
||||
print("Nothing suspicious!")
|
||||
|
||||
|
||||
def main():
|
||||
backups = sorted([os.path.join(BACKUPS_PATH, p) for p in os.listdir(BACKUPS_PATH) if p.endswith('ical')], key=extract_backup_date)
|
||||
last_backup = backups[-1]
|
||||
|
||||
check_accidentally_completed(last_backup)
|
||||
logging.info("Using " + last_backup + " for checking for accidentally completed notes")
|
||||
|
||||
backups = backups[:-5:5] + backups[-5:] # always include last 5 # TODO FIXME USE ALL?
|
||||
logging.info(f"Using {backups} for checking for wiped notes")
|
||||
|
||||
check_wiped_notes(backups)
|
||||
backup = RtmBackup.from_path(get_last_backup())
|
||||
for t in backup.get_all_todos():
|
||||
print(t)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
setup_logging()
|
||||
main()
|
||||
|
|
Loading…
Add table
Reference in a new issue