From 7864163f5ab07ba31db85cbdd30bc81ac41b967a Mon Sep 17 00:00:00 2001 From: Ben Goldsworthy Date: Fri, 15 Mar 2024 20:57:39 +0100 Subject: [PATCH] fix: search pre-existing items --- scripts/add_item.py | 124 +++++++++++++++++++++++++++++++------------- 1 file changed, 87 insertions(+), 37 deletions(-) diff --git a/scripts/add_item.py b/scripts/add_item.py index 49c3229..2637abd 100644 --- a/scripts/add_item.py +++ b/scripts/add_item.py @@ -54,17 +54,28 @@ if "" == TMDB_API_KEY: logger.error("TMDB API key not found") -def return_if_exists(item_id, media_type, log) -> dict | None: +def return_if_exists(item_id: str, media_type: str, log: str) -> dict | None: """Returns an item if it exists in the requested log""" logger.info(f"Checking for '{item_id}' in '{log}'…") with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file: log_items = json.load(log_file) + id_key = "id" + if "books" == media_type: + if re.search("OL[0-9]+[MW]", item_id) is not None: + id_key = "ol_id" + elif re.search("[0-9]{13}", item_id) is not None: + id_key = "isbn_13" + elif re.search("[0-9]{10}", item_id) is not None: + id_key = "isbn_10" + else: + raise Exception("Invalid ID for book") + existing_items = [ log_item for log_item in log_items - if "id" in log_item and log_item["id"] == item_id + if id_key in log_item and log_item[id_key] == item_id ] if len(existing_items) > 0: logger.info(f"Found item in '{log}'") @@ -72,18 +83,38 @@ def return_if_exists(item_id, media_type, log) -> dict | None: logger.info(f"'{item_id}' not found in '{log}'") -def delete_existing(item_id, media_type, log) -> None: +def delete_existing(item_id: str, media_type: str, log: str) -> None: """Deletes an item from a log if it matches the ID""" logger.info(f"Deleting '{item_id}' from '{log}'…") with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file: log_items = json.load(log_file) + id_key = "id" + if "books" == media_type: + if re.search("OL[0-9]+[MW]", item_id) is not None: + id_key = "ol_id" + elif re.search("[0-9]{13}", item_id) is not None: + id_key = "isbn_13" + elif re.search("[0-9]{10}", item_id) is not None: + id_key = "isbn_10" + else: + raise Exception("Invalid ID for book") + + elif media_type in ["films", "tv-episodes"]: + if re.search("tt[0-9]+", item_id) is not None: + id_key = "isbn_id" + elif re.search("[0-9]+", item_id) is not None: + id_key = "tmdb_id" + else: + raise Exception("Invalid ID for film") + old_len = len(log_items) log_items = [ log_item for log_item in log_items - if "id" not in log_item or ("id" in log_item and log_item["id"] != item_id) + if id_key not in log_item + or (id_key in log_item and log_item[id_key] != item_id) ] if len(log_items) < (old_len - 1): raise Exception("More than one deletion made, discarding…") @@ -93,7 +124,9 @@ def delete_existing(item_id, media_type, log) -> None: logger.info(f"'{item_id}' deleted from '{log}'") -def check_for_existing(item_id, media_type, log) -> dict[dict, str]: +def check_for_existing( + item_id, media_type, log +) -> tuple[dict[dict, str] | None, str | None]: """ Check for an existing item in the current log, and pull the `date_added` etc. and mark it as a repeat if so. @@ -127,14 +160,14 @@ def check_for_existing(item_id, media_type, log) -> dict[dict, str]: return None, None -def add_item_to_log(item_id, media_type, log) -> None: +def add_item_to_log(item_id: str, media_type: str, log: str) -> None: """Add a film, book, TV series or TV episode to a log""" logger.info(f"Processing {item_id}…") - item = None + item: dict | None = None log_to_delete = None - if "tv-episodes" != media_type and ("books" != media_type and "wishlist" != log): + if media_type not in ["tv-episodes", "books"]: item, log_to_delete = check_for_existing(item_id, media_type, log) if item is None: @@ -142,10 +175,21 @@ def add_item_to_log(item_id, media_type, log) -> None: if item is None: raise Exception("No item found") - if "books" == media_type and "wishlist" != log: - item, log_to_delete = check_for_existing(item['work']['ol_id'], media_type, log) - if item is None: - item, log_to_delete = check_for_existing(item['ol_id'], media_type, log) + if "books" == media_type: + new_item, log_to_delete = check_for_existing( + item["work"]["ol_id"], media_type, log + ) + if new_item is None: + new_item, log_to_delete = check_for_existing(item["ol_id"], media_type, log) + if new_item is None: + new_item, log_to_delete = check_for_existing( + item["isbn_13"], media_type, log + ) + if new_item is None: + new_item, log_to_delete = check_for_existing( + item["isbn_10"], media_type, log + ) + item = new_item if new_item is not None else item if log in ["log", "current"]: if "date_started" not in item and media_type in ["books", "tv-series", "games"]: @@ -202,7 +246,7 @@ def add_item_to_log(item_id, media_type, log) -> None: delete_existing(item_id, media_type, log_to_delete) -def import_by_id(import_id, media_type, log) -> dict: +def import_by_id(import_id, media_type, log) -> dict | None: """Import from the appropriate API by unique ID""" if media_type in ["films", "tv-series"]: @@ -230,8 +274,12 @@ def import_from_tmdb_by_external_id(external_id, media_type) -> dict: response = requests.get( api_url, headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, - params={"external_source": "imdb_id" if re.search("tt[0-9]+", external_id) else "tvdb_id"}, - timeout=15 + params={ + "external_source": ( + "imdb_id" if re.search("tt[0-9]+", external_id) else "tvdb_id" + ) + }, + timeout=15, ) # Process the response @@ -254,7 +302,7 @@ def import_from_tmdb_by_external_id(external_id, media_type) -> dict: key = "movie_results" response_data = json.loads(response.text)[key][0] - if response_data == None: + if response_data is None: raise Exception(f"Nothing found for TVDB ID {external_id}!") # Modify the returned result to add additional data @@ -289,7 +337,7 @@ def import_from_tmdb_by_id(tmdb_id, media_type) -> dict: return cleanup_result(response_data, media_type) -def import_from_openlibrary_by_isbn(isbn, media_type) -> dict: +def import_from_openlibrary_by_isbn(isbn, media_type) -> dict | None: """Retrieve a film, TV show or TV episode from TMDB using an IMDB ID""" logging.info(f"Importing '{isbn}'…") @@ -337,18 +385,19 @@ def import_from_openlibrary_by_isbn(isbn, media_type) -> dict: return cleanup_result(item, media_type) -def import_from_openlibrary_by_ol_key(key) -> dict: +def import_from_openlibrary_by_ol_key(key) -> dict | None: """Retrieves an item (author or work, NOT edition) from OpenLibrary using an OL key""" - if (len(key.split("/")) == 1): + if len(key.split("/")) == 1: key = f"/works/{key}" logger.info(f"Retrieving {key}…") _, mode, ol_id = key.split("/") + cached_authors = [] if "authors" == mode: with open( - f"./scripts/caching/authors.json", "r", encoding="utf-8" + "./scripts/caching/authors.json", "r", encoding="utf-8" ) as authors_cache: cached_authors = json.load(authors_cache) @@ -396,7 +445,7 @@ def import_from_openlibrary_by_ol_key(key) -> dict: logger.info(f"Caching author '{author['name']}'…") cached_authors.append(author) with open( - f"./scripts/caching/authors.json", "w", encoding="utf-8" + "./scripts/caching/authors.json", "w", encoding="utf-8" ) as authors_cache: json.dump(cached_authors, authors_cache, indent=4) logger.info(f"Author '{author['name']}' cached!") @@ -408,7 +457,9 @@ def import_from_openlibrary_by_ol_key(key) -> dict: if "authors" in item: for author in item["authors"]: - work["authors"].append(import_from_openlibrary_by_ol_key(author["author"]["key"])) + work["authors"].append( + import_from_openlibrary_by_ol_key(author["author"]["key"]) + ) for result_key in ["first_publish_date", "subjects"]: if result_key in item: @@ -429,7 +480,7 @@ def cleanup_result(item, media_type) -> dict: for field_name in [ "adult", # TMDB "backdrop_path", # TMDB - "budget", # TMDB + "budget", # TMDB "copyright_date", # OpenLibrary "classifications", # OpenLibrary "created", # OpenLibrary @@ -437,7 +488,7 @@ def cleanup_result(item, media_type) -> dict: "episode_type", # TMDB "first_sentence", # OpenLibrary "genre_ids", # TMDB - "homepage", # TMDB + "homepage", # TMDB "identifiers", # OpenLibrary "media_type", # TMDB "last_modified", # OpenLibrary @@ -452,16 +503,16 @@ def cleanup_result(item, media_type) -> dict: "physical_dimensions", # OpenLibrary "popularity", # TMDB "production_code", # TMDB - "production_companies", # TMDB - "publish_places", # OpenLibrary - "revenue", # TMDB + "production_companies", # TMDB + "publish_places", # OpenLibrary + "revenue", # TMDB "revision", # OpenLibrary "runtime", # TMDB "source_records", # OpenLibrary - "status", # TMDB + "status", # TMDB "still_path", # TMDB "table_of_contents", # OpenLibrary - "tagline", # TMDB + "tagline", # TMDB "type", # OpenLibrary "uri_descriptions", # OpenLibrary "url", # OpenLibrary @@ -487,8 +538,8 @@ def cleanup_result(item, media_type) -> dict: del item[f"original_{title_key}"], item["original_language"] if "tv-episodes" == media_type: - item['series'] = { 'tmdb_id': item['show_id'] } - del item['show_id'] + item["series"] = {"tmdb_id": item["show_id"]} + del item["show_id"] if "books" == media_type: _, _, item["ol_id"] = item["key"].split("/") @@ -515,7 +566,7 @@ def cleanup_result(item, media_type) -> dict: f"translation_of '{item['translation_of']}' \ is different to work title '{item['work']['title']}'" ) - if 'y' != input("Accept change? [y|n]: "): + if "y" != input("Accept change? [y|n]: "): raise Exception( f"translation_of '{item['translation_of']}' \ is different to work title '{item['work']['title']}'" @@ -546,8 +597,8 @@ def main() -> None: try: item_id = "" + log = "" if "films" == media_type: - log = "" while log not in ["log", "wishlist"]: log = input("Enter log to update [log|wishlist]: ") @@ -555,7 +606,6 @@ def main() -> None: item_id = input("Enter TMDB ID: ") elif "books" == media_type: - log = "" while log not in ["log", "current", "wishlist"]: log = input("Enter log to update [log|current|wishlist]: ") @@ -566,19 +616,19 @@ def main() -> None: item_id = "".join(re.findall(r"\d+", input("Enter ISBN: "))) elif "tv-episodes" == media_type: - log = "log" while re.search("(tt)?[0-9]+", item_id) is None: item_id = input("Enter TVDB or IMDB ID: ") elif "tv-series" == media_type: - log = "" while log not in ["log", "current", "wishlist"]: log = input("Enter log to update [log|current|wishlist]: ") while re.search("[0-9]+", item_id) is None: item_id = input("Enter TMDB ID: ") - add_item_to_log(re.search("(OL|tt)?[0-9]+[WMA]?", item_id)[0], media_type, log) + item_id_parsed = re.search("(OL|tt)?[0-9]+[WMA]?", item_id) + if item_id_parsed is not None: + add_item_to_log(item_id_parsed[0], media_type, log) except Exception: logger.exception("Exception occurred")