From 29592be6ceb95cb17e941612b3a0f1619a0369ef Mon Sep 17 00:00:00 2001 From: Ben Goldsworthy Date: Wed, 17 Jan 2024 23:02:56 +0100 Subject: [PATCH] lint scripts --- package.json | 2 +- scripts/add_item.py | 112 ++++++++++++++++++++-------------------- scripts/process_logs.py | 94 +++++++++++++++++++-------------- 3 files changed, 112 insertions(+), 96 deletions(-) diff --git a/package.json b/package.json index b75b631..1d58027 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ "deploy": "rsync -rP ./public/ ovhvps:~/catalogue/content", "lint:json": "jsonlint ./**/*.json -s", "lint:json:fix": "npm run lint:json -- -i", - "lint:py": "pylint ./scripts/*.py" + "lint:py": "pylint --disable=broad-exception-raised --disable=logging-fstring-interpolation ./scripts/*.py" }, "devDependencies": { "jsonlint": "^1.6.3" diff --git a/scripts/add_item.py b/scripts/add_item.py index 3c3d868..91db34a 100644 --- a/scripts/add_item.py +++ b/scripts/add_item.py @@ -1,19 +1,23 @@ -# Script to add a new item to the log +""" +Add a new item to a media catalogue, using various APIs. +""" -from datetime import datetime -from dotenv import load_dotenv import json import logging import os import re +import time +from datetime import datetime import requests -from urllib.request import urlopen +from dotenv import load_dotenv def setup_logger(name="add_item"): + """Set up the logger for console and file""" + logging.root.setLevel(logging.NOTSET) - logger = logging.getLogger(name) + logr = logging.getLogger(name) c_handler = logging.StreamHandler() f_handler = logging.FileHandler("./logs/run.log") @@ -27,10 +31,10 @@ def setup_logger(name="add_item"): c_handler.setFormatter(c_format) f_handler.setFormatter(f_format) - logger.addHandler(c_handler) - logger.addHandler(f_handler) + logr.addHandler(c_handler) + logr.addHandler(f_handler) - return logger + return logr logger = setup_logger() @@ -46,12 +50,12 @@ if "" == TVDB_API_KEY: logger.error("TVDB API key not found") -def add_item_to_log(item_id, media_type, log): +def add_item_to_log(item_id, media_type, log) -> None: """Add a film, book, TV series or TV episode to a log""" logger.info(f"Processing {item_id}…") - item = import_by_id(item_id, media_type) + item: dict = import_by_id(item_id, media_type) if log in ["log", "current"]: # TODO - review this when moving from one log to another @@ -74,7 +78,7 @@ def add_item_to_log(item_id, media_type, log): # TODO - do this automatically is_repeat = "" while is_repeat not in ["y", "n"]: - is_repeat = input(f"Is this a repeat entry? [y/n]: ") + is_repeat = input("Is this a repeat entry? [y/n]: ") if "y" == is_repeat: item["is_repeat"] = True item["added_by_id"] = item_id @@ -84,7 +88,6 @@ def add_item_to_log(item_id, media_type, log): item["comments"] = comments # Validation step - correct = "" print(f"{media_type} data to add:\n") print(json.dumps(item, indent=4)) if "y" != input("\nDoes this look correct? [y]: "): @@ -93,29 +96,31 @@ def add_item_to_log(item_id, media_type, log): # Save changes logger.info(f"Adding {media_type} to {log}…") - with open(f"./data/{media_type}/{log}.json", "r") as log_file: + with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file: log_items = json.load(log_file) log_items.insert(0, item) - with open(f"./data/{media_type}/{log}.json", "w") as log_file: + with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file: json.dump(log_items, log_file, indent=4) logger.info(f"Added {media_type} {item_id} to {log}") -def import_by_id(import_id, media_type): +def import_by_id(import_id, media_type) -> dict: + """Import from the appropriate API by unique ID""" + if media_type in ["films", "tv-series"]: return import_from_imdb_by_id(import_id, media_type) - elif media_type in ["tv-episodes"]: - return # import_from_tvdb_by_id(import_id, media_type) + if media_type in ["tv-episodes"]: + return #import_from_tvdb_by_id(import_id, media_type) - elif media_type in ["books"]: + if media_type in ["books"]: return import_from_openlibrary_by_id(import_id, media_type) -def import_from_imdb_by_id(imdb_id, media_type): +def import_from_imdb_by_id(imdb_id, media_type) -> dict: """Retrieve a film, TV show or TV episode from TMDB using an IMDB ID""" api_url = f"https://api.themoviedb.org/3/find/{imdb_id}" @@ -125,6 +130,7 @@ def import_from_imdb_by_id(imdb_id, media_type): api_url, params={"external_source": "imdb_id"}, headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, + timeout=15 ) # Process the response @@ -133,8 +139,7 @@ def import_from_imdb_by_id(imdb_id, media_type): elif 429 == response.status_code: time.sleep(2) - import_from_imdb_by_id(imdb_id, media_type) - return + return import_from_imdb_by_id(imdb_id, media_type) else: raise Exception(f"Error {response.status_code}: {response.text}") @@ -161,20 +166,20 @@ def import_from_imdb_by_id(imdb_id, media_type): try: item = response_data[int(idx)] - except: - raise Exception(f"Index {idx} is invalid") + except Exception as exc: + raise Exception(f"Index {idx} is invalid") from exc # Modify the returned result to add additional data return cleanup_result(item, media_type) -def import_from_openlibrary_by_id(isbn, media_type): +def import_from_openlibrary_by_id(isbn, media_type) -> dict: """Retrieve a film, TV show or TV episode from TMDB using an IMDB ID""" api_url = f"https://openlibrary.org/isbn/{isbn}" # Sending API request - response = requests.get(api_url, headers={"accept": "application/json"}) + response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15) # Process the response if 200 == response.status_code: @@ -182,8 +187,7 @@ def import_from_openlibrary_by_id(isbn, media_type): elif 429 == response.status_code: time.sleep(2) - import_from_openlibrary_by_id(isbn, media_type) - return + return import_from_openlibrary_by_id(isbn, media_type) else: raise Exception(f"Error {response.status_code}: {response.text}") @@ -199,16 +203,15 @@ def import_from_openlibrary_by_id(isbn, media_type): if len(item["works"]) > 1: raise Exception(f"Multiple works found for {isbn}") - else: - item["work"] = item["works"][0] - del item["works"] + item["work"] = item["works"][0] + del item["works"] # Modify the returned result to add additional data return cleanup_result(item, media_type) -def import_from_openlibrary_by_ol_key(key): - """Retrieves an item (author or work) from OpenLibrary using an OL key""" +def import_from_openlibrary_by_ol_key(key) -> dict: + """Retrieves an item (author or work, NOT edition) from OpenLibrary using an OL key""" _, mode, ol_id = key.split("/") @@ -216,7 +219,7 @@ def import_from_openlibrary_by_ol_key(key): api_url = f"https://openlibrary.org{key}" # Sending API request - response = requests.get(api_url, headers={"accept": "application/json"}) + response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15) # Process the response if 200 == response.status_code: @@ -224,8 +227,7 @@ def import_from_openlibrary_by_ol_key(key): elif 429 == response.status_code: time.sleep(2) - import_from_openlibrary_by_ol_key(key) - return + return import_from_openlibrary_by_ol_key(key) else: raise Exception(f"Error {response.status_code}: {response.text}") @@ -241,12 +243,12 @@ def import_from_openlibrary_by_ol_key(key): return author - elif "works" == mode: + if "works" == mode: work = {"id": ol_id, "title": item["title"]} - for key in ["first_publish_date", "subjects"]: - if key in item: - work[key] = item[key] + for result_key in ["first_publish_date", "subjects"]: + if result_key in item: + work[result_key] = item[result_key] return work @@ -254,8 +256,9 @@ def import_from_openlibrary_by_ol_key(key): raise Exception(f"Unknown OpenLibrary key '{mode}'") -def cleanup_result(item, media_type): - """Process a film, TV series, TV episode or book returned by their respecitve APIs by removing unnecessary fields and adding others""" +def cleanup_result(item, media_type) -> dict: + """Process a film, TV series, TV episode or book returned by their + respective APIs by removing unnecessary fields and adding others""" for field_name in [ "adult", # TMDB @@ -307,16 +310,14 @@ def cleanup_result(item, media_type): if len(item[key]) > 1: raise Exception("Multiple ISBN results") - else: - item[key] = item[key][0] + item[key] = item[key][0] if "publish_places" in item: if len(item["publish_places"]) > 1: raise Exception("Multiple publish_places") - else: - item["published_in"] = item["publish_places"][0] - del item["publish_places"] + item["published_in"] = item["publish_places"][0] + del item["publish_places"] if "languages" in item: item["languages"] = [ @@ -328,18 +329,18 @@ def cleanup_result(item, media_type): del item["translation_of"] else: raise Exception( - f"translation_of '{item['translation_of']}' is different to work title '{item['work']['title']}'" + f"translation_of '{item['translation_of']}' \ + is different to work title '{item['work']['title']}'" ) if "translated_from" in item: if len(item["translated_from"]) > 1: raise Exception("Multiple translated_from results") - else: - item["work"]["original_language"] = item["translated_from"][0][ - "key" - ].split("/")[2] - del item["translated_from"] + item["work"]["original_language"] = item["translated_from"][0][ + "key" + ].split("/")[2] + del item["translated_from"] if "date_added" not in item: item["date_added"] = datetime.today().strftime("%Y-%m-%d") @@ -347,7 +348,9 @@ def cleanup_result(item, media_type): return item -def main(): +def main() -> None: + """Prompt user to select media type and log to process""" + media_type = "" while media_type not in ["films", "tv-episodes", "tv-series", "books"]: media_type = input("Select media type [films|tv-episodes|tv-series|books]: ") @@ -393,9 +396,8 @@ def main(): add_item_to_log(imdb_id, media_type, log) - except Exception as error: + except Exception: logger.exception("Exception occurred") - print(error) if __name__ == "__main__": diff --git a/scripts/process_logs.py b/scripts/process_logs.py index f0d3817..7135a3b 100644 --- a/scripts/process_logs.py +++ b/scripts/process_logs.py @@ -1,10 +1,13 @@ -from dotenv import load_dotenv +""" +Process logs derived from social cataloguing site data exports, using various APIs. +""" + import json import os import re -import requests import time -from urllib.request import urlopen +import requests +from dotenv import load_dotenv from add_item import cleanup_result, import_by_id, setup_logger logger = setup_logger("process_logs") @@ -20,12 +23,12 @@ if "" == TVDB_API_KEY: logger.warning("TVDB API key not found") -def process_log(media_type, log): +def process_log(media_type, log) -> None: """Run through a log and call the appropriate API for each item found""" logger.info(f"Processing {media_type}/{log}…") - with open(f"./data/{media_type}/{log}.json", "r") as log_file: + with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file: log_items = json.load(log_file) log_item_values = {} @@ -105,7 +108,11 @@ def process_log(media_type, log): if re.search("tt[0-9]+", item["imdb_id"]) is not None: log_items[i] = import_by_id(item["imdb_id"], media_type) - with open(f"./data/{media_type}/{log}.json", "w") as log_file: + with open( + f"./data/{media_type}/{log}.json", + "w", + encoding='utf-8' + ) as log_file: json.dump(log_items, log_file, indent=4) else: @@ -115,7 +122,11 @@ def process_log(media_type, log): log_items[i] = new_log_item if i % 15 == 0: - with open(f"./data/{media_type}/{log}.json", "w") as log_file: + with open( + f"./data/{media_type}/{log}.json", + "w", + encoding='utf-8' + ) as log_file: json.dump(log_items, log_file, indent=4) if log_items[i] is not None: @@ -124,29 +135,29 @@ def process_log(media_type, log): except KeyError: print(json.dumps(item, indent=4)) - with open(f"./data/{media_type}/{log}.json", "w") as log_file: + with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file: json.dump(log_items, log_file, indent=4) logger.info(f"Finished processing {media_type}/{log}") -def import_by_details(item, item_title, media_type): +def import_by_details(item, item_title, media_type) -> dict: """Import an item when lacking a unique identifier""" if media_type in ["films", "tv-series"]: return import_from_tmdb_by_details(item, item_title, media_type) - elif media_type in ["tv-episodes"]: + if media_type in ["tv-episodes"]: return # import_from_tvdb_by_details(item, item_title, media_type) - elif media_type in ["books"]: + if media_type in ["books"]: return # import_from_openlibrary_by_details(item, item_title, media_type) - elif media_type in ["games"]: + if media_type in ["games"]: return # import_from_igdb_by_details(item, item_title, media_type) -def import_from_tmdb_by_details(item, item_title, media_type): +def import_from_tmdb_by_details(item, item_title, media_type) -> dict: """Retrieve a film or TV series from TMDB using its title""" logger.info(f"Processing {item_title}…") @@ -162,6 +173,7 @@ def import_from_tmdb_by_details(item, item_title, media_type): "year": item["Release Year"] if "Release Year" in item else None, }, headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, + timeout=15 ) # Process the response @@ -169,7 +181,7 @@ def import_from_tmdb_by_details(item, item_title, media_type): logger.debug(response.status_code) elif 429 == response.status_code: time.sleep(2) - import_from_tmdb_by_details(item) + return import_from_tmdb_by_details(item, item_title, media_type) else: logger.error(response.text) @@ -178,7 +190,7 @@ def import_from_tmdb_by_details(item, item_title, media_type): if 1 == len(response_data): return cleanup_result(response_data[0], media_type) - elif 0 == len(response_data): + if 0 == len(response_data): logger.warning(f"Returned no {media_type} for {item_title}") elif 1 < len(response_data): @@ -190,42 +202,45 @@ def import_from_tmdb_by_details(item, item_title, media_type): filtered_response_data = [ result for result in response_data if result[title_key] == item_title ] + frd_len = len(filtered_response_data) - if 1 == len(filtered_response_data): + if 1 == frd_len: return cleanup_result(response_data[0], media_type) - else: - logger.warning(f"Returned more than one {media_type} for '{item_title}':\n") - print( - json.dumps( - filtered_response_data - if len(filtered_response_data) > 0 - else response_data, - indent=4, - ) - ) - idx = input( - f"\nEnter the index of the result to use [0-{len(filtered_response_data if len(filtered_response_data) > 0 else response_data) - 1}]: " + logger.warning(f"Returned more than one {media_type} for '{item_title}':\n") + print( + json.dumps( + filtered_response_data + if len(filtered_response_data) > 0 + else response_data, + indent=4, ) + ) - if "" != idx: - try: - return cleanup_result(response_data[int(idx)], media_type) + last_index = len(filtered_response_data if frd_len > 0 else response_data) - 1 + idx = input( + f"\nEnter the index of the result to use [0-{last_index}]: " + ) - except: - logger.error("Index invalid!") - print("Index invalid!") + if "" != idx: + try: + return cleanup_result(response_data[int(idx)], media_type) + + except Exception as exc: + raise Exception("Index invalid") from exc item["IMDB ID"] = input(f"Enter IMDB ID for {item_title}: ") if "" != item["IMDB ID"]: return import_by_id(item["IMDB ID"], media_type) - else: - logger.warning(f"Skipped {item_title}") - return item + + logger.warning(f"Skipped {item_title}") + return item -def main(): +def main() -> None: + """Prompt user to select media type and log to process""" + media_type = "" while media_type not in ["films", "tv-episodes", "tv-series", "books"]: media_type = input("Select media type [films|tv-episodes|tv-series|books]: ") @@ -257,9 +272,8 @@ def main(): process_log(media_type, log) - except Exception as error: + except Exception: logger.exception("Exception occurred") - print(error) if __name__ == "__main__":