Extract sources and source from cli.

This commit is contained in:
Chris Berkhout 2021-05-28 18:13:07 +02:00
parent e27e78d47a
commit c1c2ab39c2
3 changed files with 56 additions and 55 deletions

View file

@ -3,7 +3,6 @@ import logging
import shutil import shutil
import sys import sys
from datetime import datetime, timedelta from datetime import datetime, timedelta
from textwrap import TextWrapper
from pricehist import __version__, outputs, sources from pricehist import __version__, outputs, sources
from pricehist.fetch import fetch from pricehist.fetch import fetch
@ -29,9 +28,15 @@ def cli(args=None, output_file=sys.stdout):
if args.version: if args.version:
print(f"pricehist v{__version__}", file=output_file) print(f"pricehist v{__version__}", file=output_file)
elif args.command == "sources": elif args.command == "sources":
print(cmd_sources(args), file=output_file) result = sources.formatted()
print(result, file=output_file)
elif args.command == "source" and args.symbols:
result = sources.by_id[args.source].format_symbols()
print(result, file=output_file)
elif args.command == "source": elif args.command == "source":
print(cmd_source(args), file=output_file) total_width = shutil.get_terminal_size().columns
result = sources.by_id[args.source].format_info(total_width)
print(result, file=output_file)
elif args.command == "fetch": elif args.command == "fetch":
source = sources.by_id[args.source] source = sources.by_id[args.source]
output = outputs.by_type[args.output] output = outputs.by_type[args.output]
@ -53,57 +58,6 @@ def cli(args=None, output_file=sys.stdout):
logging.debug(f"Finished pricehist run at {datetime.now()}.") logging.debug(f"Finished pricehist run at {datetime.now()}.")
def _format_pairs(pairs, gap=4):
width = max([len(a) for a, b in pairs])
lines = [a.ljust(width + gap) + b for a, b in pairs]
return "\n".join(lines)
def cmd_sources(args):
return _format_pairs([(s.id(), s.name()) for k, s in sorted(sources.by_id.items())])
def cmd_source(args):
def fmt_field(key, value, key_width, total_width, force=True):
separator = " : "
initial_indent = key + (" " * (key_width - len(key))) + separator
subsequent_indent = " " * len(initial_indent)
wrapper = TextWrapper(
width=total_width,
drop_whitespace=True,
initial_indent=initial_indent,
subsequent_indent=subsequent_indent,
break_long_words=force,
)
first, *rest = value.split("\n")
first_output = wrapper.wrap(first)
wrapper.initial_indent = subsequent_indent
rest_output = sum([wrapper.wrap(line) if line else ["\n"] for line in rest], [])
output = "\n".join(first_output + rest_output)
if output != "":
return output
else:
return None
source = sources.by_id[args.identifier]
if args.symbols:
return _format_pairs(source.symbols())
else:
k_width = 11
total_width = shutil.get_terminal_size().columns
parts = [
fmt_field("ID", source.id(), k_width, total_width),
fmt_field("Name", source.name(), k_width, total_width),
fmt_field("Description", source.description(), k_width, total_width),
fmt_field("URL", source.source_url(), k_width, total_width, force=False),
fmt_field("Start", source.start(), k_width, total_width),
fmt_field("Types", ", ".join(source.types()), k_width, total_width),
fmt_field("Notes", source.notes(), k_width, total_width),
]
return "\n".join(filter(None, parts))
def build_parser(): def build_parser():
def valid_date(s): def valid_date(s):
if s == "today": if s == "today":
@ -170,7 +124,7 @@ def build_parser():
formatter_class=formatter, formatter_class=formatter,
) )
source_parser.add_argument( source_parser.add_argument(
"identifier", "source",
metavar="SOURCE", metavar="SOURCE",
type=str, type=str,
choices=sources.by_id.keys(), choices=sources.by_id.keys(),

View file

@ -3,3 +3,9 @@ from .coinmarketcap import CoinMarketCap
from .ecb import ECB from .ecb import ECB
by_id = {source.id(): source for source in [CoinDesk(), CoinMarketCap(), ECB()]} by_id = {source.id(): source for source in [CoinDesk(), CoinMarketCap(), ECB()]}
def formatted():
width = max([len(k) for k, v in by_id.items()])
lines = [k.ljust(width + 4) + v.name() for k, v in by_id.items()]
return "\n".join(lines)

View file

@ -1,5 +1,6 @@
import logging import logging
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from textwrap import TextWrapper
import curlify import curlify
@ -47,3 +48,43 @@ class BaseSource(ABC):
curl = curlify.to_curl(response.request, compressed=True) curl = curlify.to_curl(response.request, compressed=True)
logging.debug(f"Request to {self.id()}: {curl}") logging.debug(f"Request to {self.id()}: {curl}")
return response return response
def format_symbols(self) -> str:
symbols = self.symbols()
width = max([len(sym) for sym, desc in symbols])
lines = [sym.ljust(width + 4) + desc for sym, desc in symbols]
return "\n".join(lines)
def format_info(self, total_width=80) -> str:
k_width = 11
parts = [
self._fmt_field("ID", self.id(), k_width, total_width),
self._fmt_field("Name", self.name(), k_width, total_width),
self._fmt_field("Description", self.description(), k_width, total_width),
self._fmt_field("URL", self.source_url(), k_width, total_width, False),
self._fmt_field("Start", self.start(), k_width, total_width),
self._fmt_field("Types", ", ".join(self.types()), k_width, total_width),
self._fmt_field("Notes", self.notes(), k_width, total_width),
]
return "\n".join(filter(None, parts))
def _fmt_field(self, key, value, key_width, total_width, force=True):
separator = " : "
initial_indent = key + (" " * (key_width - len(key))) + separator
subsequent_indent = " " * len(initial_indent)
wrapper = TextWrapper(
width=total_width,
drop_whitespace=True,
initial_indent=initial_indent,
subsequent_indent=subsequent_indent,
break_long_words=force,
)
first, *rest = value.split("\n")
first_output = wrapper.wrap(first)
wrapper.initial_indent = subsequent_indent
rest_output = sum([wrapper.wrap(line) if line else ["\n"] for line in rest], [])
output = "\n".join(first_output + rest_output)
if output != "":
return output
else:
return None