# Script to add a new item to the log from datetime import datetime from dotenv import load_dotenv import json import logging import os import re import requests from urllib.request import urlopen logging.basicConfig(filename='./logs/run.log', encoding='utf-8', level=logging.DEBUG) load_dotenv() TMDB_API_KEY = os.getenv('TMDB_API_KEY') TVDB_API_KEY = os.getenv('TVDB_API_KEY') if "" == TMDB_API_KEY: logging.error("TMDB API key not found") if "" == TVDB_API_KEY: logging.error("TVDB API key not found") def add_item_to_log(item_id, media_type, log): """Add a film, book, TV series or TV episode to a log""" logging.info(f"Processing {item_id}…") item = import_by_id(item_id, media_type) if log in ['log', 'current']: # TODO - review this when moving from one log to another if media_type in ['books', 'tv-series', 'games']: date_started = '' while re.search('[0-9]{4}-[0-9]{2}-[0-9]{2}', date_started) is None: date_started = input("Enter date started [YYYY-MM-DD, t for today]: ") if 't' == date_started: date_started = datetime.today().strftime('%Y-%m-%d') item['date_started'] = date_started if 'log' == log: date_finished = '' while re.search('[0-9]{4}-[0-9]{2}-[0-9]{2}', date_finished) is None: date_finished = input("Enter date finished [YYYY-MM-DD, t for today]: ") if 't' == date_finished: date_finished = datetime.today().strftime('%Y-%m-%d') item['date_finished'] = date_finished # TODO - do this automatically is_repeat = '' while is_repeat not in ['y', 'n']: is_repeat = input(f"Is this a repeat entry? [y/n]: ") if 'y' == is_repeat: item['is_repeat'] = True item['added_by_id'] = item_id comments = input("Enter comments (optional): ") if '' != comments: item['comments'] = comments # Validation step correct = '' print(f"{media_type} data to add:\n") print(json.dumps(item, indent=4)) if 'y' != input("\nDoes this look correct? [y]: "): return # Save changes logging.info(f"Adding {media_type} to {log}…") with open(f"./data/{media_type}/{log}.json", "r") as log_file: log_items = json.load(log_file) log_items.insert(0, item) with open(f"./data/{media_type}/{log}.json", "w") as log_file: json.dump(log_items, log_file, indent=4) logging.info(f"Added {media_type} {item_id} to {log}") def import_by_id(import_id, media_type): if media_type in ['films', 'tv-series']: return import_from_imdb_by_id(import_id, media_type) elif media_type in ['tv-episodes']: return #import_from_tvdb_by_id(import_id, media_type) elif media_type in ['books']: return import_from_openlibrary_by_id(import_id, media_type) def import_from_imdb_by_id(imdb_id, media_type): """Retrieve a film, TV show or TV episode from TMDB using an IMDB ID""" api_url = f"https://api.themoviedb.org/3/find/{imdb_id}" # Sending API request response = requests.get( api_url, params={ 'external_source': 'imdb_id' }, headers={'Authorization': f"Bearer {TMDB_API_KEY}"} ) # Process the response if (200 == response.status_code): logging.info(response.status_code) elif (429 == response.status_code): time.sleep(2) import_from_imdb_by_id(imdb_id, media_type) return else: raise Exception(f"Error {response.status_code}: {response.text}") if ('films' == media_type): results_key = 'movie_results' elif ('tv-episodes' == media_type): results_key = 'TODO' elif ('tv-series' == media_type): results_key = 'tv_results' response_data = json.loads(response.text)[results_key] if 1 == len(response_data): item = response_data[0] elif 0 == len(response_data): raise Exception(f"Returned no results for {imdb_id}") elif 1 < len(response_data): logging.warning(f"Returned more than one {media_type} for ID '{imdb_id}'") print(f"Returned more than one {media_type} for ID '{imdb_id}':\n") print(json.dumps(response_data, indent=4)) idx = input("\nEnter the index of the result to use: ") try: item = response_data[int(idx)] except: raise Exception(f"Index {idx} is invalid") # Modify the returned result to add additional data return cleanup_result(item, media_type) def import_from_openlibrary_by_id(isbn, media_type): """Retrieve a film, TV show or TV episode from TMDB using an IMDB ID""" api_url = f"https://openlibrary.org/isbn/{isbn}" # Sending API request response = requests.get( api_url, headers={'accept': 'application/json'} ) # Process the response if (200 == response.status_code): logging.info(response.status_code) elif (429 == response.status_code): time.sleep(2) import_from_openlibrary_by_id(isbn, media_type) return else: raise Exception(f"Error {reponse.status_code}: {response.text}") item = json.loads(response.text) for key in ['authors', 'works']: if key in item: for i, sub_item in enumerate(item[key]): item[key][i] = import_from_openlibrary_by_ol_key(sub_item['key']) if 'works' in item: if len(item['works']) > 1: raise Exception(f"Multiple works found for {isbn}") else: item['work'] = item['works'][0] del item['works'] # Modify the returned result to add additional data return cleanup_result(item, media_type) def import_from_openlibrary_by_ol_key(key): """Retrieves an item (author or work) from OpenLibrary using an OL key""" _, mode, ol_id = key.split('/') if mode in ['works', 'authors']: api_url = f"https://openlibrary.org{key}" # Sending API request response = requests.get( api_url, headers={'accept': 'application/json'} ) # Process the response if (200 == response.status_code): logging.info(response.status_code) elif (429 == response.status_code): time.sleep(2) import_from_openlibrary_by_ol_key(key) return else: raise Exception(f"Error {reponse.status_code}: {response.text}") item = json.loads(response.text) if 'authors' == mode: author = { 'id': ol_id, 'name': item['name'] } if 'personal_name' in item: if item['name'] != item['personal_name']: author['personal_name'] = item['personal_name'] return author elif 'works' == mode: work = { 'id': ol_id, 'title': item['title'] } for key in ['first_publish_date', 'subjects']: if key in item: work[key] = item[key] return work else: raise Exception(f"Unknown OpenLibrary key '{mode}'") def cleanup_result(item, media_type): """Process a film, TV series, TV episode or book returned by their respecitve APIs by removing unnecessary fields and adding others""" for field_name in [ 'adult', # TMDB 'backdrop_path', # TMDB 'copyright_date', # OpenLibrary 'classifications', # OpenLibrary 'created', # OpenLibrary 'episode_type', # TMDB 'first_sentence', # OpenLibrary 'genre_ids', # TMDB 'identifiers', # OpenLibrary 'media_type', # TMDB 'last_modified', # OpenLibrary 'latest_revision', # OpenLibrary 'lc_classifications', # OpenLibrary 'local_id', # OpenLibrary 'ocaid', # OpenLibrary 'popularity', # TMDB 'production_code', # TMDB 'revision', # OpenLibrary 'runtime', # TMDB 'source_records', # OpenLibrary 'still_path', # TMDB 'type', # OpenLibrary 'video', # TMDB 'vote_average', # TMDB 'vote_count' # TMDB ]: if field_name in item: del item[field_name] if media_type in ['films', 'tv-series']: title_key = 'name' if 'tv-series' == media_type else 'title' if f"original_{title_key}" in item and 'original_language' in item: if item[f"original_{title_key}"] == item[title_key] and item['original_language'] == 'en': del item[f"original_{title_key}"], item['original_language'] if 'books' == media_type: _, _, item['id'] = item['key'].split('/') del item['key'] for key in ['isbn_10', 'isbn_13']: if key in item: if len(item[key]) > 1: raise Exception("Multiple ISBN results") else: item[key] = item[key][0] if 'publish_places' in item: if len(item['publish_places']) > 1: raise Exception("Multiple publish_places") else: item['published_in'] = item['publish_places'][0] del item['publish_places'] if 'languages' in item: item['languages'] = [lang['key'].split('/')[2] for lang in item['languages']] if 'translation_of' in item: if item['translation_of'] == item['work']['title']: del item['translation_of'] else: raise Exception(f"translation_of '{item['translation_of']}' is different to work title '{item['work']['title']}'") if 'translated_from' in item: if len(item['translated_from']) > 1: raise Exception("Multiple translated_from results") else: item['work']['original_language'] = item['translated_from'][0]['key'].split('/')[2] del item['translated_from'] if 'date_added' not in item: item['date_added'] = datetime.today().strftime('%Y-%m-%d') return item def main(): media_type = '' while media_type not in ['films', 'tv-episodes', 'tv-series', 'books']: media_type = input("Select media type [films|tv-episodes|tv-series|books]: ") try: if 'films' == media_type: log = '' while log not in ['log', 'wishlist']: log = input ("Enter log to update [log|wishlist]: ") imdb_id = '' while re.search("tt[0-9]+", imdb_id) is None: imdb_id = input("Enter IMDB ID: ") add_item_to_log(imdb_id, media_type, log) elif 'books' == media_type: log = '' while log not in ['log', 'current', 'wishlist']: log = input ("Enter log to update [log|current|wishlist]: ") isbn = '' while re.search("[0-9]+", isbn) is None: isbn = input("Enter ISBN: ") add_item_to_log(isbn, media_type, log) elif 'tv-episodes' == media_type: imdb_id = '' while re.search("tt[0-9]+", imdb_id) is None: imdb_id = input("Enter IMDB ID: ") add_item_to_log(imdb_id, media_type, 'log') elif 'tv-series' == media_type: log = '' while log not in ['log', 'current', 'wishlist']: log = input ("Enter log to update [log|current|wishlist]: ") imdb_id = '' while re.search("tt[0-9]+", imdb_id) is None: imdb_id = input("Enter IMDB ID: ") add_item_to_log(imdb_id, media_type, log) except Exception as error: logging.error(repr(error)) print(error) if __name__ == "__main__": main()