""" Process logs derived from social cataloguing site data exports, using various APIs. """ import json import os import re import time import requests from dotenv import load_dotenv from add_item import cleanup_result, import_by_id, setup_logger logger = setup_logger("process_logs") load_dotenv() TMDB_API_KEY = os.getenv("TMDB_API_KEY") TVDB_API_KEY = os.getenv("TVDB_API_KEY") if "" == TMDB_API_KEY: logger.warning("TMDB API key not found") if "" == TVDB_API_KEY: logger.warning("TVDB API key not found") def process_log(media_type, log) -> None: """Run through a log and call the appropriate API for each item found""" logger.info(f"Processing {media_type}/{log}…") with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file: log_items = json.load(log_file) log_item_values = {} id_key = "" if "books" == media_type: id_key = "ol_id" elif media_type in ["films", "tv-series", "tv-episodes"]: id_key = "tmdb_id" elif "games" == media_type: id_key = "gb_id" for i, item in enumerate(log_items): try: if id_key not in item and "skip" not in item: if media_type in ["films", "books"]: item_title = item["Title"] elif "tv-episodes" == media_type: item_title = item["Episode Title"] elif "tv-series" == media_type: item_title = item["Show Title"] logger.info(f"Processing {item_title}…") # Rename pre-existing fields if "Date Added" in item: log_item_values["date_added"] = item["Date Added"] del item["Date Added"] if "date_added" in item: log_item_values["date_added"] = item["date_added"] if "Date Started" in item: log_item_values["date_started"] = item["Date Started"] del item["Date Started"] if "date_started" in item: log_item_values["date_started"] = item["date_started"] if "Date Finished" in item: log_item_values["date_finished"] = item["Date Finished"] del item["Date Finished"] if "Date Read" in item: if item["Date Finished"] == item["Date Read"]: del item["Date Read"] else: raise Exception(f"'Date Read' != 'Date Finished' for {item['Title']}") if "date_finished" in item: log_item_values["date_finished"] = item["date_finished"] if "Read Count" in item: log_item_values["read_count"] = item["Read Count"] del item["Read Count"] if "read_count" in item: log_item_values["read_count"] = item["read_count"] if "Date Watched" in item: log_item_values["date_finished"] = item["Date Watched"] del item["Date Watched"] if "Rewatch" in item: log_item_values["is_repeat"] = item["Rewatch"] del item["Rewatch"] if "Comments" in item: log_item_values["comments"] = item["Comments"] del item["Comments"] if "Series Title" in item: log_item_values["series_title"] = item["Series Title"] del item["Series Title"] if "Episode Title" in item: log_item_values["name"] = item["Episode Title"] del item["Episode Title"] if "Episode Number" in item: if re.search("[0-9]+x[0-9]+", item["Episode Number"]) is not None: season_no, _, episode_no = log_item_values[ "episode_number" ].split("x") elif ( re.search("S[0-9]+E[0-9]+", item["Episode Number"]) is not None ): season_no, _, episode_no = log_item_values[ "episode_number" ].split("E") elif re.search("E[0-9]+", item["Episode Number"]) is not None: season_no = None episode_no = item["episode_number"][1:] else: logger.error( f"Invalid episode number format '{item['Episode Number']}'" ) return log_item_values["season_number"] = season_no log_item_values["episode_number"] = episode_no del item["Episode Number"] if "IMDB ID" in item and item["IMDB ID"] != "": new_log_item = import_by_id(item["IMDB ID"], media_type) elif "books" == media_type and "wishlist" == log: ol_work_id = re.search("OL[0-9]+W", input(f"Enter OpenLibrary Work ID for '{item_title}' ({item['Author']}): ")) try: new_log_item = import_by_id(ol_work_id[0], media_type, log) except: logger.info("Skipping…") elif "ISBN13" in item and item["ISBN13"] != "" and item["ISBN13"] is not None: new_log_item = import_by_id(item["ISBN13"], media_type, log) elif "ISBN" in item and item["ISBN"] != "" and item["ISBN"] is not None: new_log_item = import_by_id(item["ISBN13"], media_type, log) else: new_log_item = import_by_details(item, item_title, media_type) if new_log_item is None: if media_type in ["films", "tv-series", "tv-episodes"] and "imdb_id" not in item: item["imdb_id"] = input(f"Enter IMDB ID for {item_title}: ") if re.search("tt[0-9]+", item["imdb_id"]) is not None: log_items[i] = import_by_id(item["imdb_id"], media_type) with open( f"./data/{media_type}/{log}.json", "w", encoding='utf-8' ) as log_file: json.dump(log_items, log_file, indent=4) elif "books" == media_type: if "ISBN" not in item and "ISBN13" not in item: item["ISBN"] = input(f"Enter ISBN for {item_title}: ") if re.search("[0-9-]+", item["ISBN"]) is not None: log_items[i] = import_by_id(item["ISBN"], media_type) with open( f"./data/{media_type}/{log}.json", "w", encoding='utf-8' ) as log_file: json.dump(log_items, log_file, indent=4) else: logger.warning(f"Skipped '{item_title}'") log_items[i]["skip"] = True else: logger.warning(f"Skipped {item_title}") else: log_items[i] = new_log_item if i % 3 == 0: with open( f"./data/{media_type}/{log}.json", "w", encoding='utf-8' ) as log_file: json.dump(log_items, log_file, indent=4) if log_items[i] is not None: log_items[i] |= log_item_values except KeyError: print(json.dumps(item, indent=4)) with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file: json.dump(log_items, log_file, indent=4) logger.info(f"Finished processing {media_type}/{log}") def import_by_details(item, item_title, media_type) -> dict: """Import an item when lacking a unique identifier""" if media_type in ["films", "tv-series"]: return import_from_tmdb_by_details(item, item_title, media_type) if media_type in ["tv-episodes"]: return # import_from_tvdb_by_details(item, item_title, media_type) if media_type in ["books"]: return # import_from_openlibrary_by_details(item, item_title, media_type) if media_type in ["games"]: return # import_from_igdb_by_details(item, item_title, media_type) def import_from_tmdb_by_details(item, item_title, media_type) -> dict: """Retrieve a film or TV series from TMDB using its title""" logger.info(f"Processing {item_title}…") api_url = f"https://api.themoviedb.org/3/search/{'movie' if 'films' == media_type else 'tv'}" # Sending API request response = requests.get( api_url, params={ "query": item_title, "include_adult": True, "year": item["Release Year"] if "Release Year" in item else None, }, headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, timeout=15 ) # Process the response if 200 == response.status_code: logger.debug(response.status_code) elif 429 == response.status_code: time.sleep(2) return import_from_tmdb_by_details(item, item_title, media_type) else: logger.error(response.text) response_data = json.loads(response.text)["results"] if 1 == len(response_data): return cleanup_result(response_data[0], media_type) if 0 == len(response_data): logger.warning(f"Returned no {media_type} for {item_title}") elif 1 < len(response_data): if "films" == media_type: title_key = "title" elif "tv-series" == media_type: title_key = "name" filtered_response_data = [ result for result in response_data if result[title_key] == item_title ] frd_len = len(filtered_response_data) if 1 == frd_len: return cleanup_result(response_data[0], media_type) logger.warning(f"Returned more than one {media_type} for '{item_title}':\n") print( json.dumps( filtered_response_data if len(filtered_response_data) > 0 else response_data, indent=4, ) ) last_index = len(filtered_response_data if frd_len > 0 else response_data) - 1 idx = input( f"\nEnter the index of the result to use [0-{last_index}]: " ) if "" != idx: try: return cleanup_result(response_data[int(idx)], media_type) except Exception as exc: raise Exception("Index invalid") from exc item["IMDB ID"] = input(f"Enter IMDB ID for {item_title}: ") if "" != item["IMDB ID"]: return import_by_id(item["IMDB ID"], media_type) logger.warning(f"Skipped {media_type} '{item_title}'") return item def main() -> None: """Prompt user to select media type and log to process""" media_type = "" while media_type not in ["films", "tv-episodes", "tv-series", "books"]: media_type = input("Select media type [films|tv-episodes|tv-series|books]: ") try: if "films" == media_type: log = "" while log not in ["log", "wishlist"]: log = input("Enter log to process [log|wishlist]: ") elif "books" == media_type: log = "" while log not in ["log", "current", "wishlist"]: log = input("Enter log to process [log|current|wishlist]: ") elif "tv-series" == media_type: log = "log" elif "tv-series" == media_type: log = "" while log not in ["log", "current", "wishlist"]: log = input("Enter log to process [log|current|wishlist]: ") process_log(media_type, log) except Exception: logger.exception("Exception occurred") if __name__ == "__main__": main()