kompress.kopen improvements
- tests - uniform handling for bytes/str, always return utf8 str by default
This commit is contained in:
parent
c3a77b6256
commit
8b8a85e8c3
7 changed files with 52 additions and 24 deletions
|
@ -136,7 +136,6 @@ def read_html(tpath: Path, file: str) -> Iterable[Parsed]:
|
|||
results.append((dt, url, title))
|
||||
parser = TakeoutHTMLParser(callback=cb)
|
||||
with kopen(tpath, file) as fo:
|
||||
# TODO careful, wht if it's a string already? make asutf method?
|
||||
data = fo.read().decode('utf8')
|
||||
data = fo.read()
|
||||
parser.feed(data)
|
||||
return results
|
||||
|
|
|
@ -3,37 +3,54 @@ Various helpers for compression
|
|||
"""
|
||||
import pathlib
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
from typing import Union, IO
|
||||
import io
|
||||
|
||||
PathIsh = Union[Path, str]
|
||||
|
||||
|
||||
def _zstd_open(path: Path):
|
||||
def _zstd_open(path: Path, *args, **kwargs):
|
||||
import zstandard as zstd # type: ignore
|
||||
fh = path.open('rb')
|
||||
fh = path.open(*args, **kwargs)
|
||||
dctx = zstd.ZstdDecompressor()
|
||||
reader = dctx.stream_reader(fh)
|
||||
return reader
|
||||
|
||||
|
||||
def kopen(path: PathIsh, *args, **kwargs): # TODO is it bytes stream??
|
||||
# TODO allow passing in mode?
|
||||
# TODO returns protocol that we can call 'read' against?
|
||||
# TODO use the 'dependent type' trick?
|
||||
def kopen(path: PathIsh, *args, mode: str='rt', **kwargs) -> IO[str]:
|
||||
# TODO handle mode in *rags?
|
||||
encoding = kwargs.get('encoding', 'utf8')
|
||||
kwargs['encoding'] = encoding
|
||||
|
||||
pp = Path(path)
|
||||
suf = pp.suffix
|
||||
if suf in {'.xz'}:
|
||||
import lzma
|
||||
return lzma.open(pp, *args, **kwargs)
|
||||
return lzma.open(pp, mode, *args, **kwargs)
|
||||
elif suf in {'.zip'}:
|
||||
# eh. this behaviour is a bit dodgy...
|
||||
from zipfile import ZipFile
|
||||
return ZipFile(pp).open(*args, **kwargs)
|
||||
zfile = ZipFile(pp)
|
||||
|
||||
[subpath] = args # meh?
|
||||
|
||||
## oh god... https://stackoverflow.com/a/5639960/706389
|
||||
ifile = zfile.open(subpath, mode='r')
|
||||
ifile.readable = lambda: True # type: ignore
|
||||
ifile.writable = lambda: False # type: ignore
|
||||
ifile.seekable = lambda: False # type: ignore
|
||||
ifile.read1 = ifile.read # type: ignore
|
||||
# TODO pass all kwargs here??
|
||||
return io.TextIOWrapper(ifile, encoding=encoding)
|
||||
elif suf in {'.lz4'}:
|
||||
import lz4.frame # type: ignore
|
||||
return lz4.frame.open(str(pp))
|
||||
return lz4.frame.open(str(pp), mode, *args, **kwargs)
|
||||
elif suf in {'.zstd'}:
|
||||
return _zstd_open(pp)
|
||||
return _zstd_open(pp, mode, *args, **kwargs)
|
||||
else:
|
||||
kwargs['encoding'] = 'utf-8'
|
||||
return pp.open(*args, **kwargs)
|
||||
return pp.open(mode, *args, **kwargs)
|
||||
|
||||
|
||||
import typing
|
||||
|
@ -60,7 +77,7 @@ class CPath(BasePath):
|
|||
return kopen(str(self))
|
||||
|
||||
|
||||
open = kopen # TODO remove?
|
||||
open = kopen # TODO deprecate
|
||||
|
||||
|
||||
# meh
|
||||
|
|
|
@ -7,7 +7,7 @@ from collections import deque
|
|||
from datetime import datetime
|
||||
from itertools import islice
|
||||
from pathlib import Path
|
||||
from typing import Any, Collection, Deque, Iterable, Iterator, List, NamedTuple, Optional, Sequence
|
||||
from typing import Any, Collection, Deque, Iterable, Iterator, List, NamedTuple, Optional, Sequence, IO
|
||||
import pytz
|
||||
|
||||
# pip3 install geopy
|
||||
|
@ -107,6 +107,7 @@ _LOCATION_JSON = 'Takeout/Location History/Location History.json'
|
|||
# TODO hope they are sorted... (could assert for it)
|
||||
@mcachew(cache_path, chunk_by=10000, logger=logger)
|
||||
def _iter_locations(path: Path, start=0, stop=None) -> Iterator[Location]:
|
||||
ctx: IO[str]
|
||||
if path.suffix == '.json':
|
||||
ctx = path.open('r')
|
||||
else: # must be a takeout archive
|
||||
|
|
|
@ -18,7 +18,7 @@ import icalendar # type: ignore
|
|||
from icalendar.cal import Todo # type: ignore
|
||||
|
||||
|
||||
logger = LazyLogger('my.rtm')
|
||||
logger = LazyLogger(__name__)
|
||||
|
||||
|
||||
# TODO extract in a module to parse RTM's ical?
|
||||
|
@ -98,8 +98,8 @@ class DAL:
|
|||
|
||||
|
||||
def dal():
|
||||
last = get_files(config.export_path, glob='*.ical.xz')[-1]
|
||||
with kopen(last, 'rb') as fo:
|
||||
last = get_files(config.export_path)[-1]
|
||||
with kopen(last) as fo:
|
||||
data = fo.read()
|
||||
return DAL(data=data, revision='TODO')
|
||||
|
||||
|
|
|
@ -117,7 +117,7 @@ class ZipExport:
|
|||
path += '.js'
|
||||
|
||||
with kompress.kopen(self.epath, path) as fo:
|
||||
ddd = fo.read().decode('utf8')
|
||||
ddd = fo.read()
|
||||
start = ddd.index('[')
|
||||
ddd = ddd[start:]
|
||||
for j in json.loads(ddd):
|
||||
|
|
|
@ -3,8 +3,9 @@ from subprocess import check_call
|
|||
import gzip
|
||||
import lzma
|
||||
import io
|
||||
import zipfile
|
||||
|
||||
from my.kython.kompress import kopen
|
||||
from my.kython.kompress import kopen, kexists
|
||||
|
||||
|
||||
import pytest # type: ignore
|
||||
|
@ -15,6 +16,8 @@ def prepare(tmp_path: Path):
|
|||
with (tmp_path / 'file.xz').open('wb') as f:
|
||||
with lzma.open(f, 'w') as lzf:
|
||||
lzf.write(b'compressed text')
|
||||
with zipfile.ZipFile(tmp_path / 'file.zip', 'w') as zf:
|
||||
zf.writestr('path/in/archive', 'data in zip')
|
||||
try:
|
||||
yield None
|
||||
finally:
|
||||
|
@ -24,12 +27,18 @@ def prepare(tmp_path: Path):
|
|||
def test_kopen(prepare, tmp_path: Path) -> None:
|
||||
"Plaintext handled transparently"
|
||||
assert kopen(tmp_path / 'file' ).read() == 'just plaintext'
|
||||
assert kopen(tmp_path / 'file.xz').read() == b'compressed text' # FIXME make this str
|
||||
assert kopen(tmp_path / 'file.xz').read() == 'compressed text'
|
||||
|
||||
"For zips behaviour is a bit different (not sure about all this, tbh...)"
|
||||
assert kopen(tmp_path / 'file.zip', 'path/in/archive').read() == 'data in zip'
|
||||
|
||||
|
||||
def test_kexists(tmp_path: Path) -> None:
|
||||
# TODO
|
||||
raise RuntimeError
|
||||
def test_kexists(prepare, tmp_path: Path) -> None:
|
||||
assert kexists(str(tmp_path / 'file.zip'), 'path/in/archive')
|
||||
assert not kexists(str(tmp_path / 'file.zip'), 'path/notin/archive')
|
||||
|
||||
# TODO not sure about this?
|
||||
assert not kexists(tmp_path / 'nosuchzip.zip', 'path/in/archive')
|
||||
|
||||
|
||||
def test_cpath():
|
||||
|
|
|
@ -1,2 +1,4 @@
|
|||
# ugh. workaround for https://github.com/pytest-dev/pytest/issues/1927
|
||||
from my.reddit import *
|
||||
|
||||
# TODO for reddit test, patch up to take every 10th archive or something; but make sure it's deterministic
|
||||
|
|
Loading…
Add table
Reference in a new issue