71 lines
2.1 KiB
Python
71 lines
2.1 KiB
Python
from datetime import datetime, timedelta
|
|
from decimal import Decimal
|
|
|
|
import requests
|
|
from lxml import etree
|
|
|
|
from pricehist import isocurrencies
|
|
from pricehist.price import Price
|
|
|
|
|
|
class ECB:
|
|
def id(self):
|
|
return "ecb"
|
|
|
|
def name(self):
|
|
return "European Central Bank"
|
|
|
|
def description(self):
|
|
return "European Central Bank Euro foreign exchange reference rates"
|
|
|
|
def source_url(self):
|
|
return "https://www.ecb.europa.eu/stats/exchange/eurofxref/html/index.en.html"
|
|
|
|
def start(self):
|
|
return "1999-01-04"
|
|
|
|
def types(self):
|
|
return ["reference"]
|
|
|
|
def notes(self):
|
|
return ""
|
|
|
|
def symbols(self):
|
|
data = self._raw_data(more_than_90_days=True)
|
|
root = etree.fromstring(data)
|
|
nodes = root.cssselect("[currency]")
|
|
currencies = sorted(set([n.attrib["currency"] for n in nodes]))
|
|
iso = isocurrencies.by_code()
|
|
pairs = [f"EUR/{c} Euro against {iso[c].name}" for c in currencies]
|
|
return pairs
|
|
|
|
def fetch(self, pair, type, start, end):
|
|
base, quote = pair.split("/")
|
|
|
|
almost_90_days_ago = str(datetime.now().date() - timedelta(days=85))
|
|
data = self._raw_data(start < almost_90_days_ago)
|
|
root = etree.fromstring(data)
|
|
|
|
all_rows = []
|
|
for day in root.cssselect("[time]"):
|
|
date = day.attrib["time"]
|
|
# TODO what if it's not found for that day?
|
|
# (some quotes aren't in the earliest data)
|
|
for row in day.cssselect(f"[currency='{quote}']"):
|
|
rate = Decimal(row.attrib["rate"])
|
|
all_rows.insert(0, (date, rate))
|
|
selected = [
|
|
Price(base, quote, d, r) for d, r in all_rows if d >= start and d <= end
|
|
]
|
|
|
|
return selected
|
|
|
|
def _raw_data(self, more_than_90_days=False):
|
|
url_base = "https://www.ecb.europa.eu/stats/eurofxref"
|
|
if more_than_90_days:
|
|
source_url = f"{url_base}/eurofxref-hist.xml" # since 1999
|
|
else:
|
|
source_url = f"{url_base}/eurofxref-hist-90d.xml" # last 90 days
|
|
|
|
response = requests.get(source_url)
|
|
return response.content
|