twitter: use import_source and proper merging for tweets from different sources

+ use proper datetime_aware for created_at
This commit is contained in:
Dima Gerasimov 2022-02-08 20:31:41 +00:00 committed by karlicoss
parent afdf9d4334
commit b9852f45cf
6 changed files with 87 additions and 37 deletions

View file

@ -1,3 +1,5 @@
from my.core import __NOT_HPI_MODULE__
from datetime import datetime from datetime import datetime
from typing import Iterator, Optional, TYPE_CHECKING from typing import Iterator, Optional, TYPE_CHECKING
@ -35,7 +37,9 @@ class Message(Protocol):
from itertools import chain from itertools import chain
from more_itertools import unique_everseen from more_itertools import unique_everseen
from my.core import Res from my.core import warn_if_empty, Res
@warn_if_empty
def _merge_messages(*sources: Iterator[Res[Message]]) -> Iterator[Res[Message]]: def _merge_messages(*sources: Iterator[Res[Message]]) -> Iterator[Res[Message]]:
# todo might be nice to dump some stats for debugging, e.g. how many were overlapping? # todo might be nice to dump some stats for debugging, e.g. how many were overlapping?
def key(r: Res[Message]): def key(r: Res[Message]):

View file

@ -1,22 +1,51 @@
""" """
Unified Twitter data (merged from the archive and periodic updates) Unified Twitter data (merged from the archive and periodic updates)
""" """
from typing import Iterator
from ..core import Res
from ..core.source import import_source
from .common import merge_tweets, Tweet
# NOTE: you can comment out the sources you don't need # NOTE: you can comment out the sources you don't need
from . import twint, archive src_twint = import_source(module_name=f'my.twitter.twint')
src_archive = import_source(module_name=f'my.twitter.archive')
from .common import merge_tweets
def tweets(): @src_twint
def _tweets_twint() -> Iterator[Res[Tweet]]:
from . import twint as src
return src.tweets()
@src_archive
def _tweets_archive() -> Iterator[Res[Tweet]]:
from . import archive as src
return src.tweets()
@src_twint
def _likes_twint() -> Iterator[Res[Tweet]]:
from . import twint as src
return src.likes()
@src_archive
def _likes_archive() -> Iterator[Res[Tweet]]:
from . import archive as src
return src.likes()
def tweets() -> Iterator[Res[Tweet]]:
yield from merge_tweets( yield from merge_tweets(
twint .tweets(), _tweets_twint(),
archive.tweets(), _tweets_archive(),
) )
def likes(): def likes() -> Iterator[Res[Tweet]]:
yield from merge_tweets( yield from merge_tweets(
twint .likes(), _likes_twint(),
archive.likes(), _likes_archive(),
) )
# TODO maybe to avoid all the boilerplate above could use some sort of module Protocol?

View file

@ -18,7 +18,7 @@ except ImportError as e:
from dataclasses import dataclass from dataclasses import dataclass
from ..core import Paths from ..core import Paths, Res, datetime_aware
@dataclass @dataclass
class twitter_archive(user_config): class twitter_archive(user_config):
@ -32,7 +32,7 @@ config = make_config(twitter_archive)
from datetime import datetime from datetime import datetime
from typing import List, Optional, Iterable, NamedTuple, Sequence from typing import List, Optional, NamedTuple, Sequence, Iterator
from pathlib import Path from pathlib import Path
import json import json
@ -61,7 +61,7 @@ class Tweet(NamedTuple):
return self.raw['id_str'] return self.raw['id_str']
@property @property
def created_at(self) -> datetime: def created_at(self) -> datetime_aware:
dts = self.raw['created_at'] dts = self.raw['created_at']
return datetime.strptime(dts, '%a %b %d %H:%M:%S %z %Y') return datetime.strptime(dts, '%a %b %d %H:%M:%S %z %Y')
@ -159,12 +159,12 @@ class ZipExport:
[acc] = self.raw('account') [acc] = self.raw('account')
return acc['username'] return acc['username']
def tweets(self) -> Iterable[Tweet]: def tweets(self) -> Iterator[Tweet]:
for r in self.raw('tweet'): for r in self.raw('tweet'):
yield Tweet(r, screen_name=self.screen_name()) yield Tweet(r, screen_name=self.screen_name())
def likes(self) -> Iterable[Like]: def likes(self) -> Iterator[Like]:
# TODO ugh. would be nice to unify Tweet/Like interface # TODO ugh. would be nice to unify Tweet/Like interface
# however, akeout only got tweetId, full text and url # however, akeout only got tweetId, full text and url
for r in self.raw('like'): for r in self.raw('like'):
@ -172,18 +172,18 @@ class ZipExport:
# todo not sure about list and sorting? although can't hurt considering json is not iterative? # todo not sure about list and sorting? although can't hurt considering json is not iterative?
def tweets() -> Iterable[Tweet]: def tweets() -> Iterator[Res[Tweet]]:
for inp in inputs(): for inp in inputs():
yield from sorted(ZipExport(inp).tweets(), key=lambda t: t.dt) yield from sorted(ZipExport(inp).tweets(), key=lambda t: t.dt)
def likes() -> Iterable[Like]: def likes() -> Iterator[Res[Like]]:
for inp in inputs(): for inp in inputs():
yield from ZipExport(inp).likes() yield from ZipExport(inp).likes()
def stats(): from ..core import stat, Stats
from ..core import stat def stats() -> Stats:
return { return {
**stat(tweets), **stat(tweets),
**stat(likes), **stat(likes),

View file

@ -1,12 +1,21 @@
from my.core import __NOT_HPI_MODULE__
from itertools import chain from itertools import chain
from typing import Iterator, Any
from more_itertools import unique_everseen from more_itertools import unique_everseen
from ..core import warn_if_empty, __NOT_HPI_MODULE__
# TODO add proper Protocol for Tweet
Tweet = Any
from my.core import warn_if_empty, Res
@warn_if_empty @warn_if_empty
def merge_tweets(*sources): def merge_tweets(*sources: Iterator[Res[Tweet]]) -> Iterator[Res[Tweet]]:
yield from unique_everseen( def key(r: Res[Tweet]):
chain(*sources), if isinstance(r, Exception):
key=lambda t: t.id_str, return str(r)
) else:
return r.id_str
yield from unique_everseen(chain(*sources), key=key)

View file

@ -7,11 +7,12 @@ from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from typing import Iterator, Sequence, Optional, Dict from typing import Iterator, Sequence, Optional, Dict
import pytz
from my.config import twitter as user_config from my.config import twitter as user_config
from ..core import Paths from ..core import Paths, Res, datetime_aware
@dataclass @dataclass
class config(user_config.talon): class config(user_config.talon):
# paths[s]/glob to the exported sqlite databases # paths[s]/glob to the exported sqlite databases
@ -28,8 +29,7 @@ def inputs() -> Sequence[Path]:
@dataclass(unsafe_hash=True) @dataclass(unsafe_hash=True)
class Tweet: class Tweet:
id_str: str id_str: str
# TODO figure out if utc created_at: datetime_aware
created_at: datetime
screen_name: str screen_name: str
text: str text: str
urls: Sequence[str] urls: Sequence[str]
@ -45,7 +45,6 @@ class _IsFavorire:
from typing import Union from typing import Union
from ..core.error import Res
from ..core.dataset import connect_readonly from ..core.dataset import connect_readonly
Entity = Union[_IsTweet, _IsFavorire] Entity = Union[_IsTweet, _IsFavorire]
def _entities() -> Iterator[Res[Entity]]: def _entities() -> Iterator[Res[Entity]]:
@ -86,9 +85,17 @@ def _process_favorite_tweets(db) -> Iterator[Res[Entity]]:
def _parse_tweet(row) -> Tweet: def _parse_tweet(row) -> Tweet:
# TODO row['retweeter] if not empty, would be user's name and means retweet? # TODO row['retweeter] if not empty, would be user's name and means retweet?
# screen name would be the actual tweet's author # screen name would be the actual tweet's author
# ok so looks like it's tz aware..
# https://github.com/klinker24/talon-for-twitter-android/blob/c3b0612717ba3ea93c0cae6d907d7d86d640069e/app/src/main/java/com/klinker/android/twitter_l/data/sq_lite/FavoriteTweetsDataSource.java#L95
# uses https://docs.oracle.com/javase/7/docs/api/java/util/Date.html#getTime()
# and it's created here, so looks like it's properly parsed from the api
# https://github.com/Twitter4J/Twitter4J/blob/8376fade8d557896bb9319fb46e39a55b134b166/twitter4j-core/src/internal-json/java/twitter4j/ParseUtil.java#L69-L79
created_at = datetime.fromtimestamp(row['time'] / 1000, tz=pytz.utc)
return Tweet( return Tweet(
id_str=str(row['tweet_id']), id_str=str(row['tweet_id']),
created_at=datetime.fromtimestamp(row['time'] / 1000), created_at=created_at,
screen_name=row['screen_name'], screen_name=row['screen_name'],
text=row['text'], text=row['text'],
# todo hmm text sometimes is trimmed with ellipsis? at least urls # todo hmm text sometimes is trimmed with ellipsis? at least urls

View file

@ -5,6 +5,7 @@ Twitter data (tweets and favorites). Uses [[https://github.com/twintproject/twin
REQUIRES = ['dataset'] REQUIRES = ['dataset']
from ..core.common import Paths from ..core.common import Paths
from ..core.error import Res
from dataclasses import dataclass from dataclasses import dataclass
from my.config import twint as user_config from my.config import twint as user_config
@ -21,10 +22,10 @@ config = make_config(twint)
from datetime import datetime from datetime import datetime
from typing import NamedTuple, Iterable, List from typing import NamedTuple, Iterator, List
from pathlib import Path from pathlib import Path
from ..core.common import get_files, LazyLogger, Json from ..core.common import get_files, LazyLogger, Json, datetime_aware
from ..core.time import abbr_to_timezone from ..core.time import abbr_to_timezone
log = LazyLogger(__name__) log = LazyLogger(__name__)
@ -42,7 +43,7 @@ class Tweet(NamedTuple):
return self.row['id_str'] return self.row['id_str']
@property @property
def created_at(self) -> datetime: def created_at(self) -> datetime_aware:
seconds = self.row['created_at'] / 1000 seconds = self.row['created_at'] / 1000
tz_abbr = self.row['timezone'] tz_abbr = self.row['timezone']
tz = abbr_to_timezone(tz_abbr) tz = abbr_to_timezone(tz_abbr)
@ -97,20 +98,20 @@ def _get_db():
return connect_readonly(db_path) return connect_readonly(db_path)
def tweets() -> Iterable[Tweet]: def tweets() -> Iterator[Res[Tweet]]:
db = _get_db() db = _get_db()
res = db.query(_QUERY.format(where='F.tweet_id IS NULL')) res = db.query(_QUERY.format(where='F.tweet_id IS NULL'))
yield from map(Tweet, res) yield from map(Tweet, res)
def likes() -> Iterable[Tweet]: def likes() -> Iterator[Res[Tweet]]:
db = _get_db() db = _get_db()
res = db.query(_QUERY.format(where='F.tweet_id IS NOT NULL')) res = db.query(_QUERY.format(where='F.tweet_id IS NOT NULL'))
yield from map(Tweet, res) yield from map(Tweet, res)
def stats(): from ..core import stat, Stats
from ..core import stat def stats() -> Stats:
return { return {
**stat(tweets), **stat(tweets),
**stat(likes), **stat(likes),