""" Process logs derived from social cataloguing site data exports, using various APIs. """ import json import os import re import time import requests from slugify import slugify from dotenv import load_dotenv from add_item import cleanup_result, import_by_id, setup_logger logger = setup_logger("process_logs") load_dotenv() TMDB_API_KEY = os.getenv("TMDB_API_KEY") TVDB_API_KEY = os.getenv("TVDB_API_KEY") if "" == TMDB_API_KEY: logger.warning("TMDB API key not found") if "" == TVDB_API_KEY: logger.warning("TVDB API key not found") def process_log(media_type, log) -> None: """Run through a log and call the appropriate API for each item found""" logger.info(f"Processing {media_type}/{log}…") with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file: log_items = json.load(log_file) log_item_values = {} id_key = "" if "books" == media_type: id_key = "ol_id" elif media_type in ["films", "tv-series", "tv-episodes"]: id_key = "tmdb_id" elif "games" == media_type: id_key = "gb_id" for i, item in enumerate(log_items): if id_key not in item:# and "skip" not in item: if media_type in ["films", "books"]: item_title = item["Title"] if "Title" in item else item["title"] elif "tv-episodes" == media_type: item_title = item["Episode Title"] elif "tv-series" == media_type: item_title = item["Show Title"] logger.info(f"Processing {item_title} ({item['Author'] if "Author" in item else item["authors"] if "authors" in item else "No Author"})…") # Rename pre-existing fields if "Date Added" in item: log_item_values["date_added"] = item["Date Added"] del item["Date Added"] if "date_added" in item: log_item_values["date_added"] = item["date_added"] if "Date Started" in item: log_item_values["date_started"] = item["Date Started"] del item["Date Started"] if "date_started" in item: log_item_values["date_started"] = item["date_started"] if "Date Finished" in item: log_item_values["date_finished"] = item["Date Finished"] del item["Date Finished"] if "Date Read" in item: if item["Date Finished"] == item["Date Read"]: del item["Date Read"] else: raise Exception( f"'Date Read' != 'Date Finished' for {item['Title']}" ) if "date_finished" in item: log_item_values["date_finished"] = item["date_finished"] if "Read Count" in item: log_item_values["read_count"] = item["Read Count"] del item["Read Count"] if "read_count" in item: log_item_values["read_count"] = item["read_count"] if "Date Watched" in item: log_item_values["date_finished"] = item["Date Watched"] del item["Date Watched"] if "Rewatch" in item: log_item_values["is_repeat"] = item["Rewatch"] del item["Rewatch"] if "Comments" in item: log_item_values["comments"] = item["Comments"] del item["Comments"] if "Series Title" in item: log_item_values["series_title"] = item["Series Title"] del item["Series Title"] if "Episode Title" in item: log_item_values["name"] = item["Episode Title"] del item["Episode Title"] if "Episode Number" in item: if re.search("[0-9]+x[0-9]+", item["Episode Number"]) is not None: season_no, _, episode_no = log_item_values[ "episode_number" ].split("x") elif ( re.search("S[0-9]+E[0-9]+", item["Episode Number"]) is not None ): season_no, _, episode_no = log_item_values[ "episode_number" ].split("E") elif re.search("E[0-9]+", item["Episode Number"]) is not None: season_no = None episode_no = item["episode_number"][1:] else: logger.error( f"Invalid episode number format '{item['Episode Number']}'" ) return log_item_values["season_number"] = season_no log_item_values["episode_number"] = episode_no del item["Episode Number"] if "IMDB ID" in item and item["IMDB ID"] != "": new_log_item = import_by_id(item["IMDB ID"], media_type) elif "books" == media_type and "wishlist" == log: new_log_item = import_by_details(item, item_title, media_type) if new_log_item is None: ol_work_id = input( f"Enter OpenLibrary Work ID for '{item_title}' ({item['Author'] if "Author" in item else item["authors"] if "authors" in item else "No Author"}), or 'd' to delete the record: " ) if 'd' == ol_work_id: logger.info("Deleting…") del log_items[i] continue ol_work_id = re.search("OL[0-9]+W", ol_work_id) try: new_log_item = import_by_id(ol_work_id[0], media_type, log) except: new_log_item = item new_log_item["skip"] = True logger.info("Skipping…") elif ( "ISBN13" in item and item["ISBN13"] != "" and item["ISBN13"] is not None ): new_log_item = import_by_id(item["ISBN13"], media_type, log) elif "ISBN" in item and item["ISBN"] != "" and item["ISBN"] is not None: new_log_item = import_by_id(item["ISBN13"], media_type, log) else: new_log_item = import_by_details(item, item_title, media_type) if new_log_item is None: if ( media_type in ["films", "tv-series", "tv-episodes"] and "imdb_id" not in item ): item["imdb_id"] = input(f"Enter IMDB ID for {item_title}: ") if re.search("tt[0-9]+", item["imdb_id"]) is not None: log_items[i] = import_by_id(item["imdb_id"], media_type) with open( f"./data/{media_type}/{log}.json", "w", encoding="utf-8" ) as log_file: json.dump(log_items, log_file, indent=4) elif "books" == media_type: if "ISBN" not in item and "ISBN13" not in item: item["ISBN"] = input(f"Enter ISBN for {item_title}: ") if re.search("[0-9-]+", item["ISBN"]) is not None: log_items[i] = import_by_id(item["ISBN"], media_type) with open( f"./data/{media_type}/{log}.json", "w", encoding="utf-8", ) as log_file: json.dump(log_items, log_file, indent=4) else: logger.warning(f"Skipped '{item_title}'") log_items[i]["skip"] = True else: logger.warning(f"Skipped {item_title}") else: log_items[i] = new_log_item if i % 3 == 0: with open( f"./data/{media_type}/{log}.json", "w", encoding="utf-8" ) as log_file: json.dump(log_items, log_file, indent=4) logger.info("Saved…") if log_items[i] is not None: log_items[i] |= log_item_values with open(f"./data/{media_type}/{log}.json", "w", encoding="utf-8") as log_file: json.dump(log_items, log_file, indent=4) logger.info(f"Finished processing {media_type}/{log}") def import_by_details(item, item_title, media_type) -> dict: """Import an item when lacking a unique identifier""" if media_type in ["films", "tv-series"]: return import_from_tmdb_by_details(item, item_title, media_type) if media_type in ["tv-episodes"]: return # import_from_tvdb_by_details(item, item_title, media_type) if media_type in ["books"]: return import_from_openlibrary_by_details(item, item_title, media_type) if media_type in ["games"]: return # import_from_igdb_by_details(item, item_title, media_type) def import_from_openlibrary_by_details(item, item_title, media_type) -> dict | None: """Retrieve a book from OpenLibrary using a title and author name""" logger.info(f"Importing '{item_title}'…") api_url = f"https://openlibrary.org/search.json?title={slugify((item['Title'] if "Title" in item else item["title"]).split(':')[0], separator='%20')}&author={slugify((item['Author'] if "Author" in item else item["authors"] if "authors" in item else "No Author"), separator='%20')}" # Sending API request response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15) # Process the response if 200 == response.status_code: logger.debug(response.status_code) elif 429 == response.status_code: time.sleep(2) return import_from_openlibrary_by_details(item, item_title, media_type) elif 404 == response.status_code: logger.error(f"{response.status_code}: Not Found for title '{item_title}'") return None else: raise Exception(f"Error {response.status_code}: {response.text}") results = json.loads(response.text) logger.info(f"Found {results['num_found']} result{'s' if results['num_found'] != 1 else ''}…") if 0 < results["num_found"]: result = results['docs'][0] if 1 == results["num_found"]: logger.info(f"Selecting OL ID {result['key']}…") item_id_parsed = re.search("(OL|tt)?[0-9]+[WMA]?", result['key']) if item_id_parsed is not None: return import_by_id(item_id_parsed[0], "books", "wishlist") else: if result['title'] == item['Title'].split(':')[0] and result['author_name'][0] == item['Author']: logger.info(f"First result ({result['key']}) is a match!") item_id_parsed = re.search("(OL|tt)?[0-9]+[WMA]?", result['key']) if item_id_parsed is not None: return import_by_id(item_id_parsed[0], "books", "wishlist") else: print(json.dumps({k: result.get(k, None) for k in ('author', 'title', 'first_publish_year')}, indent=4)) is_correct = input("Is this the correct result? [y/n]: ") if "y" == is_correct: logger.info(f"Selecting OL ID {result['key']}…") item_id_parsed = re.search("(OL|tt)?[0-9]+[WMA]?", result['key']) if item_id_parsed is not None: return import_by_id(item_id_parsed[0], "books", "wishlist") logger.info(f"Returning nothing…") return None def import_from_tmdb_by_details(item, item_title, media_type) -> dict: """Retrieve a film or TV series from TMDB using its title""" logger.info(f"Processing {item_title}…") api_url = f"https://api.themoviedb.org/3/search/{'movie' if 'films' == media_type else 'tv'}" # Sending API request response = requests.get( api_url, params={ "query": item_title, "include_adult": True, "year": item["Release Year"] if "Release Year" in item else None, }, headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, timeout=15, ) # Process the response if 200 == response.status_code: logger.debug(response.status_code) elif 429 == response.status_code: time.sleep(2) return import_from_tmdb_by_details(item, item_title, media_type) else: logger.error(response.text) response_data = json.loads(response.text)["results"] if 1 == len(response_data): return cleanup_result(response_data[0], media_type) if 0 == len(response_data): logger.warning(f"Returned no {media_type} for {item_title}") elif 1 < len(response_data): if "films" == media_type: title_key = "title" elif "tv-series" == media_type: title_key = "name" filtered_response_data = [ result for result in response_data if result[title_key] == item_title ] frd_len = len(filtered_response_data) if 1 == frd_len: return cleanup_result(response_data[0], media_type) logger.warning(f"Returned more than one {media_type} for '{item_title}':\n") print( json.dumps( ( filtered_response_data if len(filtered_response_data) > 0 else response_data ), indent=4, ) ) last_index = len(filtered_response_data if frd_len > 0 else response_data) - 1 idx = input(f"\nEnter the index of the result to use [0-{last_index}]: ") if "" != idx: try: return cleanup_result(response_data[int(idx)], media_type) except Exception as exc: raise Exception("Index invalid") from exc item["IMDB ID"] = input(f"Enter IMDB ID for {item_title}: ") if "" != item["IMDB ID"]: return import_by_id(item["IMDB ID"], media_type) logger.warning(f"Skipped {media_type} '{item_title}'") return item def main() -> None: """Prompt user to select media type and log to process""" media_type = "" while media_type not in ["films", "tv-episodes", "tv-series", "books"]: media_type = input("Select media type [films|tv-episodes|tv-series|books]: ") try: if "films" == media_type: log = "" while log not in ["log", "wishlist"]: log = input("Enter log to process [log|wishlist]: ") elif "books" == media_type: log = "" while log not in ["log", "current", "wishlist"]: log = input("Enter log to process [log|current|wishlist]: ") elif "tv-series" == media_type: log = "log" elif "tv-series" == media_type: log = "" while log not in ["log", "current", "wishlist"]: log = input("Enter log to process [log|current|wishlist]: ") process_log(media_type, log) except Exception: logger.exception("Exception occurred") if __name__ == "__main__": main()