Cataloguer/scripts/process_logs.py

419 lines
15 KiB
Python

"""
Process logs derived from social cataloguing site data exports, using various APIs.
"""
import json
import os
import re
import time
import requests
from slugify import slugify
from dotenv import load_dotenv
from add_item import cleanup_result, import_by_id, setup_logger
logger = setup_logger("process_logs")
load_dotenv()
TMDB_API_KEY = os.getenv("TMDB_API_KEY")
TVDB_API_KEY = os.getenv("TVDB_API_KEY")
if "" == TMDB_API_KEY:
logger.warning("TMDB API key not found")
if "" == TVDB_API_KEY:
logger.warning("TVDB API key not found")
def process_log(media_type, log) -> None:
"""Run through a log and call the appropriate API for each item found"""
logger.info(f"Processing {media_type}/{log}")
with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file:
log_items = json.load(log_file)
log_item_values = {}
id_key = ""
if "books" == media_type:
id_key = "ol_id"
elif media_type in ["films", "tv-series", "tv-episodes"]:
id_key = "tmdb_id"
elif "games" == media_type:
id_key = "gb_id"
for i, item in enumerate(log_items):
if id_key not in item:# and "skip" not in item:
if media_type in ["films", "books"]:
item_title = item["Title"] if "Title" in item else item["title"]
elif "tv-episodes" == media_type:
item_title = item["Episode Title"]
elif "tv-series" == media_type:
item_title = item["Show Title"]
logger.info(f"Processing {item_title} ({item['Author'] if "Author" in item else item["authors"] if "authors" in item else "No Author"})…")
# Rename pre-existing fields
if "Date Added" in item:
log_item_values["date_added"] = item["Date Added"]
del item["Date Added"]
if "date_added" in item:
log_item_values["date_added"] = item["date_added"]
if "Date Started" in item:
log_item_values["date_started"] = item["Date Started"]
del item["Date Started"]
if "date_started" in item:
log_item_values["date_started"] = item["date_started"]
if "Date Finished" in item:
log_item_values["date_finished"] = item["Date Finished"]
del item["Date Finished"]
if "Date Read" in item:
if item["Date Finished"] == item["Date Read"]:
del item["Date Read"]
else:
raise Exception(
f"'Date Read' != 'Date Finished' for {item['Title']}"
)
if "date_finished" in item:
log_item_values["date_finished"] = item["date_finished"]
if "Read Count" in item:
log_item_values["read_count"] = item["Read Count"]
del item["Read Count"]
if "read_count" in item:
log_item_values["read_count"] = item["read_count"]
if "Date Watched" in item:
log_item_values["date_finished"] = item["Date Watched"]
del item["Date Watched"]
if "Rewatch" in item:
log_item_values["is_repeat"] = item["Rewatch"]
del item["Rewatch"]
if "Comments" in item:
log_item_values["comments"] = item["Comments"]
del item["Comments"]
if "Series Title" in item:
log_item_values["series_title"] = item["Series Title"]
del item["Series Title"]
if "Episode Title" in item:
log_item_values["name"] = item["Episode Title"]
del item["Episode Title"]
if "Episode Number" in item:
if re.search("[0-9]+x[0-9]+", item["Episode Number"]) is not None:
season_no, _, episode_no = log_item_values[
"episode_number"
].split("x")
elif (
re.search("S[0-9]+E[0-9]+", item["Episode Number"]) is not None
):
season_no, _, episode_no = log_item_values[
"episode_number"
].split("E")
elif re.search("E[0-9]+", item["Episode Number"]) is not None:
season_no = None
episode_no = item["episode_number"][1:]
else:
logger.error(
f"Invalid episode number format '{item['Episode Number']}'"
)
return
log_item_values["season_number"] = season_no
log_item_values["episode_number"] = episode_no
del item["Episode Number"]
if "IMDB ID" in item and item["IMDB ID"] != "":
new_log_item = import_by_id(item["IMDB ID"], media_type)
elif "books" == media_type and "wishlist" == log:
new_log_item = import_by_details(item, item_title, media_type)
if new_log_item is None:
ol_work_id = input(
f"Enter OpenLibrary Work ID for '{item_title}' ({item['Author'] if "Author" in item else item["authors"] if "authors" in item else "No Author"}), or 'd' to delete the record: "
)
if 'd' == ol_work_id:
logger.info("Deleting…")
del log_items[i]
continue
ol_work_id = re.search("OL[0-9]+W", ol_work_id)
try:
new_log_item = import_by_id(ol_work_id[0], media_type, log)
except:
new_log_item = item
new_log_item["skip"] = True
logger.info("Skipping…")
elif (
"ISBN13" in item
and item["ISBN13"] != ""
and item["ISBN13"] is not None
):
new_log_item = import_by_id(item["ISBN13"], media_type, log)
elif "ISBN" in item and item["ISBN"] != "" and item["ISBN"] is not None:
new_log_item = import_by_id(item["ISBN13"], media_type, log)
else:
new_log_item = import_by_details(item, item_title, media_type)
if new_log_item is None:
if (
media_type in ["films", "tv-series", "tv-episodes"]
and "imdb_id" not in item
):
item["imdb_id"] = input(f"Enter IMDB ID for {item_title}: ")
if re.search("tt[0-9]+", item["imdb_id"]) is not None:
log_items[i] = import_by_id(item["imdb_id"], media_type)
with open(
f"./data/{media_type}/{log}.json", "w", encoding="utf-8"
) as log_file:
json.dump(log_items, log_file, indent=4)
elif "books" == media_type:
if "ISBN" not in item and "ISBN13" not in item:
item["ISBN"] = input(f"Enter ISBN for {item_title}: ")
if re.search("[0-9-]+", item["ISBN"]) is not None:
log_items[i] = import_by_id(item["ISBN"], media_type)
with open(
f"./data/{media_type}/{log}.json",
"w",
encoding="utf-8",
) as log_file:
json.dump(log_items, log_file, indent=4)
else:
logger.warning(f"Skipped '{item_title}'")
log_items[i]["skip"] = True
else:
logger.warning(f"Skipped {item_title}")
else:
log_items[i] = new_log_item
if i % 3 == 0:
with open(
f"./data/{media_type}/{log}.json", "w", encoding="utf-8"
) as log_file:
json.dump(log_items, log_file, indent=4)
logger.info("Saved…")
if log_items[i] is not None:
log_items[i] |= log_item_values
with open(f"./data/{media_type}/{log}.json", "w", encoding="utf-8") as log_file:
json.dump(log_items, log_file, indent=4)
logger.info(f"Finished processing {media_type}/{log}")
def import_by_details(item, item_title, media_type) -> dict:
"""Import an item when lacking a unique identifier"""
if media_type in ["films", "tv-series"]:
return import_from_tmdb_by_details(item, item_title, media_type)
if media_type in ["tv-episodes"]:
return # import_from_tvdb_by_details(item, item_title, media_type)
if media_type in ["books"]:
return import_from_openlibrary_by_details(item, item_title, media_type)
if media_type in ["games"]:
return # import_from_igdb_by_details(item, item_title, media_type)
def import_from_openlibrary_by_details(item, item_title, media_type) -> dict | None:
"""Retrieve a book from OpenLibrary using a title and author name"""
logger.info(f"Importing '{item_title}'")
api_url = f"https://openlibrary.org/search.json?title={slugify((item['Title'] if "Title" in item else item["title"]).split(':')[0], separator='%20')}&author={slugify((item['Author'] if "Author" in item else item["authors"] if "authors" in item else "No Author"), separator='%20')}"
# Sending API request
response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15)
# Process the response
if 200 == response.status_code:
logger.debug(response.status_code)
elif 429 == response.status_code:
time.sleep(2)
return import_from_openlibrary_by_details(item, item_title, media_type)
elif 404 == response.status_code:
logger.error(f"{response.status_code}: Not Found for title '{item_title}'")
return None
else:
raise Exception(f"Error {response.status_code}: {response.text}")
results = json.loads(response.text)
logger.info(f"Found {results['num_found']} result{'s' if results['num_found'] != 1 else ''}")
if 0 < results["num_found"]:
result = results['docs'][0]
if 1 == results["num_found"]:
logger.info(f"Selecting OL ID {result['key']}")
item_id_parsed = re.search("(OL|tt)?[0-9]+[WMA]?", result['key'])
if item_id_parsed is not None:
return import_by_id(item_id_parsed[0], "books", "wishlist")
else:
if result['title'] == item['Title'].split(':')[0] and result['author_name'][0] == item['Author']:
logger.info(f"First result ({result['key']}) is a match!")
item_id_parsed = re.search("(OL|tt)?[0-9]+[WMA]?", result['key'])
if item_id_parsed is not None:
return import_by_id(item_id_parsed[0], "books", "wishlist")
else:
print(json.dumps({k: result.get(k, None) for k in ('author', 'title', 'first_publish_year')}, indent=4))
is_correct = input("Is this the correct result? [y/n]: ")
if "y" == is_correct:
logger.info(f"Selecting OL ID {result['key']}")
item_id_parsed = re.search("(OL|tt)?[0-9]+[WMA]?", result['key'])
if item_id_parsed is not None:
return import_by_id(item_id_parsed[0], "books", "wishlist")
logger.info(f"Returning nothing…")
return None
def import_from_tmdb_by_details(item, item_title, media_type) -> dict:
"""Retrieve a film or TV series from TMDB using its title"""
logger.info(f"Processing {item_title}")
api_url = f"https://api.themoviedb.org/3/search/{'movie' if 'films' == media_type else 'tv'}"
# Sending API request
response = requests.get(
api_url,
params={
"query": item_title,
"include_adult": True,
"year": item["Release Year"] if "Release Year" in item else None,
},
headers={"Authorization": f"Bearer {TMDB_API_KEY}"},
timeout=15,
)
# Process the response
if 200 == response.status_code:
logger.debug(response.status_code)
elif 429 == response.status_code:
time.sleep(2)
return import_from_tmdb_by_details(item, item_title, media_type)
else:
logger.error(response.text)
response_data = json.loads(response.text)["results"]
if 1 == len(response_data):
return cleanup_result(response_data[0], media_type)
if 0 == len(response_data):
logger.warning(f"Returned no {media_type} for {item_title}")
elif 1 < len(response_data):
if "films" == media_type:
title_key = "title"
elif "tv-series" == media_type:
title_key = "name"
filtered_response_data = [
result for result in response_data if result[title_key] == item_title
]
frd_len = len(filtered_response_data)
if 1 == frd_len:
return cleanup_result(response_data[0], media_type)
logger.warning(f"Returned more than one {media_type} for '{item_title}':\n")
print(
json.dumps(
(
filtered_response_data
if len(filtered_response_data) > 0
else response_data
),
indent=4,
)
)
last_index = len(filtered_response_data if frd_len > 0 else response_data) - 1
idx = input(f"\nEnter the index of the result to use [0-{last_index}]: ")
if "" != idx:
try:
return cleanup_result(response_data[int(idx)], media_type)
except Exception as exc:
raise Exception("Index invalid") from exc
item["IMDB ID"] = input(f"Enter IMDB ID for {item_title}: ")
if "" != item["IMDB ID"]:
return import_by_id(item["IMDB ID"], media_type)
logger.warning(f"Skipped {media_type} '{item_title}'")
return item
def main() -> None:
"""Prompt user to select media type and log to process"""
media_type = ""
while media_type not in ["films", "tv-episodes", "tv-series", "books"]:
media_type = input("Select media type [films|tv-episodes|tv-series|books]: ")
try:
if "films" == media_type:
log = ""
while log not in ["log", "wishlist"]:
log = input("Enter log to process [log|wishlist]: ")
elif "books" == media_type:
log = ""
while log not in ["log", "current", "wishlist"]:
log = input("Enter log to process [log|current|wishlist]: ")
elif "tv-series" == media_type:
log = "log"
elif "tv-series" == media_type:
log = ""
while log not in ["log", "current", "wishlist"]:
log = input("Enter log to process [log|current|wishlist]: ")
process_log(media_type, log)
except Exception:
logger.exception("Exception occurred")
if __name__ == "__main__":
main()