ruff: enable and fix C4 ruleset

This commit is contained in:
Dima Gerasimov 2024-08-27 22:50:37 +01:00 committed by karlicoss
parent c08ddbc781
commit d244c7cc4e
19 changed files with 48 additions and 38 deletions

View file

@ -161,14 +161,14 @@ def git_repos_in(roots: List[Path]) -> List[Path]:
*roots,
]).decode('utf8').splitlines()
candidates = set(Path(o).resolve().absolute().parent for o in outputs)
candidates = {Path(o).resolve().absolute().parent for o in outputs}
# exclude stuff within .git dirs (can happen for submodules?)
candidates = {c for c in candidates if '.git' not in c.parts[:-1]}
candidates = {c for c in candidates if is_git_dir(c)}
repos = list(sorted(map(_git_root, candidates)))
repos = sorted(map(_git_root, candidates))
return repos

View file

@ -244,7 +244,7 @@ class ZipPath(zipfile_Path):
# see https://en.wikipedia.org/wiki/ZIP_(file_format)#Structure
dt = datetime(*self.root.getinfo(self.at).date_time)
ts = int(dt.timestamp())
params = dict(
params = dict( # noqa: C408
st_mode=0,
st_ino=0,
st_dev=0,

View file

@ -80,7 +80,7 @@ def get_files(
paths.append(src)
if sort:
paths = list(sorted(paths))
paths = sorted(paths)
if len(paths) == 0:
# todo make it conditionally defensive based on some global settings

View file

@ -60,8 +60,10 @@ class _A:
def test_freezer() -> None:
val = _A(x=dict(an_int=123, an_any=[1, 2, 3]))
val = _A(x={
'an_int': 123,
'an_any': [1, 2, 3],
})
af = Freezer(_A)
fval = af.freeze(val)

View file

@ -72,16 +72,16 @@ def fill(it: Iterable[Any], *, measurement: str, reset: bool=RESET_DEFAULT, dt_c
fields = filter_dict(d)
yield dict(
measurement=measurement,
yield {
'measurement': measurement,
# TODO maybe good idea to tag with database file/name? to inspect inconsistencies etc..
# hmm, so tags are autoindexed and might be faster?
# not sure what's the big difference though
# "fields are data and tags are metadata"
tags=tags,
time=dt,
fields=fields,
)
'tags': tags,
'time': dt,
'fields': fields,
}
from more_itertools import chunked
# "The optimal batch size is 5000 lines of line protocol."

View file

@ -222,7 +222,7 @@ def test_as_dataframe() -> None:
from .compat import fromisoformat
it = (dict(i=i, s=f'str{i}') for i in range(5))
it = ({'i': i, 's': f'str{i}'} for i in range(5))
with pytest.warns(UserWarning, match=r"No 'error' column") as record_warnings: # noqa: F841
df: DataFrameT = as_dataframe(it)
# todo test other error col policies

View file

@ -655,7 +655,7 @@ def test_wrap_unsortable() -> None:
# by default, wrap unsortable
res = list(select(_mixed_iter(), order_key="z"))
assert Counter(map(lambda t: type(t).__name__, res)) == Counter({"_A": 4, "Unsortable": 2})
assert Counter(type(t).__name__ for t in res) == Counter({"_A": 4, "Unsortable": 2})
def test_disabled_wrap_unsorted() -> None:
@ -674,7 +674,7 @@ def test_drop_unsorted() -> None:
# test drop unsortable, should remove them before the 'sorted' call
res = list(select(_mixed_iter(), order_key="z", wrap_unsorted=False, drop_unsorted=True))
assert len(res) == 4
assert Counter(map(lambda t: type(t).__name__, res)) == Counter({"_A": 4})
assert Counter(type(t).__name__ for t in res) == Counter({"_A": 4})
def test_drop_exceptions() -> None:
@ -705,7 +705,7 @@ def test_wrap_unsortable_with_error_and_warning() -> None:
# by default should wrap unsortable (error)
with pytest.warns(UserWarning, match=r"encountered exception"):
res = list(select(_mixed_iter_errors(), order_value=lambda o: isinstance(o, datetime)))
assert Counter(map(lambda t: type(t).__name__, res)) == Counter({"_A": 4, "_B": 2, "Unsortable": 1})
assert Counter(type(t).__name__ for t in res) == Counter({"_A": 4, "_B": 2, "Unsortable": 1})
# compare the returned error wrapped in the Unsortable
returned_error = next((o for o in res if isinstance(o, Unsortable))).obj
assert "Unhandled error!" == str(returned_error)
@ -717,7 +717,7 @@ def test_order_key_unsortable() -> None:
# both unsortable and items which dont match the order_by (order_key) in this case should be classified unsorted
res = list(select(_mixed_iter_errors(), order_key="z"))
assert Counter(map(lambda t: type(t).__name__, res)) == Counter({"_A": 4, "Unsortable": 3})
assert Counter(type(t).__name__ for t in res) == Counter({"_A": 4, "Unsortable": 3})
def test_order_default_param() -> None:
@ -737,7 +737,7 @@ def test_no_recursive_unsortables() -> None:
# select to select as input, wrapping unsortables the first time, second should drop them
# reverse=True to send errors to the end, so the below order_key works
res = list(select(_mixed_iter_errors(), order_key="z", reverse=True))
assert Counter(map(lambda t: type(t).__name__, res)) == Counter({"_A": 4, "Unsortable": 3})
assert Counter(type(t).__name__ for t in res) == Counter({"_A": 4, "Unsortable": 3})
# drop_unsorted
dropped = list(select(res, order_key="z", drop_unsorted=True))

View file

@ -35,7 +35,7 @@ SqliteRowFactory = Callable[[sqlite3.Cursor, sqlite3.Row], Any]
def dict_factory(cursor, row):
fields = [column[0] for column in cursor.description]
return {key: value for key, value in zip(fields, row)}
return dict(zip(fields, row))
Factory = Union[SqliteRowFactory, Literal['row', 'dict']]

View file

@ -189,7 +189,7 @@ def fake_data(nights: int = 500) -> Iterator:
# TODO remove/deprecate it? I think used by timeline
def get_datas() -> List[Emfit]:
# todo ugh. run lint properly
return list(sorted(datas(), key=lambda e: e.start)) # type: ignore
return sorted(datas(), key=lambda e: e.start) # type: ignore
# TODO move away old entries if there is a diff??

View file

@ -51,12 +51,12 @@ def events() -> Iterable[Res[Event]]:
# a bit naughty and ad-hoc, but we will generify reading from tar.gz. once we have more examples
# another one is zulip archive
if last.is_dir():
files = list(sorted(last.glob('*.json'))) # looks like all files are in the root
files = sorted(last.glob('*.json')) # looks like all files are in the root
open_file = lambda f: f.open()
else:
# treat as .tar.gz
tfile = tarfile.open(last)
files = list(sorted(map(Path, tfile.getnames())))
files = sorted(map(Path, tfile.getnames()))
files = [p for p in files if len(p.parts) == 1 and p.suffix == '.json']
open_file = lambda p: notnone(tfile.extractfile(f'./{p}')) # NOTE odd, doesn't work without ./

View file

@ -83,9 +83,10 @@ def stats() -> Stats:
def fill_influxdb() -> None:
from my.core import influxdb
# todo needs to be more automatic
sd = (dict(
dt=x.dt,
track=x.track,
) for x in scrobbles())
sd = ({
'dt': x.dt,
'track': x.track,
} for x in scrobbles())
influxdb.fill(sd, measurement=__name__)

View file

@ -55,7 +55,7 @@ class Config(user_config):
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
res.append((dt, loc))
res = list(sorted(res, key=lambda p: p[0]))
res = sorted(res, key=lambda p: p[0])
return res

View file

@ -33,7 +33,7 @@ def iter_movies() -> Iterator[Movie]:
def get_movies() -> List[Movie]:
return list(sorted(iter_movies(), key=lambda m: m.created))
return sorted(iter_movies(), key=lambda m: m.created)
def test():

View file

@ -97,7 +97,7 @@ def get_annots(p: Path) -> List[Annotation]:
b = time.time()
with p.open('rb') as fo:
doc = pdfannots.process_file(fo, emit_progress_to=None)
annots = [a for a in doc.iter_annots()]
annots = list(doc.iter_annots())
# also has outlines are kinda like TOC, I don't really need them
a = time.time()
took = a - b

View file

@ -82,12 +82,13 @@ def fake_data(rows: int=1000) -> Iterator:
def fill_influxdb() -> None:
from .core import influxdb
it = (dict(
dt=e.dt,
duration_d=e.duration_s,
tags=dict(activity=e.activity),
) for e in entries() if isinstance(e, Entry)) # TODO handle errors in core.influxdb
from my.core import influxdb
it = ({
'dt': e.dt,
'duration_d': e.duration_s,
'tags': {'activity': e.activity},
} for e in entries() if isinstance(e, Entry)) # TODO handle errors in core.influxdb
influxdb.fill(it, measurement=__name__)

View file

@ -62,7 +62,7 @@ def test_get_annots() -> None:
"""
annotations = get_annots(testdata() / 'pdfs' / 'Information Architecture for the World Wide Web.pdf')
assert len(annotations) == 3
assert set([a.highlight for a in annotations]) == EXPECTED_HIGHLIGHTS
assert {a.highlight for a in annotations} == EXPECTED_HIGHLIGHTS
def test_annotated_pdfs_with_filelist() -> None:

View file

@ -105,7 +105,7 @@ class Tweet:
repls.append((fr, to, me['display_url']))
# todo not sure, maybe use media_url_https instead?
# for now doing this for compatibility with twint
repls = list(sorted(repls))
repls = sorted(repls)
parts = []
idx = 0
for fr, to, what in repls:

View file

@ -120,4 +120,4 @@ def _watched_legacy() -> Iterable[Watched]:
watches.append(Watched(url=url, title=title, when=dt))
# todo hmm they already come sorted.. wonder if should just rely on it..
return list(sorted(watches, key=lambda e: e.when))
return sorted(watches, key=lambda e: e.when)

View file

@ -1,3 +1,9 @@
lint.extend-select = [
"F", # flakes rules -- default, but extend just in case
"E", # pycodestyle -- default, but extend just in case
"C4", # flake8-comprehensions -- unnecessary list/map/dict calls
]
lint.ignore = [
### too opinionated style checks
"E501", # too long lines