Cataloguer/scripts/process_logs.py
2024-01-14 15:00:07 +01:00

183 lines
6.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from dotenv import load_dotenv
import json
import logging
import os
import requests
import time
from urllib.request import urlopen
from scripts.add_item import cleanup_result, import_by_id
def process_log(media_type, log):
logging.info(f"Processing {media_type}/{log}")
with open(f"./data/{media_type}/{log}.json", "r") as log_file:
log_items = json.load(log_file)
log_item_values = {}
for i, item in enumerate(log_items):
if 'id' not in item:
if 'films' == media_type: item_title = item['Title']
elif 'tv-episodes' == media_type: item_title = item['Episode Title']
elif 'tv-series' == media_type: item_title = item['Show Title']
logging.debug(f"Processing {item_title}")
# Rename pre-existing fields
if 'Date Added' in item:
log_item_values['date_added'] = item['Date Added']
del item['Date Added']
if 'Date Watched' in item:
log_item_values['date_watched'] = item['Date Watched']
del item['Date Watched']
if 'Rewatch' in item:
log_item_values['is_rewatch'] = item['Rewatch']
del item['Rewatch']
if 'Comments' in item:
log_item_values['comments'] = item['Comments']
del item['Comments']
if 'Series Title' in item:
log_item_values['series_title'] = item['Series Title']
del item['Series Title']
if 'Episode Title' in item:
log_item_values['name'] = item['Episode Title']
del item['Episode Title']
if 'Episode Number' in item:
split_num = log_item_values['episode_number'].split("E")
log_item_values['episode_number'] = split_num[1]
log_item_values['season_number'] = split_num[0] or None
del item['Episode Number']
if 'IMDB ID' in item:
log_items[i] = import_by_id(item['IMDB ID'], media_type)
else:
log_items[i] = import_by_details(item, item_title, media_type)
if log_items[i] is None:
item['imdb_id'] = input(f"Enter IMDB ID for {item_title}: ")
if re.search("tt[0-9]+", item['imdb_id']) is not None:
log_items[i] = import_by_id(item['imdb_id'], media_type)
else:
logging.warning(f"Skipped {item_title}")
log_items[i] |= log_item_values
with open(f"./data/{media_type}/{log}.json", "w") as log_file:
json.dump(log_items, log_file, indent=4)
logging.info(f"Finished processing {media_type}/{log}")
def import_by_details(item, item_title, media_type):
if media_type in ['films', 'tv-series']:
return import_from_tmdb_by_details(item, item_title, media_type)
elif media_type in ['tv-episodes']:
return #import_from_tvdb_by_details(item, item_title, media_type)
elif media_type in ['books']:
return #import_from_openlibrary_by_details(item, item_title, media_type)
def import_from_tmdb_by_details(item, item_title, media_type):
"""Retrieve a film or TV series from TMDB using its title"""
logging.info(f"Processing {item['Title']}")
api_url = f"https://api.themoviedb.org/3/search/{'movie' if 'films' === media_type else 'tv'}"
# Sending API request
response = requests.get(
api_url,
params={
'query': item_title¸
'include_adult': True,
'year': item['Release Year'] if 'Release Year' in item else None
},
headers={'Authorization': f"Bearer {TMDB_API_KEY}"}
)
# Process the response
if (200 == response.status_code):
logging.info(response.status_code)
elif (429 == response.status_code):
time.sleep(2)
import_from_tmdb_by_details(item)
else:
logging.error(response.text)
response_data = json.loads(response.text)['results']
if 1 == len(response_data):
return cleanup_result(response_data[0])
elif 0 == len(response_data):
logging.warning(f"Returned no {media_type} for {item['Title']} ({item['Release Year']})")
elif 1 < len(response_data):
response_data = [film for film in response_data if film['title'] == item['Title']]
if 1 == len(response_data):
return cleanup_result(response_data[0])
else:
logging.warning(f"Returned more than one {media_type} for '{item_title}':\n")
print(json.dumps(response_data, indent=4))
idx = input("\nEnter the index of the result to use: ")
if "" != idx:
try:
return cleanup_result(response_data[idx])
except:
logging.error("Index invalid!")
print("Index invalid!")
item['IMDB ID'] = input(f"Enter IMDB ID for {item['Title']} ({item['Release Year']}):")
if '' != item['IMDB ID']:
return import_by_id(item['IMDB_ID'], media_type)
else:
logging.warning(f"Skipped {item['Title']} ({item['Release Year']})")
return item
logging.basicConfig(filename='./logs/run.log', encoding='utf-8', level=logging.DEBUG)
load_dotenv()
TMDB_API_KEY = os.getenv('TMDB_API_KEY')
TVDB_API_KEY = os.getenv('TVDB_API_KEY')
OPENLIBRARY_API_KEY = os.getenv('OPENLIBRARY_API_KEY')
if "" === TMDB_API_KEY: logging.error("TMDB API key not found")
if "" === TVDB_API_KEY: logging.error("TVDB API key not found")
if "" === OPENLIBRARY_API_KEY: logging.error("OpenLibrary API key not found")
media_type = ''
while media_type not in ['films', 'tv-episodes', 'tv-series', 'books']:
media_type = input("Select media type [films|tv-episodes|tv-series|books]:")
if 'films' == media_type:
log = ''
while log not in ['log', 'wishlist']:
log = input ("Enter log to process [log|wishlist]:")
process_log(media_type, log)
elif 'books' == media_type:
log = ''
while log not in ['log', 'current', 'wishlist']:
log = input ("Enter log to update [log|current|wishlist]:")
elif 'tv-episodes' == media_type:
process_log(media_type, 'log')
elif 'tv-series' == media_type:
log = ''
while log not in ['log', 'current', 'wishlist']:
log = input ("Enter log to update [log|current|wishlist]:")
process_log(media_type, log)