640 lines
21 KiB
Python
640 lines
21 KiB
Python
"""
|
|
Add a new item to a media catalogue, using various APIs:
|
|
|
|
- TV series' and films using the TMDB API and IDs;
|
|
- TV episodes using the TMDB API and TVDB IDs (because the TMDB
|
|
API is difficult and a lot of TMDB records don't have IMDB IDs);
|
|
- books using the OpenLibrary API and ISBNs; and
|
|
- games using the GiantBomb API and IDs.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
import time
|
|
from datetime import datetime
|
|
import requests
|
|
from dotenv import load_dotenv
|
|
|
|
authors = []
|
|
|
|
|
|
def setup_logger(name="add_item"):
|
|
"""Set up the logger for console and file"""
|
|
|
|
logr = logging.getLogger(name)
|
|
|
|
c_handler = logging.StreamHandler()
|
|
f_handler = logging.FileHandler("./logs/run.log")
|
|
|
|
logging.root.setLevel(logging.INFO)
|
|
c_handler.setLevel(logging.INFO)
|
|
f_handler.setLevel(logging.WARNING)
|
|
|
|
c_format = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
|
|
f_format = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
|
|
|
c_handler.setFormatter(c_format)
|
|
f_handler.setFormatter(f_format)
|
|
|
|
logr.addHandler(c_handler)
|
|
logr.addHandler(f_handler)
|
|
|
|
return logr
|
|
|
|
|
|
logger = setup_logger()
|
|
|
|
load_dotenv()
|
|
|
|
TMDB_API_KEY = os.getenv("TMDB_API_KEY")
|
|
|
|
if "" == TMDB_API_KEY:
|
|
logger.error("TMDB API key not found")
|
|
|
|
|
|
def return_if_exists(item_id: str, media_type: str, log: str) -> dict | None:
|
|
"""Returns an item if it exists in the requested log"""
|
|
|
|
logger.info(f"Checking for '{item_id}' in '{log}'…")
|
|
with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file:
|
|
log_items = json.load(log_file)
|
|
|
|
id_key = "id"
|
|
if "books" == media_type:
|
|
if re.search("OL[0-9]+[MW]", item_id) is not None:
|
|
id_key = "ol_id"
|
|
elif re.search("[0-9]{13}", item_id) is not None:
|
|
id_key = "isbn_13"
|
|
elif re.search("[0-9]{10}", item_id) is not None:
|
|
id_key = "isbn_10"
|
|
else:
|
|
raise Exception("Invalid ID for book")
|
|
|
|
existing_items = [
|
|
log_item
|
|
for log_item in log_items
|
|
if id_key in log_item and log_item[id_key] == item_id
|
|
]
|
|
if len(existing_items) > 0:
|
|
logger.info(f"Found item in '{log}'")
|
|
return existing_items[-1]
|
|
logger.info(f"'{item_id}' not found in '{log}'")
|
|
|
|
|
|
def delete_existing(item_id: str, media_type: str, log: str) -> None:
|
|
"""Deletes an item from a log if it matches the ID"""
|
|
|
|
logger.info(f"Deleting '{item_id}' from '{log}'…")
|
|
with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file:
|
|
log_items = json.load(log_file)
|
|
|
|
id_key = "id"
|
|
if "books" == media_type:
|
|
if re.search("OL[0-9]+[MW]", item_id) is not None:
|
|
id_key = "ol_id"
|
|
elif re.search("[0-9]{13}", item_id) is not None:
|
|
id_key = "isbn_13"
|
|
elif re.search("[0-9]{10}", item_id) is not None:
|
|
id_key = "isbn_10"
|
|
else:
|
|
raise Exception("Invalid ID for book")
|
|
|
|
elif media_type in ["films", "tv-episodes"]:
|
|
if re.search("tt[0-9]+", item_id) is not None:
|
|
id_key = "isbn_id"
|
|
elif re.search("[0-9]+", item_id) is not None:
|
|
id_key = "tmdb_id"
|
|
else:
|
|
raise Exception("Invalid ID for film")
|
|
|
|
old_len = len(log_items)
|
|
log_items = [
|
|
log_item
|
|
for log_item in log_items
|
|
if id_key not in log_item
|
|
or (id_key in log_item and log_item[id_key] != item_id)
|
|
]
|
|
if len(log_items) < (old_len - 1):
|
|
raise Exception("More than one deletion made, discarding…")
|
|
elif len(log_items) == old_len:
|
|
raise Exception("No item deleted, skipping…")
|
|
|
|
with open(f"./data/{media_type}/{log}.json", "w", encoding="utf-8") as log_file:
|
|
json.dump(log_items, log_file, indent=4)
|
|
logger.info(f"'{item_id}' deleted from '{log}'")
|
|
|
|
|
|
def check_for_existing(
|
|
item_id, media_type, log
|
|
) -> tuple[dict[dict, str] | None, str | None]:
|
|
"""
|
|
Check for an existing item in the current log, and pull the
|
|
`date_added` etc. and mark it as a repeat if so.
|
|
Otherwise, check for an existing item in the other logs, and move
|
|
it to the specified log if so.
|
|
"""
|
|
|
|
logger.info(f"Checking for '{item_id}' in logs…")
|
|
|
|
# Check in specified log
|
|
existing_item = return_if_exists(item_id, media_type, log)
|
|
|
|
if existing_item is not None:
|
|
if "log" == log:
|
|
existing_item["is_repeat"] = True
|
|
return existing_item, None
|
|
|
|
for log_to_check in [
|
|
p_log for p_log in ["log", "current", "wishlist"] if p_log != log
|
|
]:
|
|
if (
|
|
"current" == log_to_check and media_type in ["books", "games", "tv-series"]
|
|
) or (
|
|
"wishlist" == log_to_check
|
|
and media_type in ["books", "games", "films", "tv-series"]
|
|
):
|
|
existing_item = return_if_exists(item_id, media_type, log_to_check)
|
|
if existing_item is not None:
|
|
return existing_item, log_to_check
|
|
|
|
return None, None
|
|
|
|
|
|
def add_item_to_log(item_id: str, media_type: str, log: str) -> None:
|
|
"""Add a film, book, TV series or TV episode to a log"""
|
|
|
|
logger.info(f"Processing {item_id}…")
|
|
|
|
item: dict | None = None
|
|
log_to_delete = None
|
|
if media_type not in ["tv-episodes", "books"]:
|
|
item, log_to_delete = check_for_existing(item_id, media_type, log)
|
|
|
|
if item is None:
|
|
item = import_by_id(item_id, media_type, log)
|
|
if item is None:
|
|
raise Exception("No item found")
|
|
|
|
if "books" == media_type:
|
|
new_item, log_to_delete = check_for_existing(
|
|
item["work"]["ol_id"], media_type, log
|
|
)
|
|
if new_item is None:
|
|
new_item, log_to_delete = check_for_existing(item["ol_id"], media_type, log)
|
|
if new_item is None:
|
|
new_item, log_to_delete = check_for_existing(
|
|
item["isbn_13"], media_type, log
|
|
)
|
|
if new_item is None:
|
|
new_item, log_to_delete = check_for_existing(
|
|
item["isbn_10"], media_type, log
|
|
)
|
|
item = new_item if new_item is not None else item
|
|
|
|
if log in ["log", "current"]:
|
|
if "date_started" not in item and media_type in ["books", "tv-series", "games"]:
|
|
date_started = ""
|
|
while re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}", date_started) is None:
|
|
date_started = input("Enter date started [YYYY-MM-DD, t for today]: ")
|
|
if "t" == date_started:
|
|
date_started = datetime.today().strftime("%Y-%m-%d")
|
|
item["date_started"] = date_started
|
|
|
|
if "date_finished" not in item and "log" == log:
|
|
date_finished = ""
|
|
while re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}", date_finished) is None:
|
|
date_finished = input("Enter date finished [YYYY-MM-DD, t for today]: ")
|
|
if "t" == date_finished:
|
|
date_finished = datetime.today().strftime("%Y-%m-%d")
|
|
item["date_finished"] = date_finished
|
|
|
|
if "is_repeat" not in item:
|
|
is_repeat = ""
|
|
while is_repeat not in ["y", "n"]:
|
|
is_repeat = input("Is this a repeat entry? [y/n]: ")
|
|
if "y" == is_repeat:
|
|
item["is_repeat"] = True
|
|
|
|
if "added_by_id" not in item:
|
|
item["added_by_id"] = item_id
|
|
|
|
if "comments" not in item:
|
|
comments = input("Enter comments (optional): ")
|
|
if "" != comments:
|
|
item["comments"] = comments
|
|
|
|
# Validation step
|
|
print(f"{media_type} data to add:\n")
|
|
print(json.dumps(item, indent=4))
|
|
if "y" != input("\nDoes this look correct? [y]: "):
|
|
return
|
|
|
|
# Save changes
|
|
logger.info(f"Adding {media_type} to {log}…")
|
|
|
|
with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file:
|
|
log_items = json.load(log_file)
|
|
|
|
log_items.insert(0, item)
|
|
|
|
with open(f"./data/{media_type}/{log}.json", "w", encoding="utf-8") as log_file:
|
|
json.dump(log_items, log_file, indent=4)
|
|
|
|
logger.info(f"Added {media_type} {item_id} to {log}")
|
|
|
|
if log_to_delete is not None:
|
|
delete_existing(item_id, media_type, log_to_delete)
|
|
|
|
|
|
def import_by_id(import_id, media_type, log) -> dict | None:
|
|
"""Import from the appropriate API by unique ID"""
|
|
|
|
if media_type in ["films", "tv-series"]:
|
|
return import_from_tmdb_by_id(import_id, media_type)
|
|
|
|
if media_type in ["tv-episodes"]:
|
|
return import_from_tmdb_by_external_id(import_id, media_type)
|
|
|
|
if media_type in ["books"]:
|
|
if "wishlist" == log:
|
|
return import_from_openlibrary_by_ol_key(import_id)
|
|
|
|
else:
|
|
return import_from_openlibrary_by_isbn(
|
|
"".join(re.findall(r"\d+", import_id)), media_type
|
|
)
|
|
|
|
|
|
def import_from_tmdb_by_external_id(external_id, media_type) -> dict:
|
|
"""Retrieve a film, TV show or TV episode from TMDB using an IMDB or TVDB ID"""
|
|
|
|
api_url = f"https://api.themoviedb.org/3/find/{external_id}"
|
|
|
|
# Sending API request
|
|
response = requests.get(
|
|
api_url,
|
|
headers={"Authorization": f"Bearer {TMDB_API_KEY}"},
|
|
params={
|
|
"external_source": (
|
|
"imdb_id" if re.search("tt[0-9]+", external_id) else "tvdb_id"
|
|
)
|
|
},
|
|
timeout=15,
|
|
)
|
|
|
|
# Process the response
|
|
if 200 == response.status_code:
|
|
logger.debug(response.status_code)
|
|
|
|
elif 429 == response.status_code:
|
|
time.sleep(2)
|
|
return import_from_tmdb_by_external_id(external_id, media_type)
|
|
|
|
else:
|
|
raise Exception(f"Error {response.status_code}: {response.text}")
|
|
|
|
key = ""
|
|
if "tv-episodes" == media_type:
|
|
key = "tv_episode_results"
|
|
elif "tv-series" == media_type:
|
|
key = "tv_results"
|
|
elif "films" == media_type:
|
|
key = "movie_results"
|
|
|
|
response_data = json.loads(response.text)[key][0]
|
|
if response_data is None:
|
|
raise Exception(f"Nothing found for TVDB ID {external_id}!")
|
|
|
|
# Modify the returned result to add additional data
|
|
return cleanup_result(response_data, media_type)
|
|
|
|
|
|
def import_from_tmdb_by_id(tmdb_id, media_type) -> dict:
|
|
"""Retrieve a film, TV show or TV episode from TMDB using an TMDB ID"""
|
|
|
|
api_path = "movie" if "films" == media_type else "tv"
|
|
api_url = f"https://api.themoviedb.org/3/{api_path}/{tmdb_id}"
|
|
|
|
# Sending API request
|
|
response = requests.get(
|
|
api_url, headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, timeout=15
|
|
)
|
|
|
|
# Process the response
|
|
if 200 == response.status_code:
|
|
logger.debug(response.status_code)
|
|
|
|
elif 429 == response.status_code:
|
|
time.sleep(2)
|
|
return import_from_tmdb_by_id(tmdb_id, media_type)
|
|
|
|
else:
|
|
raise Exception(f"Error {response.status_code}: {response.text}")
|
|
|
|
response_data = json.loads(response.text)
|
|
|
|
# Modify the returned result to add additional data
|
|
return cleanup_result(response_data, media_type)
|
|
|
|
|
|
def import_from_openlibrary_by_isbn(isbn, media_type) -> dict | None:
|
|
"""Retrieve a film, TV show or TV episode from TMDB using an IMDB ID"""
|
|
|
|
logging.info(f"Importing '{isbn}'…")
|
|
|
|
api_url = f"https://openlibrary.org/isbn/{isbn}"
|
|
|
|
# Sending API request
|
|
response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15)
|
|
|
|
# Process the response
|
|
if 200 == response.status_code:
|
|
logger.debug(response.status_code)
|
|
|
|
elif 429 == response.status_code:
|
|
time.sleep(2)
|
|
return import_from_openlibrary_by_isbn(isbn, media_type)
|
|
|
|
elif 404 == response.status_code:
|
|
logger.error(f"{response.status_code}: Not Found for ISBN '{isbn}'")
|
|
return None
|
|
else:
|
|
raise Exception(f"Error {response.status_code}: {response.text}")
|
|
|
|
item = json.loads(response.text)
|
|
|
|
for key in ["authors", "works"]:
|
|
if key in item:
|
|
for i, sub_item in enumerate(item[key]):
|
|
item[key][i] = import_from_openlibrary_by_ol_key(sub_item["key"])
|
|
|
|
if "works" in item:
|
|
if len(item["works"]) > 1:
|
|
print(f"Multiple works found for {isbn}:")
|
|
print(item["works"])
|
|
idx = input(f"Select ID to use [0-{len(item['works'])-1}]: ")
|
|
item["works"][0] = item["works"][int(idx)]
|
|
|
|
item["work"] = item["works"][0]
|
|
del item["works"]
|
|
|
|
# Rate limiting
|
|
time.sleep(1)
|
|
|
|
# Modify the returned result to add additional data
|
|
return cleanup_result(item, media_type)
|
|
|
|
|
|
def import_from_openlibrary_by_ol_key(key) -> dict | None:
|
|
"""Retrieves an item (author or work, NOT edition) from OpenLibrary using an OL key"""
|
|
|
|
if len(key.split("/")) == 1:
|
|
key = f"/works/{key}"
|
|
|
|
logger.info(f"Retrieving {key}…")
|
|
_, mode, ol_id = key.split("/")
|
|
cached_authors = []
|
|
|
|
if "authors" == mode:
|
|
with open(
|
|
"./scripts/caching/authors.json", "r", encoding="utf-8"
|
|
) as authors_cache:
|
|
cached_authors = json.load(authors_cache)
|
|
|
|
if mode in ["works", "authors"]:
|
|
if "authors" == mode:
|
|
matched_cached_authors = [
|
|
aut for aut in cached_authors if aut["ol_id"] == ol_id
|
|
]
|
|
if len(matched_cached_authors) == 1:
|
|
logging.info(
|
|
f"Found cached author '{matched_cached_authors[0]['name']}'"
|
|
)
|
|
return matched_cached_authors[0]
|
|
|
|
api_url = f"https://openlibrary.org{key}"
|
|
|
|
# Sending API request
|
|
response = requests.get(
|
|
api_url, headers={"accept": "application/json"}, timeout=15
|
|
)
|
|
|
|
# Process the response
|
|
if 200 == response.status_code:
|
|
logger.debug(response.status_code)
|
|
|
|
elif 429 == response.status_code:
|
|
time.sleep(2)
|
|
import_from_openlibrary_by_ol_key(key)
|
|
|
|
else:
|
|
raise Exception(f"Error {response.status_code}: {response.text}")
|
|
|
|
# Rate limiting
|
|
time.sleep(1)
|
|
|
|
item = json.loads(response.text)
|
|
|
|
if "authors" == mode:
|
|
author = {"ol_id": ol_id, "name": item["name"]}
|
|
print(author)
|
|
if "personal_name" in item:
|
|
if item["name"] != item["personal_name"]:
|
|
author["personal_name"] = item["personal_name"]
|
|
|
|
logger.info(f"Caching author '{author['name']}'…")
|
|
cached_authors.append(author)
|
|
with open(
|
|
"./scripts/caching/authors.json", "w", encoding="utf-8"
|
|
) as authors_cache:
|
|
json.dump(cached_authors, authors_cache, indent=4)
|
|
logger.info(f"Author '{author['name']}' cached!")
|
|
|
|
return author
|
|
|
|
if "works" == mode:
|
|
work = {"ol_id": ol_id, "title": item["title"], "authors": []}
|
|
|
|
if "authors" in item:
|
|
for author in item["authors"]:
|
|
work["authors"].append(
|
|
import_from_openlibrary_by_ol_key(author["author"]["key"])
|
|
)
|
|
|
|
for result_key in ["first_publish_date", "subjects"]:
|
|
if result_key in item:
|
|
work[result_key] = item[result_key]
|
|
|
|
work["date_added"] = datetime.today().strftime("%Y-%m-%d")
|
|
|
|
return work
|
|
|
|
else:
|
|
raise Exception(f"Unknown OpenLibrary key '{mode}'")
|
|
|
|
|
|
def cleanup_result(item, media_type) -> dict:
|
|
"""Process a film, TV series, TV episode or book returned by their
|
|
respective APIs by removing unnecessary fields and adding others"""
|
|
|
|
for field_name in [
|
|
"adult", # TMDB
|
|
"backdrop_path", # TMDB
|
|
"budget", # TMDB
|
|
"copyright_date", # OpenLibrary
|
|
"classifications", # OpenLibrary
|
|
"created", # OpenLibrary
|
|
"dewey_decimal_class", # OpenLibary
|
|
"episode_type", # TMDB
|
|
"first_sentence", # OpenLibrary
|
|
"genre_ids", # TMDB
|
|
"homepage", # TMDB
|
|
"identifiers", # OpenLibrary
|
|
"media_type", # TMDB
|
|
"last_modified", # OpenLibrary
|
|
"latest_revision", # OpenLibrary
|
|
"lc_classifications", # OpenLibrary
|
|
"lccn", # OpenLibrary
|
|
"local_id", # OpenLibrary
|
|
"notes", # OpenLibrary
|
|
"ocaid", # OpenLibrary
|
|
"oclc_numbers", # OpenLibrary
|
|
"pagination", # OpenLibrary
|
|
"physical_dimensions", # OpenLibrary
|
|
"popularity", # TMDB
|
|
"production_code", # TMDB
|
|
"production_companies", # TMDB
|
|
"publish_places", # OpenLibrary
|
|
"revenue", # TMDB
|
|
"revision", # OpenLibrary
|
|
"runtime", # TMDB
|
|
"source_records", # OpenLibrary
|
|
"status", # TMDB
|
|
"still_path", # TMDB
|
|
"table_of_contents", # OpenLibrary
|
|
"tagline", # TMDB
|
|
"type", # OpenLibrary
|
|
"uri_descriptions", # OpenLibrary
|
|
"url", # OpenLibrary
|
|
"video", # TMDB
|
|
"vote_average", # TMDB
|
|
"vote_count", # TMDB
|
|
"weight", # OpenLibrary
|
|
]:
|
|
if field_name in item:
|
|
del item[field_name]
|
|
|
|
if media_type in ["films", "tv-series", "tv-episodes"]:
|
|
item["tmdb_id"] = item["id"]
|
|
del item["id"]
|
|
|
|
title_key = "name" if "tv-series" == media_type else "title"
|
|
|
|
if f"original_{title_key}" in item and "original_language" in item:
|
|
if (
|
|
item[f"original_{title_key}"] == item[title_key]
|
|
and item["original_language"] == "en"
|
|
):
|
|
del item[f"original_{title_key}"], item["original_language"]
|
|
|
|
if "tv-episodes" == media_type:
|
|
item["series"] = {"tmdb_id": item["show_id"]}
|
|
del item["show_id"]
|
|
|
|
if "books" == media_type:
|
|
_, _, item["ol_id"] = item["key"].split("/")
|
|
del item["key"]
|
|
|
|
for key in ["isbn_10", "isbn_13"]:
|
|
if key in item:
|
|
if len(item[key]) > 1:
|
|
logger.warning("Multiple ISBN results")
|
|
|
|
item[key] = item[key][0]
|
|
|
|
if "languages" in item:
|
|
item["languages"] = [
|
|
lang["key"].split("/")[2] for lang in item["languages"]
|
|
]
|
|
|
|
if "translation_of" in item:
|
|
if not (
|
|
item["translation_of"].split(":")[0].lower()
|
|
== item["work"]["title"].split(":")[0].lower()
|
|
):
|
|
logger.warn(
|
|
f"translation_of '{item['translation_of']}' \
|
|
is different to work title '{item['work']['title']}'"
|
|
)
|
|
if "y" != input("Accept change? [y|n]: "):
|
|
raise Exception(
|
|
f"translation_of '{item['translation_of']}' \
|
|
is different to work title '{item['work']['title']}'"
|
|
)
|
|
del item["translation_of"]
|
|
|
|
if "translated_from" in item:
|
|
if len(item["translated_from"]) > 1:
|
|
raise Exception("Multiple translated_from results")
|
|
|
|
item["work"]["original_language"] = item["translated_from"][0]["key"].split(
|
|
"/"
|
|
)[2]
|
|
del item["translated_from"]
|
|
|
|
if "date_added" not in item:
|
|
item["date_added"] = datetime.today().strftime("%Y-%m-%d")
|
|
|
|
return item
|
|
|
|
|
|
def main() -> None:
|
|
"""Prompt user to select media type and log to process"""
|
|
|
|
media_type = ""
|
|
while media_type not in ["films", "tv-episodes", "tv-series", "books"]:
|
|
media_type = input("Select media type [films|tv-episodes|tv-series|books]: ")
|
|
|
|
try:
|
|
item_id = ""
|
|
log = ""
|
|
if "films" == media_type:
|
|
while log not in ["log", "wishlist"]:
|
|
log = input("Enter log to update [log|wishlist]: ")
|
|
|
|
while re.search("[0-9]+", item_id) is None:
|
|
item_id = input("Enter TMDB ID: ")
|
|
|
|
elif "books" == media_type:
|
|
while log not in ["log", "current", "wishlist"]:
|
|
log = input("Enter log to update [log|current|wishlist]: ")
|
|
|
|
while re.search("[0-9]+", item_id) is None:
|
|
if "wishlist" == log:
|
|
item_id = input("Enter OpenLibrary Work ID: ")
|
|
else:
|
|
item_id = "".join(re.findall(r"\d+", input("Enter ISBN: ")))
|
|
|
|
elif "tv-episodes" == media_type:
|
|
while re.search("(tt)?[0-9]+", item_id) is None:
|
|
item_id = input("Enter TVDB or IMDB ID: ")
|
|
|
|
elif "tv-series" == media_type:
|
|
while log not in ["log", "current", "wishlist"]:
|
|
log = input("Enter log to update [log|current|wishlist]: ")
|
|
|
|
while re.search("[0-9]+", item_id) is None:
|
|
item_id = input("Enter TMDB ID: ")
|
|
|
|
item_id_parsed = re.search("(OL|tt)?[0-9]+[WMA]?", item_id)
|
|
if item_id_parsed is not None:
|
|
add_item_to_log(item_id_parsed[0], media_type, log)
|
|
|
|
except Exception:
|
|
logger.exception("Exception occurred")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|