diff --git a/scripts/add_item.py b/scripts/add_item.py index c618537..4e4b46d 100644 --- a/scripts/add_item.py +++ b/scripts/add_item.py @@ -1,5 +1,11 @@ """ -Add a new item to a media catalogue, using various APIs. +Add a new item to a media catalogue, using various APIs: + +- TV series' and films using the TMDB API and IDs; +- TV episodes using the TMDB API and TVDB IDs (because the TMDB + API is difficult and a lot of TMDB records don't have IMDB IDs); +- books using the OpenLibrary API and ISBNs; and +- games using the GiantBomb API and IDs. """ import json @@ -43,12 +49,9 @@ logger = setup_logger() load_dotenv() TMDB_API_KEY = os.getenv("TMDB_API_KEY") -TVDB_API_KEY = os.getenv("TVDB_API_KEY") if "" == TMDB_API_KEY: logger.error("TMDB API key not found") -if "" == TVDB_API_KEY: - logger.error("TVDB API key not found") def return_if_exists(item_id, media_type, log) -> dict | None: @@ -91,7 +94,12 @@ def delete_existing(item_id, media_type, log) -> None: def check_for_existing(item_id, media_type, log) -> dict[dict, str]: - """Check for an existing item and move it to the specified log if requested""" + """ + Check for an existing item in the current log, and pull the + `date_added` etc. and mark it as a repeat if so. + Otherwise, check for an existing item in the other logs, and move + it to the specified log if so. + """ logger.info(f"Checking for '{item_id}' in logs…") @@ -134,6 +142,11 @@ def add_item_to_log(item_id, media_type, log) -> None: if item is None: raise Exception("No item found") + if "books" == media_type and "wishlist" != log: + item, log_to_delete = check_for_existing(item['work']['ol_id'], media_type, log) + if item is None: + item, log_to_delete = check_for_existing(item['ol_id'], media_type, log) + if log in ["log", "current"]: if "date_started" not in item and media_type in ["books", "tv-series", "games"]: date_started = "" @@ -196,28 +209,28 @@ def import_by_id(import_id, media_type, log) -> dict: return import_from_tmdb_by_id(import_id, media_type) if media_type in ["tv-episodes"]: - return import_from_tmdb_by_imdb_id(import_id, media_type) + return import_from_tmdb_by_external_id(import_id, media_type) if media_type in ["books"]: if "wishlist" == log: return import_from_openlibrary_by_ol_key(import_id) else: - return import_from_openlibrary_by_id( + return import_from_openlibrary_by_isbn( "".join(re.findall(r"\d+", import_id)), media_type ) -def import_from_tmdb_by_imdb_id(imdb_id, media_type) -> dict: - """Retrieve a film, TV show or TV episode from TMDB using an IMDB ID""" +def import_from_tmdb_by_external_id(external_id, media_type) -> dict: + """Retrieve a film, TV show or TV episode from TMDB using an IMDB or TVDB ID""" - api_url = f"https://api.themoviedb.org/3/find/{imdb_id}" + api_url = f"https://api.themoviedb.org/3/find/{external_id}" # Sending API request response = requests.get( api_url, headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, - params={"external_source": "imdb_id"}, + params={"external_source": "imdb_id" if re.search("tt[0-9]+", external_id) else "tvdb_id"}, timeout=15 ) @@ -227,7 +240,7 @@ def import_from_tmdb_by_imdb_id(imdb_id, media_type) -> dict: elif 429 == response.status_code: time.sleep(2) - return import_from_tmdb_by_imdb_id(imdb_id, media_type) + return import_from_tmdb_by_external_id(external_id, media_type) else: raise Exception(f"Error {response.status_code}: {response.text}") @@ -242,7 +255,7 @@ def import_from_tmdb_by_imdb_id(imdb_id, media_type) -> dict: response_data = json.loads(response.text)[key][0] if response_data == None: - raise Exception(f"Nothing found for IMDB ID {imdb_id}!") + raise Exception(f"Nothing found for TVDB ID {external_id}!") # Modify the returned result to add additional data return cleanup_result(response_data, media_type) @@ -251,9 +264,6 @@ def import_from_tmdb_by_imdb_id(imdb_id, media_type) -> dict: def import_from_tmdb_by_id(tmdb_id, media_type) -> dict: """Retrieve a film, TV show or TV episode from TMDB using an TMDB ID""" - if "tv-episodes" == media_type: - raise Exception("TV Episodes are TODO!") - api_path = "movie" if "films" == media_type else "tv" api_url = f"https://api.themoviedb.org/3/{api_path}/{tmdb_id}" @@ -279,7 +289,7 @@ def import_from_tmdb_by_id(tmdb_id, media_type) -> dict: return cleanup_result(response_data, media_type) -def import_from_openlibrary_by_id(isbn, media_type) -> dict: +def import_from_openlibrary_by_isbn(isbn, media_type) -> dict: """Retrieve a film, TV show or TV episode from TMDB using an IMDB ID""" logging.info(f"Importing '{isbn}'…") @@ -295,7 +305,7 @@ def import_from_openlibrary_by_id(isbn, media_type) -> dict: elif 429 == response.status_code: time.sleep(2) - return import_from_openlibrary_by_id(isbn, media_type) + return import_from_openlibrary_by_isbn(isbn, media_type) elif 404 == response.status_code: logger.error(f"{response.status_code}: Not Found for ISBN '{isbn}'") @@ -377,7 +387,7 @@ def import_from_openlibrary_by_ol_key(key) -> dict: item = json.loads(response.text) if "authors" == mode: - author = {"id": ol_id, "name": item["name"]} + author = {"ol_id": ol_id, "name": item["name"]} if "personal_name" in item: if item["name"] != item["personal_name"]: @@ -394,7 +404,7 @@ def import_from_openlibrary_by_ol_key(key) -> dict: return author if "works" == mode: - work = {"id": ol_id, "title": item["title"]} + work = {"ol_id": ol_id, "title": item["title"]} for result_key in ["first_publish_date", "subjects"]: if result_key in item: @@ -437,6 +447,7 @@ def cleanup_result(item, media_type) -> dict: "popularity", # TMDB "production_code", # TMDB "production_companies", # TMDB + "publish_places", # OpenLibrary "revenue", # TMDB "revision", # OpenLibrary "runtime", # TMDB @@ -456,8 +467,8 @@ def cleanup_result(item, media_type) -> dict: if field_name in item: del item[field_name] - if media_type in ["films", "tv-series"]: - item["id"] = item["tmdb_id"] + if media_type in ["films", "tv-series", "tv-episodes"]: + item["tmdb_id"] = item["id"] del item["id"] title_key = "name" if "tv-series" == media_type else "title" @@ -469,6 +480,10 @@ def cleanup_result(item, media_type) -> dict: ): del item[f"original_{title_key}"], item["original_language"] + if "tv-episodes" == media_type: + item['series']['tmdb_id'] = item['show_id'] + del item['show_id'] + if "books" == media_type: _, _, item["ol_id"] = item["key"].split("/") del item["key"] @@ -480,10 +495,6 @@ def cleanup_result(item, media_type) -> dict: item[key] = item[key][0] - if "publish_places" in item: - item["published_in"] = item["publish_places"] - del item["publish_places"] - if "languages" in item: item["languages"] = [ lang["key"].split("/")[2] for lang in item["languages"] @@ -561,7 +572,7 @@ def main() -> None: while re.search("[0-9]+", item_id) is None: item_id = input("Enter TMDB ID: ") - add_item_to_log(item_id, media_type, log) + add_item_to_log(re.search("[0-9]+", item_id)[0], media_type, log) except Exception: logger.exception("Exception occurred") diff --git a/scripts/process_logs.py b/scripts/process_logs.py index e5c4d3e..94f6438 100644 --- a/scripts/process_logs.py +++ b/scripts/process_logs.py @@ -33,9 +33,17 @@ def process_log(media_type, log) -> None: log_item_values = {} + id_key = "" + if "books" == media_type: + id_key = "ol_id" + elif media_type in ["films", "tv-series", "tv-episodes"]: + id_key = "tmdb_id" + elif "games" == media_type: + id_key = "gb_id" + for i, item in enumerate(log_items): try: - if "id" not in item and "skip" not in item: + if id_key not in item and "skip" not in item: if media_type in ["films", "books"]: item_title = item["Title"] elif "tv-episodes" == media_type: @@ -50,10 +58,16 @@ def process_log(media_type, log) -> None: log_item_values["date_added"] = item["Date Added"] del item["Date Added"] + if "date_added" in item: + log_item_values["date_added"] = item["date_added"] + if "Date Started" in item: log_item_values["date_started"] = item["Date Started"] del item["Date Started"] + if "date_started" in item: + log_item_values["date_started"] = item["date_started"] + if "Date Finished" in item: log_item_values["date_finished"] = item["Date Finished"] del item["Date Finished"] @@ -63,10 +77,16 @@ def process_log(media_type, log) -> None: else: raise Exception(f"'Date Read' != 'Date Finished' for {item['Title']}") + if "date_finished" in item: + log_item_values["date_finished"] = item["date_finished"] + if "Read Count" in item: log_item_values["read_count"] = item["Read Count"] del item["Read Count"] + if "read_count" in item: + log_item_values["read_count"] = item["read_count"] + if "Date Watched" in item: log_item_values["date_finished"] = item["Date Watched"] del item["Date Watched"] @@ -116,11 +136,18 @@ def process_log(media_type, log) -> None: if "IMDB ID" in item and item["IMDB ID"] != "": new_log_item = import_by_id(item["IMDB ID"], media_type) + elif "books" == media_type and "wishlist" == log: + ol_work_id = re.search("OL[0-9]+W", input(f"Enter OpenLibrary Work ID for '{item_title}' ({item['Author']}): ")) + try: + new_log_item = import_by_id(ol_work_id[0], media_type, log) + except: + logger.info("Skipping…") + elif "ISBN13" in item and item["ISBN13"] != "" and item["ISBN13"] is not None: - new_log_item = import_by_id(item["ISBN13"], media_type) + new_log_item = import_by_id(item["ISBN13"], media_type, log) elif "ISBN" in item and item["ISBN"] != "" and item["ISBN"] is not None: - new_log_item = import_by_id(item["ISBN"], media_type) + new_log_item = import_by_id(item["ISBN13"], media_type, log) else: new_log_item = import_by_details(item, item_title, media_type) @@ -163,7 +190,7 @@ def process_log(media_type, log) -> None: else: log_items[i] = new_log_item - if i % 10 == 0: + if i % 3 == 0: with open( f"./data/{media_type}/{log}.json", "w",