from dotenv import load_dotenv import json import os import re import requests import time from urllib.request import urlopen from add_item import cleanup_result, import_by_id, setup_logger logger = setup_logger("process_logs") load_dotenv() TMDB_API_KEY = os.getenv("TMDB_API_KEY") TVDB_API_KEY = os.getenv("TVDB_API_KEY") if "" == TMDB_API_KEY: logger.warning("TMDB API key not found") if "" == TVDB_API_KEY: logger.warning("TVDB API key not found") def process_log(media_type, log): """Run through a log and call the appropriate API for each item found""" logger.info(f"Processing {media_type}/{log}…") with open(f"./data/{media_type}/{log}.json", "r") as log_file: log_items = json.load(log_file) log_item_values = {} for i, item in enumerate(log_items): try: if "id" not in item: if "films" == media_type: item_title = item["Title"] elif "tv-episodes" == media_type: item_title = item["Episode Title"] elif "tv-series" == media_type: item_title = item["Show Title"] logger.debug(f"Processing {item_title}…") # Rename pre-existing fields if "Date Added" in item: log_item_values["date_added"] = item["Date Added"] del item["Date Added"] if "Date Watched" in item: log_item_values["date_finished"] = item["Date Watched"] del item["Date Watched"] if "Rewatch" in item: log_item_values["is_repeat"] = item["Rewatch"] del item["Rewatch"] if "Comments" in item: log_item_values["comments"] = item["Comments"] del item["Comments"] if "Series Title" in item: log_item_values["series_title"] = item["Series Title"] del item["Series Title"] if "Episode Title" in item: log_item_values["name"] = item["Episode Title"] del item["Episode Title"] if "Episode Number" in item: if re.search("[0-9]+x[0-9]+", item["Episode Number"]) is not None: season_no, _, episode_no = log_item_values[ "episode_number" ].split("x") elif ( re.search("S[0-9]+E[0-9]+", item["Episode Number"]) is not None ): season_no, _, episode_no = log_item_values[ "episode_number" ].split("E") elif re.search("E[0-9]+", item["Episode Number"]) is not None: season_no = None episode_no = item["episode_number"][1:] else: logger.error( f"Invalid episode number format '{item['Episode Number']}'" ) return log_item_values["season_number"] = season_no log_item_values["episode_number"] = episode_no del item["Episode Number"] if "IMDB ID" in item and item["IMDB ID"] != "": new_log_item = import_by_id(item["IMDB ID"], media_type) else: new_log_item = import_by_details(item, item_title, media_type) if new_log_item is None: item["imdb_id"] = input(f"Enter IMDB ID for {item_title}: ") if re.search("tt[0-9]+", item["imdb_id"]) is not None: log_items[i] = import_by_id(item["imdb_id"], media_type) with open(f"./data/{media_type}/{log}.json", "w") as log_file: json.dump(log_items, log_file, indent=4) else: logger.warning(f"Skipped {item_title}") else: log_items[i] = new_log_item if i % 15 == 0: with open(f"./data/{media_type}/{log}.json", "w") as log_file: json.dump(log_items, log_file, indent=4) if log_items[i] is not None: log_items[i] |= log_item_values except KeyError: print(json.dumps(item, indent=4)) with open(f"./data/{media_type}/{log}.json", "w") as log_file: json.dump(log_items, log_file, indent=4) logger.info(f"Finished processing {media_type}/{log}") def import_by_details(item, item_title, media_type): """Import an item when lacking a unique identifier""" if media_type in ["films", "tv-series"]: return import_from_tmdb_by_details(item, item_title, media_type) elif media_type in ["tv-episodes"]: return # import_from_tvdb_by_details(item, item_title, media_type) elif media_type in ["books"]: return # import_from_openlibrary_by_details(item, item_title, media_type) elif media_type in ["games"]: return # import_from_igdb_by_details(item, item_title, media_type) def import_from_tmdb_by_details(item, item_title, media_type): """Retrieve a film or TV series from TMDB using its title""" logger.info(f"Processing {item_title}…") api_url = f"https://api.themoviedb.org/3/search/{'movie' if 'films' == media_type else 'tv'}" # Sending API request response = requests.get( api_url, params={ "query": item_title, "include_adult": True, "year": item["Release Year"] if "Release Year" in item else None, }, headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, ) # Process the response if 200 == response.status_code: logger.debug(response.status_code) elif 429 == response.status_code: time.sleep(2) import_from_tmdb_by_details(item) else: logger.error(response.text) response_data = json.loads(response.text)["results"] if 1 == len(response_data): return cleanup_result(response_data[0], media_type) elif 0 == len(response_data): logger.warning(f"Returned no {media_type} for {item_title}") elif 1 < len(response_data): if "films" == media_type: title_key = "title" elif "tv-series" == media_type: title_key = "name" filtered_response_data = [ result for result in response_data if result[title_key] == item_title ] if 1 == len(filtered_response_data): return cleanup_result(response_data[0], media_type) else: logger.warning(f"Returned more than one {media_type} for '{item_title}':\n") print( json.dumps( filtered_response_data if len(filtered_response_data) > 0 else response_data, indent=4, ) ) idx = input( f"\nEnter the index of the result to use [0-{len(filtered_response_data if len(filtered_response_data) > 0 else response_data) - 1}]: " ) if "" != idx: try: return cleanup_result(response_data[int(idx)], media_type) except: logger.error("Index invalid!") print("Index invalid!") item["IMDB ID"] = input(f"Enter IMDB ID for {item_title}: ") if "" != item["IMDB ID"]: return import_by_id(item["IMDB ID"], media_type) else: logger.warning(f"Skipped {item_title}") return item def main(): media_type = "" while media_type not in ["films", "tv-episodes", "tv-series", "books"]: media_type = input("Select media type [films|tv-episodes|tv-series|books]: ") try: if "films" == media_type: log = "" while log not in ["log", "wishlist"]: log = input("Enter log to process [log|wishlist]: ") process_log(media_type, log) elif "books" == media_type: log = "" while log not in ["log", "current", "wishlist"]: log = input("Enter log to process [log|current|wishlist]: ") # TODO elif "tv-episodes" == media_type: process_log(media_type, "log") # TODO elif "tv-series" == media_type: log = "" while log not in ["log", "current", "wishlist"]: log = input("Enter log to process [log|current|wishlist]: ") process_log(media_type, log) except Exception as error: logger.exception("Exception occurred") print(error) if __name__ == "__main__": main()