From 5d25e3bb7488315c5a0b76895623585b8cb8b45b Mon Sep 17 00:00:00 2001 From: Ben Goldsworthy Date: Tue, 23 Jan 2024 18:57:22 +0000 Subject: [PATCH] add autodetection of duplicate entries --- package.json | 2 +- scripts/add_item.py | 212 +++++++++++++++++++++++++++------------- scripts/process_logs.py | 82 +++++++++++----- 3 files changed, 206 insertions(+), 90 deletions(-) diff --git a/package.json b/package.json index 9e0d060..3418ae3 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,7 @@ "build": "rm -rf ./public/ && snap run hugo --templateMetrics --templateMetricsHints", "deploy": "rsync -rP ./public/ ovhvps:~/catalogue/content", "add": "python ./scripts/add_item.py", - "process": "python ./scripts/process_items.py", + "process": "python ./scripts/process_logs.py", "lint:json": "jsonlint ./**/*.json -s", "lint:json:fix": "npm run lint:json -- -i", "lint:py": "pylint --disable=broad-exception-raised --disable=logging-fstring-interpolation ./scripts/*.py" diff --git a/scripts/add_item.py b/scripts/add_item.py index 91db34a..bee5e93 100644 --- a/scripts/add_item.py +++ b/scripts/add_item.py @@ -11,19 +11,19 @@ from datetime import datetime import requests from dotenv import load_dotenv +authors = [] def setup_logger(name="add_item"): """Set up the logger for console and file""" - logging.root.setLevel(logging.NOTSET) - logr = logging.getLogger(name) c_handler = logging.StreamHandler() f_handler = logging.FileHandler("./logs/run.log") + logging.root.setLevel(logging.INFO) c_handler.setLevel(logging.INFO) - f_handler.setLevel(logging.ERROR) + f_handler.setLevel(logging.WARNING) c_format = logging.Formatter("%(name)s - %(levelname)s - %(message)s") f_format = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") @@ -50,16 +50,73 @@ if "" == TVDB_API_KEY: logger.error("TVDB API key not found") +def return_if_exists(item_id, media_type, log) -> dict|None: + """Returns an item if it exists in the requested log""" + + logger.info(f"Checking for '{item_id}' in '{log}'…") + with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file: + log_items = json.load(log_file) + + existing_items = [log_item for log_item in log_items if "id" in log_item and log_item['id'] == int(item_id)] + if len(existing_items) > 0: + logger.info(f"Found item in '{log}'") + return existing_items[-1] + logger.info(f"'{item_id}' not found in '{log}'") + + +def delete_existing(item_id, media_type, log) -> None: + """Deletes an item from a log if it matches the ID""" + + logger.info(f"Deleting '{item_id}' from '{log}'…") + with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file: + log_items = json.load(log_file) + + old_len = len(log_items) + log_items = [log_item for log_item in log_items if "id" not in log_item or ("id" in log_item and log_item['id'] != int(item_id))] + if len(log_items) < (old_len - 1): + raise Exception("More than one deletion made, discarding…") + + with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file: + json.dump(log_items, log_file, indent=4) + logger.info(f"'{item_id}' deleted from '{log}'") + + +def check_for_existing(item_id, media_type, log) -> dict[dict, str]: + """Check for an existing item and move it to the specified log if requested""" + + logger.info(f"Checking for '{item_id}' in logs…") + + # Check in specified log + existing_item = return_if_exists(item_id, media_type, log) + + if existing_item is not None: + if "log" == log: + existing_item["is_repeat"] = True + return existing_item, None + + for log_to_check in [p_log for p_log in ["log", "current", "wishlist"] if p_log != log]: + if ("current" == log_to_check and media_type in ["books", "games", "tv-series"]) or ("wishlist" == log_to_check and media_type in ["books", "games", "films", "tv-series"]): + existing_item = return_if_exists(item_id, media_type, log_to_check) + if existing_item is not None: + return existing_item, log_to_check + + return None, None + + def add_item_to_log(item_id, media_type, log) -> None: """Add a film, book, TV series or TV episode to a log""" logger.info(f"Processing {item_id}…") - item: dict = import_by_id(item_id, media_type) + item, log_to_delete = check_for_existing(item_id, media_type, log) + + if item is None: + item = import_by_id(item_id, media_type) + if item is None: + raise Exception("No item found") if log in ["log", "current"]: - # TODO - review this when moving from one log to another - if media_type in ["books", "tv-series", "games"]: + if "date_started" not in item and media_type in ["books", "tv-series", "games"]: date_started = "" while re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}", date_started) is None: date_started = input("Enter date started [YYYY-MM-DD, t for today]: ") @@ -67,7 +124,7 @@ def add_item_to_log(item_id, media_type, log) -> None: date_started = datetime.today().strftime("%Y-%m-%d") item["date_started"] = date_started - if "log" == log: + if "date_finished" not in item and "log" == log: date_finished = "" while re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}", date_finished) is None: date_finished = input("Enter date finished [YYYY-MM-DD, t for today]: ") @@ -75,17 +132,20 @@ def add_item_to_log(item_id, media_type, log) -> None: date_finished = datetime.today().strftime("%Y-%m-%d") item["date_finished"] = date_finished - # TODO - do this automatically - is_repeat = "" - while is_repeat not in ["y", "n"]: - is_repeat = input("Is this a repeat entry? [y/n]: ") - if "y" == is_repeat: - item["is_repeat"] = True - item["added_by_id"] = item_id + if "is_repeat" not in item: + is_repeat = "" + while is_repeat not in ["y", "n"]: + is_repeat = input("Is this a repeat entry? [y/n]: ") + if "y" == is_repeat: + item["is_repeat"] = True - comments = input("Enter comments (optional): ") - if "" != comments: - item["comments"] = comments + if "added_by_id" not in item: + item["added_by_id"] = item_id + + if "comments" not in item: + comments = input("Enter comments (optional): ") + if "" != comments: + item["comments"] = comments # Validation step print(f"{media_type} data to add:\n") @@ -106,12 +166,15 @@ def add_item_to_log(item_id, media_type, log) -> None: logger.info(f"Added {media_type} {item_id} to {log}") + if log_to_delete is not None: + delete_existing(item_id, media_type, log_to_delete) + def import_by_id(import_id, media_type) -> dict: """Import from the appropriate API by unique ID""" if media_type in ["films", "tv-series"]: - return import_from_imdb_by_id(import_id, media_type) + return import_from_tmdb_by_id(import_id, media_type) if media_type in ["tv-episodes"]: return #import_from_tvdb_by_id(import_id, media_type) @@ -120,15 +183,15 @@ def import_by_id(import_id, media_type) -> dict: return import_from_openlibrary_by_id(import_id, media_type) -def import_from_imdb_by_id(imdb_id, media_type) -> dict: +def import_from_tmdb_by_id(tmdb_id, media_type) -> dict: """Retrieve a film, TV show or TV episode from TMDB using an IMDB ID""" - api_url = f"https://api.themoviedb.org/3/find/{imdb_id}" + api_path = "movie" if "films" == media_type else "tv" + api_url = f"https://api.themoviedb.org/3/{api_path}/{tmdb_id}" # Sending API request response = requests.get( api_url, - params={"external_source": "imdb_id"}, headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, timeout=15 ) @@ -139,35 +202,21 @@ def import_from_imdb_by_id(imdb_id, media_type) -> dict: elif 429 == response.status_code: time.sleep(2) - return import_from_imdb_by_id(imdb_id, media_type) + return import_from_tmdb_by_id(tmdb_id, media_type) else: raise Exception(f"Error {response.status_code}: {response.text}") - if "films" == media_type: - results_key = "movie_results" - elif "tv-episodes" == media_type: - results_key = "TODO" - elif "tv-series" == media_type: - results_key = "tv_results" + if "tv-episodes" == media_type: + raise Exception("TV Episodes are TODO!") - response_data = json.loads(response.text)[results_key] + response_data = json.loads(response.text) if 1 == len(response_data): item = response_data[0] elif 0 == len(response_data): - raise Exception(f"Returned no results for {imdb_id}") - - elif 1 < len(response_data): - logger.warning(f"Returned more than one {media_type} for ID '{imdb_id}'\n") - print(json.dumps(response_data, indent=4)) - idx = input("\nEnter the index of the result to use: ") - try: - item = response_data[int(idx)] - - except Exception as exc: - raise Exception(f"Index {idx} is invalid") from exc + raise Exception(f"Returned no results for {tmdb_id}") # Modify the returned result to add additional data return cleanup_result(item, media_type) @@ -176,6 +225,8 @@ def import_from_imdb_by_id(imdb_id, media_type) -> dict: def import_from_openlibrary_by_id(isbn, media_type) -> dict: """Retrieve a film, TV show or TV episode from TMDB using an IMDB ID""" + logging.info(f"Importing '{isbn}'…") + api_url = f"https://openlibrary.org/isbn/{isbn}" # Sending API request @@ -189,6 +240,9 @@ def import_from_openlibrary_by_id(isbn, media_type) -> dict: time.sleep(2) return import_from_openlibrary_by_id(isbn, media_type) + elif 404 == response.status_code: + logger.error(f"{response.status_code}: Not Found for ISBN '{isbn}'") + return None else: raise Exception(f"Error {response.status_code}: {response.text}") @@ -199,6 +253,7 @@ def import_from_openlibrary_by_id(isbn, media_type) -> dict: for i, sub_item in enumerate(item[key]): item[key][i] = import_from_openlibrary_by_ol_key(sub_item["key"]) + if "works" in item: if len(item["works"]) > 1: raise Exception(f"Multiple works found for {isbn}") @@ -206,6 +261,9 @@ def import_from_openlibrary_by_id(isbn, media_type) -> dict: item["work"] = item["works"][0] del item["works"] + # Rate limiting + time.sleep(1) + # Modify the returned result to add additional data return cleanup_result(item, media_type) @@ -213,9 +271,20 @@ def import_from_openlibrary_by_id(isbn, media_type) -> dict: def import_from_openlibrary_by_ol_key(key) -> dict: """Retrieves an item (author or work, NOT edition) from OpenLibrary using an OL key""" + logger.info(f"Retrieving {key}…") _, mode, ol_id = key.split("/") + if "authors" == mode: + with open(f"./scripts/caching/authors.json", "r", encoding='utf-8') as authors_cache: + cached_authors = json.load(authors_cache) + if mode in ["works", "authors"]: + if "authors" == mode: + matched_cached_authors = [aut for aut in cached_authors if aut['id'] == ol_id] + if len(matched_cached_authors) == 1: + logging.info(f"Found cached author '{matched_cached_authors[0]['name']}'") + return matched_cached_authors[0] + api_url = f"https://openlibrary.org{key}" # Sending API request @@ -227,11 +296,14 @@ def import_from_openlibrary_by_ol_key(key) -> dict: elif 429 == response.status_code: time.sleep(2) - return import_from_openlibrary_by_ol_key(key) + import_from_openlibrary_by_ol_key(key) else: raise Exception(f"Error {response.status_code}: {response.text}") + # Rate limiting + time.sleep(1) + item = json.loads(response.text) if "authors" == mode: @@ -241,6 +313,16 @@ def import_from_openlibrary_by_ol_key(key) -> dict: if item["name"] != item["personal_name"]: author["personal_name"] = item["personal_name"] + logger.info(f"Caching author '{author['name']}'…") + cached_authors.append(author) + with open( + f"./scripts/caching/authors.json", + "w", + encoding='utf-8' + ) as authors_cache: + json.dump(cached_authors, authors_cache, indent=4) + logger.info(f"Author '{author['name']}' cached!") + return author if "works" == mode: @@ -266,6 +348,7 @@ def cleanup_result(item, media_type) -> dict: "copyright_date", # OpenLibrary "classifications", # OpenLibrary "created", # OpenLibrary + "dewey_decimal_class", # OpenLibary "episode_type", # TMDB "first_sentence", # OpenLibrary "genre_ids", # TMDB @@ -274,19 +357,27 @@ def cleanup_result(item, media_type) -> dict: "last_modified", # OpenLibrary "latest_revision", # OpenLibrary "lc_classifications", # OpenLibrary + "lccn", # OpenLibrary "local_id", # OpenLibrary + "notes", # OpenLibrary "ocaid", # OpenLibrary "oclc_numbers", # OpenLibrary + "pagination", # OpenLibrary + "physical_dimensions", # OpenLibrary "popularity", # TMDB "production_code", # TMDB "revision", # OpenLibrary "runtime", # TMDB "source_records", # OpenLibrary "still_path", # TMDB + "table_of_contents", # OpenLibrary "type", # OpenLibrary + "uri_descriptions", # OpenLibrary + "url", # OpenLibrary "video", # TMDB "vote_average", # TMDB "vote_count", # TMDB + "weight", # OpenLibrary ]: if field_name in item: del item[field_name] @@ -308,15 +399,12 @@ def cleanup_result(item, media_type) -> dict: for key in ["isbn_10", "isbn_13"]: if key in item: if len(item[key]) > 1: - raise Exception("Multiple ISBN results") + logger.warning("Multiple ISBN results") item[key] = item[key][0] if "publish_places" in item: - if len(item["publish_places"]) > 1: - raise Exception("Multiple publish_places") - - item["published_in"] = item["publish_places"][0] + item["published_in"] = item["publish_places"] del item["publish_places"] if "languages" in item: @@ -325,7 +413,7 @@ def cleanup_result(item, media_type) -> dict: ] if "translation_of" in item: - if item["translation_of"] == item["work"]["title"]: + if item["translation_of"].split(":")[0].lower() == item["work"]["title"].split(":")[0].lower(): del item["translation_of"] else: raise Exception( @@ -356,45 +444,37 @@ def main() -> None: media_type = input("Select media type [films|tv-episodes|tv-series|books]: ") try: + item_id = "" if "films" == media_type: log = "" while log not in ["log", "wishlist"]: log = input("Enter log to update [log|wishlist]: ") - imdb_id = "" - while re.search("tt[0-9]+", imdb_id) is None: - imdb_id = input("Enter IMDB ID: ") - - add_item_to_log(imdb_id, media_type, log) + while re.search("[0-9]+", item_id) is None: + item_id = input("Enter TMDB ID: ") elif "books" == media_type: log = "" while log not in ["log", "current", "wishlist"]: log = input("Enter log to update [log|current|wishlist]: ") - isbn = "" - while re.search("[0-9]+", isbn) is None: - isbn = input("Enter ISBN: ") - - add_item_to_log(isbn, media_type, log) + while re.search("[0-9]+", item_id) is None: + item_id = input("Enter ISBN: ") elif "tv-episodes" == media_type: - imdb_id = "" - while re.search("tt[0-9]+", imdb_id) is None: - imdb_id = input("Enter IMDB ID: ") - - add_item_to_log(imdb_id, media_type, "log") + log = "log" + while re.search("[0-9]+", item_id) is None: + item_id = input("Enter TVDB ID: ") elif "tv-series" == media_type: log = "" while log not in ["log", "current", "wishlist"]: log = input("Enter log to update [log|current|wishlist]: ") - imdb_id = "" - while re.search("tt[0-9]+", imdb_id) is None: - imdb_id = input("Enter IMDB ID: ") + while re.search("[0-9]+", item_id) is None: + item_id = input("Enter TMDB ID: ") - add_item_to_log(imdb_id, media_type, log) + add_item_to_log(item_id, media_type, log) except Exception: logger.exception("Exception occurred") diff --git a/scripts/process_logs.py b/scripts/process_logs.py index 7135a3b..e5c4d3e 100644 --- a/scripts/process_logs.py +++ b/scripts/process_logs.py @@ -35,21 +35,38 @@ def process_log(media_type, log) -> None: for i, item in enumerate(log_items): try: - if "id" not in item: - if "films" == media_type: + if "id" not in item and "skip" not in item: + if media_type in ["films", "books"]: item_title = item["Title"] elif "tv-episodes" == media_type: item_title = item["Episode Title"] elif "tv-series" == media_type: item_title = item["Show Title"] - logger.debug(f"Processing {item_title}…") + logger.info(f"Processing {item_title}…") # Rename pre-existing fields if "Date Added" in item: log_item_values["date_added"] = item["Date Added"] del item["Date Added"] + if "Date Started" in item: + log_item_values["date_started"] = item["Date Started"] + del item["Date Started"] + + if "Date Finished" in item: + log_item_values["date_finished"] = item["Date Finished"] + del item["Date Finished"] + if "Date Read" in item: + if item["Date Finished"] == item["Date Read"]: + del item["Date Read"] + else: + raise Exception(f"'Date Read' != 'Date Finished' for {item['Title']}") + + if "Read Count" in item: + log_item_values["read_count"] = item["Read Count"] + del item["Read Count"] + if "Date Watched" in item: log_item_values["date_finished"] = item["Date Watched"] del item["Date Watched"] @@ -99,21 +116,46 @@ def process_log(media_type, log) -> None: if "IMDB ID" in item and item["IMDB ID"] != "": new_log_item = import_by_id(item["IMDB ID"], media_type) + elif "ISBN13" in item and item["ISBN13"] != "" and item["ISBN13"] is not None: + new_log_item = import_by_id(item["ISBN13"], media_type) + + elif "ISBN" in item and item["ISBN"] != "" and item["ISBN"] is not None: + new_log_item = import_by_id(item["ISBN"], media_type) + else: new_log_item = import_by_details(item, item_title, media_type) if new_log_item is None: - item["imdb_id"] = input(f"Enter IMDB ID for {item_title}: ") + if media_type in ["films", "tv-series", "tv-episodes"] and "imdb_id" not in item: + item["imdb_id"] = input(f"Enter IMDB ID for {item_title}: ") - if re.search("tt[0-9]+", item["imdb_id"]) is not None: - log_items[i] = import_by_id(item["imdb_id"], media_type) + if re.search("tt[0-9]+", item["imdb_id"]) is not None: + log_items[i] = import_by_id(item["imdb_id"], media_type) - with open( - f"./data/{media_type}/{log}.json", - "w", - encoding='utf-8' - ) as log_file: - json.dump(log_items, log_file, indent=4) + with open( + f"./data/{media_type}/{log}.json", + "w", + encoding='utf-8' + ) as log_file: + json.dump(log_items, log_file, indent=4) + + elif "books" == media_type: + if "ISBN" not in item and "ISBN13" not in item: + item["ISBN"] = input(f"Enter ISBN for {item_title}: ") + + if re.search("[0-9-]+", item["ISBN"]) is not None: + log_items[i] = import_by_id(item["ISBN"], media_type) + + with open( + f"./data/{media_type}/{log}.json", + "w", + encoding='utf-8' + ) as log_file: + json.dump(log_items, log_file, indent=4) + + else: + logger.warning(f"Skipped '{item_title}'") + log_items[i]["skip"] = True else: logger.warning(f"Skipped {item_title}") @@ -121,7 +163,7 @@ def process_log(media_type, log) -> None: else: log_items[i] = new_log_item - if i % 15 == 0: + if i % 10 == 0: with open( f"./data/{media_type}/{log}.json", "w", @@ -234,7 +276,7 @@ def import_from_tmdb_by_details(item, item_title, media_type) -> dict: if "" != item["IMDB ID"]: return import_by_id(item["IMDB ID"], media_type) - logger.warning(f"Skipped {item_title}") + logger.warning(f"Skipped {media_type} '{item_title}'") return item @@ -251,26 +293,20 @@ def main() -> None: while log not in ["log", "wishlist"]: log = input("Enter log to process [log|wishlist]: ") - process_log(media_type, log) - elif "books" == media_type: log = "" while log not in ["log", "current", "wishlist"]: log = input("Enter log to process [log|current|wishlist]: ") - # TODO - - elif "tv-episodes" == media_type: - process_log(media_type, "log") - - # TODO + elif "tv-series" == media_type: + log = "log" elif "tv-series" == media_type: log = "" while log not in ["log", "current", "wishlist"]: log = input("Enter log to process [log|current|wishlist]: ") - process_log(media_type, log) + process_log(media_type, log) except Exception: logger.exception("Exception occurred")