""" Add a new item to a media catalogue, using various APIs. """ import json import logging import os import re import time from datetime import datetime import requests from dotenv import load_dotenv def setup_logger(name="add_item"): """Set up the logger for console and file""" logging.root.setLevel(logging.NOTSET) logr = logging.getLogger(name) c_handler = logging.StreamHandler() f_handler = logging.FileHandler("./logs/run.log") c_handler.setLevel(logging.INFO) f_handler.setLevel(logging.ERROR) c_format = logging.Formatter("%(name)s - %(levelname)s - %(message)s") f_format = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") c_handler.setFormatter(c_format) f_handler.setFormatter(f_format) logr.addHandler(c_handler) logr.addHandler(f_handler) return logr logger = setup_logger() load_dotenv() TMDB_API_KEY = os.getenv("TMDB_API_KEY") TVDB_API_KEY = os.getenv("TVDB_API_KEY") if "" == TMDB_API_KEY: logger.error("TMDB API key not found") if "" == TVDB_API_KEY: logger.error("TVDB API key not found") def add_item_to_log(item_id, media_type, log) -> None: """Add a film, book, TV series or TV episode to a log""" logger.info(f"Processing {item_id}…") item: dict = import_by_id(item_id, media_type) if log in ["log", "current"]: # TODO - review this when moving from one log to another if media_type in ["books", "tv-series", "games"]: date_started = "" while re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}", date_started) is None: date_started = input("Enter date started [YYYY-MM-DD, t for today]: ") if "t" == date_started: date_started = datetime.today().strftime("%Y-%m-%d") item["date_started"] = date_started if "log" == log: date_finished = "" while re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}", date_finished) is None: date_finished = input("Enter date finished [YYYY-MM-DD, t for today]: ") if "t" == date_finished: date_finished = datetime.today().strftime("%Y-%m-%d") item["date_finished"] = date_finished # TODO - do this automatically is_repeat = "" while is_repeat not in ["y", "n"]: is_repeat = input("Is this a repeat entry? [y/n]: ") if "y" == is_repeat: item["is_repeat"] = True item["added_by_id"] = item_id comments = input("Enter comments (optional): ") if "" != comments: item["comments"] = comments # Validation step print(f"{media_type} data to add:\n") print(json.dumps(item, indent=4)) if "y" != input("\nDoes this look correct? [y]: "): return # Save changes logger.info(f"Adding {media_type} to {log}…") with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file: log_items = json.load(log_file) log_items.insert(0, item) with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file: json.dump(log_items, log_file, indent=4) logger.info(f"Added {media_type} {item_id} to {log}") def import_by_id(import_id, media_type) -> dict: """Import from the appropriate API by unique ID""" if media_type in ["films", "tv-series"]: return import_from_imdb_by_id(import_id, media_type) if media_type in ["tv-episodes"]: return #import_from_tvdb_by_id(import_id, media_type) if media_type in ["books"]: return import_from_openlibrary_by_id(import_id, media_type) def import_from_imdb_by_id(imdb_id, media_type) -> dict: """Retrieve a film, TV show or TV episode from TMDB using an IMDB ID""" api_url = f"https://api.themoviedb.org/3/find/{imdb_id}" # Sending API request response = requests.get( api_url, params={"external_source": "imdb_id"}, headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, timeout=15 ) # Process the response if 200 == response.status_code: logger.debug(response.status_code) elif 429 == response.status_code: time.sleep(2) return import_from_imdb_by_id(imdb_id, media_type) else: raise Exception(f"Error {response.status_code}: {response.text}") if "films" == media_type: results_key = "movie_results" elif "tv-episodes" == media_type: results_key = "TODO" elif "tv-series" == media_type: results_key = "tv_results" response_data = json.loads(response.text)[results_key] if 1 == len(response_data): item = response_data[0] elif 0 == len(response_data): raise Exception(f"Returned no results for {imdb_id}") elif 1 < len(response_data): logger.warning(f"Returned more than one {media_type} for ID '{imdb_id}'\n") print(json.dumps(response_data, indent=4)) idx = input("\nEnter the index of the result to use: ") try: item = response_data[int(idx)] except Exception as exc: raise Exception(f"Index {idx} is invalid") from exc # Modify the returned result to add additional data return cleanup_result(item, media_type) def import_from_openlibrary_by_id(isbn, media_type) -> dict: """Retrieve a film, TV show or TV episode from TMDB using an IMDB ID""" api_url = f"https://openlibrary.org/isbn/{isbn}" # Sending API request response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15) # Process the response if 200 == response.status_code: logger.debug(response.status_code) elif 429 == response.status_code: time.sleep(2) return import_from_openlibrary_by_id(isbn, media_type) else: raise Exception(f"Error {response.status_code}: {response.text}") item = json.loads(response.text) for key in ["authors", "works"]: if key in item: for i, sub_item in enumerate(item[key]): item[key][i] = import_from_openlibrary_by_ol_key(sub_item["key"]) if "works" in item: if len(item["works"]) > 1: raise Exception(f"Multiple works found for {isbn}") item["work"] = item["works"][0] del item["works"] # Modify the returned result to add additional data return cleanup_result(item, media_type) def import_from_openlibrary_by_ol_key(key) -> dict: """Retrieves an item (author or work, NOT edition) from OpenLibrary using an OL key""" _, mode, ol_id = key.split("/") if mode in ["works", "authors"]: api_url = f"https://openlibrary.org{key}" # Sending API request response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15) # Process the response if 200 == response.status_code: logger.debug(response.status_code) elif 429 == response.status_code: time.sleep(2) return import_from_openlibrary_by_ol_key(key) else: raise Exception(f"Error {response.status_code}: {response.text}") item = json.loads(response.text) if "authors" == mode: author = {"id": ol_id, "name": item["name"]} if "personal_name" in item: if item["name"] != item["personal_name"]: author["personal_name"] = item["personal_name"] return author if "works" == mode: work = {"id": ol_id, "title": item["title"]} for result_key in ["first_publish_date", "subjects"]: if result_key in item: work[result_key] = item[result_key] return work else: raise Exception(f"Unknown OpenLibrary key '{mode}'") def cleanup_result(item, media_type) -> dict: """Process a film, TV series, TV episode or book returned by their respective APIs by removing unnecessary fields and adding others""" for field_name in [ "adult", # TMDB "backdrop_path", # TMDB "copyright_date", # OpenLibrary "classifications", # OpenLibrary "created", # OpenLibrary "episode_type", # TMDB "first_sentence", # OpenLibrary "genre_ids", # TMDB "identifiers", # OpenLibrary "media_type", # TMDB "last_modified", # OpenLibrary "latest_revision", # OpenLibrary "lc_classifications", # OpenLibrary "local_id", # OpenLibrary "ocaid", # OpenLibrary "oclc_numbers", # OpenLibrary "popularity", # TMDB "production_code", # TMDB "revision", # OpenLibrary "runtime", # TMDB "source_records", # OpenLibrary "still_path", # TMDB "type", # OpenLibrary "video", # TMDB "vote_average", # TMDB "vote_count", # TMDB ]: if field_name in item: del item[field_name] if media_type in ["films", "tv-series"]: title_key = "name" if "tv-series" == media_type else "title" if f"original_{title_key}" in item and "original_language" in item: if ( item[f"original_{title_key}"] == item[title_key] and item["original_language"] == "en" ): del item[f"original_{title_key}"], item["original_language"] if "books" == media_type: _, _, item["id"] = item["key"].split("/") del item["key"] for key in ["isbn_10", "isbn_13"]: if key in item: if len(item[key]) > 1: raise Exception("Multiple ISBN results") item[key] = item[key][0] if "publish_places" in item: if len(item["publish_places"]) > 1: raise Exception("Multiple publish_places") item["published_in"] = item["publish_places"][0] del item["publish_places"] if "languages" in item: item["languages"] = [ lang["key"].split("/")[2] for lang in item["languages"] ] if "translation_of" in item: if item["translation_of"] == item["work"]["title"]: del item["translation_of"] else: raise Exception( f"translation_of '{item['translation_of']}' \ is different to work title '{item['work']['title']}'" ) if "translated_from" in item: if len(item["translated_from"]) > 1: raise Exception("Multiple translated_from results") item["work"]["original_language"] = item["translated_from"][0][ "key" ].split("/")[2] del item["translated_from"] if "date_added" not in item: item["date_added"] = datetime.today().strftime("%Y-%m-%d") return item def main() -> None: """Prompt user to select media type and log to process""" media_type = "" while media_type not in ["films", "tv-episodes", "tv-series", "books"]: media_type = input("Select media type [films|tv-episodes|tv-series|books]: ") try: if "films" == media_type: log = "" while log not in ["log", "wishlist"]: log = input("Enter log to update [log|wishlist]: ") imdb_id = "" while re.search("tt[0-9]+", imdb_id) is None: imdb_id = input("Enter IMDB ID: ") add_item_to_log(imdb_id, media_type, log) elif "books" == media_type: log = "" while log not in ["log", "current", "wishlist"]: log = input("Enter log to update [log|current|wishlist]: ") isbn = "" while re.search("[0-9]+", isbn) is None: isbn = input("Enter ISBN: ") add_item_to_log(isbn, media_type, log) elif "tv-episodes" == media_type: imdb_id = "" while re.search("tt[0-9]+", imdb_id) is None: imdb_id = input("Enter IMDB ID: ") add_item_to_log(imdb_id, media_type, "log") elif "tv-series" == media_type: log = "" while log not in ["log", "current", "wishlist"]: log = input("Enter log to update [log|current|wishlist]: ") imdb_id = "" while re.search("tt[0-9]+", imdb_id) is None: imdb_id = input("Enter IMDB ID: ") add_item_to_log(imdb_id, media_type, log) except Exception: logger.exception("Exception occurred") if __name__ == "__main__": main()