fix script

This commit is contained in:
Ben Goldsworthy 2024-02-03 23:25:14 +00:00
parent 1c6cde3f4c
commit b4d64ccd34

View file

@ -13,6 +13,7 @@ from dotenv import load_dotenv
authors = []
def setup_logger(name="add_item"):
"""Set up the logger for console and file"""
@ -54,10 +55,14 @@ def return_if_exists(item_id, media_type, log) -> dict|None:
"""Returns an item if it exists in the requested log"""
logger.info(f"Checking for '{item_id}' in '{log}'")
with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file:
with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file:
log_items = json.load(log_file)
existing_items = [log_item for log_item in log_items if "id" in log_item and log_item['id'] == int(item_id)]
existing_items = [
log_item
for log_item in log_items
if "id" in log_item and log_item["id"] == int(item_id)
]
if len(existing_items) > 0:
logger.info(f"Found item in '{log}'")
return existing_items[-1]
@ -68,15 +73,19 @@ def delete_existing(item_id, media_type, log) -> None:
"""Deletes an item from a log if it matches the ID"""
logger.info(f"Deleting '{item_id}' from '{log}'")
with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file:
with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file:
log_items = json.load(log_file)
old_len = len(log_items)
log_items = [log_item for log_item in log_items if "id" not in log_item or ("id" in log_item and log_item['id'] != int(item_id))]
log_items = [
log_item
for log_item in log_items
if "id" not in log_item or ("id" in log_item and log_item["id"] != int(item_id))
]
if len(log_items) < (old_len - 1):
raise Exception("More than one deletion made, discarding…")
with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file:
with open(f"./data/{media_type}/{log}.json", "w", encoding="utf-8") as log_file:
json.dump(log_items, log_file, indent=4)
logger.info(f"'{item_id}' deleted from '{log}'")
@ -94,8 +103,15 @@ def check_for_existing(item_id, media_type, log) -> dict[dict, str]:
existing_item["is_repeat"] = True
return existing_item, None
for log_to_check in [p_log for p_log in ["log", "current", "wishlist"] if p_log != log]:
if ("current" == log_to_check and media_type in ["books", "games", "tv-series"]) or ("wishlist" == log_to_check and media_type in ["books", "games", "films", "tv-series"]):
for log_to_check in [
p_log for p_log in ["log", "current", "wishlist"] if p_log != log
]:
if (
"current" == log_to_check and media_type in ["books", "games", "tv-series"]
) or (
"wishlist" == log_to_check
and media_type in ["books", "games", "films", "tv-series"]
):
existing_item = return_if_exists(item_id, media_type, log_to_check)
if existing_item is not None:
return existing_item, log_to_check
@ -156,12 +172,12 @@ def add_item_to_log(item_id, media_type, log) -> None:
# Save changes
logger.info(f"Adding {media_type} to {log}")
with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file:
with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file:
log_items = json.load(log_file)
log_items.insert(0, item)
with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file:
with open(f"./data/{media_type}/{log}.json", "w", encoding="utf-8") as log_file:
json.dump(log_items, log_file, indent=4)
logger.info(f"Added {media_type} {item_id} to {log}")
@ -180,7 +196,9 @@ def import_by_id(import_id, media_type) -> dict:
return # import_from_tvdb_by_id(import_id, media_type)
if media_type in ["books"]:
return import_from_openlibrary_by_id(import_id, media_type)
return import_from_openlibrary_by_id(
"".join(re.findall(r"\d+", import_id)), media_type
)
def import_from_tmdb_by_id(tmdb_id, media_type) -> dict:
@ -191,9 +209,7 @@ def import_from_tmdb_by_id(tmdb_id, media_type) -> dict:
# Sending API request
response = requests.get(
api_url,
headers={"Authorization": f"Bearer {TMDB_API_KEY}"},
timeout=15
api_url, headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, timeout=15
)
# Process the response
@ -212,14 +228,8 @@ def import_from_tmdb_by_id(tmdb_id, media_type) -> dict:
response_data = json.loads(response.text)
if 1 == len(response_data):
item = response_data[0]
elif 0 == len(response_data):
raise Exception(f"Returned no results for {tmdb_id}")
# Modify the returned result to add additional data
return cleanup_result(item, media_type)
return cleanup_result(response_data, media_type)
def import_from_openlibrary_by_id(isbn, media_type) -> dict:
@ -253,10 +263,12 @@ def import_from_openlibrary_by_id(isbn, media_type) -> dict:
for i, sub_item in enumerate(item[key]):
item[key][i] = import_from_openlibrary_by_ol_key(sub_item["key"])
if "works" in item:
if len(item["works"]) > 1:
raise Exception(f"Multiple works found for {isbn}")
print(f"Multiple works found for {isbn}:")
print(item["works"])
idx = input(f"Select ID to use [0-{len(item['works'])-1}]: ")
item["works"][0] = item["works"][int(idx)]
item["work"] = item["works"][0]
del item["works"]
@ -275,20 +287,28 @@ def import_from_openlibrary_by_ol_key(key) -> dict:
_, mode, ol_id = key.split("/")
if "authors" == mode:
with open(f"./scripts/caching/authors.json", "r", encoding='utf-8') as authors_cache:
with open(
f"./scripts/caching/authors.json", "r", encoding="utf-8"
) as authors_cache:
cached_authors = json.load(authors_cache)
if mode in ["works", "authors"]:
if "authors" == mode:
matched_cached_authors = [aut for aut in cached_authors if aut['id'] == ol_id]
matched_cached_authors = [
aut for aut in cached_authors if aut["id"] == ol_id
]
if len(matched_cached_authors) == 1:
logging.info(f"Found cached author '{matched_cached_authors[0]['name']}'")
logging.info(
f"Found cached author '{matched_cached_authors[0]['name']}'"
)
return matched_cached_authors[0]
api_url = f"https://openlibrary.org{key}"
# Sending API request
response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15)
response = requests.get(
api_url, headers={"accept": "application/json"}, timeout=15
)
# Process the response
if 200 == response.status_code:
@ -316,9 +336,7 @@ def import_from_openlibrary_by_ol_key(key) -> dict:
logger.info(f"Caching author '{author['name']}'")
cached_authors.append(author)
with open(
f"./scripts/caching/authors.json",
"w",
encoding='utf-8'
f"./scripts/caching/authors.json", "w", encoding="utf-8"
) as authors_cache:
json.dump(cached_authors, authors_cache, indent=4)
logger.info(f"Author '{author['name']}' cached!")
@ -345,6 +363,7 @@ def cleanup_result(item, media_type) -> dict:
for field_name in [
"adult", # TMDB
"backdrop_path", # TMDB
"budget", # TMDB
"copyright_date", # OpenLibrary
"classifications", # OpenLibrary
"created", # OpenLibrary
@ -352,6 +371,7 @@ def cleanup_result(item, media_type) -> dict:
"episode_type", # TMDB
"first_sentence", # OpenLibrary
"genre_ids", # TMDB
"homepage", # TMDB
"identifiers", # OpenLibrary
"media_type", # TMDB
"last_modified", # OpenLibrary
@ -366,11 +386,15 @@ def cleanup_result(item, media_type) -> dict:
"physical_dimensions", # OpenLibrary
"popularity", # TMDB
"production_code", # TMDB
"production_companies", # TMDB
"revenue", # TMDB
"revision", # OpenLibrary
"runtime", # TMDB
"source_records", # OpenLibrary
"status", # TMDB
"still_path", # TMDB
"table_of_contents", # OpenLibrary
"tagline", # TMDB
"type", # OpenLibrary
"uri_descriptions", # OpenLibrary
"url", # OpenLibrary
@ -413,21 +437,28 @@ def cleanup_result(item, media_type) -> dict:
]
if "translation_of" in item:
if item["translation_of"].split(":")[0].lower() == item["work"]["title"].split(":")[0].lower():
del item["translation_of"]
else:
if not (
item["translation_of"].split(":")[0].lower()
== item["work"]["title"].split(":")[0].lower()
):
logger.warn(
f"translation_of '{item['translation_of']}' \
is different to work title '{item['work']['title']}'"
)
if 'y' != input("Accept change? [y|n]: "):
raise Exception(
f"translation_of '{item['translation_of']}' \
is different to work title '{item['work']['title']}'"
)
del item["translation_of"]
if "translated_from" in item:
if len(item["translated_from"]) > 1:
raise Exception("Multiple translated_from results")
item["work"]["original_language"] = item["translated_from"][0][
"key"
].split("/")[2]
item["work"]["original_language"] = item["translated_from"][0]["key"].split(
"/"
)[2]
del item["translated_from"]
if "date_added" not in item:
@ -459,7 +490,7 @@ def main() -> None:
log = input("Enter log to update [log|current|wishlist]: ")
while re.search("[0-9]+", item_id) is None:
item_id = input("Enter ISBN: ")
item_id = "".join(re.findall(r"\d+", input("Enter ISBN: ")))
elif "tv-episodes" == media_type:
log = "log"