From b4d64ccd34d8875893f3c89da04ccad89e33505a Mon Sep 17 00:00:00 2001 From: Ben Goldsworthy Date: Sat, 3 Feb 2024 23:25:14 +0000 Subject: [PATCH] fix script --- scripts/add_item.py | 127 +++++++++++++++++++++++++++----------------- 1 file changed, 79 insertions(+), 48 deletions(-) diff --git a/scripts/add_item.py b/scripts/add_item.py index bee5e93..e86d070 100644 --- a/scripts/add_item.py +++ b/scripts/add_item.py @@ -13,6 +13,7 @@ from dotenv import load_dotenv authors = [] + def setup_logger(name="add_item"): """Set up the logger for console and file""" @@ -50,14 +51,18 @@ if "" == TVDB_API_KEY: logger.error("TVDB API key not found") -def return_if_exists(item_id, media_type, log) -> dict|None: +def return_if_exists(item_id, media_type, log) -> dict | None: """Returns an item if it exists in the requested log""" logger.info(f"Checking for '{item_id}' in '{log}'…") - with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file: + with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file: log_items = json.load(log_file) - existing_items = [log_item for log_item in log_items if "id" in log_item and log_item['id'] == int(item_id)] + existing_items = [ + log_item + for log_item in log_items + if "id" in log_item and log_item["id"] == int(item_id) + ] if len(existing_items) > 0: logger.info(f"Found item in '{log}'") return existing_items[-1] @@ -68,15 +73,19 @@ def delete_existing(item_id, media_type, log) -> None: """Deletes an item from a log if it matches the ID""" logger.info(f"Deleting '{item_id}' from '{log}'…") - with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file: + with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file: log_items = json.load(log_file) old_len = len(log_items) - log_items = [log_item for log_item in log_items if "id" not in log_item or ("id" in log_item and log_item['id'] != int(item_id))] + log_items = [ + log_item + for log_item in log_items + if "id" not in log_item or ("id" in log_item and log_item["id"] != int(item_id)) + ] if len(log_items) < (old_len - 1): raise Exception("More than one deletion made, discarding…") - with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file: + with open(f"./data/{media_type}/{log}.json", "w", encoding="utf-8") as log_file: json.dump(log_items, log_file, indent=4) logger.info(f"'{item_id}' deleted from '{log}'") @@ -94,8 +103,15 @@ def check_for_existing(item_id, media_type, log) -> dict[dict, str]: existing_item["is_repeat"] = True return existing_item, None - for log_to_check in [p_log for p_log in ["log", "current", "wishlist"] if p_log != log]: - if ("current" == log_to_check and media_type in ["books", "games", "tv-series"]) or ("wishlist" == log_to_check and media_type in ["books", "games", "films", "tv-series"]): + for log_to_check in [ + p_log for p_log in ["log", "current", "wishlist"] if p_log != log + ]: + if ( + "current" == log_to_check and media_type in ["books", "games", "tv-series"] + ) or ( + "wishlist" == log_to_check + and media_type in ["books", "games", "films", "tv-series"] + ): existing_item = return_if_exists(item_id, media_type, log_to_check) if existing_item is not None: return existing_item, log_to_check @@ -156,12 +172,12 @@ def add_item_to_log(item_id, media_type, log) -> None: # Save changes logger.info(f"Adding {media_type} to {log}…") - with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file: + with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file: log_items = json.load(log_file) log_items.insert(0, item) - with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file: + with open(f"./data/{media_type}/{log}.json", "w", encoding="utf-8") as log_file: json.dump(log_items, log_file, indent=4) logger.info(f"Added {media_type} {item_id} to {log}") @@ -177,10 +193,12 @@ def import_by_id(import_id, media_type) -> dict: return import_from_tmdb_by_id(import_id, media_type) if media_type in ["tv-episodes"]: - return #import_from_tvdb_by_id(import_id, media_type) + return # import_from_tvdb_by_id(import_id, media_type) if media_type in ["books"]: - return import_from_openlibrary_by_id(import_id, media_type) + return import_from_openlibrary_by_id( + "".join(re.findall(r"\d+", import_id)), media_type + ) def import_from_tmdb_by_id(tmdb_id, media_type) -> dict: @@ -191,9 +209,7 @@ def import_from_tmdb_by_id(tmdb_id, media_type) -> dict: # Sending API request response = requests.get( - api_url, - headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, - timeout=15 + api_url, headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, timeout=15 ) # Process the response @@ -212,14 +228,8 @@ def import_from_tmdb_by_id(tmdb_id, media_type) -> dict: response_data = json.loads(response.text) - if 1 == len(response_data): - item = response_data[0] - - elif 0 == len(response_data): - raise Exception(f"Returned no results for {tmdb_id}") - # Modify the returned result to add additional data - return cleanup_result(item, media_type) + return cleanup_result(response_data, media_type) def import_from_openlibrary_by_id(isbn, media_type) -> dict: @@ -253,10 +263,12 @@ def import_from_openlibrary_by_id(isbn, media_type) -> dict: for i, sub_item in enumerate(item[key]): item[key][i] = import_from_openlibrary_by_ol_key(sub_item["key"]) - if "works" in item: if len(item["works"]) > 1: - raise Exception(f"Multiple works found for {isbn}") + print(f"Multiple works found for {isbn}:") + print(item["works"]) + idx = input(f"Select ID to use [0-{len(item['works'])-1}]: ") + item["works"][0] = item["works"][int(idx)] item["work"] = item["works"][0] del item["works"] @@ -275,20 +287,28 @@ def import_from_openlibrary_by_ol_key(key) -> dict: _, mode, ol_id = key.split("/") if "authors" == mode: - with open(f"./scripts/caching/authors.json", "r", encoding='utf-8') as authors_cache: + with open( + f"./scripts/caching/authors.json", "r", encoding="utf-8" + ) as authors_cache: cached_authors = json.load(authors_cache) if mode in ["works", "authors"]: if "authors" == mode: - matched_cached_authors = [aut for aut in cached_authors if aut['id'] == ol_id] + matched_cached_authors = [ + aut for aut in cached_authors if aut["id"] == ol_id + ] if len(matched_cached_authors) == 1: - logging.info(f"Found cached author '{matched_cached_authors[0]['name']}'") + logging.info( + f"Found cached author '{matched_cached_authors[0]['name']}'" + ) return matched_cached_authors[0] api_url = f"https://openlibrary.org{key}" # Sending API request - response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15) + response = requests.get( + api_url, headers={"accept": "application/json"}, timeout=15 + ) # Process the response if 200 == response.status_code: @@ -316,9 +336,7 @@ def import_from_openlibrary_by_ol_key(key) -> dict: logger.info(f"Caching author '{author['name']}'…") cached_authors.append(author) with open( - f"./scripts/caching/authors.json", - "w", - encoding='utf-8' + f"./scripts/caching/authors.json", "w", encoding="utf-8" ) as authors_cache: json.dump(cached_authors, authors_cache, indent=4) logger.info(f"Author '{author['name']}' cached!") @@ -345,39 +363,45 @@ def cleanup_result(item, media_type) -> dict: for field_name in [ "adult", # TMDB "backdrop_path", # TMDB + "budget", # TMDB "copyright_date", # OpenLibrary "classifications", # OpenLibrary "created", # OpenLibrary - "dewey_decimal_class", # OpenLibary + "dewey_decimal_class", # OpenLibary "episode_type", # TMDB "first_sentence", # OpenLibrary "genre_ids", # TMDB + "homepage", # TMDB "identifiers", # OpenLibrary "media_type", # TMDB "last_modified", # OpenLibrary "latest_revision", # OpenLibrary "lc_classifications", # OpenLibrary - "lccn", # OpenLibrary + "lccn", # OpenLibrary "local_id", # OpenLibrary - "notes", # OpenLibrary + "notes", # OpenLibrary "ocaid", # OpenLibrary "oclc_numbers", # OpenLibrary - "pagination", # OpenLibrary - "physical_dimensions", # OpenLibrary + "pagination", # OpenLibrary + "physical_dimensions", # OpenLibrary "popularity", # TMDB "production_code", # TMDB + "production_companies", # TMDB + "revenue", # TMDB "revision", # OpenLibrary "runtime", # TMDB "source_records", # OpenLibrary + "status", # TMDB "still_path", # TMDB - "table_of_contents", # OpenLibrary + "table_of_contents", # OpenLibrary + "tagline", # TMDB "type", # OpenLibrary - "uri_descriptions", # OpenLibrary - "url", # OpenLibrary + "uri_descriptions", # OpenLibrary + "url", # OpenLibrary "video", # TMDB "vote_average", # TMDB "vote_count", # TMDB - "weight", # OpenLibrary + "weight", # OpenLibrary ]: if field_name in item: del item[field_name] @@ -413,21 +437,28 @@ def cleanup_result(item, media_type) -> dict: ] if "translation_of" in item: - if item["translation_of"].split(":")[0].lower() == item["work"]["title"].split(":")[0].lower(): - del item["translation_of"] - else: - raise Exception( + if not ( + item["translation_of"].split(":")[0].lower() + == item["work"]["title"].split(":")[0].lower() + ): + logger.warn( f"translation_of '{item['translation_of']}' \ is different to work title '{item['work']['title']}'" ) + if 'y' != input("Accept change? [y|n]: "): + raise Exception( + f"translation_of '{item['translation_of']}' \ + is different to work title '{item['work']['title']}'" + ) + del item["translation_of"] if "translated_from" in item: if len(item["translated_from"]) > 1: raise Exception("Multiple translated_from results") - item["work"]["original_language"] = item["translated_from"][0][ - "key" - ].split("/")[2] + item["work"]["original_language"] = item["translated_from"][0]["key"].split( + "/" + )[2] del item["translated_from"] if "date_added" not in item: @@ -459,7 +490,7 @@ def main() -> None: log = input("Enter log to update [log|current|wishlist]: ") while re.search("[0-9]+", item_id) is None: - item_id = input("Enter ISBN: ") + item_id = "".join(re.findall(r"\d+", input("Enter ISBN: "))) elif "tv-episodes" == media_type: log = "log"