use TVDB IDs for TV episodes, process wishlisted books as OpenLibrary works

This commit is contained in:
Ben Goldsworthy 2024-02-25 19:53:44 +01:00
parent 0b3cc113a7
commit f9f43d4bbf
Signed by: Rumperuu
SSH key fingerprint: SHA256:e5XfzNOr9UvWpEzyLfw0GtTMZWIFh3NmxH+/qQIi3xE
2 changed files with 69 additions and 31 deletions

View file

@ -1,5 +1,11 @@
""" """
Add a new item to a media catalogue, using various APIs. Add a new item to a media catalogue, using various APIs:
- TV series' and films using the TMDB API and IDs;
- TV episodes using the TMDB API and TVDB IDs (because the TMDB
API is difficult and a lot of TMDB records don't have IMDB IDs);
- books using the OpenLibrary API and ISBNs; and
- games using the GiantBomb API and IDs.
""" """
import json import json
@ -43,12 +49,9 @@ logger = setup_logger()
load_dotenv() load_dotenv()
TMDB_API_KEY = os.getenv("TMDB_API_KEY") TMDB_API_KEY = os.getenv("TMDB_API_KEY")
TVDB_API_KEY = os.getenv("TVDB_API_KEY")
if "" == TMDB_API_KEY: if "" == TMDB_API_KEY:
logger.error("TMDB API key not found") logger.error("TMDB API key not found")
if "" == TVDB_API_KEY:
logger.error("TVDB API key not found")
def return_if_exists(item_id, media_type, log) -> dict | None: def return_if_exists(item_id, media_type, log) -> dict | None:
@ -91,7 +94,12 @@ def delete_existing(item_id, media_type, log) -> None:
def check_for_existing(item_id, media_type, log) -> dict[dict, str]: def check_for_existing(item_id, media_type, log) -> dict[dict, str]:
"""Check for an existing item and move it to the specified log if requested""" """
Check for an existing item in the current log, and pull the
`date_added` etc. and mark it as a repeat if so.
Otherwise, check for an existing item in the other logs, and move
it to the specified log if so.
"""
logger.info(f"Checking for '{item_id}' in logs…") logger.info(f"Checking for '{item_id}' in logs…")
@ -134,6 +142,11 @@ def add_item_to_log(item_id, media_type, log) -> None:
if item is None: if item is None:
raise Exception("No item found") raise Exception("No item found")
if "books" == media_type and "wishlist" != log:
item, log_to_delete = check_for_existing(item['work']['ol_id'], media_type, log)
if item is None:
item, log_to_delete = check_for_existing(item['ol_id'], media_type, log)
if log in ["log", "current"]: if log in ["log", "current"]:
if "date_started" not in item and media_type in ["books", "tv-series", "games"]: if "date_started" not in item and media_type in ["books", "tv-series", "games"]:
date_started = "" date_started = ""
@ -196,28 +209,28 @@ def import_by_id(import_id, media_type, log) -> dict:
return import_from_tmdb_by_id(import_id, media_type) return import_from_tmdb_by_id(import_id, media_type)
if media_type in ["tv-episodes"]: if media_type in ["tv-episodes"]:
return import_from_tmdb_by_imdb_id(import_id, media_type) return import_from_tmdb_by_external_id(import_id, media_type)
if media_type in ["books"]: if media_type in ["books"]:
if "wishlist" == log: if "wishlist" == log:
return import_from_openlibrary_by_ol_key(import_id) return import_from_openlibrary_by_ol_key(import_id)
else: else:
return import_from_openlibrary_by_id( return import_from_openlibrary_by_isbn(
"".join(re.findall(r"\d+", import_id)), media_type "".join(re.findall(r"\d+", import_id)), media_type
) )
def import_from_tmdb_by_imdb_id(imdb_id, media_type) -> dict: def import_from_tmdb_by_external_id(external_id, media_type) -> dict:
"""Retrieve a film, TV show or TV episode from TMDB using an IMDB ID""" """Retrieve a film, TV show or TV episode from TMDB using an IMDB or TVDB ID"""
api_url = f"https://api.themoviedb.org/3/find/{imdb_id}" api_url = f"https://api.themoviedb.org/3/find/{external_id}"
# Sending API request # Sending API request
response = requests.get( response = requests.get(
api_url, api_url,
headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, headers={"Authorization": f"Bearer {TMDB_API_KEY}"},
params={"external_source": "imdb_id"}, params={"external_source": "imdb_id" if re.search("tt[0-9]+", external_id) else "tvdb_id"},
timeout=15 timeout=15
) )
@ -227,7 +240,7 @@ def import_from_tmdb_by_imdb_id(imdb_id, media_type) -> dict:
elif 429 == response.status_code: elif 429 == response.status_code:
time.sleep(2) time.sleep(2)
return import_from_tmdb_by_imdb_id(imdb_id, media_type) return import_from_tmdb_by_external_id(external_id, media_type)
else: else:
raise Exception(f"Error {response.status_code}: {response.text}") raise Exception(f"Error {response.status_code}: {response.text}")
@ -242,7 +255,7 @@ def import_from_tmdb_by_imdb_id(imdb_id, media_type) -> dict:
response_data = json.loads(response.text)[key][0] response_data = json.loads(response.text)[key][0]
if response_data == None: if response_data == None:
raise Exception(f"Nothing found for IMDB ID {imdb_id}!") raise Exception(f"Nothing found for TVDB ID {external_id}!")
# Modify the returned result to add additional data # Modify the returned result to add additional data
return cleanup_result(response_data, media_type) return cleanup_result(response_data, media_type)
@ -251,9 +264,6 @@ def import_from_tmdb_by_imdb_id(imdb_id, media_type) -> dict:
def import_from_tmdb_by_id(tmdb_id, media_type) -> dict: def import_from_tmdb_by_id(tmdb_id, media_type) -> dict:
"""Retrieve a film, TV show or TV episode from TMDB using an TMDB ID""" """Retrieve a film, TV show or TV episode from TMDB using an TMDB ID"""
if "tv-episodes" == media_type:
raise Exception("TV Episodes are TODO!")
api_path = "movie" if "films" == media_type else "tv" api_path = "movie" if "films" == media_type else "tv"
api_url = f"https://api.themoviedb.org/3/{api_path}/{tmdb_id}" api_url = f"https://api.themoviedb.org/3/{api_path}/{tmdb_id}"
@ -279,7 +289,7 @@ def import_from_tmdb_by_id(tmdb_id, media_type) -> dict:
return cleanup_result(response_data, media_type) return cleanup_result(response_data, media_type)
def import_from_openlibrary_by_id(isbn, media_type) -> dict: def import_from_openlibrary_by_isbn(isbn, media_type) -> dict:
"""Retrieve a film, TV show or TV episode from TMDB using an IMDB ID""" """Retrieve a film, TV show or TV episode from TMDB using an IMDB ID"""
logging.info(f"Importing '{isbn}'") logging.info(f"Importing '{isbn}'")
@ -295,7 +305,7 @@ def import_from_openlibrary_by_id(isbn, media_type) -> dict:
elif 429 == response.status_code: elif 429 == response.status_code:
time.sleep(2) time.sleep(2)
return import_from_openlibrary_by_id(isbn, media_type) return import_from_openlibrary_by_isbn(isbn, media_type)
elif 404 == response.status_code: elif 404 == response.status_code:
logger.error(f"{response.status_code}: Not Found for ISBN '{isbn}'") logger.error(f"{response.status_code}: Not Found for ISBN '{isbn}'")
@ -377,7 +387,7 @@ def import_from_openlibrary_by_ol_key(key) -> dict:
item = json.loads(response.text) item = json.loads(response.text)
if "authors" == mode: if "authors" == mode:
author = {"id": ol_id, "name": item["name"]} author = {"ol_id": ol_id, "name": item["name"]}
if "personal_name" in item: if "personal_name" in item:
if item["name"] != item["personal_name"]: if item["name"] != item["personal_name"]:
@ -394,7 +404,7 @@ def import_from_openlibrary_by_ol_key(key) -> dict:
return author return author
if "works" == mode: if "works" == mode:
work = {"id": ol_id, "title": item["title"]} work = {"ol_id": ol_id, "title": item["title"]}
for result_key in ["first_publish_date", "subjects"]: for result_key in ["first_publish_date", "subjects"]:
if result_key in item: if result_key in item:
@ -437,6 +447,7 @@ def cleanup_result(item, media_type) -> dict:
"popularity", # TMDB "popularity", # TMDB
"production_code", # TMDB "production_code", # TMDB
"production_companies", # TMDB "production_companies", # TMDB
"publish_places", # OpenLibrary
"revenue", # TMDB "revenue", # TMDB
"revision", # OpenLibrary "revision", # OpenLibrary
"runtime", # TMDB "runtime", # TMDB
@ -456,8 +467,8 @@ def cleanup_result(item, media_type) -> dict:
if field_name in item: if field_name in item:
del item[field_name] del item[field_name]
if media_type in ["films", "tv-series"]: if media_type in ["films", "tv-series", "tv-episodes"]:
item["id"] = item["tmdb_id"] item["tmdb_id"] = item["id"]
del item["id"] del item["id"]
title_key = "name" if "tv-series" == media_type else "title" title_key = "name" if "tv-series" == media_type else "title"
@ -469,6 +480,10 @@ def cleanup_result(item, media_type) -> dict:
): ):
del item[f"original_{title_key}"], item["original_language"] del item[f"original_{title_key}"], item["original_language"]
if "tv-episodes" == media_type:
item['series']['tmdb_id'] = item['show_id']
del item['show_id']
if "books" == media_type: if "books" == media_type:
_, _, item["ol_id"] = item["key"].split("/") _, _, item["ol_id"] = item["key"].split("/")
del item["key"] del item["key"]
@ -480,10 +495,6 @@ def cleanup_result(item, media_type) -> dict:
item[key] = item[key][0] item[key] = item[key][0]
if "publish_places" in item:
item["published_in"] = item["publish_places"]
del item["publish_places"]
if "languages" in item: if "languages" in item:
item["languages"] = [ item["languages"] = [
lang["key"].split("/")[2] for lang in item["languages"] lang["key"].split("/")[2] for lang in item["languages"]
@ -561,7 +572,7 @@ def main() -> None:
while re.search("[0-9]+", item_id) is None: while re.search("[0-9]+", item_id) is None:
item_id = input("Enter TMDB ID: ") item_id = input("Enter TMDB ID: ")
add_item_to_log(item_id, media_type, log) add_item_to_log(re.search("[0-9]+", item_id)[0], media_type, log)
except Exception: except Exception:
logger.exception("Exception occurred") logger.exception("Exception occurred")

View file

@ -33,9 +33,17 @@ def process_log(media_type, log) -> None:
log_item_values = {} log_item_values = {}
id_key = ""
if "books" == media_type:
id_key = "ol_id"
elif media_type in ["films", "tv-series", "tv-episodes"]:
id_key = "tmdb_id"
elif "games" == media_type:
id_key = "gb_id"
for i, item in enumerate(log_items): for i, item in enumerate(log_items):
try: try:
if "id" not in item and "skip" not in item: if id_key not in item and "skip" not in item:
if media_type in ["films", "books"]: if media_type in ["films", "books"]:
item_title = item["Title"] item_title = item["Title"]
elif "tv-episodes" == media_type: elif "tv-episodes" == media_type:
@ -50,10 +58,16 @@ def process_log(media_type, log) -> None:
log_item_values["date_added"] = item["Date Added"] log_item_values["date_added"] = item["Date Added"]
del item["Date Added"] del item["Date Added"]
if "date_added" in item:
log_item_values["date_added"] = item["date_added"]
if "Date Started" in item: if "Date Started" in item:
log_item_values["date_started"] = item["Date Started"] log_item_values["date_started"] = item["Date Started"]
del item["Date Started"] del item["Date Started"]
if "date_started" in item:
log_item_values["date_started"] = item["date_started"]
if "Date Finished" in item: if "Date Finished" in item:
log_item_values["date_finished"] = item["Date Finished"] log_item_values["date_finished"] = item["Date Finished"]
del item["Date Finished"] del item["Date Finished"]
@ -63,10 +77,16 @@ def process_log(media_type, log) -> None:
else: else:
raise Exception(f"'Date Read' != 'Date Finished' for {item['Title']}") raise Exception(f"'Date Read' != 'Date Finished' for {item['Title']}")
if "date_finished" in item:
log_item_values["date_finished"] = item["date_finished"]
if "Read Count" in item: if "Read Count" in item:
log_item_values["read_count"] = item["Read Count"] log_item_values["read_count"] = item["Read Count"]
del item["Read Count"] del item["Read Count"]
if "read_count" in item:
log_item_values["read_count"] = item["read_count"]
if "Date Watched" in item: if "Date Watched" in item:
log_item_values["date_finished"] = item["Date Watched"] log_item_values["date_finished"] = item["Date Watched"]
del item["Date Watched"] del item["Date Watched"]
@ -116,11 +136,18 @@ def process_log(media_type, log) -> None:
if "IMDB ID" in item and item["IMDB ID"] != "": if "IMDB ID" in item and item["IMDB ID"] != "":
new_log_item = import_by_id(item["IMDB ID"], media_type) new_log_item = import_by_id(item["IMDB ID"], media_type)
elif "books" == media_type and "wishlist" == log:
ol_work_id = re.search("OL[0-9]+W", input(f"Enter OpenLibrary Work ID for '{item_title}' ({item['Author']}): "))
try:
new_log_item = import_by_id(ol_work_id[0], media_type, log)
except:
logger.info("Skipping…")
elif "ISBN13" in item and item["ISBN13"] != "" and item["ISBN13"] is not None: elif "ISBN13" in item and item["ISBN13"] != "" and item["ISBN13"] is not None:
new_log_item = import_by_id(item["ISBN13"], media_type) new_log_item = import_by_id(item["ISBN13"], media_type, log)
elif "ISBN" in item and item["ISBN"] != "" and item["ISBN"] is not None: elif "ISBN" in item and item["ISBN"] != "" and item["ISBN"] is not None:
new_log_item = import_by_id(item["ISBN"], media_type) new_log_item = import_by_id(item["ISBN13"], media_type, log)
else: else:
new_log_item = import_by_details(item, item_title, media_type) new_log_item = import_by_details(item, item_title, media_type)
@ -163,7 +190,7 @@ def process_log(media_type, log) -> None:
else: else:
log_items[i] = new_log_item log_items[i] = new_log_item
if i % 10 == 0: if i % 3 == 0:
with open( with open(
f"./data/{media_type}/{log}.json", f"./data/{media_type}/{log}.json",
"w", "w",