""" Add a new item to a media catalogue, using various APIs: - TV series' and films using the TMDB API and IDs; - TV episodes using the TMDB API and TVDB IDs (because the TMDB API is difficult and a lot of TMDB records don't have IMDB IDs); - books using the OpenLibrary API and ISBNs; and - games using the GiantBomb API and IDs. """ import json import logging import os import re import time from datetime import datetime import requests from dotenv import load_dotenv authors = [] def setup_logger(name="add_item"): """Set up the logger for console and file""" logr = logging.getLogger(name) c_handler = logging.StreamHandler() f_handler = logging.FileHandler("./logs/run.log") logging.root.setLevel(logging.INFO) c_handler.setLevel(logging.INFO) f_handler.setLevel(logging.WARNING) c_format = logging.Formatter("%(name)s - %(levelname)s - %(message)s") f_format = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") c_handler.setFormatter(c_format) f_handler.setFormatter(f_format) logr.addHandler(c_handler) logr.addHandler(f_handler) return logr logger = setup_logger() load_dotenv() TMDB_API_KEY = os.getenv("TMDB_API_KEY") if "" == TMDB_API_KEY: logger.error("TMDB API key not found") def return_if_exists(item_id: str, media_type: str, log: str) -> dict | None: """Returns an item if it exists in the requested log""" logger.info(f"Checking for '{item_id}' in '{log}'…") with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file: log_items = json.load(log_file) id_key = "id" if "books" == media_type: if re.search("OL[0-9]+[MW]", item_id) is not None: id_key = "ol_id" elif re.search("[0-9]{13}", item_id) is not None: id_key = "isbn_13" elif re.search("[0-9]{10}", item_id) is not None: id_key = "isbn_10" else: raise Exception("Invalid ID for book") existing_items = [ log_item for log_item in log_items if id_key in log_item and log_item[id_key] == item_id ] if len(existing_items) > 0: logger.info(f"Found item in '{log}'") return existing_items[-1] logger.info(f"'{item_id}' not found in '{log}'") def delete_existing(item_id: str, media_type: str, log: str) -> None: """Deletes an item from a log if it matches the ID""" logger.info(f"Deleting '{item_id}' from '{log}'…") with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file: log_items = json.load(log_file) id_key = "id" if "books" == media_type: if re.search("OL[0-9]+[MW]", item_id) is not None: id_key = "ol_id" elif re.search("[0-9]{13}", item_id) is not None: id_key = "isbn_13" elif re.search("[0-9]{10}", item_id) is not None: id_key = "isbn_10" else: raise Exception("Invalid ID for book") elif media_type in ["films", "tv-episodes"]: if re.search("tt[0-9]+", item_id) is not None: id_key = "isbn_id" elif re.search("[0-9]+", item_id) is not None: id_key = "tmdb_id" else: raise Exception("Invalid ID for film") old_len = len(log_items) log_items = [ log_item for log_item in log_items if id_key not in log_item or (id_key in log_item and log_item[id_key] != item_id) ] if len(log_items) < (old_len - 1): raise Exception("More than one deletion made, discarding…") elif len(log_items) == old_len: raise Exception("No item deleted, skipping…"s) with open(f"./data/{media_type}/{log}.json", "w", encoding="utf-8") as log_file: json.dump(log_items, log_file, indent=4) logger.info(f"'{item_id}' deleted from '{log}'") def check_for_existing( item_id, media_type, log ) -> tuple[dict[dict, str] | None, str | None]: """ Check for an existing item in the current log, and pull the `date_added` etc. and mark it as a repeat if so. Otherwise, check for an existing item in the other logs, and move it to the specified log if so. """ logger.info(f"Checking for '{item_id}' in logs…") # Check in specified log existing_item = return_if_exists(item_id, media_type, log) if existing_item is not None: if "log" == log: existing_item["is_repeat"] = True return existing_item, None for log_to_check in [ p_log for p_log in ["log", "current", "wishlist"] if p_log != log ]: if ( "current" == log_to_check and media_type in ["books", "games", "tv-series"] ) or ( "wishlist" == log_to_check and media_type in ["books", "games", "films", "tv-series"] ): existing_item = return_if_exists(item_id, media_type, log_to_check) if existing_item is not None: return existing_item, log_to_check return None, None def add_item_to_log(item_id: str, media_type: str, log: str) -> None: """Add a film, book, TV series or TV episode to a log""" logger.info(f"Processing {item_id}…") item: dict | None = None log_to_delete = None if media_type not in ["tv-episodes", "books"]: item, log_to_delete = check_for_existing(item_id, media_type, log) if item is None: item = import_by_id(item_id, media_type, log) if item is None: raise Exception("No item found") if "books" == media_type: new_item, log_to_delete = check_for_existing( item["work"]["ol_id"], media_type, log ) if new_item is None: new_item, log_to_delete = check_for_existing(item["ol_id"], media_type, log) if new_item is None: new_item, log_to_delete = check_for_existing( item["isbn_13"], media_type, log ) if new_item is None: new_item, log_to_delete = check_for_existing( item["isbn_10"], media_type, log ) item = new_item if new_item is not None else item if log in ["log", "current"]: if "date_started" not in item and media_type in ["books", "tv-series", "games"]: date_started = "" while re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}", date_started) is None: date_started = input("Enter date started [YYYY-MM-DD, t for today]: ") if "t" == date_started: date_started = datetime.today().strftime("%Y-%m-%d") item["date_started"] = date_started if "date_finished" not in item and "log" == log: date_finished = "" while re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}", date_finished) is None: date_finished = input("Enter date finished [YYYY-MM-DD, t for today]: ") if "t" == date_finished: date_finished = datetime.today().strftime("%Y-%m-%d") item["date_finished"] = date_finished if "is_repeat" not in item: is_repeat = "" while is_repeat not in ["y", "n"]: is_repeat = input("Is this a repeat entry? [y/n]: ") if "y" == is_repeat: item["is_repeat"] = True if "added_by_id" not in item: item["added_by_id"] = item_id if "comments" not in item: comments = input("Enter comments (optional): ") if "" != comments: item["comments"] = comments # Validation step print(f"{media_type} data to add:\n") print(json.dumps(item, indent=4)) if "y" != input("\nDoes this look correct? [y]: "): return # Save changes logger.info(f"Adding {media_type} to {log}…") with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file: log_items = json.load(log_file) log_items.insert(0, item) with open(f"./data/{media_type}/{log}.json", "w", encoding="utf-8") as log_file: json.dump(log_items, log_file, indent=4) logger.info(f"Added {media_type} {item_id} to {log}") if log_to_delete is not None: delete_existing(item_id, media_type, log_to_delete) def import_by_id(import_id, media_type, log) -> dict | None: """Import from the appropriate API by unique ID""" if media_type in ["films", "tv-series"]: return import_from_tmdb_by_id(import_id, media_type) if media_type in ["tv-episodes"]: return import_from_tmdb_by_external_id(import_id, media_type) if media_type in ["books"]: if "wishlist" == log: return import_from_openlibrary_by_ol_key(import_id) else: return import_from_openlibrary_by_isbn( "".join(re.findall(r"\d+", import_id)), media_type ) def import_from_tmdb_by_external_id(external_id, media_type) -> dict: """Retrieve a film, TV show or TV episode from TMDB using an IMDB or TVDB ID""" api_url = f"https://api.themoviedb.org/3/find/{external_id}" # Sending API request response = requests.get( api_url, headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, params={ "external_source": ( "imdb_id" if re.search("tt[0-9]+", external_id) else "tvdb_id" ) }, timeout=15, ) # Process the response if 200 == response.status_code: logger.debug(response.status_code) elif 429 == response.status_code: time.sleep(2) return import_from_tmdb_by_external_id(external_id, media_type) else: raise Exception(f"Error {response.status_code}: {response.text}") key = "" if "tv-episodes" == media_type: key = "tv_episode_results" elif "tv-series" == media_type: key = "tv_results" elif "films" == media_type: key = "movie_results" response_data = json.loads(response.text)[key][0] if response_data is None: raise Exception(f"Nothing found for TVDB ID {external_id}!") # Modify the returned result to add additional data return cleanup_result(response_data, media_type) def import_from_tmdb_by_id(tmdb_id, media_type) -> dict: """Retrieve a film, TV show or TV episode from TMDB using an TMDB ID""" api_path = "movie" if "films" == media_type else "tv" api_url = f"https://api.themoviedb.org/3/{api_path}/{tmdb_id}" # Sending API request response = requests.get( api_url, headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, timeout=15 ) # Process the response if 200 == response.status_code: logger.debug(response.status_code) elif 429 == response.status_code: time.sleep(2) return import_from_tmdb_by_id(tmdb_id, media_type) else: raise Exception(f"Error {response.status_code}: {response.text}") response_data = json.loads(response.text) # Modify the returned result to add additional data return cleanup_result(response_data, media_type) def import_from_openlibrary_by_isbn(isbn, media_type) -> dict | None: """Retrieve a film, TV show or TV episode from TMDB using an IMDB ID""" logging.info(f"Importing '{isbn}'…") api_url = f"https://openlibrary.org/isbn/{isbn}" # Sending API request response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15) # Process the response if 200 == response.status_code: logger.debug(response.status_code) elif 429 == response.status_code: time.sleep(2) return import_from_openlibrary_by_isbn(isbn, media_type) elif 404 == response.status_code: logger.error(f"{response.status_code}: Not Found for ISBN '{isbn}'") return None else: raise Exception(f"Error {response.status_code}: {response.text}") item = json.loads(response.text) for key in ["authors", "works"]: if key in item: for i, sub_item in enumerate(item[key]): item[key][i] = import_from_openlibrary_by_ol_key(sub_item["key"]) if "works" in item: if len(item["works"]) > 1: print(f"Multiple works found for {isbn}:") print(item["works"]) idx = input(f"Select ID to use [0-{len(item['works'])-1}]: ") item["works"][0] = item["works"][int(idx)] item["work"] = item["works"][0] del item["works"] # Rate limiting time.sleep(1) # Modify the returned result to add additional data return cleanup_result(item, media_type) def import_from_openlibrary_by_ol_key(key) -> dict | None: """Retrieves an item (author or work, NOT edition) from OpenLibrary using an OL key""" if len(key.split("/")) == 1: key = f"/works/{key}" logger.info(f"Retrieving {key}…") _, mode, ol_id = key.split("/") cached_authors = [] if "authors" == mode: with open( "./scripts/caching/authors.json", "r", encoding="utf-8" ) as authors_cache: cached_authors = json.load(authors_cache) if mode in ["works", "authors"]: if "authors" == mode: matched_cached_authors = [ aut for aut in cached_authors if aut["ol_id"] == ol_id ] if len(matched_cached_authors) == 1: logging.info( f"Found cached author '{matched_cached_authors[0]['name']}'" ) return matched_cached_authors[0] api_url = f"https://openlibrary.org{key}" # Sending API request response = requests.get( api_url, headers={"accept": "application/json"}, timeout=15 ) # Process the response if 200 == response.status_code: logger.debug(response.status_code) elif 429 == response.status_code: time.sleep(2) import_from_openlibrary_by_ol_key(key) else: raise Exception(f"Error {response.status_code}: {response.text}") # Rate limiting time.sleep(1) item = json.loads(response.text) if "authors" == mode: author = {"ol_id": ol_id, "name": item["name"]} print(author) if "personal_name" in item: if item["name"] != item["personal_name"]: author["personal_name"] = item["personal_name"] logger.info(f"Caching author '{author['name']}'…") cached_authors.append(author) with open( "./scripts/caching/authors.json", "w", encoding="utf-8" ) as authors_cache: json.dump(cached_authors, authors_cache, indent=4) logger.info(f"Author '{author['name']}' cached!") return author if "works" == mode: work = {"ol_id": ol_id, "title": item["title"], "authors": []} if "authors" in item: for author in item["authors"]: work["authors"].append( import_from_openlibrary_by_ol_key(author["author"]["key"]) ) for result_key in ["first_publish_date", "subjects"]: if result_key in item: work[result_key] = item[result_key] work["date_added"] = datetime.today().strftime("%Y-%m-%d") return work else: raise Exception(f"Unknown OpenLibrary key '{mode}'") def cleanup_result(item, media_type) -> dict: """Process a film, TV series, TV episode or book returned by their respective APIs by removing unnecessary fields and adding others""" for field_name in [ "adult", # TMDB "backdrop_path", # TMDB "budget", # TMDB "copyright_date", # OpenLibrary "classifications", # OpenLibrary "created", # OpenLibrary "dewey_decimal_class", # OpenLibary "episode_type", # TMDB "first_sentence", # OpenLibrary "genre_ids", # TMDB "homepage", # TMDB "identifiers", # OpenLibrary "media_type", # TMDB "last_modified", # OpenLibrary "latest_revision", # OpenLibrary "lc_classifications", # OpenLibrary "lccn", # OpenLibrary "local_id", # OpenLibrary "notes", # OpenLibrary "ocaid", # OpenLibrary "oclc_numbers", # OpenLibrary "pagination", # OpenLibrary "physical_dimensions", # OpenLibrary "popularity", # TMDB "production_code", # TMDB "production_companies", # TMDB "publish_places", # OpenLibrary "revenue", # TMDB "revision", # OpenLibrary "runtime", # TMDB "source_records", # OpenLibrary "status", # TMDB "still_path", # TMDB "table_of_contents", # OpenLibrary "tagline", # TMDB "type", # OpenLibrary "uri_descriptions", # OpenLibrary "url", # OpenLibrary "video", # TMDB "vote_average", # TMDB "vote_count", # TMDB "weight", # OpenLibrary ]: if field_name in item: del item[field_name] if media_type in ["films", "tv-series", "tv-episodes"]: item["tmdb_id"] = item["id"] del item["id"] title_key = "name" if "tv-series" == media_type else "title" if f"original_{title_key}" in item and "original_language" in item: if ( item[f"original_{title_key}"] == item[title_key] and item["original_language"] == "en" ): del item[f"original_{title_key}"], item["original_language"] if "tv-episodes" == media_type: item["series"] = {"tmdb_id": item["show_id"]} del item["show_id"] if "books" == media_type: _, _, item["ol_id"] = item["key"].split("/") del item["key"] for key in ["isbn_10", "isbn_13"]: if key in item: if len(item[key]) > 1: logger.warning("Multiple ISBN results") item[key] = item[key][0] if "languages" in item: item["languages"] = [ lang["key"].split("/")[2] for lang in item["languages"] ] if "translation_of" in item: if not ( item["translation_of"].split(":")[0].lower() == item["work"]["title"].split(":")[0].lower() ): logger.warn( f"translation_of '{item['translation_of']}' \ is different to work title '{item['work']['title']}'" ) if "y" != input("Accept change? [y|n]: "): raise Exception( f"translation_of '{item['translation_of']}' \ is different to work title '{item['work']['title']}'" ) del item["translation_of"] if "translated_from" in item: if len(item["translated_from"]) > 1: raise Exception("Multiple translated_from results") item["work"]["original_language"] = item["translated_from"][0]["key"].split( "/" )[2] del item["translated_from"] if "date_added" not in item: item["date_added"] = datetime.today().strftime("%Y-%m-%d") return item def main() -> None: """Prompt user to select media type and log to process""" media_type = "" while media_type not in ["films", "tv-episodes", "tv-series", "books"]: media_type = input("Select media type [films|tv-episodes|tv-series|books]: ") try: item_id = "" log = "" if "films" == media_type: while log not in ["log", "wishlist"]: log = input("Enter log to update [log|wishlist]: ") while re.search("[0-9]+", item_id) is None: item_id = input("Enter TMDB ID: ") elif "books" == media_type: while log not in ["log", "current", "wishlist"]: log = input("Enter log to update [log|current|wishlist]: ") while re.search("[0-9]+", item_id) is None: if "wishlist" == log: item_id = input("Enter OpenLibrary Work ID: ") else: item_id = "".join(re.findall(r"\d+", input("Enter ISBN: "))) elif "tv-episodes" == media_type: while re.search("(tt)?[0-9]+", item_id) is None: item_id = input("Enter TVDB or IMDB ID: ") elif "tv-series" == media_type: while log not in ["log", "current", "wishlist"]: log = input("Enter log to update [log|current|wishlist]: ") while re.search("[0-9]+", item_id) is None: item_id = input("Enter TMDB ID: ") item_id_parsed = re.search("(OL|tt)?[0-9]+[WMA]?", item_id) if item_id_parsed is not None: add_item_to_log(item_id_parsed[0], media_type, log) except Exception: logger.exception("Exception occurred") if __name__ == "__main__": main()