Cataloguer/scripts/process_film_logs.py

130 lines
4.1 KiB
Python

import json
import logging
import requests
import time
from urllib.request import urlopen
def process_items(items):
logging.info("Processing items…")
for i, item in enumerate(items):
item_values = {
'date_added': item['Date Added'],
'date_watched': item['Date Watched']
}
if 'Rewatch' in item: item_values['is_rewatch'] = item['Rewatch']
if 'Comments' in item: item_values['comments'] = item['Comments']
if 'IMDB ID' in item:
items[i] = populate_from_id(item)
else:
items[i] = populate_from_details(item)
items[i] |= item_values
with open("../data/films/log.json", "w") as films_log:
json.dump(items, films_log, indent=4)
logging.info("Finished processing items")
def populate_from_details(item):
logging.info(f"Processing {item['Title']}")
api_url = f"https://api.themoviedb.org/3/search/movie"
# Sending API request
response = requests.get(
api_url,
params={
'query': item['Title'],
'include_adult': True,
'year': item['Release Year']
},
headers={'Authorization': 'Bearer eyJhbGciOiJIUzI1NiJ9.eyJhdWQiOiI1NWQ2ZjY3YzJlOTQwMDI1NTFmN2VkNmEyZWVjM2E3NyIsInN1YiI6IjUxNWMyNzkxMTljMjk1MTQ0ZDAzZDM0NCIsInNjb3BlcyI6WyJhcGlfcmVhZCJdLCJ2ZXJzaW9uIjoxfQ.92eNKubJ_CORCIIlta30P9Qjg_Q9gPRFDTfG4gyz9kY'}
)
# Process the response
if (200 == response.status_code):
logging.info(response.status_code)
elif (429 == response.status_code):
time.sleep(2)
populate_from_details(item)
else:
logging.error(response.text)
response_data = json.loads(response.text)
if len(response_data['results']) > 1:
response_data['results'] = [film for film in response_data['results'] if film['title'] == item['Title']]
if len(response_data['results']) > 1:
logging.warning(f"Returned more than one film for {item['Title']}")
return item
if len(response_data['results']) > 0:
film = response_data['results'][0]
return cleanup_film(film)
else:
logging.warning(f"Returning no results for {item['Title']}")
return item
def populate_from_id(item):
logging.info(f"Processing ID {item['IMDB ID']} ({item['Title']})…")
api_url = f"https://api.themoviedb.org/3/find/{item['IMDB ID']}"
# Sending API request
response = requests.get(
api_url,
params={
'external_source': 'imdb_id'
},
headers={'Authorization': 'Bearer eyJhbGciOiJIUzI1NiJ9.eyJhdWQiOiI1NWQ2ZjY3YzJlOTQwMDI1NTFmN2VkNmEyZWVjM2E3NyIsInN1YiI6IjUxNWMyNzkxMTljMjk1MTQ0ZDAzZDM0NCIsInNjb3BlcyI6WyJhcGlfcmVhZCJdLCJ2ZXJzaW9uIjoxfQ.92eNKubJ_CORCIIlta30P9Qjg_Q9gPRFDTfG4gyz9kY'}
)
# Process the response
if (200 == response.status_code):
logging.info(response.status_code)
elif (429 == response.status_code):
time.sleep(2)
populate_from_id(item)
else:
logging.error(response.text)
response_data = json.loads(response.text)
if len(response_data['movie_results']) > 1:
logging.warning(f"Returned more than one film for ID {item['IMDB ID']}")
return item
if len(response_data['movie_results']) > 0:
film = response_data['movie_results'][0]
return cleanup_film(film)
else:
logging.warning(f"Returning no results for {item['Title']}")
return item
return cleanup_film(film)
def cleanup_film(film):
del film['adult'], film['backdrop_path'], film['genre_ids'], film['popularity'], film['video'], film['vote_average'], film['vote_count']
if 'media_type' in film: del film['media_type']
if film['original_title'] == film['title'] and film['original_language'] == 'en':
del film['original_title'], film['original_language']
film['poster_path'] = f"https://www.themoviedb.org/t/p/original/{film['poster_path']}"
return film
logging.basicConfig(filename='run.log', encoding='utf-8', level=logging.DEBUG)
with open("../data/films/log.json", "r") as films_log:
films = json.load(films_log)
process_items(films)