lint scripts

This commit is contained in:
Ben Goldsworthy 2024-01-17 23:02:56 +01:00
parent 33e8270afe
commit 29592be6ce
3 changed files with 112 additions and 96 deletions

View file

@ -7,7 +7,7 @@
"deploy": "rsync -rP ./public/ ovhvps:~/catalogue/content",
"lint:json": "jsonlint ./**/*.json -s",
"lint:json:fix": "npm run lint:json -- -i",
"lint:py": "pylint ./scripts/*.py"
"lint:py": "pylint --disable=broad-exception-raised --disable=logging-fstring-interpolation ./scripts/*.py"
},
"devDependencies": {
"jsonlint": "^1.6.3"

View file

@ -1,19 +1,23 @@
# Script to add a new item to the log
"""
Add a new item to a media catalogue, using various APIs.
"""
from datetime import datetime
from dotenv import load_dotenv
import json
import logging
import os
import re
import time
from datetime import datetime
import requests
from urllib.request import urlopen
from dotenv import load_dotenv
def setup_logger(name="add_item"):
"""Set up the logger for console and file"""
logging.root.setLevel(logging.NOTSET)
logger = logging.getLogger(name)
logr = logging.getLogger(name)
c_handler = logging.StreamHandler()
f_handler = logging.FileHandler("./logs/run.log")
@ -27,10 +31,10 @@ def setup_logger(name="add_item"):
c_handler.setFormatter(c_format)
f_handler.setFormatter(f_format)
logger.addHandler(c_handler)
logger.addHandler(f_handler)
logr.addHandler(c_handler)
logr.addHandler(f_handler)
return logger
return logr
logger = setup_logger()
@ -46,12 +50,12 @@ if "" == TVDB_API_KEY:
logger.error("TVDB API key not found")
def add_item_to_log(item_id, media_type, log):
def add_item_to_log(item_id, media_type, log) -> None:
"""Add a film, book, TV series or TV episode to a log"""
logger.info(f"Processing {item_id}")
item = import_by_id(item_id, media_type)
item: dict = import_by_id(item_id, media_type)
if log in ["log", "current"]:
# TODO - review this when moving from one log to another
@ -74,7 +78,7 @@ def add_item_to_log(item_id, media_type, log):
# TODO - do this automatically
is_repeat = ""
while is_repeat not in ["y", "n"]:
is_repeat = input(f"Is this a repeat entry? [y/n]: ")
is_repeat = input("Is this a repeat entry? [y/n]: ")
if "y" == is_repeat:
item["is_repeat"] = True
item["added_by_id"] = item_id
@ -84,7 +88,6 @@ def add_item_to_log(item_id, media_type, log):
item["comments"] = comments
# Validation step
correct = ""
print(f"{media_type} data to add:\n")
print(json.dumps(item, indent=4))
if "y" != input("\nDoes this look correct? [y]: "):
@ -93,29 +96,31 @@ def add_item_to_log(item_id, media_type, log):
# Save changes
logger.info(f"Adding {media_type} to {log}")
with open(f"./data/{media_type}/{log}.json", "r") as log_file:
with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file:
log_items = json.load(log_file)
log_items.insert(0, item)
with open(f"./data/{media_type}/{log}.json", "w") as log_file:
with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file:
json.dump(log_items, log_file, indent=4)
logger.info(f"Added {media_type} {item_id} to {log}")
def import_by_id(import_id, media_type):
def import_by_id(import_id, media_type) -> dict:
"""Import from the appropriate API by unique ID"""
if media_type in ["films", "tv-series"]:
return import_from_imdb_by_id(import_id, media_type)
elif media_type in ["tv-episodes"]:
if media_type in ["tv-episodes"]:
return #import_from_tvdb_by_id(import_id, media_type)
elif media_type in ["books"]:
if media_type in ["books"]:
return import_from_openlibrary_by_id(import_id, media_type)
def import_from_imdb_by_id(imdb_id, media_type):
def import_from_imdb_by_id(imdb_id, media_type) -> dict:
"""Retrieve a film, TV show or TV episode from TMDB using an IMDB ID"""
api_url = f"https://api.themoviedb.org/3/find/{imdb_id}"
@ -125,6 +130,7 @@ def import_from_imdb_by_id(imdb_id, media_type):
api_url,
params={"external_source": "imdb_id"},
headers={"Authorization": f"Bearer {TMDB_API_KEY}"},
timeout=15
)
# Process the response
@ -133,8 +139,7 @@ def import_from_imdb_by_id(imdb_id, media_type):
elif 429 == response.status_code:
time.sleep(2)
import_from_imdb_by_id(imdb_id, media_type)
return
return import_from_imdb_by_id(imdb_id, media_type)
else:
raise Exception(f"Error {response.status_code}: {response.text}")
@ -161,20 +166,20 @@ def import_from_imdb_by_id(imdb_id, media_type):
try:
item = response_data[int(idx)]
except:
raise Exception(f"Index {idx} is invalid")
except Exception as exc:
raise Exception(f"Index {idx} is invalid") from exc
# Modify the returned result to add additional data
return cleanup_result(item, media_type)
def import_from_openlibrary_by_id(isbn, media_type):
def import_from_openlibrary_by_id(isbn, media_type) -> dict:
"""Retrieve a film, TV show or TV episode from TMDB using an IMDB ID"""
api_url = f"https://openlibrary.org/isbn/{isbn}"
# Sending API request
response = requests.get(api_url, headers={"accept": "application/json"})
response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15)
# Process the response
if 200 == response.status_code:
@ -182,8 +187,7 @@ def import_from_openlibrary_by_id(isbn, media_type):
elif 429 == response.status_code:
time.sleep(2)
import_from_openlibrary_by_id(isbn, media_type)
return
return import_from_openlibrary_by_id(isbn, media_type)
else:
raise Exception(f"Error {response.status_code}: {response.text}")
@ -199,7 +203,6 @@ def import_from_openlibrary_by_id(isbn, media_type):
if len(item["works"]) > 1:
raise Exception(f"Multiple works found for {isbn}")
else:
item["work"] = item["works"][0]
del item["works"]
@ -207,8 +210,8 @@ def import_from_openlibrary_by_id(isbn, media_type):
return cleanup_result(item, media_type)
def import_from_openlibrary_by_ol_key(key):
"""Retrieves an item (author or work) from OpenLibrary using an OL key"""
def import_from_openlibrary_by_ol_key(key) -> dict:
"""Retrieves an item (author or work, NOT edition) from OpenLibrary using an OL key"""
_, mode, ol_id = key.split("/")
@ -216,7 +219,7 @@ def import_from_openlibrary_by_ol_key(key):
api_url = f"https://openlibrary.org{key}"
# Sending API request
response = requests.get(api_url, headers={"accept": "application/json"})
response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15)
# Process the response
if 200 == response.status_code:
@ -224,8 +227,7 @@ def import_from_openlibrary_by_ol_key(key):
elif 429 == response.status_code:
time.sleep(2)
import_from_openlibrary_by_ol_key(key)
return
return import_from_openlibrary_by_ol_key(key)
else:
raise Exception(f"Error {response.status_code}: {response.text}")
@ -241,12 +243,12 @@ def import_from_openlibrary_by_ol_key(key):
return author
elif "works" == mode:
if "works" == mode:
work = {"id": ol_id, "title": item["title"]}
for key in ["first_publish_date", "subjects"]:
if key in item:
work[key] = item[key]
for result_key in ["first_publish_date", "subjects"]:
if result_key in item:
work[result_key] = item[result_key]
return work
@ -254,8 +256,9 @@ def import_from_openlibrary_by_ol_key(key):
raise Exception(f"Unknown OpenLibrary key '{mode}'")
def cleanup_result(item, media_type):
"""Process a film, TV series, TV episode or book returned by their respecitve APIs by removing unnecessary fields and adding others"""
def cleanup_result(item, media_type) -> dict:
"""Process a film, TV series, TV episode or book returned by their
respective APIs by removing unnecessary fields and adding others"""
for field_name in [
"adult", # TMDB
@ -307,14 +310,12 @@ def cleanup_result(item, media_type):
if len(item[key]) > 1:
raise Exception("Multiple ISBN results")
else:
item[key] = item[key][0]
if "publish_places" in item:
if len(item["publish_places"]) > 1:
raise Exception("Multiple publish_places")
else:
item["published_in"] = item["publish_places"][0]
del item["publish_places"]
@ -328,14 +329,14 @@ def cleanup_result(item, media_type):
del item["translation_of"]
else:
raise Exception(
f"translation_of '{item['translation_of']}' is different to work title '{item['work']['title']}'"
f"translation_of '{item['translation_of']}' \
is different to work title '{item['work']['title']}'"
)
if "translated_from" in item:
if len(item["translated_from"]) > 1:
raise Exception("Multiple translated_from results")
else:
item["work"]["original_language"] = item["translated_from"][0][
"key"
].split("/")[2]
@ -347,7 +348,9 @@ def cleanup_result(item, media_type):
return item
def main():
def main() -> None:
"""Prompt user to select media type and log to process"""
media_type = ""
while media_type not in ["films", "tv-episodes", "tv-series", "books"]:
media_type = input("Select media type [films|tv-episodes|tv-series|books]: ")
@ -393,9 +396,8 @@ def main():
add_item_to_log(imdb_id, media_type, log)
except Exception as error:
except Exception:
logger.exception("Exception occurred")
print(error)
if __name__ == "__main__":

View file

@ -1,10 +1,13 @@
from dotenv import load_dotenv
"""
Process logs derived from social cataloguing site data exports, using various APIs.
"""
import json
import os
import re
import requests
import time
from urllib.request import urlopen
import requests
from dotenv import load_dotenv
from add_item import cleanup_result, import_by_id, setup_logger
logger = setup_logger("process_logs")
@ -20,12 +23,12 @@ if "" == TVDB_API_KEY:
logger.warning("TVDB API key not found")
def process_log(media_type, log):
def process_log(media_type, log) -> None:
"""Run through a log and call the appropriate API for each item found"""
logger.info(f"Processing {media_type}/{log}")
with open(f"./data/{media_type}/{log}.json", "r") as log_file:
with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file:
log_items = json.load(log_file)
log_item_values = {}
@ -105,7 +108,11 @@ def process_log(media_type, log):
if re.search("tt[0-9]+", item["imdb_id"]) is not None:
log_items[i] = import_by_id(item["imdb_id"], media_type)
with open(f"./data/{media_type}/{log}.json", "w") as log_file:
with open(
f"./data/{media_type}/{log}.json",
"w",
encoding='utf-8'
) as log_file:
json.dump(log_items, log_file, indent=4)
else:
@ -115,7 +122,11 @@ def process_log(media_type, log):
log_items[i] = new_log_item
if i % 15 == 0:
with open(f"./data/{media_type}/{log}.json", "w") as log_file:
with open(
f"./data/{media_type}/{log}.json",
"w",
encoding='utf-8'
) as log_file:
json.dump(log_items, log_file, indent=4)
if log_items[i] is not None:
@ -124,29 +135,29 @@ def process_log(media_type, log):
except KeyError:
print(json.dumps(item, indent=4))
with open(f"./data/{media_type}/{log}.json", "w") as log_file:
with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file:
json.dump(log_items, log_file, indent=4)
logger.info(f"Finished processing {media_type}/{log}")
def import_by_details(item, item_title, media_type):
def import_by_details(item, item_title, media_type) -> dict:
"""Import an item when lacking a unique identifier"""
if media_type in ["films", "tv-series"]:
return import_from_tmdb_by_details(item, item_title, media_type)
elif media_type in ["tv-episodes"]:
if media_type in ["tv-episodes"]:
return # import_from_tvdb_by_details(item, item_title, media_type)
elif media_type in ["books"]:
if media_type in ["books"]:
return # import_from_openlibrary_by_details(item, item_title, media_type)
elif media_type in ["games"]:
if media_type in ["games"]:
return # import_from_igdb_by_details(item, item_title, media_type)
def import_from_tmdb_by_details(item, item_title, media_type):
def import_from_tmdb_by_details(item, item_title, media_type) -> dict:
"""Retrieve a film or TV series from TMDB using its title"""
logger.info(f"Processing {item_title}")
@ -162,6 +173,7 @@ def import_from_tmdb_by_details(item, item_title, media_type):
"year": item["Release Year"] if "Release Year" in item else None,
},
headers={"Authorization": f"Bearer {TMDB_API_KEY}"},
timeout=15
)
# Process the response
@ -169,7 +181,7 @@ def import_from_tmdb_by_details(item, item_title, media_type):
logger.debug(response.status_code)
elif 429 == response.status_code:
time.sleep(2)
import_from_tmdb_by_details(item)
return import_from_tmdb_by_details(item, item_title, media_type)
else:
logger.error(response.text)
@ -178,7 +190,7 @@ def import_from_tmdb_by_details(item, item_title, media_type):
if 1 == len(response_data):
return cleanup_result(response_data[0], media_type)
elif 0 == len(response_data):
if 0 == len(response_data):
logger.warning(f"Returned no {media_type} for {item_title}")
elif 1 < len(response_data):
@ -190,11 +202,11 @@ def import_from_tmdb_by_details(item, item_title, media_type):
filtered_response_data = [
result for result in response_data if result[title_key] == item_title
]
frd_len = len(filtered_response_data)
if 1 == len(filtered_response_data):
if 1 == frd_len:
return cleanup_result(response_data[0], media_type)
else:
logger.warning(f"Returned more than one {media_type} for '{item_title}':\n")
print(
json.dumps(
@ -204,28 +216,31 @@ def import_from_tmdb_by_details(item, item_title, media_type):
indent=4,
)
)
last_index = len(filtered_response_data if frd_len > 0 else response_data) - 1
idx = input(
f"\nEnter the index of the result to use [0-{len(filtered_response_data if len(filtered_response_data) > 0 else response_data) - 1}]: "
f"\nEnter the index of the result to use [0-{last_index}]: "
)
if "" != idx:
try:
return cleanup_result(response_data[int(idx)], media_type)
except:
logger.error("Index invalid!")
print("Index invalid!")
except Exception as exc:
raise Exception("Index invalid") from exc
item["IMDB ID"] = input(f"Enter IMDB ID for {item_title}: ")
if "" != item["IMDB ID"]:
return import_by_id(item["IMDB ID"], media_type)
else:
logger.warning(f"Skipped {item_title}")
return item
def main():
def main() -> None:
"""Prompt user to select media type and log to process"""
media_type = ""
while media_type not in ["films", "tv-episodes", "tv-series", "books"]:
media_type = input("Select media type [films|tv-episodes|tv-series|books]: ")
@ -257,9 +272,8 @@ def main():
process_log(media_type, log)
except Exception as error:
except Exception:
logger.exception("Exception occurred")
print(error)
if __name__ == "__main__":