245 lines
7.6 KiB
Python
245 lines
7.6 KiB
Python
import alive_progress
|
|
import requests
|
|
import connectors
|
|
import argparse
|
|
import logging
|
|
import config
|
|
|
|
|
|
class Book(dict):
|
|
def __init__(self, asin=""):
|
|
self.asin = asin
|
|
self.title = ""
|
|
self.series = dict()
|
|
|
|
def __bool__(self):
|
|
return self.asin != ""
|
|
|
|
def __repr__(self):
|
|
return f"Book(asin='{self.asin}', series='{self.series}')"
|
|
|
|
|
|
class BookCollection(dict):
|
|
def __init__(self, series_name, books: list[Book] = []):
|
|
self.__name__ = series_name
|
|
for book in books:
|
|
self.add(book)
|
|
|
|
def add(self, book: Book):
|
|
sequence = book.series[self.__name__]
|
|
keys = expand_range(sequence)
|
|
|
|
for key in keys:
|
|
self.setdefault(key, [])
|
|
self[key].append(book.asin)
|
|
|
|
def get_first_book(self):
|
|
firt_key = list(self.keys())[0]
|
|
return self[firt_key][0]
|
|
|
|
def __bool__(self):
|
|
return len(self.keys()) > 0 and self.get_first_book() != None
|
|
|
|
|
|
def expand_range(part):
|
|
"""Expands a range string (e.g., "1-10") or float sequence into a list of numbers."""
|
|
try:
|
|
if "-" in part and not part.startswith("-"):
|
|
start, end = map(int, part.split("-"))
|
|
if start >= end:
|
|
return [] # Handle invalid ranges (start >= end)
|
|
return list(range(start, end + 1))
|
|
else:
|
|
float_val = float(part)
|
|
return [float_val]
|
|
except ValueError:
|
|
return [] # Handle non-numeric input or invalid format
|
|
|
|
|
|
def process_sequence(books):
|
|
"""Groups books by ASIN, handling sequence ranges (including floats)."""
|
|
books_sequence = {}
|
|
for book in books:
|
|
asin = book["asin"]
|
|
sequence = book.get("sequence", "")
|
|
|
|
if sequence:
|
|
keys = expand_range(sequence.split(", ")[0])
|
|
else:
|
|
keys = [float(book.get("sort", "1")) * -1]
|
|
|
|
for key in keys:
|
|
if key not in books_sequence:
|
|
books_sequence[key] = []
|
|
books_sequence[key].append(asin)
|
|
|
|
keys = sorted(books_sequence.keys(), key=lambda x: float(x))
|
|
ordered_sequence = {}
|
|
for key in keys:
|
|
ordered_sequence[key] = books_sequence[key]
|
|
return ordered_sequence
|
|
|
|
|
|
def process_audible_serie(books, serie_name):
|
|
processed_books = BookCollection(serie_name)
|
|
|
|
for json in books:
|
|
if book["relationship_type"] == "series":
|
|
book = Book(json["asin"])
|
|
book.series.setdefault(serie_name, json["sequence"])
|
|
book.series.setdefault(serie_name, f"-{json['sort']}")
|
|
processed_books.add(book)
|
|
|
|
return processed_books
|
|
|
|
|
|
def process_abs_serie(books, series_name):
|
|
processed_books = BookCollection(series_name)
|
|
|
|
for index, json in enumerate(books):
|
|
meta = json["media"]["metadata"]
|
|
|
|
if meta["asin"] == None:
|
|
logger.debug(
|
|
"ASIN missing: %s (%s by %s)",
|
|
meta["title"],
|
|
series_name,
|
|
meta["authorName"],
|
|
)
|
|
|
|
book = Book(meta["asin"])
|
|
for name in meta["seriesName"].split(", "):
|
|
try:
|
|
[name, sequence] = name.split(" #")
|
|
except ValueError:
|
|
logger.debug("Serie missing sequence: %s (%s)", meta["title"], name)
|
|
sequence = f"-{index + 1}"
|
|
|
|
book.series[name] = sequence
|
|
processed_books.add(book)
|
|
return processed_books
|
|
|
|
|
|
def get_serie_asin(first_book_asin, series_name):
|
|
audnexus_first_book = audnexus.get_book_from_asin(first_book_asin)
|
|
|
|
primary = audnexus_first_book.get("seriesPrimary", {"name": ""})
|
|
secondary = audnexus_first_book.get("seriesSecondary", {"name": ""})
|
|
|
|
if primary["name"].casefold() == series_name.casefold():
|
|
return primary["asin"]
|
|
elif secondary["name"].casefold() == series_name.casefold():
|
|
return secondary["asin"]
|
|
else:
|
|
audible_first_book = audible.get_produce_from_asin(first_book_asin)
|
|
|
|
if "series" not in audible_first_book:
|
|
return None
|
|
|
|
series = audible_first_book.get("series", [])
|
|
series_matching_sequence = [x for x in series if x["sequence"] == "1"]
|
|
|
|
if len(series_matching_sequence) == 1:
|
|
return series_matching_sequence[0]["asin"]
|
|
|
|
# TODO: search by name
|
|
return series[0]["asin"]
|
|
|
|
|
|
def main():
|
|
|
|
libraries = abs.get_library_ids()
|
|
|
|
for library in libraries:
|
|
series = abs.get_series_by_library_id(library["id"])
|
|
|
|
for serie in alive_progress.alive_it(series):
|
|
series_name = serie["name"]
|
|
abs_book_sequence = process_abs_serie(serie["books"], series_name)
|
|
|
|
if not abs_book_sequence:
|
|
logger.debug("No ASINs found for series: %s", series_name)
|
|
continue
|
|
|
|
first_book_asin = abs_book_sequence.get_first_book()
|
|
series_asin = get_serie_asin(first_book_asin, series_name)
|
|
|
|
if not series_asin:
|
|
logger.debug("Serie does not exist: %s", series_name)
|
|
continue
|
|
|
|
audible_serie = audible.get_produce_from_asin(series_asin)
|
|
audible_book_sequence = process_sequence(audible_serie["relationships"])
|
|
|
|
if len(abs_book_sequence) >= len(audible_book_sequence):
|
|
continue
|
|
|
|
missing_keys = set(
|
|
[
|
|
key
|
|
for key in audible_book_sequence.keys()
|
|
if key not in abs_book_sequence
|
|
]
|
|
)
|
|
|
|
# Separate missing and soon-to-be-released books
|
|
missing_books = []
|
|
soon_to_release_books = []
|
|
|
|
for key in missing_keys:
|
|
try:
|
|
audnexus.get_book_from_asin(audible_book_sequence[key][0])
|
|
missing_books.append(key)
|
|
|
|
except requests.exceptions.HTTPError:
|
|
logger.debug("%s Book %d is yet to be released", series_name, key)
|
|
soon_to_release_books.append(key)
|
|
|
|
msgs = []
|
|
|
|
if missing_books:
|
|
msgs.append(f"{len(missing_books)} book.s missing")
|
|
if soon_to_release_books:
|
|
msgs.append(f"{len(soon_to_release_books)} book.s yet to be released")
|
|
|
|
for i, msg in enumerate(msgs):
|
|
logger.info(
|
|
"%s - %s",
|
|
series_name if i == 0 else "".ljust(len(series_name)),
|
|
msg,
|
|
)
|
|
|
|
# TODO: add input to choose which library is to be scaned
|
|
break
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("-d", "--dev", action="store_true")
|
|
parser.add_argument("-v", "--verbose", action="store_true")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.dev:
|
|
abs = connectors.ABSConnectorMock(config.ABS_API_URL, config.ABS_API_TOKEN)
|
|
audible = connectors.AudibleConnectorMock(config.AUDIBLE_AUTH_FILE)
|
|
audnexus = connectors.AudNexusConnectorMock()
|
|
else:
|
|
abs = connectors.ABSConnector(config.ABS_API_URL, config.ABS_API_TOKEN)
|
|
audible = connectors.AudibleConnector(config.AUDIBLE_AUTH_FILE)
|
|
audnexus = connectors.AudNexusConnector()
|
|
|
|
logging.basicConfig(
|
|
filename="log",
|
|
filemode="w",
|
|
format="%(levelname)s - %(message)s" if args.verbose else "%(message)s",
|
|
)
|
|
logging.getLogger("httpx").setLevel(logging.WARNING)
|
|
logging.getLogger("audible").setLevel(logging.WARNING)
|
|
logging.getLogger("urllib3").setLevel(logging.WARNING)
|
|
logging.getLogger("httpcore").setLevel(logging.WARNING)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
logger.setLevel(logging.DEBUG if args.verbose else logging.INFO)
|
|
main()
|