Cataloguer/scripts/process_logs.py
2024-01-17 23:02:56 +01:00

281 lines
9.2 KiB
Python

"""
Process logs derived from social cataloguing site data exports, using various APIs.
"""
import json
import os
import re
import time
import requests
from dotenv import load_dotenv
from add_item import cleanup_result, import_by_id, setup_logger
logger = setup_logger("process_logs")
load_dotenv()
TMDB_API_KEY = os.getenv("TMDB_API_KEY")
TVDB_API_KEY = os.getenv("TVDB_API_KEY")
if "" == TMDB_API_KEY:
logger.warning("TMDB API key not found")
if "" == TVDB_API_KEY:
logger.warning("TVDB API key not found")
def process_log(media_type, log) -> None:
"""Run through a log and call the appropriate API for each item found"""
logger.info(f"Processing {media_type}/{log}")
with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file:
log_items = json.load(log_file)
log_item_values = {}
for i, item in enumerate(log_items):
try:
if "id" not in item:
if "films" == media_type:
item_title = item["Title"]
elif "tv-episodes" == media_type:
item_title = item["Episode Title"]
elif "tv-series" == media_type:
item_title = item["Show Title"]
logger.debug(f"Processing {item_title}")
# Rename pre-existing fields
if "Date Added" in item:
log_item_values["date_added"] = item["Date Added"]
del item["Date Added"]
if "Date Watched" in item:
log_item_values["date_finished"] = item["Date Watched"]
del item["Date Watched"]
if "Rewatch" in item:
log_item_values["is_repeat"] = item["Rewatch"]
del item["Rewatch"]
if "Comments" in item:
log_item_values["comments"] = item["Comments"]
del item["Comments"]
if "Series Title" in item:
log_item_values["series_title"] = item["Series Title"]
del item["Series Title"]
if "Episode Title" in item:
log_item_values["name"] = item["Episode Title"]
del item["Episode Title"]
if "Episode Number" in item:
if re.search("[0-9]+x[0-9]+", item["Episode Number"]) is not None:
season_no, _, episode_no = log_item_values[
"episode_number"
].split("x")
elif (
re.search("S[0-9]+E[0-9]+", item["Episode Number"]) is not None
):
season_no, _, episode_no = log_item_values[
"episode_number"
].split("E")
elif re.search("E[0-9]+", item["Episode Number"]) is not None:
season_no = None
episode_no = item["episode_number"][1:]
else:
logger.error(
f"Invalid episode number format '{item['Episode Number']}'"
)
return
log_item_values["season_number"] = season_no
log_item_values["episode_number"] = episode_no
del item["Episode Number"]
if "IMDB ID" in item and item["IMDB ID"] != "":
new_log_item = import_by_id(item["IMDB ID"], media_type)
else:
new_log_item = import_by_details(item, item_title, media_type)
if new_log_item is None:
item["imdb_id"] = input(f"Enter IMDB ID for {item_title}: ")
if re.search("tt[0-9]+", item["imdb_id"]) is not None:
log_items[i] = import_by_id(item["imdb_id"], media_type)
with open(
f"./data/{media_type}/{log}.json",
"w",
encoding='utf-8'
) as log_file:
json.dump(log_items, log_file, indent=4)
else:
logger.warning(f"Skipped {item_title}")
else:
log_items[i] = new_log_item
if i % 15 == 0:
with open(
f"./data/{media_type}/{log}.json",
"w",
encoding='utf-8'
) as log_file:
json.dump(log_items, log_file, indent=4)
if log_items[i] is not None:
log_items[i] |= log_item_values
except KeyError:
print(json.dumps(item, indent=4))
with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file:
json.dump(log_items, log_file, indent=4)
logger.info(f"Finished processing {media_type}/{log}")
def import_by_details(item, item_title, media_type) -> dict:
"""Import an item when lacking a unique identifier"""
if media_type in ["films", "tv-series"]:
return import_from_tmdb_by_details(item, item_title, media_type)
if media_type in ["tv-episodes"]:
return # import_from_tvdb_by_details(item, item_title, media_type)
if media_type in ["books"]:
return # import_from_openlibrary_by_details(item, item_title, media_type)
if media_type in ["games"]:
return # import_from_igdb_by_details(item, item_title, media_type)
def import_from_tmdb_by_details(item, item_title, media_type) -> dict:
"""Retrieve a film or TV series from TMDB using its title"""
logger.info(f"Processing {item_title}")
api_url = f"https://api.themoviedb.org/3/search/{'movie' if 'films' == media_type else 'tv'}"
# Sending API request
response = requests.get(
api_url,
params={
"query": item_title,
"include_adult": True,
"year": item["Release Year"] if "Release Year" in item else None,
},
headers={"Authorization": f"Bearer {TMDB_API_KEY}"},
timeout=15
)
# Process the response
if 200 == response.status_code:
logger.debug(response.status_code)
elif 429 == response.status_code:
time.sleep(2)
return import_from_tmdb_by_details(item, item_title, media_type)
else:
logger.error(response.text)
response_data = json.loads(response.text)["results"]
if 1 == len(response_data):
return cleanup_result(response_data[0], media_type)
if 0 == len(response_data):
logger.warning(f"Returned no {media_type} for {item_title}")
elif 1 < len(response_data):
if "films" == media_type:
title_key = "title"
elif "tv-series" == media_type:
title_key = "name"
filtered_response_data = [
result for result in response_data if result[title_key] == item_title
]
frd_len = len(filtered_response_data)
if 1 == frd_len:
return cleanup_result(response_data[0], media_type)
logger.warning(f"Returned more than one {media_type} for '{item_title}':\n")
print(
json.dumps(
filtered_response_data
if len(filtered_response_data) > 0
else response_data,
indent=4,
)
)
last_index = len(filtered_response_data if frd_len > 0 else response_data) - 1
idx = input(
f"\nEnter the index of the result to use [0-{last_index}]: "
)
if "" != idx:
try:
return cleanup_result(response_data[int(idx)], media_type)
except Exception as exc:
raise Exception("Index invalid") from exc
item["IMDB ID"] = input(f"Enter IMDB ID for {item_title}: ")
if "" != item["IMDB ID"]:
return import_by_id(item["IMDB ID"], media_type)
logger.warning(f"Skipped {item_title}")
return item
def main() -> None:
"""Prompt user to select media type and log to process"""
media_type = ""
while media_type not in ["films", "tv-episodes", "tv-series", "books"]:
media_type = input("Select media type [films|tv-episodes|tv-series|books]: ")
try:
if "films" == media_type:
log = ""
while log not in ["log", "wishlist"]:
log = input("Enter log to process [log|wishlist]: ")
process_log(media_type, log)
elif "books" == media_type:
log = ""
while log not in ["log", "current", "wishlist"]:
log = input("Enter log to process [log|current|wishlist]: ")
# TODO
elif "tv-episodes" == media_type:
process_log(media_type, "log")
# TODO
elif "tv-series" == media_type:
log = ""
while log not in ["log", "current", "wishlist"]:
log = input("Enter log to process [log|current|wishlist]: ")
process_log(media_type, log)
except Exception:
logger.exception("Exception occurred")
if __name__ == "__main__":
main()