Merge branch 'master' into location-fallback

This commit is contained in:
seanbreckenridge 2023-02-22 23:02:37 -08:00 committed by GitHub
commit f05e81cee5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
36 changed files with 626 additions and 232 deletions

View file

@ -50,12 +50,12 @@ jobs:
- run: bash scripts/ci/run - run: bash scripts/ci/run
- if: matrix.platform == 'ubuntu-latest' # no need to compute coverage for other platforms - if: matrix.platform == 'ubuntu-latest' # no need to compute coverage for other platforms
uses: actions/upload-artifact@v2 uses: actions/upload-artifact@v3
with: with:
name: .coverage.mypy-misc_${{ matrix.platform }}_${{ matrix.python-version }} name: .coverage.mypy-misc_${{ matrix.platform }}_${{ matrix.python-version }}
path: .coverage.mypy-misc/ path: .coverage.mypy-misc/
- if: matrix.platform == 'ubuntu-latest' # no need to compute coverage for other platforms - if: matrix.platform == 'ubuntu-latest' # no need to compute coverage for other platforms
uses: actions/upload-artifact@v2 uses: actions/upload-artifact@v3
with: with:
name: .coverage.mypy-core_${{ matrix.platform }}_${{ matrix.python-version }} name: .coverage.mypy-core_${{ matrix.platform }}_${{ matrix.python-version }}
path: .coverage.mypy-core/ path: .coverage.mypy-core/

View file

@ -13,7 +13,7 @@ import pandas as pd # type: ignore
import orgparse import orgparse
from my.config import blood as config from my.config import blood as config # type: ignore[attr-defined]
class Entry(NamedTuple): class Entry(NamedTuple):

View file

@ -10,7 +10,7 @@ from ..core.error import Res, set_error_datetime, extract_error_datetime
from .. import orgmode from .. import orgmode
from my.config import weight as config from my.config import weight as config # type: ignore[attr-defined]
log = LazyLogger('my.body.weight') log = LazyLogger('my.body.weight')

View file

@ -4,4 +4,4 @@ warnings.high('my.books.kobo is deprecated! Please use my.kobo instead!')
from ..core.util import __NOT_HPI_MODULE__ from ..core.util import __NOT_HPI_MODULE__
from ..kobo import * from ..kobo import * # type: ignore[no-redef]

View file

@ -1,11 +1,13 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from my.config import codeforces as config from my.config import codeforces as config # type: ignore[attr-defined]
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import NamedTuple from typing import NamedTuple
import json import json
from typing import Dict, Iterator from typing import Dict, Iterator
from ..core import get_files, Res, unwrap from ..core import get_files, Res, unwrap
from ..core.compat import cached_property from ..core.compat import cached_property
from ..core.konsume import ignore, wrap from ..core.konsume import ignore, wrap

View file

@ -1,11 +1,13 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from my.config import topcoder as config from my.config import topcoder as config # type: ignore[attr-defined]
from datetime import datetime from datetime import datetime
from typing import NamedTuple from typing import NamedTuple
import json import json
from typing import Dict, Iterator from typing import Dict, Iterator
from ..core import get_files, Res, unwrap, Json from ..core import get_files, Res, unwrap, Json
from ..core.compat import cached_property from ..core.compat import cached_property
from ..core.error import Res, unwrap from ..core.error import Res, unwrap

View file

@ -14,8 +14,14 @@ from my.core import init
### ###
from datetime import tzinfo
from pathlib import Path
from typing import List
from my.core import Paths, PathIsh from my.core import Paths, PathIsh
class hypothesis: class hypothesis:
# expects outputs from https://github.com/karlicoss/hypexport # expects outputs from https://github.com/karlicoss/hypexport
# (it's just the standard Hypothes.is export format) # (it's just the standard Hypothes.is export format)
@ -141,9 +147,14 @@ class hackernews:
export_path: Paths export_path: Paths
class materialistic:
export_path: Paths
class fbmessenger: class fbmessenger:
class fbmessengerexport: class fbmessengerexport:
export_db: PathIsh export_db: PathIsh
facebook_id: Optional[str]
class android: class android:
export_path: Paths export_path: Paths
@ -156,8 +167,87 @@ class twitter:
class talon: class talon:
export_path: Paths export_path: Paths
class twint:
export_path: Paths
class browser: class browser:
class export: class export:
export_path: Paths = '' export_path: Paths = ''
class active_browser: class active_browser:
export_path: Paths = '' export_path: Paths = ''
class telegram:
class telegram_backup:
export_path: PathIsh = ''
class demo:
data_path: Paths
username: str
timezone: tzinfo
class simple:
count: int
class vk_messages_backup:
storage_path: Path
class kobo:
export_path: Paths
class feedly:
export_path: Paths
class feedbin:
export_path: Paths
class taplog:
export_path: Paths
class lastfm:
export_path: Paths
class rescuetime:
export_path: Paths
class runnerup:
export_path: Paths
class emfit:
export_path: Path
timezone: tzinfo
excluded_sids: List[str]
class foursquare:
export_path: Paths
class rtm:
export_path: Paths
class imdb:
export_path: Paths
class roamresearch:
export_path: Paths
username: str

View file

@ -344,8 +344,8 @@ def _requires(modules: Sequence[str]) -> Sequence[str]:
reqs = mod.requires reqs = mod.requires
if reqs is None: if reqs is None:
error(f"Module {mod.name} has no REQUIRES specification") warning(f"Module {mod.name} has no REQUIRES specification")
sys.exit(1) continue
for r in reqs: for r in reqs:
if r not in res: if r not in res:
res.append(r) res.append(r)
@ -369,6 +369,10 @@ def module_install(*, user: bool, module: Sequence[str], parallel: bool=False) -
requirements = _requires(module) requirements = _requires(module)
if len(requirements) == 0:
warning('requirements list is empty, no need to install anything')
return
pre_cmd = [ pre_cmd = [
sys.executable, '-m', 'pip', sys.executable, '-m', 'pip',
'install', 'install',

View file

@ -28,7 +28,7 @@ F = TypeVar('F')
from contextlib import contextmanager from contextlib import contextmanager
from typing import Iterator from typing import Iterator
@contextmanager @contextmanager
def override_config(config: F) -> Iterator[F]: def _override_config(config: F) -> Iterator[F]:
''' '''
Temporary override for config's parameters, useful for testing/fake data/etc. Temporary override for config's parameters, useful for testing/fake data/etc.
''' '''
@ -44,12 +44,53 @@ def override_config(config: F) -> Iterator[F]:
delattr(config, k) delattr(config, k)
# helper for tests? not sure if could be useful elsewhere import importlib
import sys
from typing import Optional, Set
ModuleRegex = str
@contextmanager @contextmanager
def tmp_config(): def _reload_modules(modules: ModuleRegex) -> Iterator[None]:
import my.config as C def loaded_modules() -> Set[str]:
with override_config(C): return {name for name in sys.modules if re.fullmatch(modules, name)}
yield C # todo not sure?
modules_before = loaded_modules()
for m in modules_before:
importlib.reload(sys.modules[m])
try:
yield
finally:
modules_after = loaded_modules()
for m in modules_after:
if m in modules_before:
# was previously loaded, so need to reload to pick up old config
importlib.reload(sys.modules[m])
else:
# wasn't previously loaded, so need to unload it
# otherwise it might fail due to missing config etc
sys.modules.pop(m, None)
from contextlib import ExitStack
import re
@contextmanager
def tmp_config(*, modules: Optional[ModuleRegex]=None, config=None):
if modules is None:
assert config is None
if modules is not None:
assert config is not None
import my.config
with ExitStack() as module_reload_stack, _override_config(my.config) as new_config:
if config is not None:
overrides = {k: v for k, v in vars(config).items() if not k.startswith('__')}
for k, v in overrides.items():
setattr(new_config, k, v)
if modules is not None:
module_reload_stack.enter_context(_reload_modules(modules))
yield new_config
def test_tmp_config() -> None: def test_tmp_config() -> None:
@ -63,3 +104,8 @@ def test_tmp_config() -> None:
# todo hmm. not sure what should do about new properties?? # todo hmm. not sure what should do about new properties??
assert not hasattr(c, 'extra') assert not hasattr(c, 'extra')
assert c.google != 'whatever' assert c.google != 'whatever'
###
# todo properly deprecate, this isn't really meant for public use
override_config = _override_config

View file

@ -123,8 +123,8 @@ from contextlib import contextmanager as ctx
@ctx @ctx
def _reset_config() -> Iterator[Config]: def _reset_config() -> Iterator[Config]:
# todo maybe have this decorator for the whole of my.config? # todo maybe have this decorator for the whole of my.config?
from .cfg import override_config from .cfg import _override_config
with override_config(config) as cc: with _override_config(config) as cc:
cc.enabled_modules = None cc.enabled_modules = None
cc.disabled_modules = None cc.disabled_modules = None
cc.cache_dir = None cc.cache_dir = None

View file

@ -1,17 +1,19 @@
from .common import assert_subpackage; assert_subpackage(__name__) from .common import assert_subpackage; assert_subpackage(__name__)
from contextlib import contextmanager
from pathlib import Path from pathlib import Path
import shutil import shutil
import sqlite3 import sqlite3
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
from typing import Tuple, Any, Iterator, Callable, Optional, Union
from .common import PathIsh from .common import PathIsh, assert_never
from .compat import Literal
def sqlite_connect_immutable(db: PathIsh) -> sqlite3.Connection: def sqlite_connect_immutable(db: PathIsh) -> sqlite3.Connection:
# https://www.sqlite.org/draft/uri.html#uriimmutable
return sqlite3.connect(f'file:{db}?immutable=1', uri=True) return sqlite3.connect(f'file:{db}?immutable=1', uri=True)
@ -30,6 +32,44 @@ def test_sqlite_connect_immutable(tmp_path: Path) -> None:
conn.execute('DROP TABLE testtable') conn.execute('DROP TABLE testtable')
SqliteRowFactory = Callable[[sqlite3.Cursor, sqlite3.Row], Any]
def dict_factory(cursor, row):
fields = [column[0] for column in cursor.description]
return {key: value for key, value in zip(fields, row)}
Factory = Union[SqliteRowFactory, Literal['row', 'dict']]
@contextmanager
def sqlite_connection(db: PathIsh, *, immutable: bool=False, row_factory: Optional[Factory]=None) -> Iterator[sqlite3.Connection]:
dbp = f'file:{db}'
# https://www.sqlite.org/draft/uri.html#uriimmutable
if immutable:
# assert results in nicer error than sqlite3.OperationalError
assert Path(db).exists(), db
dbp = f'{dbp}?immutable=1'
row_factory_: Any = None
if row_factory is not None:
if callable(row_factory):
row_factory_ = row_factory
elif row_factory == 'row':
row_factory_ = sqlite3.Row
elif row_factory == 'dict':
row_factory_ = dict_factory
else:
assert_never()
conn = sqlite3.connect(dbp, uri=True)
try:
conn.row_factory = row_factory_
with conn:
yield conn
finally:
# Connection context manager isn't actually closing the connection, only keeps transaction
conn.close()
# TODO come up with a better name? # TODO come up with a better name?
# NOTE: this is tested by tests/sqlite.py::test_sqlite_read_with_wal # NOTE: this is tested by tests/sqlite.py::test_sqlite_read_with_wal
def sqlite_copy_and_open(db: PathIsh) -> sqlite3.Connection: def sqlite_copy_and_open(db: PathIsh) -> sqlite3.Connection:
@ -52,8 +92,6 @@ def sqlite_copy_and_open(db: PathIsh) -> sqlite3.Connection:
return dest return dest
from typing import Tuple, Any, Iterator
# NOTE hmm, so this kinda works # NOTE hmm, so this kinda works
# V = TypeVar('V', bound=Tuple[Any, ...]) # V = TypeVar('V', bound=Tuple[Any, ...])
# def select(cols: V, rest: str, *, db: sqlite3.Connetion) -> Iterator[V]: # def select(cols: V, rest: str, *, db: sqlite3.Connetion) -> Iterator[V]:

View file

@ -3,6 +3,11 @@
Consumes data exported by https://github.com/karlicoss/emfitexport Consumes data exported by https://github.com/karlicoss/emfitexport
""" """
REQUIRES = [
'git+https://github.com/karlicoss/emfitexport',
]
from pathlib import Path from pathlib import Path
from typing import Dict, List, Iterable, Any, Optional from typing import Dict, List, Iterable, Any, Optional
@ -140,16 +145,20 @@ def stats() -> Stats:
from contextlib import contextmanager from contextlib import contextmanager
from typing import Iterator from typing import Iterator
@contextmanager @contextmanager
def fake_data(nights: int=500) -> Iterator[None]: def fake_data(nights: int=500) -> Iterator:
from ..core.cfg import override_config from my.core.cfg import tmp_config
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
with override_config(config) as cfg, TemporaryDirectory() as td: with TemporaryDirectory() as td:
tdir = Path(td) tdir = Path(td)
cfg.export_path = tdir
gen = dal.FakeData() gen = dal.FakeData()
gen.fill(tdir, count=nights) gen.fill(tdir, count=nights)
yield
class override:
class emfit:
export_path = tdir
with tmp_config(modules=__name__, config=override) as cfg:
yield cfg
# TODO remove/deprecate it? I think used by timeline # TODO remove/deprecate it? I think used by timeline

View file

@ -87,20 +87,24 @@ def stats() -> Stats:
# TODO make sure it's possible to 'advise' functions and override stuff # TODO make sure it's possible to 'advise' functions and override stuff
from contextlib import contextmanager from contextlib import contextmanager
from typing import Iterator
@contextmanager @contextmanager
def fake_data(count: int=100): def fake_data(count: int=100) -> Iterator:
from .core.cfg import override_config from my.core.cfg import tmp_config
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
import json import json
with override_config(endomondo) as cfg, TemporaryDirectory() as td: with TemporaryDirectory() as td:
tdir = Path(td) tdir = Path(td)
cfg.export_path = tdir
# todo would be nice to somehow expose the generator so it's possible to hack from the outside?
fd = dal.FakeData() fd = dal.FakeData()
data = fd.generate(count=count) data = fd.generate(count=count)
jf = tdir / 'data.json' jf = tdir / 'data.json'
jf.write_text(json.dumps(data)) jf.write_text(json.dumps(data))
yield class override:
class endomondo:
export_path = tdir
with tmp_config(modules=__name__, config=override) as cfg:
# todo would be nice to somehow expose the generator so it's possible to hack from the outside?
yield cfg

View file

@ -3,25 +3,37 @@ Messenger data from Android app database (in =/data/data/com.facebook.orca/datab
""" """
from __future__ import annotations from __future__ import annotations
REQUIRES = ['dataset']
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from typing import Iterator, Sequence, Optional, Dict from pathlib import Path
import sqlite3
from typing import Iterator, Sequence, Optional, Dict, Union, List
from more_itertools import unique_everseen
from my.core import get_files, Paths, datetime_naive, Res, assert_never, LazyLogger, make_config
from my.core.error import echain
from my.core.sqlite import sqlite_connection
from my.config import fbmessenger as user_config from my.config import fbmessenger as user_config
from ..core import Paths logger = LazyLogger(__name__)
@dataclass @dataclass
class config(user_config.android): class Config(user_config.android):
# paths[s]/glob to the exported sqlite databases # paths[s]/glob to the exported sqlite databases
export_path: Paths export_path: Paths
facebook_id: Optional[str] = None
# hmm. this is necessary for default value (= None) to work
# otherwise Config.facebook_id is always None..
config = make_config(Config)
from ..core import get_files
from pathlib import Path
def inputs() -> Sequence[Path]: def inputs() -> Sequence[Path]:
return get_files(config.export_path) return get_files(config.export_path)
@ -35,10 +47,9 @@ class Sender:
@dataclass(unsafe_hash=True) @dataclass(unsafe_hash=True)
class Thread: class Thread:
id: str id: str
name: Optional[str] name: Optional[str] # isn't set for groups or one to one messages
# todo not sure about order of fields... # todo not sure about order of fields...
from ..core import datetime_naive
@dataclass @dataclass
class _BaseMessage: class _BaseMessage:
id: str id: str
@ -63,77 +74,92 @@ class Message(_BaseMessage):
reply_to: Optional[Message] reply_to: Optional[Message]
import json
from typing import Union
from ..core import Res, assert_never
from ..core.dataset import connect_readonly, DatabaseT
Entity = Union[Sender, Thread, _Message] Entity = Union[Sender, Thread, _Message]
def _entities() -> Iterator[Res[Entity]]: def _entities() -> Iterator[Res[Entity]]:
for f in inputs(): dbs = inputs()
with connect_readonly(f) as db: for i, f in enumerate(dbs):
logger.debug(f'processing {f} {i}/{len(dbs)}')
with sqlite_connection(f, immutable=True, row_factory='row') as db:
try:
yield from _process_db(db) yield from _process_db(db)
def _process_db(db: DatabaseT) -> Iterator[Res[Entity]]:
# works both for GROUP:group_id and ONE_TO_ONE:other_user:your_user
threadkey2id = lambda key: key.split(':')[1]
for r in db['threads'].find():
try:
yield Thread(
id=threadkey2id(r['thread_key']),
name=r['name'],
)
except Exception as e: except Exception as e:
yield e yield echain(RuntimeError(f'While processing {f}'), cause=e)
continue
for r in db['messages'].find(order_by='timestamp_ms'):
mtype: int = r['msg_type']
if mtype == -1:
# likely immediately deleted or something? doesn't have any data at all
continue
user_id = None def _normalise_user_id(ukey: str) -> str:
try: # trying to match messages.author from fbchat
# todo could use thread_users?
sj = json.loads(r['sender'])
ukey: str = sj['user_key']
prefix = 'FACEBOOK:' prefix = 'FACEBOOK:'
assert ukey.startswith(prefix), ukey assert ukey.startswith(prefix), ukey
user_id = ukey[len(prefix):] return ukey[len(prefix):]
yield Sender(
id=user_id,
name=sj['name'], def _normalise_thread_id(key) -> str:
# works both for GROUP:group_id and ONE_TO_ONE:other_user:your_user
return key.split(':')[1]
def _process_db(db: sqlite3.Connection) -> Iterator[Res[Entity]]:
senders: Dict[str, Sender] = {}
for r in db.execute('''SELECT * FROM thread_users'''):
# for messaging_actor_type == 'REDUCED_MESSAGING_ACTOR', name is None
# but they are still referenced, so need to keep
name = r['name'] or '<NAME UNAVAILABLE>'
user_key = r['user_key']
s = Sender(
id=_normalise_user_id(user_key),
name=name,
) )
except Exception as e: senders[user_key] = s
yield e yield s
self_id = config.facebook_id
thread_users: Dict[str, List[Sender]] = {}
for r in db.execute('SELECT * from thread_participants'):
thread_key = r['thread_key']
user_key = r['user_key']
if self_id is not None and user_key == f'FACEBOOK:{self_id}':
# exclude yourself, otherwise it's just spammy to show up in all participants
continue continue
thread_id = None ll = thread_users.get(thread_key)
try: if ll is None:
thread_id = threadkey2id(r['thread_key']) ll = []
except Exception as e: thread_users[thread_key] = ll
yield e ll.append(senders[user_key])
continue
try: for r in db.execute('SELECT * FROM threads'):
assert user_id is not None thread_key = r['thread_key']
assert thread_id is not None thread_type = thread_key.split(':')[0]
if thread_type == 'MONTAGE': # no idea what this is?
continue
name = r['name'] # seems that it's only set for some groups
if name is None:
users = thread_users[thread_key]
name = ', '.join([u.name for u in users])
yield Thread(
id=_normalise_thread_id(thread_key),
name=name,
)
for r in db.execute('''
SELECT *, json_extract(sender, "$.user_key") AS user_key FROM messages
WHERE msg_type NOT IN (
-1, /* these don't have any data at all, likely immediately deleted or something? */
2 /* these are 'left group' system messages, also a bit annoying since they might reference nonexistent users */
)
ORDER BY timestamp_ms /* they aren't in order in the database, so need to sort */
'''):
yield _Message( yield _Message(
id=r['msg_id'], id=r['msg_id'],
dt=datetime.fromtimestamp(r['timestamp_ms'] / 1000), dt=datetime.fromtimestamp(r['timestamp_ms'] / 1000),
# is_incoming=False, TODO?? # is_incoming=False, TODO??
text=r['text'], text=r['text'],
thread_id=thread_id, thread_id=_normalise_thread_id(r['thread_key']),
sender_id=user_id, sender_id=_normalise_user_id(r['user_key']),
reply_to_id=r['message_replied_to_id'] reply_to_id=r['message_replied_to_id']
) )
except Exception as e:
yield e
from more_itertools import unique_everseen
def messages() -> Iterator[Res[Message]]: def messages() -> Iterator[Res[Message]]:
senders: Dict[str, Sender] = {} senders: Dict[str, Sender] = {}
msgs: Dict[str, Message] = {} msgs: Dict[str, Message] = {}
@ -150,12 +176,12 @@ def messages() -> Iterator[Res[Message]]:
continue continue
if isinstance(x, _Message): if isinstance(x, _Message):
reply_to_id = x.reply_to_id reply_to_id = x.reply_to_id
try: # hmm, reply_to be missing due to the synthetic nature of export, so have to be defensive
sender = senders[x.sender_id] reply_to = None if reply_to_id is None else msgs.get(reply_to_id)
# hmm, reply_to be missing due to the synthetic nature of export
# also would be interesting to merge together entities rather than resuling messages from different sources.. # also would be interesting to merge together entities rather than resuling messages from different sources..
# then the merging thing could be moved to common? # then the merging thing could be moved to common?
reply_to = None if reply_to_id is None else msgs[reply_to_id] try:
sender = senders[x.sender_id]
thread = threads[x.thread_id] thread = threads[x.thread_id]
except Exception as e: except Exception as e:
yield e yield e

View file

@ -7,10 +7,13 @@ REQUIRES = [
'git+https://github.com/karlicoss/fbmessengerexport', 'git+https://github.com/karlicoss/fbmessengerexport',
] ]
from contextlib import ExitStack, contextmanager
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Iterator from typing import Iterator
from my.core import PathIsh, Res, stat, Stats
from my.core.warnings import high
from my.config import fbmessenger as user_config from my.config import fbmessenger as user_config
import fbmessengerexport.dal as messenger import fbmessengerexport.dal as messenger
@ -22,7 +25,6 @@ _new_section = getattr(user_config, 'fbmessengerexport', None)
_old_attr = getattr(user_config, 'export_db', None) _old_attr = getattr(user_config, 'export_db', None)
if _new_section is None and _old_attr is not None: if _new_section is None and _old_attr is not None:
from my.core.warnings import high
high("""DEPRECATED! Please modify your fbmessenger config to look like: high("""DEPRECATED! Please modify your fbmessenger config to look like:
class fbmessenger: class fbmessenger:
@ -35,24 +37,26 @@ class fbmessenger:
### ###
from ..core import PathIsh
@dataclass @dataclass
class config(user_config.fbmessengerexport): class config(user_config.fbmessengerexport):
export_db: PathIsh export_db: PathIsh
def _dal() -> messenger.DAL: @contextmanager
return messenger.DAL(config.export_db) def _dal() -> Iterator[messenger.DAL]:
model = messenger.DAL(config.export_db)
with ExitStack() as stack:
if hasattr(model, '__dal__'): # defensive to support legacy fbmessengerexport
stack.enter_context(model)
yield model
from ..core import Res
def messages() -> Iterator[Res[messenger.Message]]: def messages() -> Iterator[Res[messenger.Message]]:
model = _dal() with _dal() as model:
for t in model.iter_threads(): for t in model.iter_threads():
yield from t.iter_messages() yield from t.iter_messages()
from ..core import stat, Stats
def stats() -> Stats: def stats() -> Stats:
return stat(messages) return stat(messages)
@ -75,11 +79,9 @@ def dump_chat_history(where: PathIsh) -> None:
p = Path(where) p = Path(where)
assert not p.exists() or p.is_dir() assert not p.exists() or p.is_dir()
model = _dal()
from shutil import rmtree from shutil import rmtree
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
with TemporaryDirectory() as tdir: with TemporaryDirectory() as tdir, _dal() as model:
td = Path(tdir) td = Path(tdir)
_dump_helper(model, td) _dump_helper(model, td)

View file

@ -5,13 +5,15 @@ from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from typing import Iterator, Sequence, Optional, Dict from pathlib import Path
from typing import Iterator, Sequence, Optional
from my.core import get_files, Paths, Res
from my.core.sqlite import sqlite_connection
from my.config import hackernews as user_config from my.config import hackernews as user_config
from ..core import Paths
@dataclass @dataclass
class config(user_config.dogsheep): class config(user_config.dogsheep):
# paths[s]/glob to the dogsheep database # paths[s]/glob to the dogsheep database
@ -20,8 +22,6 @@ class config(user_config.dogsheep):
# todo so much boilerplate... really need some common wildcard imports?... # todo so much boilerplate... really need some common wildcard imports?...
# at least for stuff which realistically is used in each module like get_files/Sequence/Paths/dataclass/Iterator/Optional # at least for stuff which realistically is used in each module like get_files/Sequence/Paths/dataclass/Iterator/Optional
from ..core import get_files
from pathlib import Path
def inputs() -> Sequence[Path]: def inputs() -> Sequence[Path]:
return get_files(config.export_path) return get_files(config.export_path)
@ -44,15 +44,15 @@ class Item:
@property @property
def permalink(self) -> str: def permalink(self) -> str:
return hackernews_link(self.id) return hackernews_link(self.id)
# TODO hmm kinda annoying that permalink isn't getting serialized
# maybe won't be such a big problem if we used hpi query directly on objects, without jsons?
# so we could just take .permalink thing
from ..core.error import Res
from ..core.dataset import connect_readonly
def items() -> Iterator[Res[Item]]: def items() -> Iterator[Res[Item]]:
f = max(inputs()) f = max(inputs())
with connect_readonly(f) as db: with sqlite_connection(f, immutable=True, row_factory='row') as conn:
items = db['items'] for r in conn.execute('SELECT * FROM items ORDER BY time'):
for r in items.all(order_by='time'):
yield Item( yield Item(
id=r['id'], id=r['id'],
type=r['type'], type=r['type'],

View file

@ -1,20 +1,17 @@
""" """
[[https://play.google.com/store/apps/details?id=io.github.hidroh.materialistic][Materialistic]] app for Hackernews [[https://play.google.com/store/apps/details?id=io.github.hidroh.materialistic][Materialistic]] app for Hackernews
""" """
from datetime import datetime, timezone
REQUIRES = ['dataset'] from pathlib import Path
from datetime import datetime
from typing import Any, Dict, Iterator, NamedTuple, Sequence from typing import Any, Dict, Iterator, NamedTuple, Sequence
import pytz from my.core import get_files
from my.core.sqlite import sqlite_connection
from my.config import materialistic as config from my.config import materialistic as config
# todo migrate config to my.hackernews.materialistic # todo migrate config to my.hackernews.materialistic
from ..core import get_files
from pathlib import Path
def inputs() -> Sequence[Path]: def inputs() -> Sequence[Path]:
return get_files(config.export_path) return get_files(config.export_path)
@ -28,7 +25,7 @@ class Saved(NamedTuple):
@property @property
def when(self) -> datetime: def when(self) -> datetime:
ts = int(self.row['time']) / 1000 ts = int(self.row['time']) / 1000
return datetime.fromtimestamp(ts, tz=pytz.utc) return datetime.fromtimestamp(ts, tz=timezone.utc)
@property @property
def uid(self) -> str: def uid(self) -> str:
@ -47,13 +44,11 @@ class Saved(NamedTuple):
return hackernews_link(self.uid) return hackernews_link(self.uid)
from ..core.dataset import connect_readonly
def raw() -> Iterator[Row]: def raw() -> Iterator[Row]:
last = max(inputs()) last = max(inputs())
with connect_readonly(last) as db: with sqlite_connection(last, immutable=True, row_factory='dict') as conn:
saved = db['saved'] yield from conn.execute('SELECT * FROM saved ORDER BY time')
# TODO wonder if it's 'save time' or creation time? # TODO wonder if it's 'save time' or creation time?
yield from saved.all(order_by='time')
def saves() -> Iterator[Saved]: def saves() -> Iterator[Saved]:

View file

@ -119,15 +119,17 @@ def _entities() -> Iterator[Res[Union[User, _Message]]]:
# todo use TypedDict? # todo use TypedDict?
for f in inputs(): for f in inputs():
with sqlite_connect_immutable(f) as db: with sqlite_connect_immutable(f) as db:
for (self_uid, thread_json) in select(('user_id', 'thread_info'), 'FROM threads', db=db): for (self_uid, thread_json) in select(('user_id', 'thread_info'), 'FROM threads', db=db):
j = json.loads(thread_json) j = json.loads(thread_json)
# todo in principle should leave the thread attached to the message? # todo in principle should leave the thread attached to the message?
# since thread is a group of users? # since thread is a group of users?
# inviter usually contains our own user # inviter usually contains our own user
for r in [j['inviter'], *j['recipients']]: for r in [j['inviter'], *j['recipients']]:
# id disappeared and seems that pk_id is in use now (around december 2022)
uid = r.get('id') or r.get('pk_id')
assert uid is not None
yield User( yield User(
id=str(r['id']), # for some reason it's int in the db id=str(uid), # for some reason it's int in the db
full_name=r['full_name'], full_name=r['full_name'],
username=r['username'], username=r['username'],
) )

View file

@ -10,7 +10,7 @@ from ..core.common import LazyLogger
logger = LazyLogger(__name__) logger = LazyLogger(__name__)
from my.config import jawbone as config from my.config import jawbone as config # type: ignore[attr-defined]
BDIR = config.export_dir BDIR = config.export_dir

View file

@ -85,7 +85,7 @@ def iter_useful(data_file: str):
# TODO <<< hmm. these files do contain deep and light sleep?? # TODO <<< hmm. these files do contain deep and light sleep??
# also steps stats?? # also steps stats??
from my.config import jawbone as config from my.config import jawbone as config # type: ignore[attr-defined]
p = config.export_dir / 'old_csv' p = config.export_dir / 'old_csv'
# TODO with_my? # TODO with_my?
@ -95,7 +95,7 @@ files = [
p / "2017.csv", p / "2017.csv",
] ]
from kython import concat, parse_date from kython import concat, parse_date # type: ignore
useful = concat(*(list(iter_useful(str(f))) for f in files)) useful = concat(*(list(iter_useful(str(f))) for f in files))
# for u in useful: # for u in useful:
@ -108,7 +108,7 @@ dates = [parse_date(u.date, yearfirst=True, dayfirst=False) for u in useful]
# TODO filter outliers? # TODO filter outliers?
# TODO don't need this anymore? it's gonna be in dashboards package # TODO don't need this anymore? it's gonna be in dashboards package
from kython.plotting import plot_timestamped from kython.plotting import plot_timestamped # type: ignore
for attr, lims, mavg, fig in [ # type: ignore for attr, lims, mavg, fig in [ # type: ignore
('light', (0, 400), 5, None), ('light', (0, 400), 5, None),
('deep', (0, 600), 5, None), ('deep', (0, 600), 5, None),

View file

@ -19,7 +19,7 @@ from ..core.common import LazyLogger, mcachew, fastermime
from ..core.error import Res, sort_res_by from ..core.error import Res, sort_res_by
from ..core.cachew import cache_dir from ..core.cachew import cache_dir
from my.config import photos as config from my.config import photos as config # type: ignore[attr-defined]
logger = LazyLogger(__name__) logger = LazyLogger(__name__)

View file

@ -58,22 +58,27 @@ def stats() -> Stats:
# basically, hack config and populate it with fake data? fake data generated by DAL, but the rest is handled by this? # basically, hack config and populate it with fake data? fake data generated by DAL, but the rest is handled by this?
from typing import Iterator
from contextlib import contextmanager from contextlib import contextmanager
from typing import Iterator
# todo take seed, or what? # todo take seed, or what?
@contextmanager @contextmanager
def fake_data(rows: int=1000) -> Iterator[None]: def fake_data(rows: int=1000) -> Iterator:
# todo also disable cachew automatically for such things? # todo also disable cachew automatically for such things?
from .core.cachew import disabled_cachew from my.core.cfg import tmp_config
from .core.cfg import override_config from my.core.cachew import disabled_cachew
from tempfile import TemporaryDirectory from tempfile import TemporaryDirectory
with disabled_cachew(), override_config(config) as cfg, TemporaryDirectory() as td:
tdir = Path(td)
cfg.export_path = tdir
f = tdir / 'rescuetime.json'
import json import json
with disabled_cachew(), TemporaryDirectory() as td:
tdir = Path(td)
f = tdir / 'rescuetime.json'
f.write_text(json.dumps(dal.fake_data_generator(rows=rows))) f.write_text(json.dumps(dal.fake_data_generator(rows=rows)))
yield
class override:
class rescuetime:
export_path = tdir
with tmp_config(modules=__name__, config=override) as cfg:
yield cfg
# TODO ok, now it's something that actually could run on CI! # TODO ok, now it's something that actually could run on CI!
# todo would be kinda nice if doctor could run against the fake data, to have a basic health check of the module? # todo would be kinda nice if doctor could run against the fake data, to have a basic health check of the module?

View file

@ -13,7 +13,7 @@ from typing import Iterable
from .core import Res, get_files from .core import Res, get_files
from .core.common import isoparse, Json from .core.common import isoparse, Json
import tcxparser import tcxparser # type: ignore[import]
from my.config import runnerup as config from my.config import runnerup as config

21
my/simple.py Normal file
View file

@ -0,0 +1,21 @@
'''
Just a demo module for testing and documentation purposes
'''
from dataclasses import dataclass
from typing import Iterator
from my.core import make_config
from my.config import simple as user_config
@dataclass
class simple(user_config):
count: int
config = make_config(simple)
def items() -> Iterator[int]:
yield from range(config.count)

View file

@ -1,11 +1,11 @@
''' '''
[[https://play.google.com/store/apps/details?id=com.waterbear.taglog][Taplog]] app data [[https://play.google.com/store/apps/details?id=com.waterbear.taglog][Taplog]] app data
''' '''
from datetime import datetime from datetime import datetime
from typing import NamedTuple, Dict, Optional, Iterable from typing import NamedTuple, Dict, Optional, Iterable
from .core import get_files from my.core import get_files, stat, Stats
from my.core.sqlite import sqlite_connection
from my.config import taplog as user_config from my.config import taplog as user_config
@ -46,10 +46,9 @@ class Entry(NamedTuple):
def entries() -> Iterable[Entry]: def entries() -> Iterable[Entry]:
last = max(get_files(user_config.export_path)) last = max(get_files(user_config.export_path))
from .core.dataset import connect_readonly with sqlite_connection(last, immutable=True, row_factory='dict') as db:
db = connect_readonly(last)
# todo is it sorted by timestamp? # todo is it sorted by timestamp?
for row in db['Log'].all(): for row in db.execute('SELECT * FROM Log'):
yield Entry(row) yield Entry(row)
@ -60,6 +59,5 @@ def by_button(button: str) -> Iterable[Entry]:
yield e yield e
from .core import stat, Stats
def stats() -> Stats: def stats() -> Stats:
return stat(entries) return stat(entries)

View file

@ -0,0 +1,103 @@
"""
Telegram data via [fabianonline/telegram_backup](https://github.com/fabianonline/telegram_backup) tool
"""
from dataclasses import dataclass
from datetime import datetime, timezone
import sqlite3
from typing import Dict, Iterator, Optional
from my.core import datetime_aware, PathIsh
from my.core.sqlite import sqlite_connection
from my.config import telegram as user_config
@dataclass
class config(user_config.telegram_backup):
# path to the export database.sqlite
export_path: PathIsh
@dataclass
class Chat:
id: str
name: Optional[str]
# not all users have short handle + groups don't have them either?
# TODO hmm some groups have it -- it's just the tool doesn't dump them??
handle: Optional[str]
# not sure if need type?
@dataclass
class User:
id: str
name: Optional[str]
@dataclass
class Message:
# NOTE: message id is NOT unique globally -- only with respect to chat!
id: int
time: datetime_aware
chat: Chat
sender: User
text: str
@property
def permalink(self) -> str:
handle = self.chat.handle
if handle is None:
clink = str(self.chat.id)
else:
# FIXME add c/
clink = f'{handle}'
# NOTE: don't think deep links to messages work for private conversations sadly https://core.telegram.org/api/links#message-links
# NOTE: doesn't look like this works with private groups at all, doesn't even jump into it
return f'https://t.me/{clink}/{self.id}'
Chats = Dict[str, Chat]
def _message_from_row(r: sqlite3.Row, *, chats: Chats) -> Message:
ts = r['time']
time = datetime.fromtimestamp(ts, tz=timezone.utc)
chat = chats[r['source_id']]
sender = chats[r['sender_id']]
return Message(
id=r['message_id'],
time=time,
chat=chat,
sender=User(id=sender.id, name=sender.name),
text=r['text'],
)
def messages() -> Iterator[Message]:
with sqlite_connection(config.export_path, immutable=True, row_factory='row') as db:
chats: Chats = {}
for r in db.execute('SELECT * FROM chats'):
chat = Chat(id=r['id'], name=r['name'], handle=None)
assert chat.id not in chats
chats[chat.id] = chat
for r in db.execute('SELECT * FROM users'):
first = r["first_name"]
last = r["last_name"]
name: Optional[str]
if first is not None and last is not None:
name = f'{first} {last}'
else:
name = first or last
chat = Chat(id=r['id'], name=name, handle=r['username'])
assert chat.id not in chats
chats[chat.id] = chat
# TODO order by? not sure
for r in db.execute('SELECT * FROM messages WHERE message_type NOT IN ("service_message", "empty_message")'):
# seems like the only remaining have message_type = 'message'
yield _message_from_row(r, chats=chats)

View file

@ -3,19 +3,21 @@ Tinder data from Android app database (in =/data/data/com.tinder/databases/tinde
""" """
from __future__ import annotations from __future__ import annotations
REQUIRES = ['dataset']
from collections import defaultdict from collections import defaultdict
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timezone
from itertools import chain from itertools import chain
from pathlib import Path from pathlib import Path
import sqlite3
from typing import Sequence, Iterator, Union, Dict, List, Mapping from typing import Sequence, Iterator, Union, Dict, List, Mapping
from more_itertools import unique_everseen from more_itertools import unique_everseen
from my.core import Paths, get_files, Res, assert_never, stat, Stats, datetime_aware from my.core import Paths, get_files, Res, assert_never, stat, Stats, datetime_aware, LazyLogger
from my.core.dataset import connect_readonly, DatabaseT from my.core.sqlite import sqlite_connection
logger = LazyLogger(__name__)
from my.config import tinder as user_config from my.config import tinder as user_config
@ -39,7 +41,7 @@ class _BaseMatch:
id: str id: str
@dataclass @dataclass(unsafe_hash=True)
class _Match(_BaseMatch): class _Match(_BaseMatch):
person_id: str person_id: str
@ -59,7 +61,7 @@ class _BaseMessage:
text: str text: str
@dataclass @dataclass(unsafe_hash=True)
class _Message(_BaseMessage): class _Message(_BaseMessage):
match_id: str match_id: str
from_id: str from_id: str
@ -73,6 +75,8 @@ class Message(_BaseMessage):
to: Person to: Person
# todo hmm I have a suspicion it might be cumulative?
# although still possible that the user might remove/install app back, so need to keep that in mind
def inputs() -> Sequence[Path]: def inputs() -> Sequence[Path]:
return get_files(config.export_path) return get_files(config.export_path)
@ -82,41 +86,46 @@ Entity = Union[Person, Match, Message]
def _entities() -> Iterator[Res[_Entity]]: def _entities() -> Iterator[Res[_Entity]]:
for db_file in inputs(): dbs = inputs()
with connect_readonly(db_file) as db: for i, db_file in enumerate(dbs):
logger.debug(f'processing {db_file} {i}/{len(dbs)}')
with sqlite_connection(db_file, immutable=True, row_factory='row') as db:
yield from _handle_db(db) yield from _handle_db(db)
def _handle_db(db: DatabaseT) -> Iterator[Res[_Entity]]: def _handle_db(db: sqlite3.Connection) -> Iterator[Res[_Entity]]:
# profile_user_view contains our own user id # profile_user_view contains our own user id
for row in chain(db['profile_user_view'], db['match_person']): for row in chain(
db.execute('SELECT * FROM profile_user_view'),
db.execute('SELECT * FROM match_person'),
):
try: try:
yield _parse_person(row) yield _parse_person(row)
except Exception as e: except Exception as e:
# todo attach error contex? # todo attach error contex?
yield e yield e
for row in db['match']: for row in db.execute('SELECT * FROM match'):
try: try:
yield _parse_match(row) yield _parse_match(row)
except Exception as e: except Exception as e:
yield e yield e
for row in db['message']: for row in db.execute('SELECT * FROM message'):
try: try:
yield _parse_msg(row) yield _parse_msg(row)
except Exception as e: except Exception as e:
yield e yield e
def _parse_person(row) -> Person: def _parse_person(row: sqlite3.Row) -> Person:
return Person( return Person(
id=row['id'], id=row['id'],
name=row['name'], name=row['name'],
) )
def _parse_match(row) -> _Match: def _parse_match(row: sqlite3.Row) -> _Match:
return _Match( return _Match(
id=row['id'], id=row['id'],
person_id=row['person_id'], person_id=row['person_id'],
@ -124,7 +133,7 @@ def _parse_match(row) -> _Match:
) )
def _parse_msg(row) -> _Message: def _parse_msg(row: sqlite3.Row) -> _Message:
# note it also has raw_message_data -- not sure which is best to use.. # note it also has raw_message_data -- not sure which is best to use..
sent = row['sent_date'] sent = row['sent_date']
return _Message( return _Message(

View file

@ -12,7 +12,7 @@ except ImportError as ie:
# must be caused by something else # must be caused by something else
raise ie raise ie
try: try:
from my.config import twitter as user_config # type: ignore[misc] from my.config import twitter as user_config # type: ignore[misc,assignment]
except ImportError: except ImportError:
raise ie # raise the original exception.. must be something else raise ie # raise the original exception.. must be something else
else: else:

View file

@ -4,31 +4,32 @@ Twitter data from Talon app database (in =/data/data/com.klinker.android.twitter
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime from datetime import datetime, timezone
import re import re
from typing import Iterator, Sequence, Optional, Dict import sqlite3
from typing import Iterator, Sequence, Union
import pytz from more_itertools import unique_everseen
from my.core import Paths, Res, datetime_aware, get_files
from my.core.sqlite import sqlite_connection
from .common import TweetId, permalink
from my.config import twitter as user_config from my.config import twitter as user_config
from ..core import Paths, Res, datetime_aware
@dataclass @dataclass
class config(user_config.talon): class config(user_config.talon):
# paths[s]/glob to the exported sqlite databases # paths[s]/glob to the exported sqlite databases
export_path: Paths export_path: Paths
from ..core import get_files
from pathlib import Path from pathlib import Path
def inputs() -> Sequence[Path]: def inputs() -> Sequence[Path]:
return get_files(config.export_path) return get_files(config.export_path)
from .common import TweetId, permalink
@dataclass(unsafe_hash=True) @dataclass(unsafe_hash=True)
class Tweet: class Tweet:
id_str: TweetId id_str: TweetId
@ -51,8 +52,6 @@ class _IsFavorire:
tweet: Tweet tweet: Tweet
from typing import Union
from ..core.dataset import connect_readonly
Entity = Union[_IsTweet, _IsFavorire] Entity = Union[_IsTweet, _IsFavorire]
def _entities() -> Iterator[Res[Entity]]: def _entities() -> Iterator[Res[Entity]]:
for f in inputs(): for f in inputs():
@ -67,35 +66,36 @@ def _process_one(f: Path) -> Iterator[Res[Entity]]:
fname = f.name fname = f.name
handler = handlers.get(fname) handler = handlers.get(fname)
if handler is None: if handler is None:
yield RuntimeError(f"Coulnd't find handler for {fname}") yield RuntimeError(f"Could not find handler for {fname}")
return return
with connect_readonly(f) as db: with sqlite_connection(f, immutable=True, row_factory='row') as db:
yield from handler(db) yield from handler(db)
def _process_user_tweets(db) -> Iterator[Res[Entity]]: def _process_user_tweets(db: sqlite3.Connection) -> Iterator[Res[Entity]]:
# dunno why it's called 'lists' # dunno why it's called 'lists'
for r in db['lists'].all(order_by='time'): for r in db.execute('SELECT * FROM lists ORDER BY time'):
try: try:
yield _IsTweet(_parse_tweet(r)) yield _IsTweet(_parse_tweet(r))
except Exception as e: except Exception as e:
yield e yield e
def _process_favorite_tweets(db) -> Iterator[Res[Entity]]: def _process_favorite_tweets(db: sqlite3.Connection) -> Iterator[Res[Entity]]:
for r in db['favorite_tweets'].all(order_by='time'): for r in db.execute('SELECT * FROM favorite_tweets ORDER BY time'):
try: try:
yield _IsFavorire(_parse_tweet(r)) yield _IsFavorire(_parse_tweet(r))
except Exception as e: except Exception as e:
yield e yield e
def _parse_tweet(row) -> Tweet:
def _parse_tweet(row: sqlite3.Row) -> Tweet:
# ok so looks like it's tz aware.. # ok so looks like it's tz aware..
# https://github.com/klinker24/talon-for-twitter-android/blob/c3b0612717ba3ea93c0cae6d907d7d86d640069e/app/src/main/java/com/klinker/android/twitter_l/data/sq_lite/FavoriteTweetsDataSource.java#L95 # https://github.com/klinker24/talon-for-twitter-android/blob/c3b0612717ba3ea93c0cae6d907d7d86d640069e/app/src/main/java/com/klinker/android/twitter_l/data/sq_lite/FavoriteTweetsDataSource.java#L95
# uses https://docs.oracle.com/javase/7/docs/api/java/util/Date.html#getTime() # uses https://docs.oracle.com/javase/7/docs/api/java/util/Date.html#getTime()
# and it's created here, so looks like it's properly parsed from the api # and it's created here, so looks like it's properly parsed from the api
# https://github.com/Twitter4J/Twitter4J/blob/8376fade8d557896bb9319fb46e39a55b134b166/twitter4j-core/src/internal-json/java/twitter4j/ParseUtil.java#L69-L79 # https://github.com/Twitter4J/Twitter4J/blob/8376fade8d557896bb9319fb46e39a55b134b166/twitter4j-core/src/internal-json/java/twitter4j/ParseUtil.java#L69-L79
created_at = datetime.fromtimestamp(row['time'] / 1000, tz=pytz.utc) created_at = datetime.fromtimestamp(row['time'] / 1000, tz=timezone.utc)
text = row['text'] text = row['text']
# try explanding URLs.. sadly there are no positions in the db # try explanding URLs.. sadly there are no positions in the db
@ -132,7 +132,6 @@ def _parse_tweet(row) -> Tweet:
) )
from more_itertools import unique_everseen
def tweets() -> Iterator[Res[Tweet]]: def tweets() -> Iterator[Res[Tweet]]:
for x in unique_everseen(_entities()): for x in unique_everseen(_entities()):
if isinstance(x, Exception): if isinstance(x, Exception):
@ -140,6 +139,7 @@ def tweets() -> Iterator[Res[Tweet]]:
elif isinstance(x, _IsTweet): elif isinstance(x, _IsTweet):
yield x.tweet yield x.tweet
def likes() -> Iterator[Res[Tweet]]: def likes() -> Iterator[Res[Tweet]]:
for x in unique_everseen(_entities()): for x in unique_everseen(_entities()):
if isinstance(x, Exception): if isinstance(x, Exception):

View file

@ -1,12 +1,16 @@
""" """
Twitter data (tweets and favorites). Uses [[https://github.com/twintproject/twint][Twint]] data export. Twitter data (tweets and favorites). Uses [[https://github.com/twintproject/twint][Twint]] data export.
""" """
REQUIRES = ['dataset']
from ..core.common import Paths
from ..core.error import Res
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import NamedTuple, Iterator, List
from my.core import Paths, Res, get_files, LazyLogger, Json, datetime_aware, stat, Stats
from my.core.cfg import make_config
from my.core.sqlite import sqlite_connection
from my.config import twint as user_config from my.config import twint as user_config
# TODO move to twitter.twint config structure # TODO move to twitter.twint config structure
@ -17,16 +21,9 @@ class twint(user_config):
#### ####
from ..core.cfg import make_config
config = make_config(twint) config = make_config(twint)
from datetime import datetime, timezone
from typing import NamedTuple, Iterator, List
from pathlib import Path
from ..core.common import get_files, LazyLogger, Json, datetime_aware
log = LazyLogger(__name__) log = LazyLogger(__name__)
@ -110,25 +107,19 @@ WHERE {where}
ORDER BY T.created_at ORDER BY T.created_at
''' '''
def _get_db():
from ..core.dataset import connect_readonly
db_path = get_db_path()
return connect_readonly(db_path)
def tweets() -> Iterator[Res[Tweet]]: def tweets() -> Iterator[Res[Tweet]]:
db = _get_db() with sqlite_connection(get_db_path(), immutable=True, row_factory='row') as db:
res = db.query(_QUERY.format(where='F.tweet_id IS NULL')) res = db.execute(_QUERY.format(where='F.tweet_id IS NULL'))
yield from map(Tweet, res) yield from map(Tweet, res)
def likes() -> Iterator[Res[Tweet]]: def likes() -> Iterator[Res[Tweet]]:
db = _get_db() with sqlite_connection(get_db_path(), immutable=True, row_factory='row') as db:
res = db.query(_QUERY.format(where='F.tweet_id IS NOT NULL')) res = db.execute(_QUERY.format(where='F.tweet_id IS NOT NULL'))
yield from map(Tweet, res) yield from map(Tweet, res)
from ..core import stat, Stats
def stats() -> Stats: def stats() -> Stats:
return { return {
**stat(tweets), **stat(tweets),

View file

@ -3,7 +3,8 @@ from datetime import datetime
import json import json
from typing import NamedTuple, Iterable, Sequence, Optional from typing import NamedTuple, Iterable, Sequence, Optional
from my.config import vk as config
from my.config import vk as config # type: ignore[attr-defined]
class Favorite(NamedTuple): class Favorite(NamedTuple):

View file

@ -1,13 +1,26 @@
#!/usr/bin/env python3
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Iterator, Any
from more_itertools import one from more_itertools import one
import pytest # type: ignore import pytest
if TYPE_CHECKING:
from my.bluemaestro import Measurement
else:
Measurement = Any
def ok_measurements() -> Iterator[Measurement]:
from my.bluemaestro import measurements
for m in measurements():
assert not isinstance(m, Exception)
yield m
def test() -> None: def test() -> None:
from my.bluemaestro import measurements res2020 = [m for m in ok_measurements() if '2020' in str(m.dt)]
res2020 = [m for m in measurements() if '2020' in str(m.dt)]
tp = [x for x in res2020 if x.temp == 2.1] tp = [x for x in res2020 if x.temp == 2.1]
assert len(tp) > 0 assert len(tp) > 0
@ -24,8 +37,7 @@ def test() -> None:
def test_old_db() -> None: def test_old_db() -> None:
from my.bluemaestro import measurements res = list(ok_measurements())
res = list(measurements())
r1 = one(x for x in res if x.dt.strftime('%Y%m%d %H:%M:%S') == '20181003 09:07:00') r1 = one(x for x in res if x.dt.strftime('%Y%m%d %H:%M:%S') == '20181003 09:07:00')
r2 = one(x for x in res if x.dt.strftime('%Y%m%d %H:%M:%S') == '20181003 09:19:00') r2 = one(x for x in res if x.dt.strftime('%Y%m%d %H:%M:%S') == '20181003 09:19:00')

View file

@ -4,7 +4,7 @@ from datetime import date, time
# todo private test.. move away # todo private test.. move away
def test_tz() -> None: def test_tz() -> None:
from my.jawbone import sleeps_by_date from my.jawbone import sleeps_by_date # type: ignore[attr-defined]
sleeps = sleeps_by_date() sleeps = sleeps_by_date()
for s in sleeps.values(): for s in sleeps.values():
assert s.sleep_start.tzinfo is not None assert s.sleep_start.tzinfo is not None

View file

@ -23,7 +23,8 @@ def test_with_error(with_config, tmp_path: Path) -> None:
g = root / 'garbage.pdf' g = root / 'garbage.pdf'
g.write_text('garbage') g.write_text('garbage')
from my.config import pdfs from my.config import pdfs
del pdfs.roots # meh. otherwise legacy config value 'wins' # meh. otherwise legacy config value 'wins'
del pdfs.roots # type: ignore[attr-defined]
pdfs.paths = (root,) pdfs.paths = (root,)
from my.pdfs import annotations from my.pdfs import annotations

View file

@ -13,7 +13,7 @@ from more_itertools import ilen
def test_location_perf() -> None: def test_location_perf() -> None:
# 2.80 s for 10 iterations and 10K points # 2.80 s for 10 iterations and 10K points
# TODO try switching to jq and see how it goes? not sure.. # TODO try switching to jq and see how it goes? not sure..
print(ilen(islice(LT.iter_locations(), 0, 10000))) print(ilen(islice(LT.iter_locations(), 0, 10000))) # type: ignore
# in theory should support any HTML takeout file? # in theory should support any HTML takeout file?

33
tests/test_tmp_config.py Normal file
View file

@ -0,0 +1,33 @@
from pathlib import Path
import tempfile
from my.core.cfg import tmp_config
import pytest
def _init_default_config() -> None:
import my.config
class default_config:
count = 5
my.config.simple = default_config # type: ignore[attr-defined,assignment,misc]
def test_tmp_config() -> None:
## ugh. ideally this would be on the top level (would be a better test)
## but pytest imports eveything first, executes hooks, and some reset_modules() fictures mess stuff up
## later would be nice to be a bit more careful about them
_init_default_config()
from my.simple import items
##
assert len(list(items())) == 5
class config:
class simple:
count = 3
with tmp_config(modules='my.simple', config=config):
assert len(list(items())) == 3
assert len(list(items())) == 5