Cataloguer/scripts/process_logs.py

231 lines
7.9 KiB
Python

from dotenv import load_dotenv
import json
import os
import re
import requests
import time
from urllib.request import urlopen
from add_item import cleanup_result, import_by_id, setup_logger
logger = setup_logger(__name__)
load_dotenv()
TMDB_API_KEY = os.getenv('TMDB_API_KEY')
TVDB_API_KEY = os.getenv('TVDB_API_KEY')
if "" == TMDB_API_KEY: logger.warning("TMDB API key not found")
if "" == TVDB_API_KEY: logger.warning("TVDB API key not found")
def process_log(media_type, log):
"""Run through a log and call the appropriate API for each item found"""
logger.info(f"Processing {media_type}/{log}")
with open(f"./data/{media_type}/{log}.json", "r") as log_file:
log_items = json.load(log_file)
log_item_values = {}
for i, item in enumerate(log_items):
try:
if 'id' not in item:
if 'films' == media_type: item_title = item['Title']
elif 'tv-episodes' == media_type: item_title = item['Episode Title']
elif 'tv-series' == media_type: item_title = item['Show Title']
logger.debug(f"Processing {item_title}")
# Rename pre-existing fields
if 'Date Added' in item:
log_item_values['date_added'] = item['Date Added']
del item['Date Added']
if 'Date Watched' in item:
log_item_values['date_finished'] = item['Date Watched']
del item['Date Watched']
if 'Rewatch' in item:
log_item_values['is_repeat'] = item['Rewatch']
del item['Rewatch']
if 'Comments' in item:
log_item_values['comments'] = item['Comments']
del item['Comments']
if 'Series Title' in item:
log_item_values['series_title'] = item['Series Title']
del item['Series Title']
if 'Episode Title' in item:
log_item_values['name'] = item['Episode Title']
del item['Episode Title']
if 'Episode Number' in item:
if re.search("[0-9]+x[0-9]+", item['Episode Number']) is not None:
season_no, _, episode_no = log_item_values['episode_number'].split("x")
elif re.search("S[0-9]+E[0-9]+", item['Episode Number']) is not None:
season_no, _, episode_no = log_item_values['episode_number'].split("E")
elif re.search("E[0-9]+", item['Episode Number']) is not None:
season_no = None
episode_no = item['episode_number'][1:]
else:
logger.error(f"Invalid episode number format '{item['Episode Number']}'")
return
log_item_values['season_number'] = season_no
log_item_values['episode_number'] = episode_no
del item['Episode Number']
if 'IMDB ID' in item:
log_items[i] = import_by_id(item['IMDB ID'], media_type)
else:
log_items[i] = import_by_details(item, item_title, media_type)
if log_items[i] is None:
item['imdb_id'] = input(f"Enter IMDB ID for {item_title}: ")
if re.search("tt[0-9]+", item['imdb_id']) is not None:
log_items[i] = import_by_id(item['imdb_id'], media_type)
with open(f"./data/{media_type}/{log}.json", "w") as log_file:
json.dump(log_items, log_file, indent=4)
else:
logger.warning(f"Skipped {item_title}")
if log_items[i] is not None: log_items[i] |= log_item_values
except KeyError:
print(json.dumps(item, indent=4))
with open(f"./data/{media_type}/{log}.json", "w") as log_file:
json.dump(log_items, log_file, indent=4)
logger.info(f"Finished processing {media_type}/{log}")
def import_by_details(item, item_title, media_type):
"""Import an item when lacking a unique identifier"""
if media_type in ['films', 'tv-series']:
return import_from_tmdb_by_details(item, item_title, media_type)
elif media_type in ['tv-episodes']:
return #import_from_tvdb_by_details(item, item_title, media_type)
elif media_type in ['books']:
return #import_from_openlibrary_by_details(item, item_title, media_type)
elif media_type in ['games']:
return #import_from_igdb_by_details(item, item_title, media_type)
def import_from_tmdb_by_details(item, item_title, media_type):
"""Retrieve a film or TV series from TMDB using its title"""
logger.info(f"Processing {item_title}")
api_url = f"https://api.themoviedb.org/3/search/{'movie' if 'films' == media_type else 'tv'}"
# Sending API request
response = requests.get(
api_url,
params={
'query': item_title,
'include_adult': True,
'year': item['Release Year'] if 'Release Year' in item else None
},
headers={'Authorization': f"Bearer {TMDB_API_KEY}"}
)
# Process the response
if (200 == response.status_code):
logger.info(response.status_code)
elif (429 == response.status_code):
time.sleep(2)
import_from_tmdb_by_details(item)
else:
logger.error(response.text)
response_data = json.loads(response.text)['results']
if 1 == len(response_data):
return cleanup_result(response_data[0], media_type)
elif 0 == len(response_data):
logger.warning(f"Returned no {media_type} for {item_title}")
elif 1 < len(response_data):
if 'films' == media_type: title_key = 'title'
elif 'tv-series' == media_type: title_key = 'name'
response_data = [result for result in response_data if result[title_key] == item_title]
if 1 == len(response_data):
return cleanup_result(response_data[0], media_type)
else:
logger.warning(f"Returned more than one {media_type} for '{item_title}':\n")
print(json.dumps(response_data, indent=4))
idx = input("\nEnter the index of the result to use: ")
if "" != idx:
try:
return cleanup_result(response_data[int(idx)], media_type)
except:
logger.error("Index invalid!")
print("Index invalid!")
item['IMDB ID'] = input(f"Enter IMDB ID for {item_title}: ")
if '' != item['IMDB ID']:
return import_by_id(item['IMDB ID'], media_type)
else:
logger.warning(f"Skipped {item_title}")
return item
def main():
media_type = ''
while media_type not in ['films', 'tv-episodes', 'tv-series', 'books']:
media_type = input("Select media type [films|tv-episodes|tv-series|books]: ")
try:
if 'films' == media_type:
log = ''
while log not in ['log', 'wishlist']:
log = input ("Enter log to process [log|wishlist]:")
process_log(media_type, log)
elif 'books' == media_type:
log = ''
while log not in ['log', 'current', 'wishlist']:
log = input ("Enter log to process [log|current|wishlist]:")
# TODO
elif 'tv-episodes' == media_type:
process_log(media_type, 'log')
# TODO
elif 'tv-series' == media_type:
log = ''
while log not in ['log', 'current', 'wishlist']:
log = input ("Enter log to process [log|current|wishlist]:")
process_log(media_type, log)
except Exception as error:
logger.exception("Exception occurred")
print(error)
if __name__ == "__main__":
main()