Cataloguer/scripts/process_logs.py

193 lines
6.8 KiB
Python
Raw Normal View History

2024-01-14 14:00:07 +00:00
from dotenv import load_dotenv
import json
import logging
import os
2024-01-14 15:11:01 +00:00
import re
2024-01-14 14:00:07 +00:00
import requests
import time
from urllib.request import urlopen
2024-01-14 15:11:01 +00:00
from add_item import cleanup_result, import_by_id
2024-01-14 14:00:07 +00:00
2024-01-14 15:11:01 +00:00
logging.basicConfig(filename='./logs/run.log', encoding='utf-8', level=logging.DEBUG)
load_dotenv()
TMDB_API_KEY = os.getenv('TMDB_API_KEY')
TVDB_API_KEY = os.getenv('TVDB_API_KEY')
OPENLIBRARY_API_KEY = os.getenv('OPENLIBRARY_API_KEY')
if "" == TMDB_API_KEY: logging.error("TMDB API key not found")
if "" == TVDB_API_KEY: logging.error("TVDB API key not found")
if "" == OPENLIBRARY_API_KEY: logging.error("OpenLibrary API key not found")
2024-01-14 14:00:07 +00:00
def process_log(media_type, log):
logging.info(f"Processing {media_type}/{log}")
with open(f"./data/{media_type}/{log}.json", "r") as log_file:
log_items = json.load(log_file)
log_item_values = {}
for i, item in enumerate(log_items):
2024-01-14 15:11:01 +00:00
try:
if 'id' not in item:
if 'films' == media_type: item_title = item['Title']
elif 'tv-episodes' == media_type: item_title = item['Episode Title']
elif 'tv-series' == media_type: item_title = item['Show Title']
logging.debug(f"Processing {item_title}")
# Rename pre-existing fields
if 'Date Added' in item:
log_item_values['date_added'] = item['Date Added']
del item['Date Added']
if 'Date Watched' in item:
log_item_values['date_watched'] = item['Date Watched']
del item['Date Watched']
if 'Rewatch' in item:
log_item_values['is_rewatch'] = item['Rewatch']
del item['Rewatch']
if 'Comments' in item:
log_item_values['comments'] = item['Comments']
del item['Comments']
if 'Series Title' in item:
log_item_values['series_title'] = item['Series Title']
del item['Series Title']
if 'Episode Title' in item:
log_item_values['name'] = item['Episode Title']
del item['Episode Title']
if 'Episode Number' in item:
split_num = log_item_values['episode_number'].split("E")
log_item_values['episode_number'] = split_num[1]
log_item_values['season_number'] = split_num[0] or None
del item['Episode Number']
if 'IMDB ID' in item:
log_items[i] = import_by_id(item['IMDB ID'], media_type)
2024-01-14 14:00:07 +00:00
else:
2024-01-14 15:11:01 +00:00
log_items[i] = import_by_details(item, item_title, media_type)
if log_items[i] is None:
item['imdb_id'] = input(f"Enter IMDB ID for {item_title}: ")
if re.search("tt[0-9]+", item['imdb_id']) is not None:
log_items[i] = import_by_id(item['imdb_id'], media_type)
with open(f"./data/{media_type}/{log}.json", "w") as log_file:
json.dump(log_items, log_file, indent=4)
else:
logging.warning(f"Skipped {item_title}")
2024-01-14 14:00:07 +00:00
2024-01-14 15:11:01 +00:00
if log_items[i] is not None: log_items[i] |= log_item_values
except KeyError:
print(json.dumps(item, indent=4))
2024-01-14 14:00:07 +00:00
with open(f"./data/{media_type}/{log}.json", "w") as log_file:
json.dump(log_items, log_file, indent=4)
logging.info(f"Finished processing {media_type}/{log}")
def import_by_details(item, item_title, media_type):
if media_type in ['films', 'tv-series']:
return import_from_tmdb_by_details(item, item_title, media_type)
elif media_type in ['tv-episodes']:
return #import_from_tvdb_by_details(item, item_title, media_type)
elif media_type in ['books']:
return #import_from_openlibrary_by_details(item, item_title, media_type)
def import_from_tmdb_by_details(item, item_title, media_type):
"""Retrieve a film or TV series from TMDB using its title"""
2024-01-14 15:11:01 +00:00
logging.info(f"Processing {item_title}")
2024-01-14 14:00:07 +00:00
2024-01-14 15:11:01 +00:00
api_url = f"https://api.themoviedb.org/3/search/{'movie' if 'films' == media_type else 'tv'}"
2024-01-14 14:00:07 +00:00
# Sending API request
response = requests.get(
api_url,
params={
2024-01-14 15:11:01 +00:00
'query': item_title,
2024-01-14 14:00:07 +00:00
'include_adult': True,
'year': item['Release Year'] if 'Release Year' in item else None
},
headers={'Authorization': f"Bearer {TMDB_API_KEY}"}
)
# Process the response
if (200 == response.status_code):
logging.info(response.status_code)
elif (429 == response.status_code):
time.sleep(2)
import_from_tmdb_by_details(item)
else:
logging.error(response.text)
response_data = json.loads(response.text)['results']
if 1 == len(response_data):
return cleanup_result(response_data[0])
elif 0 == len(response_data):
2024-01-14 15:11:01 +00:00
logging.warning(f"Returned no {media_type} for {item_title}")
2024-01-14 14:00:07 +00:00
elif 1 < len(response_data):
2024-01-14 15:11:01 +00:00
if 'films' == media_type: title_key = 'title'
elif 'tv-series' == media_type: title_key = 'name'
response_data = [result for result in response_data if result[title_key] == item_title]
2024-01-14 14:00:07 +00:00
if 1 == len(response_data):
return cleanup_result(response_data[0])
else:
logging.warning(f"Returned more than one {media_type} for '{item_title}':\n")
print(json.dumps(response_data, indent=4))
idx = input("\nEnter the index of the result to use: ")
if "" != idx:
try:
2024-01-14 15:11:01 +00:00
return cleanup_result(response_data[int(idx)])
2024-01-14 14:00:07 +00:00
except:
logging.error("Index invalid!")
print("Index invalid!")
2024-01-14 15:11:01 +00:00
item['IMDB ID'] = input(f"Enter IMDB ID for {item_title}: ")
2024-01-14 14:00:07 +00:00
if '' != item['IMDB ID']:
2024-01-14 15:11:01 +00:00
return import_by_id(item['IMDB ID'], media_type)
2024-01-14 14:00:07 +00:00
else:
2024-01-14 15:11:01 +00:00
logging.warning(f"Skipped {item_title}")
2024-01-14 14:00:07 +00:00
return item
media_type = ''
while media_type not in ['films', 'tv-episodes', 'tv-series', 'books']:
2024-01-14 15:11:01 +00:00
media_type = input("Select media type [films|tv-episodes|tv-series|books]: ")
2024-01-14 14:00:07 +00:00
if 'films' == media_type:
log = ''
while log not in ['log', 'wishlist']:
log = input ("Enter log to process [log|wishlist]:")
process_log(media_type, log)
elif 'books' == media_type:
log = ''
while log not in ['log', 'current', 'wishlist']:
2024-01-14 15:11:01 +00:00
log = input ("Enter log to process [log|current|wishlist]:")
2024-01-14 14:00:07 +00:00
elif 'tv-episodes' == media_type:
process_log(media_type, 'log')
elif 'tv-series' == media_type:
log = ''
while log not in ['log', 'current', 'wishlist']:
2024-01-14 15:11:01 +00:00
log = input ("Enter log to process [log|current|wishlist]:")
2024-01-14 14:00:07 +00:00
process_log(media_type, log)