Compare commits

..

No commits in common. "29592be6ceb95cb17e941612b3a0f1619a0369ef" and "cc6e4de409640e969e31a9c86034e0254a6c3e4b" have entirely different histories.

7 changed files with 2241 additions and 7576 deletions

1
.gitignore vendored
View file

@ -6,4 +6,3 @@ node_modules/
public/
logs/
.env
catalogue_venv/

View file

@ -1,39 +1,4 @@
[
{
"id": 2371,
"origin_country": [
"GB"
],
"overview": "Bodies is an award-winning British television medical drama produced by Hat Trick Productions for the BBC. Created by Jed Mercurio, the series began in 2004 and is based on his book Bodies. In December 2009, The Times ranked Bodies in 9th place in its list of \"Shows of the Decade\". The Guardian has ranked the series among \"The Greatest Television Dramas of All-Time\".",
"poster_path": "/7sDRMbZC5zvbE4UMjm9cw7KSj9u.jpg",
"first_air_date": "2004-06-23",
"name": "Bodies",
"date_added": "2024-01-17"
},
{
"id": 80307,
"origin_country": [
"GB"
],
"overview": "A troubled war veteran is assigned to protect a controversial politician who may be the target of a terror plot.",
"poster_path": "/5DUJTrHTRLHLCKWriPhdusQogAv.jpg",
"first_air_date": "2018-08-26",
"name": "Bodyguard",
"date_added": "2024-01-17"
},
{
"id": 890,
"origin_country": [
"JP"
],
"original_language": "ja",
"original_name": "\u65b0\u4e16\u7d00\u30a8\u30f4\u30a1\u30f3\u30b2\u30ea\u30aa\u30f3",
"overview": "At the turn of the century, the Angels returned to Earth, seeking to wipe out humanity in an apocalyptic fury. Devastated, mankind's last remnants moved underground to wait for the day when the Angels would come back to finish the job. Fifteen years later, that day has come... but this time, humanity is ready to fight back with terrifying bio-mechanical weapons known as the Evangelions. Watch as Shinji, Rei, Asuka and the rest of the mysterious shadow agency Nerv battle to save earth from total annihilation.",
"poster_path": "/y2ah9t0navXyIvoHg1uIbIHO3tt.jpg",
"first_air_date": "1995-10-04",
"name": "Neon Genesis Evangelion",
"date_added": "2024-01-17"
},
{
"id": 2316,
"name": "The Office",

File diff suppressed because it is too large Load diff

View file

@ -6,8 +6,7 @@
"build": "rm -rf ./public/ && snap run hugo --templateMetrics --templateMetricsHints",
"deploy": "rsync -rP ./public/ ovhvps:~/catalogue/content",
"lint:json": "jsonlint ./**/*.json -s",
"lint:json:fix": "npm run lint:json -- -i",
"lint:py": "pylint --disable=broad-exception-raised --disable=logging-fstring-interpolation ./scripts/*.py"
"lint:json:fix": "npm run lint:json -- -i"
},
"devDependencies": {
"jsonlint": "^1.6.3"

View file

@ -1,13 +0,0 @@
astroid==3.0.2
black==23.12.1
click==8.1.7
dill==0.3.7
isort==5.13.2
mccabe==0.7.0
mypy-extensions==1.0.0
packaging==23.2
pathspec==0.12.1
platformdirs==4.1.0
pylint==3.0.3
python-dotenv==1.0.0
tomlkit==0.12.3

View file

@ -1,126 +1,114 @@
"""
Add a new item to a media catalogue, using various APIs.
"""
# Script to add a new item to the log
from datetime import datetime
from dotenv import load_dotenv
import json
import logging
import os
import re
import time
from datetime import datetime
import requests
from dotenv import load_dotenv
def setup_logger(name="add_item"):
"""Set up the logger for console and file"""
from urllib.request import urlopen
def setup_logger(name = __name__):
logging.root.setLevel(logging.NOTSET)
logr = logging.getLogger(name)
logger = logging.getLogger(name)
c_handler = logging.StreamHandler()
f_handler = logging.FileHandler("./logs/run.log")
f_handler = logging.FileHandler('./logs/run.log')
c_handler.setLevel(logging.INFO)
f_handler.setLevel(logging.ERROR)
c_format = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
f_format = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
c_format = logging.Formatter('%(name)s - %(levelname)s - %(message)s')
f_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
c_handler.setFormatter(c_format)
f_handler.setFormatter(f_format)
logr.addHandler(c_handler)
logr.addHandler(f_handler)
logger.addHandler(c_handler)
logger.addHandler(f_handler)
return logr
return logger
logger = setup_logger()
load_dotenv()
TMDB_API_KEY = os.getenv("TMDB_API_KEY")
TVDB_API_KEY = os.getenv("TVDB_API_KEY")
TMDB_API_KEY = os.getenv('TMDB_API_KEY')
TVDB_API_KEY = os.getenv('TVDB_API_KEY')
if "" == TMDB_API_KEY:
logger.error("TMDB API key not found")
if "" == TVDB_API_KEY:
logger.error("TVDB API key not found")
if "" == TMDB_API_KEY: logger.error("TMDB API key not found")
if "" == TVDB_API_KEY: logger.error("TVDB API key not found")
def add_item_to_log(item_id, media_type, log) -> None:
def add_item_to_log(item_id, media_type, log):
"""Add a film, book, TV series or TV episode to a log"""
logger.info(f"Processing {item_id}")
item: dict = import_by_id(item_id, media_type)
item = import_by_id(item_id, media_type)
if log in ['log', 'current']:
if log in ["log", "current"]:
# TODO - review this when moving from one log to another
if media_type in ["books", "tv-series", "games"]:
date_started = ""
while re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}", date_started) is None:
if media_type in ['books', 'tv-series', 'games']:
date_started = ''
while re.search('[0-9]{4}-[0-9]{2}-[0-9]{2}', date_started) is None:
date_started = input("Enter date started [YYYY-MM-DD, t for today]: ")
if "t" == date_started:
date_started = datetime.today().strftime("%Y-%m-%d")
item["date_started"] = date_started
if 't' == date_started: date_started = datetime.today().strftime('%Y-%m-%d')
item['date_started'] = date_started
if "log" == log:
date_finished = ""
while re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}", date_finished) is None:
if 'log' == log:
date_finished = ''
while re.search('[0-9]{4}-[0-9]{2}-[0-9]{2}', date_finished) is None:
date_finished = input("Enter date finished [YYYY-MM-DD, t for today]: ")
if "t" == date_finished:
date_finished = datetime.today().strftime("%Y-%m-%d")
item["date_finished"] = date_finished
if 't' == date_finished: date_finished = datetime.today().strftime('%Y-%m-%d')
item['date_finished'] = date_finished
# TODO - do this automatically
is_repeat = ""
while is_repeat not in ["y", "n"]:
is_repeat = input("Is this a repeat entry? [y/n]: ")
if "y" == is_repeat:
item["is_repeat"] = True
item["added_by_id"] = item_id
is_repeat = ''
while is_repeat not in ['y', 'n']:
is_repeat = input(f"Is this a repeat entry? [y/n]: ")
if 'y' == is_repeat: item['is_repeat'] = True
item['added_by_id'] = item_id
comments = input("Enter comments (optional): ")
if "" != comments:
item["comments"] = comments
if '' != comments: item['comments'] = comments
# Validation step
correct = ''
print(f"{media_type} data to add:\n")
print(json.dumps(item, indent=4))
if "y" != input("\nDoes this look correct? [y]: "):
return
if 'y' != input("\nDoes this look correct? [y]: "): return
# Save changes
logger.info(f"Adding {media_type} to {log}")
with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file:
with open(f"./data/{media_type}/{log}.json", "r") as log_file:
log_items = json.load(log_file)
log_items.insert(0, item)
with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file:
with open(f"./data/{media_type}/{log}.json", "w") as log_file:
json.dump(log_items, log_file, indent=4)
logger.info(f"Added {media_type} {item_id} to {log}")
def import_by_id(import_id, media_type) -> dict:
"""Import from the appropriate API by unique ID"""
if media_type in ["films", "tv-series"]:
def import_by_id(import_id, media_type):
if media_type in ['films', 'tv-series']:
return import_from_imdb_by_id(import_id, media_type)
if media_type in ["tv-episodes"]:
elif media_type in ['tv-episodes']:
return #import_from_tvdb_by_id(import_id, media_type)
if media_type in ["books"]:
elif media_type in ['books']:
return import_from_openlibrary_by_id(import_id, media_type)
def import_from_imdb_by_id(imdb_id, media_type) -> dict:
def import_from_imdb_by_id(imdb_id, media_type):
"""Retrieve a film, TV show or TV episode from TMDB using an IMDB ID"""
api_url = f"https://api.themoviedb.org/3/find/{imdb_id}"
@ -128,28 +116,27 @@ def import_from_imdb_by_id(imdb_id, media_type) -> dict:
# Sending API request
response = requests.get(
api_url,
params={"external_source": "imdb_id"},
headers={"Authorization": f"Bearer {TMDB_API_KEY}"},
timeout=15
params={
'external_source': 'imdb_id'
},
headers={'Authorization': f"Bearer {TMDB_API_KEY}"}
)
# Process the response
if 200 == response.status_code:
logger.debug(response.status_code)
if (200 == response.status_code):
logger.info(response.status_code)
elif 429 == response.status_code:
elif (429 == response.status_code):
time.sleep(2)
return import_from_imdb_by_id(imdb_id, media_type)
import_from_imdb_by_id(imdb_id, media_type)
return
else:
raise Exception(f"Error {response.status_code}: {response.text}")
if "films" == media_type:
results_key = "movie_results"
elif "tv-episodes" == media_type:
results_key = "TODO"
elif "tv-series" == media_type:
results_key = "tv_results"
if ('films' == media_type): results_key = 'movie_results'
elif ('tv-episodes' == media_type): results_key = 'TODO'
elif ('tv-series' == media_type): results_key = 'tv_results'
response_data = json.loads(response.text)[results_key]
@ -160,95 +147,109 @@ def import_from_imdb_by_id(imdb_id, media_type) -> dict:
raise Exception(f"Returned no results for {imdb_id}")
elif 1 < len(response_data):
logger.warning(f"Returned more than one {media_type} for ID '{imdb_id}'\n")
logger.warning(f"Returned more than one {media_type} for ID '{imdb_id}'")
print(f"Returned more than one {media_type} for ID '{imdb_id}':\n")
print(json.dumps(response_data, indent=4))
idx = input("\nEnter the index of the result to use: ")
try:
item = response_data[int(idx)]
except Exception as exc:
raise Exception(f"Index {idx} is invalid") from exc
except:
raise Exception(f"Index {idx} is invalid")
# Modify the returned result to add additional data
return cleanup_result(item, media_type)
def import_from_openlibrary_by_id(isbn, media_type) -> dict:
def import_from_openlibrary_by_id(isbn, media_type):
"""Retrieve a film, TV show or TV episode from TMDB using an IMDB ID"""
api_url = f"https://openlibrary.org/isbn/{isbn}"
# Sending API request
response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15)
response = requests.get(
api_url,
headers={'accept': 'application/json'}
)
# Process the response
if 200 == response.status_code:
logger.debug(response.status_code)
if (200 == response.status_code):
logger.info(response.status_code)
elif 429 == response.status_code:
elif (429 == response.status_code):
time.sleep(2)
return import_from_openlibrary_by_id(isbn, media_type)
import_from_openlibrary_by_id(isbn, media_type)
return
else:
raise Exception(f"Error {response.status_code}: {response.text}")
item = json.loads(response.text)
for key in ["authors", "works"]:
for key in ['authors', 'works']:
if key in item:
for i, sub_item in enumerate(item[key]):
item[key][i] = import_from_openlibrary_by_ol_key(sub_item["key"])
item[key][i] = import_from_openlibrary_by_ol_key(sub_item['key'])
if "works" in item:
if len(item["works"]) > 1:
if 'works' in item:
if len(item['works']) > 1:
raise Exception(f"Multiple works found for {isbn}")
item["work"] = item["works"][0]
del item["works"]
else:
item['work'] = item['works'][0]
del item['works']
# Modify the returned result to add additional data
return cleanup_result(item, media_type)
def import_from_openlibrary_by_ol_key(key) -> dict:
"""Retrieves an item (author or work, NOT edition) from OpenLibrary using an OL key"""
def import_from_openlibrary_by_ol_key(key):
"""Retrieves an item (author or work) from OpenLibrary using an OL key"""
_, mode, ol_id = key.split("/")
_, mode, ol_id = key.split('/')
if mode in ["works", "authors"]:
if mode in ['works', 'authors']:
api_url = f"https://openlibrary.org{key}"
# Sending API request
response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15)
response = requests.get(
api_url,
headers={'accept': 'application/json'}
)
# Process the response
if 200 == response.status_code:
logger.debug(response.status_code)
if (200 == response.status_code):
logger.info(response.status_code)
elif 429 == response.status_code:
elif (429 == response.status_code):
time.sleep(2)
return import_from_openlibrary_by_ol_key(key)
import_from_openlibrary_by_ol_key(key)
return
else:
raise Exception(f"Error {response.status_code}: {response.text}")
item = json.loads(response.text)
if "authors" == mode:
author = {"id": ol_id, "name": item["name"]}
if 'authors' == mode:
author = {
'id': ol_id,
'name': item['name']
}
if "personal_name" in item:
if item["name"] != item["personal_name"]:
author["personal_name"] = item["personal_name"]
if 'personal_name' in item:
if item['name'] != item['personal_name']: author['personal_name'] = item['personal_name']
return author
if "works" == mode:
work = {"id": ol_id, "title": item["title"]}
elif 'works' == mode:
work = {
'id': ol_id,
'title': item['title']
}
for result_key in ["first_publish_date", "subjects"]:
if result_key in item:
work[result_key] = item[result_key]
for key in ['first_publish_date', 'subjects']:
if key in item: work[key] = item[key]
return work
@ -256,148 +257,138 @@ def import_from_openlibrary_by_ol_key(key) -> dict:
raise Exception(f"Unknown OpenLibrary key '{mode}'")
def cleanup_result(item, media_type) -> dict:
"""Process a film, TV series, TV episode or book returned by their
respective APIs by removing unnecessary fields and adding others"""
def cleanup_result(item, media_type):
"""Process a film, TV series, TV episode or book returned by their respecitve APIs by removing unnecessary fields and adding others"""
for field_name in [
"adult", # TMDB
"backdrop_path", # TMDB
"copyright_date", # OpenLibrary
"classifications", # OpenLibrary
"created", # OpenLibrary
"episode_type", # TMDB
"first_sentence", # OpenLibrary
"genre_ids", # TMDB
"identifiers", # OpenLibrary
"media_type", # TMDB
"last_modified", # OpenLibrary
"latest_revision", # OpenLibrary
"lc_classifications", # OpenLibrary
"local_id", # OpenLibrary
"ocaid", # OpenLibrary
"oclc_numbers", # OpenLibrary
"popularity", # TMDB
"production_code", # TMDB
"revision", # OpenLibrary
"runtime", # TMDB
"source_records", # OpenLibrary
"still_path", # TMDB
"type", # OpenLibrary
"video", # TMDB
"vote_average", # TMDB
"vote_count", # TMDB
'adult', # TMDB
'backdrop_path', # TMDB
'copyright_date', # OpenLibrary
'classifications', # OpenLibrary
'created', # OpenLibrary
'episode_type', # TMDB
'first_sentence', # OpenLibrary
'genre_ids', # TMDB
'identifiers', # OpenLibrary
'media_type', # TMDB
'last_modified', # OpenLibrary
'latest_revision', # OpenLibrary
'lc_classifications', # OpenLibrary
'local_id', # OpenLibrary
'ocaid', # OpenLibrary
'oclc_numbers', # OpenLibrary
'popularity', # TMDB
'production_code', # TMDB
'revision', # OpenLibrary
'runtime', # TMDB
'source_records', # OpenLibrary
'still_path', # TMDB
'type', # OpenLibrary
'video', # TMDB
'vote_average', # TMDB
'vote_count' # TMDB
]:
if field_name in item:
del item[field_name]
if field_name in item: del item[field_name]
if media_type in ["films", "tv-series"]:
title_key = "name" if "tv-series" == media_type else "title"
if media_type in ['films', 'tv-series']:
title_key = 'name' if 'tv-series' == media_type else 'title'
if f"original_{title_key}" in item and "original_language" in item:
if (
item[f"original_{title_key}"] == item[title_key]
and item["original_language"] == "en"
):
del item[f"original_{title_key}"], item["original_language"]
if f"original_{title_key}" in item and 'original_language' in item:
if item[f"original_{title_key}"] == item[title_key] and item['original_language'] == 'en':
del item[f"original_{title_key}"], item['original_language']
if "books" == media_type:
_, _, item["id"] = item["key"].split("/")
del item["key"]
if 'books' == media_type:
_, _, item['id'] = item['key'].split('/')
del item['key']
for key in ["isbn_10", "isbn_13"]:
for key in ['isbn_10', 'isbn_13']:
if key in item:
if len(item[key]) > 1:
raise Exception("Multiple ISBN results")
else:
item[key] = item[key][0]
if "publish_places" in item:
if len(item["publish_places"]) > 1:
if 'publish_places' in item:
if len(item['publish_places']) > 1:
raise Exception("Multiple publish_places")
item["published_in"] = item["publish_places"][0]
del item["publish_places"]
if "languages" in item:
item["languages"] = [
lang["key"].split("/")[2] for lang in item["languages"]
]
if "translation_of" in item:
if item["translation_of"] == item["work"]["title"]:
del item["translation_of"]
else:
raise Exception(
f"translation_of '{item['translation_of']}' \
is different to work title '{item['work']['title']}'"
)
item['published_in'] = item['publish_places'][0]
del item['publish_places']
if "translated_from" in item:
if len(item["translated_from"]) > 1:
if 'languages' in item:
item['languages'] = [lang['key'].split('/')[2] for lang in item['languages']]
if 'translation_of' in item:
if item['translation_of'] == item['work']['title']:
del item['translation_of']
else:
raise Exception(f"translation_of '{item['translation_of']}' is different to work title '{item['work']['title']}'")
if 'translated_from' in item:
if len(item['translated_from']) > 1:
raise Exception("Multiple translated_from results")
item["work"]["original_language"] = item["translated_from"][0][
"key"
].split("/")[2]
del item["translated_from"]
else:
item['work']['original_language'] = item['translated_from'][0]['key'].split('/')[2]
del item['translated_from']
if "date_added" not in item:
item["date_added"] = datetime.today().strftime("%Y-%m-%d")
if 'date_added' not in item: item['date_added'] = datetime.today().strftime('%Y-%m-%d')
return item
def main() -> None:
"""Prompt user to select media type and log to process"""
media_type = ""
while media_type not in ["films", "tv-episodes", "tv-series", "books"]:
def main():
media_type = ''
while media_type not in ['films', 'tv-episodes', 'tv-series', 'books']:
media_type = input("Select media type [films|tv-episodes|tv-series|books]: ")
try:
if "films" == media_type:
log = ""
while log not in ["log", "wishlist"]:
if 'films' == media_type:
log = ''
while log not in ['log', 'wishlist']:
log = input ("Enter log to update [log|wishlist]: ")
imdb_id = ""
imdb_id = ''
while re.search("tt[0-9]+", imdb_id) is None:
imdb_id = input("Enter IMDB ID: ")
add_item_to_log(imdb_id, media_type, log)
elif "books" == media_type:
log = ""
while log not in ["log", "current", "wishlist"]:
elif 'books' == media_type:
log = ''
while log not in ['log', 'current', 'wishlist']:
log = input ("Enter log to update [log|current|wishlist]: ")
isbn = ""
isbn = ''
while re.search("[0-9]+", isbn) is None:
isbn = input("Enter ISBN: ")
add_item_to_log(isbn, media_type, log)
elif "tv-episodes" == media_type:
imdb_id = ""
elif 'tv-episodes' == media_type:
imdb_id = ''
while re.search("tt[0-9]+", imdb_id) is None:
imdb_id = input("Enter IMDB ID: ")
add_item_to_log(imdb_id, media_type, "log")
add_item_to_log(imdb_id, media_type, 'log')
elif "tv-series" == media_type:
log = ""
while log not in ["log", "current", "wishlist"]:
elif 'tv-series' == media_type:
log = ''
while log not in ['log', 'current', 'wishlist']:
log = input ("Enter log to update [log|current|wishlist]: ")
imdb_id = ""
imdb_id = ''
while re.search("tt[0-9]+", imdb_id) is None:
imdb_id = input("Enter IMDB ID: ")
add_item_to_log(imdb_id, media_type, log)
except Exception:
except Exception as error:
logger.exception("Exception occurred")
print(error)
if __name__ == "__main__":

View file

@ -1,163 +1,130 @@
"""
Process logs derived from social cataloguing site data exports, using various APIs.
"""
from dotenv import load_dotenv
import json
import os
import re
import time
import requests
from dotenv import load_dotenv
import time
from urllib.request import urlopen
from add_item import cleanup_result, import_by_id, setup_logger
logger = setup_logger("process_logs")
logger = setup_logger(__name__)
load_dotenv()
TMDB_API_KEY = os.getenv("TMDB_API_KEY")
TVDB_API_KEY = os.getenv("TVDB_API_KEY")
TMDB_API_KEY = os.getenv('TMDB_API_KEY')
TVDB_API_KEY = os.getenv('TVDB_API_KEY')
if "" == TMDB_API_KEY:
logger.warning("TMDB API key not found")
if "" == TVDB_API_KEY:
logger.warning("TVDB API key not found")
if "" == TMDB_API_KEY: logger.warning("TMDB API key not found")
if "" == TVDB_API_KEY: logger.warning("TVDB API key not found")
def process_log(media_type, log) -> None:
def process_log(media_type, log):
"""Run through a log and call the appropriate API for each item found"""
logger.info(f"Processing {media_type}/{log}")
with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file:
with open(f"./data/{media_type}/{log}.json", "r") as log_file:
log_items = json.load(log_file)
log_item_values = {}
for i, item in enumerate(log_items):
try:
if "id" not in item:
if "films" == media_type:
item_title = item["Title"]
elif "tv-episodes" == media_type:
item_title = item["Episode Title"]
elif "tv-series" == media_type:
item_title = item["Show Title"]
if 'id' not in item:
if 'films' == media_type: item_title = item['Title']
elif 'tv-episodes' == media_type: item_title = item['Episode Title']
elif 'tv-series' == media_type: item_title = item['Show Title']
logger.debug(f"Processing {item_title}")
# Rename pre-existing fields
if "Date Added" in item:
log_item_values["date_added"] = item["Date Added"]
del item["Date Added"]
if 'Date Added' in item:
log_item_values['date_added'] = item['Date Added']
del item['Date Added']
if "Date Watched" in item:
log_item_values["date_finished"] = item["Date Watched"]
del item["Date Watched"]
if 'Date Watched' in item:
log_item_values['date_finished'] = item['Date Watched']
del item['Date Watched']
if "Rewatch" in item:
log_item_values["is_repeat"] = item["Rewatch"]
del item["Rewatch"]
if 'Rewatch' in item:
log_item_values['is_repeat'] = item['Rewatch']
del item['Rewatch']
if "Comments" in item:
log_item_values["comments"] = item["Comments"]
del item["Comments"]
if 'Comments' in item:
log_item_values['comments'] = item['Comments']
del item['Comments']
if "Series Title" in item:
log_item_values["series_title"] = item["Series Title"]
del item["Series Title"]
if 'Series Title' in item:
log_item_values['series_title'] = item['Series Title']
del item['Series Title']
if "Episode Title" in item:
log_item_values["name"] = item["Episode Title"]
del item["Episode Title"]
if 'Episode Title' in item:
log_item_values['name'] = item['Episode Title']
del item['Episode Title']
if "Episode Number" in item:
if re.search("[0-9]+x[0-9]+", item["Episode Number"]) is not None:
season_no, _, episode_no = log_item_values[
"episode_number"
].split("x")
if 'Episode Number' in item:
if re.search("[0-9]+x[0-9]+", item['Episode Number']) is not None:
season_no, _, episode_no = log_item_values['episode_number'].split("x")
elif (
re.search("S[0-9]+E[0-9]+", item["Episode Number"]) is not None
):
season_no, _, episode_no = log_item_values[
"episode_number"
].split("E")
elif re.search("S[0-9]+E[0-9]+", item['Episode Number']) is not None:
season_no, _, episode_no = log_item_values['episode_number'].split("E")
elif re.search("E[0-9]+", item["Episode Number"]) is not None:
elif re.search("E[0-9]+", item['Episode Number']) is not None:
season_no = None
episode_no = item["episode_number"][1:]
episode_no = item['episode_number'][1:]
else:
logger.error(
f"Invalid episode number format '{item['Episode Number']}'"
)
logger.error(f"Invalid episode number format '{item['Episode Number']}'")
return
log_item_values["season_number"] = season_no
log_item_values["episode_number"] = episode_no
del item["Episode Number"]
log_item_values['season_number'] = season_no
log_item_values['episode_number'] = episode_no
del item['Episode Number']
if "IMDB ID" in item and item["IMDB ID"] != "":
new_log_item = import_by_id(item["IMDB ID"], media_type)
if 'IMDB ID' in item:
log_items[i] = import_by_id(item['IMDB ID'], media_type)
else:
new_log_item = import_by_details(item, item_title, media_type)
log_items[i] = import_by_details(item, item_title, media_type)
if new_log_item is None:
item["imdb_id"] = input(f"Enter IMDB ID for {item_title}: ")
if log_items[i] is None:
item['imdb_id'] = input(f"Enter IMDB ID for {item_title}: ")
if re.search("tt[0-9]+", item["imdb_id"]) is not None:
log_items[i] = import_by_id(item["imdb_id"], media_type)
if re.search("tt[0-9]+", item['imdb_id']) is not None:
log_items[i] = import_by_id(item['imdb_id'], media_type)
with open(
f"./data/{media_type}/{log}.json",
"w",
encoding='utf-8'
) as log_file:
with open(f"./data/{media_type}/{log}.json", "w") as log_file:
json.dump(log_items, log_file, indent=4)
else:
logger.warning(f"Skipped {item_title}")
else:
log_items[i] = new_log_item
if i % 15 == 0:
with open(
f"./data/{media_type}/{log}.json",
"w",
encoding='utf-8'
) as log_file:
json.dump(log_items, log_file, indent=4)
if log_items[i] is not None:
log_items[i] |= log_item_values
if log_items[i] is not None: log_items[i] |= log_item_values
except KeyError:
print(json.dumps(item, indent=4))
with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file:
with open(f"./data/{media_type}/{log}.json", "w") as log_file:
json.dump(log_items, log_file, indent=4)
logger.info(f"Finished processing {media_type}/{log}")
def import_by_details(item, item_title, media_type) -> dict:
def import_by_details(item, item_title, media_type):
"""Import an item when lacking a unique identifier"""
if media_type in ["films", "tv-series"]:
if media_type in ['films', 'tv-series']:
return import_from_tmdb_by_details(item, item_title, media_type)
if media_type in ["tv-episodes"]:
elif media_type in ['tv-episodes']:
return #import_from_tvdb_by_details(item, item_title, media_type)
if media_type in ["books"]:
elif media_type in ['books']:
return #import_from_openlibrary_by_details(item, item_title, media_type)
if media_type in ["games"]:
elif media_type in ['games']:
return #import_from_igdb_by_details(item, item_title, media_type)
def import_from_tmdb_by_details(item, item_title, media_type) -> dict:
def import_from_tmdb_by_details(item, item_title, media_type):
"""Retrieve a film or TV series from TMDB using its title"""
logger.info(f"Processing {item_title}")
@ -168,112 +135,96 @@ def import_from_tmdb_by_details(item, item_title, media_type) -> dict:
response = requests.get(
api_url,
params={
"query": item_title,
"include_adult": True,
"year": item["Release Year"] if "Release Year" in item else None,
'query': item_title,
'include_adult': True,
'year': item['Release Year'] if 'Release Year' in item else None
},
headers={"Authorization": f"Bearer {TMDB_API_KEY}"},
timeout=15
headers={'Authorization': f"Bearer {TMDB_API_KEY}"}
)
# Process the response
if 200 == response.status_code:
logger.debug(response.status_code)
elif 429 == response.status_code:
if (200 == response.status_code):
logger.info(response.status_code)
elif (429 == response.status_code):
time.sleep(2)
return import_from_tmdb_by_details(item, item_title, media_type)
import_from_tmdb_by_details(item)
else:
logger.error(response.text)
response_data = json.loads(response.text)["results"]
response_data = json.loads(response.text)['results']
if 1 == len(response_data):
return cleanup_result(response_data[0], media_type)
if 0 == len(response_data):
elif 0 == len(response_data):
logger.warning(f"Returned no {media_type} for {item_title}")
elif 1 < len(response_data):
if "films" == media_type:
title_key = "title"
elif "tv-series" == media_type:
title_key = "name"
if 'films' == media_type: title_key = 'title'
elif 'tv-series' == media_type: title_key = 'name'
filtered_response_data = [
result for result in response_data if result[title_key] == item_title
]
frd_len = len(filtered_response_data)
response_data = [result for result in response_data if result[title_key] == item_title]
if 1 == frd_len:
if 1 == len(response_data):
return cleanup_result(response_data[0], media_type)
else:
logger.warning(f"Returned more than one {media_type} for '{item_title}':\n")
print(
json.dumps(
filtered_response_data
if len(filtered_response_data) > 0
else response_data,
indent=4,
)
)
last_index = len(filtered_response_data if frd_len > 0 else response_data) - 1
idx = input(
f"\nEnter the index of the result to use [0-{last_index}]: "
)
print(json.dumps(response_data, indent=4))
idx = input("\nEnter the index of the result to use: ")
if "" != idx:
try:
return cleanup_result(response_data[int(idx)], media_type)
except Exception as exc:
raise Exception("Index invalid") from exc
except:
logger.error("Index invalid!")
print("Index invalid!")
item["IMDB ID"] = input(f"Enter IMDB ID for {item_title}: ")
if "" != item["IMDB ID"]:
return import_by_id(item["IMDB ID"], media_type)
item['IMDB ID'] = input(f"Enter IMDB ID for {item_title}: ")
if '' != item['IMDB ID']:
return import_by_id(item['IMDB ID'], media_type)
else:
logger.warning(f"Skipped {item_title}")
return item
def main() -> None:
"""Prompt user to select media type and log to process"""
media_type = ""
while media_type not in ["films", "tv-episodes", "tv-series", "books"]:
def main():
media_type = ''
while media_type not in ['films', 'tv-episodes', 'tv-series', 'books']:
media_type = input("Select media type [films|tv-episodes|tv-series|books]: ")
try:
if "films" == media_type:
log = ""
while log not in ["log", "wishlist"]:
if 'films' == media_type:
log = ''
while log not in ['log', 'wishlist']:
log = input ("Enter log to process [log|wishlist]:")
process_log(media_type, log)
elif "books" == media_type:
log = ""
while log not in ["log", "current", "wishlist"]:
elif 'books' == media_type:
log = ''
while log not in ['log', 'current', 'wishlist']:
log = input ("Enter log to process [log|current|wishlist]:")
# TODO
elif "tv-episodes" == media_type:
process_log(media_type, "log")
elif 'tv-episodes' == media_type:
process_log(media_type, 'log')
# TODO
elif "tv-series" == media_type:
log = ""
while log not in ["log", "current", "wishlist"]:
elif 'tv-series' == media_type:
log = ''
while log not in ['log', 'current', 'wishlist']:
log = input ("Enter log to process [log|current|wishlist]:")
process_log(media_type, log)
except Exception:
except Exception as error:
logger.exception("Exception occurred")
print(error)
if __name__ == "__main__":