fix: search pre-existing items

This commit is contained in:
Ben Goldsworthy 2024-03-15 20:57:39 +01:00
parent 1d6374010d
commit 7864163f5a
Signed by: Rumperuu
SSH Key Fingerprint: SHA256:e5XfzNOr9UvWpEzyLfw0GtTMZWIFh3NmxH+/qQIi3xE
1 changed files with 87 additions and 37 deletions

View File

@ -54,17 +54,28 @@ if "" == TMDB_API_KEY:
logger.error("TMDB API key not found")
def return_if_exists(item_id, media_type, log) -> dict | None:
def return_if_exists(item_id: str, media_type: str, log: str) -> dict | None:
"""Returns an item if it exists in the requested log"""
logger.info(f"Checking for '{item_id}' in '{log}'")
with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file:
log_items = json.load(log_file)
id_key = "id"
if "books" == media_type:
if re.search("OL[0-9]+[MW]", item_id) is not None:
id_key = "ol_id"
elif re.search("[0-9]{13}", item_id) is not None:
id_key = "isbn_13"
elif re.search("[0-9]{10}", item_id) is not None:
id_key = "isbn_10"
else:
raise Exception("Invalid ID for book")
existing_items = [
log_item
for log_item in log_items
if "id" in log_item and log_item["id"] == item_id
if id_key in log_item and log_item[id_key] == item_id
]
if len(existing_items) > 0:
logger.info(f"Found item in '{log}'")
@ -72,18 +83,38 @@ def return_if_exists(item_id, media_type, log) -> dict | None:
logger.info(f"'{item_id}' not found in '{log}'")
def delete_existing(item_id, media_type, log) -> None:
def delete_existing(item_id: str, media_type: str, log: str) -> None:
"""Deletes an item from a log if it matches the ID"""
logger.info(f"Deleting '{item_id}' from '{log}'")
with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file:
log_items = json.load(log_file)
id_key = "id"
if "books" == media_type:
if re.search("OL[0-9]+[MW]", item_id) is not None:
id_key = "ol_id"
elif re.search("[0-9]{13}", item_id) is not None:
id_key = "isbn_13"
elif re.search("[0-9]{10}", item_id) is not None:
id_key = "isbn_10"
else:
raise Exception("Invalid ID for book")
elif media_type in ["films", "tv-episodes"]:
if re.search("tt[0-9]+", item_id) is not None:
id_key = "isbn_id"
elif re.search("[0-9]+", item_id) is not None:
id_key = "tmdb_id"
else:
raise Exception("Invalid ID for film")
old_len = len(log_items)
log_items = [
log_item
for log_item in log_items
if "id" not in log_item or ("id" in log_item and log_item["id"] != item_id)
if id_key not in log_item
or (id_key in log_item and log_item[id_key] != item_id)
]
if len(log_items) < (old_len - 1):
raise Exception("More than one deletion made, discarding…")
@ -93,7 +124,9 @@ def delete_existing(item_id, media_type, log) -> None:
logger.info(f"'{item_id}' deleted from '{log}'")
def check_for_existing(item_id, media_type, log) -> dict[dict, str]:
def check_for_existing(
item_id, media_type, log
) -> tuple[dict[dict, str] | None, str | None]:
"""
Check for an existing item in the current log, and pull the
`date_added` etc. and mark it as a repeat if so.
@ -127,14 +160,14 @@ def check_for_existing(item_id, media_type, log) -> dict[dict, str]:
return None, None
def add_item_to_log(item_id, media_type, log) -> None:
def add_item_to_log(item_id: str, media_type: str, log: str) -> None:
"""Add a film, book, TV series or TV episode to a log"""
logger.info(f"Processing {item_id}")
item = None
item: dict | None = None
log_to_delete = None
if "tv-episodes" != media_type and ("books" != media_type and "wishlist" != log):
if media_type not in ["tv-episodes", "books"]:
item, log_to_delete = check_for_existing(item_id, media_type, log)
if item is None:
@ -142,10 +175,21 @@ def add_item_to_log(item_id, media_type, log) -> None:
if item is None:
raise Exception("No item found")
if "books" == media_type and "wishlist" != log:
item, log_to_delete = check_for_existing(item['work']['ol_id'], media_type, log)
if item is None:
item, log_to_delete = check_for_existing(item['ol_id'], media_type, log)
if "books" == media_type:
new_item, log_to_delete = check_for_existing(
item["work"]["ol_id"], media_type, log
)
if new_item is None:
new_item, log_to_delete = check_for_existing(item["ol_id"], media_type, log)
if new_item is None:
new_item, log_to_delete = check_for_existing(
item["isbn_13"], media_type, log
)
if new_item is None:
new_item, log_to_delete = check_for_existing(
item["isbn_10"], media_type, log
)
item = new_item if new_item is not None else item
if log in ["log", "current"]:
if "date_started" not in item and media_type in ["books", "tv-series", "games"]:
@ -202,7 +246,7 @@ def add_item_to_log(item_id, media_type, log) -> None:
delete_existing(item_id, media_type, log_to_delete)
def import_by_id(import_id, media_type, log) -> dict:
def import_by_id(import_id, media_type, log) -> dict | None:
"""Import from the appropriate API by unique ID"""
if media_type in ["films", "tv-series"]:
@ -230,8 +274,12 @@ def import_from_tmdb_by_external_id(external_id, media_type) -> dict:
response = requests.get(
api_url,
headers={"Authorization": f"Bearer {TMDB_API_KEY}"},
params={"external_source": "imdb_id" if re.search("tt[0-9]+", external_id) else "tvdb_id"},
timeout=15
params={
"external_source": (
"imdb_id" if re.search("tt[0-9]+", external_id) else "tvdb_id"
)
},
timeout=15,
)
# Process the response
@ -254,7 +302,7 @@ def import_from_tmdb_by_external_id(external_id, media_type) -> dict:
key = "movie_results"
response_data = json.loads(response.text)[key][0]
if response_data == None:
if response_data is None:
raise Exception(f"Nothing found for TVDB ID {external_id}!")
# Modify the returned result to add additional data
@ -289,7 +337,7 @@ def import_from_tmdb_by_id(tmdb_id, media_type) -> dict:
return cleanup_result(response_data, media_type)
def import_from_openlibrary_by_isbn(isbn, media_type) -> dict:
def import_from_openlibrary_by_isbn(isbn, media_type) -> dict | None:
"""Retrieve a film, TV show or TV episode from TMDB using an IMDB ID"""
logging.info(f"Importing '{isbn}'")
@ -337,18 +385,19 @@ def import_from_openlibrary_by_isbn(isbn, media_type) -> dict:
return cleanup_result(item, media_type)
def import_from_openlibrary_by_ol_key(key) -> dict:
def import_from_openlibrary_by_ol_key(key) -> dict | None:
"""Retrieves an item (author or work, NOT edition) from OpenLibrary using an OL key"""
if (len(key.split("/")) == 1):
if len(key.split("/")) == 1:
key = f"/works/{key}"
logger.info(f"Retrieving {key}")
_, mode, ol_id = key.split("/")
cached_authors = []
if "authors" == mode:
with open(
f"./scripts/caching/authors.json", "r", encoding="utf-8"
"./scripts/caching/authors.json", "r", encoding="utf-8"
) as authors_cache:
cached_authors = json.load(authors_cache)
@ -396,7 +445,7 @@ def import_from_openlibrary_by_ol_key(key) -> dict:
logger.info(f"Caching author '{author['name']}'")
cached_authors.append(author)
with open(
f"./scripts/caching/authors.json", "w", encoding="utf-8"
"./scripts/caching/authors.json", "w", encoding="utf-8"
) as authors_cache:
json.dump(cached_authors, authors_cache, indent=4)
logger.info(f"Author '{author['name']}' cached!")
@ -408,7 +457,9 @@ def import_from_openlibrary_by_ol_key(key) -> dict:
if "authors" in item:
for author in item["authors"]:
work["authors"].append(import_from_openlibrary_by_ol_key(author["author"]["key"]))
work["authors"].append(
import_from_openlibrary_by_ol_key(author["author"]["key"])
)
for result_key in ["first_publish_date", "subjects"]:
if result_key in item:
@ -429,7 +480,7 @@ def cleanup_result(item, media_type) -> dict:
for field_name in [
"adult", # TMDB
"backdrop_path", # TMDB
"budget", # TMDB
"budget", # TMDB
"copyright_date", # OpenLibrary
"classifications", # OpenLibrary
"created", # OpenLibrary
@ -437,7 +488,7 @@ def cleanup_result(item, media_type) -> dict:
"episode_type", # TMDB
"first_sentence", # OpenLibrary
"genre_ids", # TMDB
"homepage", # TMDB
"homepage", # TMDB
"identifiers", # OpenLibrary
"media_type", # TMDB
"last_modified", # OpenLibrary
@ -452,16 +503,16 @@ def cleanup_result(item, media_type) -> dict:
"physical_dimensions", # OpenLibrary
"popularity", # TMDB
"production_code", # TMDB
"production_companies", # TMDB
"publish_places", # OpenLibrary
"revenue", # TMDB
"production_companies", # TMDB
"publish_places", # OpenLibrary
"revenue", # TMDB
"revision", # OpenLibrary
"runtime", # TMDB
"source_records", # OpenLibrary
"status", # TMDB
"status", # TMDB
"still_path", # TMDB
"table_of_contents", # OpenLibrary
"tagline", # TMDB
"tagline", # TMDB
"type", # OpenLibrary
"uri_descriptions", # OpenLibrary
"url", # OpenLibrary
@ -487,8 +538,8 @@ def cleanup_result(item, media_type) -> dict:
del item[f"original_{title_key}"], item["original_language"]
if "tv-episodes" == media_type:
item['series'] = { 'tmdb_id': item['show_id'] }
del item['show_id']
item["series"] = {"tmdb_id": item["show_id"]}
del item["show_id"]
if "books" == media_type:
_, _, item["ol_id"] = item["key"].split("/")
@ -515,7 +566,7 @@ def cleanup_result(item, media_type) -> dict:
f"translation_of '{item['translation_of']}' \
is different to work title '{item['work']['title']}'"
)
if 'y' != input("Accept change? [y|n]: "):
if "y" != input("Accept change? [y|n]: "):
raise Exception(
f"translation_of '{item['translation_of']}' \
is different to work title '{item['work']['title']}'"
@ -546,8 +597,8 @@ def main() -> None:
try:
item_id = ""
log = ""
if "films" == media_type:
log = ""
while log not in ["log", "wishlist"]:
log = input("Enter log to update [log|wishlist]: ")
@ -555,7 +606,6 @@ def main() -> None:
item_id = input("Enter TMDB ID: ")
elif "books" == media_type:
log = ""
while log not in ["log", "current", "wishlist"]:
log = input("Enter log to update [log|current|wishlist]: ")
@ -566,19 +616,19 @@ def main() -> None:
item_id = "".join(re.findall(r"\d+", input("Enter ISBN: ")))
elif "tv-episodes" == media_type:
log = "log"
while re.search("(tt)?[0-9]+", item_id) is None:
item_id = input("Enter TVDB or IMDB ID: ")
elif "tv-series" == media_type:
log = ""
while log not in ["log", "current", "wishlist"]:
log = input("Enter log to update [log|current|wishlist]: ")
while re.search("[0-9]+", item_id) is None:
item_id = input("Enter TMDB ID: ")
add_item_to_log(re.search("(OL|tt)?[0-9]+[WMA]?", item_id)[0], media_type, log)
item_id_parsed = re.search("(OL|tt)?[0-9]+[WMA]?", item_id)
if item_id_parsed is not None:
add_item_to_log(item_id_parsed[0], media_type, log)
except Exception:
logger.exception("Exception occurred")