""" Add a new item to a media catalogue, using various APIs. """ import json import logging import os import re import time from datetime import datetime import requests from dotenv import load_dotenv authors = [] def setup_logger(name="add_item"): """Set up the logger for console and file""" logr = logging.getLogger(name) c_handler = logging.StreamHandler() f_handler = logging.FileHandler("./logs/run.log") logging.root.setLevel(logging.INFO) c_handler.setLevel(logging.INFO) f_handler.setLevel(logging.WARNING) c_format = logging.Formatter("%(name)s - %(levelname)s - %(message)s") f_format = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") c_handler.setFormatter(c_format) f_handler.setFormatter(f_format) logr.addHandler(c_handler) logr.addHandler(f_handler) return logr logger = setup_logger() load_dotenv() TMDB_API_KEY = os.getenv("TMDB_API_KEY") TVDB_API_KEY = os.getenv("TVDB_API_KEY") if "" == TMDB_API_KEY: logger.error("TMDB API key not found") if "" == TVDB_API_KEY: logger.error("TVDB API key not found") def return_if_exists(item_id, media_type, log) -> dict|None: """Returns an item if it exists in the requested log""" logger.info(f"Checking for '{item_id}' in '{log}'…") with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file: log_items = json.load(log_file) existing_items = [log_item for log_item in log_items if "id" in log_item and log_item['id'] == int(item_id)] if len(existing_items) > 0: logger.info(f"Found item in '{log}'") return existing_items[-1] logger.info(f"'{item_id}' not found in '{log}'") def delete_existing(item_id, media_type, log) -> None: """Deletes an item from a log if it matches the ID""" logger.info(f"Deleting '{item_id}' from '{log}'…") with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file: log_items = json.load(log_file) old_len = len(log_items) log_items = [log_item for log_item in log_items if "id" not in log_item or ("id" in log_item and log_item['id'] != int(item_id))] if len(log_items) < (old_len - 1): raise Exception("More than one deletion made, discarding…") with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file: json.dump(log_items, log_file, indent=4) logger.info(f"'{item_id}' deleted from '{log}'") def check_for_existing(item_id, media_type, log) -> dict[dict, str]: """Check for an existing item and move it to the specified log if requested""" logger.info(f"Checking for '{item_id}' in logs…") # Check in specified log existing_item = return_if_exists(item_id, media_type, log) if existing_item is not None: if "log" == log: existing_item["is_repeat"] = True return existing_item, None for log_to_check in [p_log for p_log in ["log", "current", "wishlist"] if p_log != log]: if ("current" == log_to_check and media_type in ["books", "games", "tv-series"]) or ("wishlist" == log_to_check and media_type in ["books", "games", "films", "tv-series"]): existing_item = return_if_exists(item_id, media_type, log_to_check) if existing_item is not None: return existing_item, log_to_check return None, None def add_item_to_log(item_id, media_type, log) -> None: """Add a film, book, TV series or TV episode to a log""" logger.info(f"Processing {item_id}…") item, log_to_delete = check_for_existing(item_id, media_type, log) if item is None: item = import_by_id(item_id, media_type) if item is None: raise Exception("No item found") if log in ["log", "current"]: if "date_started" not in item and media_type in ["books", "tv-series", "games"]: date_started = "" while re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}", date_started) is None: date_started = input("Enter date started [YYYY-MM-DD, t for today]: ") if "t" == date_started: date_started = datetime.today().strftime("%Y-%m-%d") item["date_started"] = date_started if "date_finished" not in item and "log" == log: date_finished = "" while re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}", date_finished) is None: date_finished = input("Enter date finished [YYYY-MM-DD, t for today]: ") if "t" == date_finished: date_finished = datetime.today().strftime("%Y-%m-%d") item["date_finished"] = date_finished if "is_repeat" not in item: is_repeat = "" while is_repeat not in ["y", "n"]: is_repeat = input("Is this a repeat entry? [y/n]: ") if "y" == is_repeat: item["is_repeat"] = True if "added_by_id" not in item: item["added_by_id"] = item_id if "comments" not in item: comments = input("Enter comments (optional): ") if "" != comments: item["comments"] = comments # Validation step print(f"{media_type} data to add:\n") print(json.dumps(item, indent=4)) if "y" != input("\nDoes this look correct? [y]: "): return # Save changes logger.info(f"Adding {media_type} to {log}…") with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file: log_items = json.load(log_file) log_items.insert(0, item) with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file: json.dump(log_items, log_file, indent=4) logger.info(f"Added {media_type} {item_id} to {log}") if log_to_delete is not None: delete_existing(item_id, media_type, log_to_delete) def import_by_id(import_id, media_type) -> dict: """Import from the appropriate API by unique ID""" if media_type in ["films", "tv-series"]: return import_from_tmdb_by_id(import_id, media_type) if media_type in ["tv-episodes"]: return #import_from_tvdb_by_id(import_id, media_type) if media_type in ["books"]: return import_from_openlibrary_by_id(import_id, media_type) def import_from_tmdb_by_id(tmdb_id, media_type) -> dict: """Retrieve a film, TV show or TV episode from TMDB using an IMDB ID""" api_path = "movie" if "films" == media_type else "tv" api_url = f"https://api.themoviedb.org/3/{api_path}/{tmdb_id}" # Sending API request response = requests.get( api_url, headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, timeout=15 ) # Process the response if 200 == response.status_code: logger.debug(response.status_code) elif 429 == response.status_code: time.sleep(2) return import_from_tmdb_by_id(tmdb_id, media_type) else: raise Exception(f"Error {response.status_code}: {response.text}") if "tv-episodes" == media_type: raise Exception("TV Episodes are TODO!") response_data = json.loads(response.text) if 1 == len(response_data): item = response_data[0] elif 0 == len(response_data): raise Exception(f"Returned no results for {tmdb_id}") # Modify the returned result to add additional data return cleanup_result(item, media_type) def import_from_openlibrary_by_id(isbn, media_type) -> dict: """Retrieve a film, TV show or TV episode from TMDB using an IMDB ID""" logging.info(f"Importing '{isbn}'…") api_url = f"https://openlibrary.org/isbn/{isbn}" # Sending API request response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15) # Process the response if 200 == response.status_code: logger.debug(response.status_code) elif 429 == response.status_code: time.sleep(2) return import_from_openlibrary_by_id(isbn, media_type) elif 404 == response.status_code: logger.error(f"{response.status_code}: Not Found for ISBN '{isbn}'") return None else: raise Exception(f"Error {response.status_code}: {response.text}") item = json.loads(response.text) for key in ["authors", "works"]: if key in item: for i, sub_item in enumerate(item[key]): item[key][i] = import_from_openlibrary_by_ol_key(sub_item["key"]) if "works" in item: if len(item["works"]) > 1: raise Exception(f"Multiple works found for {isbn}") item["work"] = item["works"][0] del item["works"] # Rate limiting time.sleep(1) # Modify the returned result to add additional data return cleanup_result(item, media_type) def import_from_openlibrary_by_ol_key(key) -> dict: """Retrieves an item (author or work, NOT edition) from OpenLibrary using an OL key""" logger.info(f"Retrieving {key}…") _, mode, ol_id = key.split("/") if "authors" == mode: with open(f"./scripts/caching/authors.json", "r", encoding='utf-8') as authors_cache: cached_authors = json.load(authors_cache) if mode in ["works", "authors"]: if "authors" == mode: matched_cached_authors = [aut for aut in cached_authors if aut['id'] == ol_id] if len(matched_cached_authors) == 1: logging.info(f"Found cached author '{matched_cached_authors[0]['name']}'") return matched_cached_authors[0] api_url = f"https://openlibrary.org{key}" # Sending API request response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15) # Process the response if 200 == response.status_code: logger.debug(response.status_code) elif 429 == response.status_code: time.sleep(2) import_from_openlibrary_by_ol_key(key) else: raise Exception(f"Error {response.status_code}: {response.text}") # Rate limiting time.sleep(1) item = json.loads(response.text) if "authors" == mode: author = {"id": ol_id, "name": item["name"]} if "personal_name" in item: if item["name"] != item["personal_name"]: author["personal_name"] = item["personal_name"] logger.info(f"Caching author '{author['name']}'…") cached_authors.append(author) with open( f"./scripts/caching/authors.json", "w", encoding='utf-8' ) as authors_cache: json.dump(cached_authors, authors_cache, indent=4) logger.info(f"Author '{author['name']}' cached!") return author if "works" == mode: work = {"id": ol_id, "title": item["title"]} for result_key in ["first_publish_date", "subjects"]: if result_key in item: work[result_key] = item[result_key] return work else: raise Exception(f"Unknown OpenLibrary key '{mode}'") def cleanup_result(item, media_type) -> dict: """Process a film, TV series, TV episode or book returned by their respective APIs by removing unnecessary fields and adding others""" for field_name in [ "adult", # TMDB "backdrop_path", # TMDB "copyright_date", # OpenLibrary "classifications", # OpenLibrary "created", # OpenLibrary "dewey_decimal_class", # OpenLibary "episode_type", # TMDB "first_sentence", # OpenLibrary "genre_ids", # TMDB "identifiers", # OpenLibrary "media_type", # TMDB "last_modified", # OpenLibrary "latest_revision", # OpenLibrary "lc_classifications", # OpenLibrary "lccn", # OpenLibrary "local_id", # OpenLibrary "notes", # OpenLibrary "ocaid", # OpenLibrary "oclc_numbers", # OpenLibrary "pagination", # OpenLibrary "physical_dimensions", # OpenLibrary "popularity", # TMDB "production_code", # TMDB "revision", # OpenLibrary "runtime", # TMDB "source_records", # OpenLibrary "still_path", # TMDB "table_of_contents", # OpenLibrary "type", # OpenLibrary "uri_descriptions", # OpenLibrary "url", # OpenLibrary "video", # TMDB "vote_average", # TMDB "vote_count", # TMDB "weight", # OpenLibrary ]: if field_name in item: del item[field_name] if media_type in ["films", "tv-series"]: title_key = "name" if "tv-series" == media_type else "title" if f"original_{title_key}" in item and "original_language" in item: if ( item[f"original_{title_key}"] == item[title_key] and item["original_language"] == "en" ): del item[f"original_{title_key}"], item["original_language"] if "books" == media_type: _, _, item["id"] = item["key"].split("/") del item["key"] for key in ["isbn_10", "isbn_13"]: if key in item: if len(item[key]) > 1: logger.warning("Multiple ISBN results") item[key] = item[key][0] if "publish_places" in item: item["published_in"] = item["publish_places"] del item["publish_places"] if "languages" in item: item["languages"] = [ lang["key"].split("/")[2] for lang in item["languages"] ] if "translation_of" in item: if item["translation_of"].split(":")[0].lower() == item["work"]["title"].split(":")[0].lower(): del item["translation_of"] else: raise Exception( f"translation_of '{item['translation_of']}' \ is different to work title '{item['work']['title']}'" ) if "translated_from" in item: if len(item["translated_from"]) > 1: raise Exception("Multiple translated_from results") item["work"]["original_language"] = item["translated_from"][0][ "key" ].split("/")[2] del item["translated_from"] if "date_added" not in item: item["date_added"] = datetime.today().strftime("%Y-%m-%d") return item def main() -> None: """Prompt user to select media type and log to process""" media_type = "" while media_type not in ["films", "tv-episodes", "tv-series", "books"]: media_type = input("Select media type [films|tv-episodes|tv-series|books]: ") try: item_id = "" if "films" == media_type: log = "" while log not in ["log", "wishlist"]: log = input("Enter log to update [log|wishlist]: ") while re.search("[0-9]+", item_id) is None: item_id = input("Enter TMDB ID: ") elif "books" == media_type: log = "" while log not in ["log", "current", "wishlist"]: log = input("Enter log to update [log|current|wishlist]: ") while re.search("[0-9]+", item_id) is None: item_id = input("Enter ISBN: ") elif "tv-episodes" == media_type: log = "log" while re.search("[0-9]+", item_id) is None: item_id = input("Enter TVDB ID: ") elif "tv-series" == media_type: log = "" while log not in ["log", "current", "wishlist"]: log = input("Enter log to update [log|current|wishlist]: ") while re.search("[0-9]+", item_id) is None: item_id = input("Enter TMDB ID: ") add_item_to_log(item_id, media_type, log) except Exception: logger.exception("Exception occurred") if __name__ == "__main__": main()