AudibleSeriesChecker/connectors/audnexus_connector.py

40 lines
1.1 KiB
Python

from ratelimit import limits, sleep_and_retry
import requests
import json
import os
class AudNexusConnector:
@sleep_and_retry
@limits(calls=100, period=60)
def request(self, url):
return requests.get(url, {"update": 0, "seedAuthors": 0})
def get_book_from_asin(self, book_asin):
endpoint = f"https://api.audnex.us/books/{book_asin}"
response = self.request(endpoint)
response.raise_for_status()
return response.json()
class AudNexusConnectorMock(AudNexusConnector):
def __init__(self):
super().__init__()
self.directory = "dumps/audnexus"
if not os.path.exists(self.directory):
os.makedirs(self.directory)
def get_book_from_asin(self, book_asin):
path = f"{self.directory}/book_{book_asin}.json"
try:
with open(path, "r") as f:
data = json.load(f)
return data
except FileNotFoundError:
data = AudNexusConnector.get_book_from_asin(self, book_asin)
with open(path, "w+") as f:
json.dump(data, f, indent=4)
return data