add autodetection of duplicate entries
This commit is contained in:
parent
ed355c2440
commit
5d25e3bb74
3 changed files with 206 additions and 90 deletions
|
@ -6,7 +6,7 @@
|
||||||
"build": "rm -rf ./public/ && snap run hugo --templateMetrics --templateMetricsHints",
|
"build": "rm -rf ./public/ && snap run hugo --templateMetrics --templateMetricsHints",
|
||||||
"deploy": "rsync -rP ./public/ ovhvps:~/catalogue/content",
|
"deploy": "rsync -rP ./public/ ovhvps:~/catalogue/content",
|
||||||
"add": "python ./scripts/add_item.py",
|
"add": "python ./scripts/add_item.py",
|
||||||
"process": "python ./scripts/process_items.py",
|
"process": "python ./scripts/process_logs.py",
|
||||||
"lint:json": "jsonlint ./**/*.json -s",
|
"lint:json": "jsonlint ./**/*.json -s",
|
||||||
"lint:json:fix": "npm run lint:json -- -i",
|
"lint:json:fix": "npm run lint:json -- -i",
|
||||||
"lint:py": "pylint --disable=broad-exception-raised --disable=logging-fstring-interpolation ./scripts/*.py"
|
"lint:py": "pylint --disable=broad-exception-raised --disable=logging-fstring-interpolation ./scripts/*.py"
|
||||||
|
|
|
@ -11,19 +11,19 @@ from datetime import datetime
|
||||||
import requests
|
import requests
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
|
|
||||||
|
authors = []
|
||||||
|
|
||||||
def setup_logger(name="add_item"):
|
def setup_logger(name="add_item"):
|
||||||
"""Set up the logger for console and file"""
|
"""Set up the logger for console and file"""
|
||||||
|
|
||||||
logging.root.setLevel(logging.NOTSET)
|
|
||||||
|
|
||||||
logr = logging.getLogger(name)
|
logr = logging.getLogger(name)
|
||||||
|
|
||||||
c_handler = logging.StreamHandler()
|
c_handler = logging.StreamHandler()
|
||||||
f_handler = logging.FileHandler("./logs/run.log")
|
f_handler = logging.FileHandler("./logs/run.log")
|
||||||
|
|
||||||
|
logging.root.setLevel(logging.INFO)
|
||||||
c_handler.setLevel(logging.INFO)
|
c_handler.setLevel(logging.INFO)
|
||||||
f_handler.setLevel(logging.ERROR)
|
f_handler.setLevel(logging.WARNING)
|
||||||
|
|
||||||
c_format = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
|
c_format = logging.Formatter("%(name)s - %(levelname)s - %(message)s")
|
||||||
f_format = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
f_format = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
|
||||||
|
@ -50,16 +50,73 @@ if "" == TVDB_API_KEY:
|
||||||
logger.error("TVDB API key not found")
|
logger.error("TVDB API key not found")
|
||||||
|
|
||||||
|
|
||||||
|
def return_if_exists(item_id, media_type, log) -> dict|None:
|
||||||
|
"""Returns an item if it exists in the requested log"""
|
||||||
|
|
||||||
|
logger.info(f"Checking for '{item_id}' in '{log}'…")
|
||||||
|
with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file:
|
||||||
|
log_items = json.load(log_file)
|
||||||
|
|
||||||
|
existing_items = [log_item for log_item in log_items if "id" in log_item and log_item['id'] == int(item_id)]
|
||||||
|
if len(existing_items) > 0:
|
||||||
|
logger.info(f"Found item in '{log}'")
|
||||||
|
return existing_items[-1]
|
||||||
|
logger.info(f"'{item_id}' not found in '{log}'")
|
||||||
|
|
||||||
|
|
||||||
|
def delete_existing(item_id, media_type, log) -> None:
|
||||||
|
"""Deletes an item from a log if it matches the ID"""
|
||||||
|
|
||||||
|
logger.info(f"Deleting '{item_id}' from '{log}'…")
|
||||||
|
with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file:
|
||||||
|
log_items = json.load(log_file)
|
||||||
|
|
||||||
|
old_len = len(log_items)
|
||||||
|
log_items = [log_item for log_item in log_items if "id" not in log_item or ("id" in log_item and log_item['id'] != int(item_id))]
|
||||||
|
if len(log_items) < (old_len - 1):
|
||||||
|
raise Exception("More than one deletion made, discarding…")
|
||||||
|
|
||||||
|
with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file:
|
||||||
|
json.dump(log_items, log_file, indent=4)
|
||||||
|
logger.info(f"'{item_id}' deleted from '{log}'")
|
||||||
|
|
||||||
|
|
||||||
|
def check_for_existing(item_id, media_type, log) -> dict[dict, str]:
|
||||||
|
"""Check for an existing item and move it to the specified log if requested"""
|
||||||
|
|
||||||
|
logger.info(f"Checking for '{item_id}' in logs…")
|
||||||
|
|
||||||
|
# Check in specified log
|
||||||
|
existing_item = return_if_exists(item_id, media_type, log)
|
||||||
|
|
||||||
|
if existing_item is not None:
|
||||||
|
if "log" == log:
|
||||||
|
existing_item["is_repeat"] = True
|
||||||
|
return existing_item, None
|
||||||
|
|
||||||
|
for log_to_check in [p_log for p_log in ["log", "current", "wishlist"] if p_log != log]:
|
||||||
|
if ("current" == log_to_check and media_type in ["books", "games", "tv-series"]) or ("wishlist" == log_to_check and media_type in ["books", "games", "films", "tv-series"]):
|
||||||
|
existing_item = return_if_exists(item_id, media_type, log_to_check)
|
||||||
|
if existing_item is not None:
|
||||||
|
return existing_item, log_to_check
|
||||||
|
|
||||||
|
return None, None
|
||||||
|
|
||||||
|
|
||||||
def add_item_to_log(item_id, media_type, log) -> None:
|
def add_item_to_log(item_id, media_type, log) -> None:
|
||||||
"""Add a film, book, TV series or TV episode to a log"""
|
"""Add a film, book, TV series or TV episode to a log"""
|
||||||
|
|
||||||
logger.info(f"Processing {item_id}…")
|
logger.info(f"Processing {item_id}…")
|
||||||
|
|
||||||
item: dict = import_by_id(item_id, media_type)
|
item, log_to_delete = check_for_existing(item_id, media_type, log)
|
||||||
|
|
||||||
|
if item is None:
|
||||||
|
item = import_by_id(item_id, media_type)
|
||||||
|
if item is None:
|
||||||
|
raise Exception("No item found")
|
||||||
|
|
||||||
if log in ["log", "current"]:
|
if log in ["log", "current"]:
|
||||||
# TODO - review this when moving from one log to another
|
if "date_started" not in item and media_type in ["books", "tv-series", "games"]:
|
||||||
if media_type in ["books", "tv-series", "games"]:
|
|
||||||
date_started = ""
|
date_started = ""
|
||||||
while re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}", date_started) is None:
|
while re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}", date_started) is None:
|
||||||
date_started = input("Enter date started [YYYY-MM-DD, t for today]: ")
|
date_started = input("Enter date started [YYYY-MM-DD, t for today]: ")
|
||||||
|
@ -67,7 +124,7 @@ def add_item_to_log(item_id, media_type, log) -> None:
|
||||||
date_started = datetime.today().strftime("%Y-%m-%d")
|
date_started = datetime.today().strftime("%Y-%m-%d")
|
||||||
item["date_started"] = date_started
|
item["date_started"] = date_started
|
||||||
|
|
||||||
if "log" == log:
|
if "date_finished" not in item and "log" == log:
|
||||||
date_finished = ""
|
date_finished = ""
|
||||||
while re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}", date_finished) is None:
|
while re.search("[0-9]{4}-[0-9]{2}-[0-9]{2}", date_finished) is None:
|
||||||
date_finished = input("Enter date finished [YYYY-MM-DD, t for today]: ")
|
date_finished = input("Enter date finished [YYYY-MM-DD, t for today]: ")
|
||||||
|
@ -75,14 +132,17 @@ def add_item_to_log(item_id, media_type, log) -> None:
|
||||||
date_finished = datetime.today().strftime("%Y-%m-%d")
|
date_finished = datetime.today().strftime("%Y-%m-%d")
|
||||||
item["date_finished"] = date_finished
|
item["date_finished"] = date_finished
|
||||||
|
|
||||||
# TODO - do this automatically
|
if "is_repeat" not in item:
|
||||||
is_repeat = ""
|
is_repeat = ""
|
||||||
while is_repeat not in ["y", "n"]:
|
while is_repeat not in ["y", "n"]:
|
||||||
is_repeat = input("Is this a repeat entry? [y/n]: ")
|
is_repeat = input("Is this a repeat entry? [y/n]: ")
|
||||||
if "y" == is_repeat:
|
if "y" == is_repeat:
|
||||||
item["is_repeat"] = True
|
item["is_repeat"] = True
|
||||||
|
|
||||||
|
if "added_by_id" not in item:
|
||||||
item["added_by_id"] = item_id
|
item["added_by_id"] = item_id
|
||||||
|
|
||||||
|
if "comments" not in item:
|
||||||
comments = input("Enter comments (optional): ")
|
comments = input("Enter comments (optional): ")
|
||||||
if "" != comments:
|
if "" != comments:
|
||||||
item["comments"] = comments
|
item["comments"] = comments
|
||||||
|
@ -106,12 +166,15 @@ def add_item_to_log(item_id, media_type, log) -> None:
|
||||||
|
|
||||||
logger.info(f"Added {media_type} {item_id} to {log}")
|
logger.info(f"Added {media_type} {item_id} to {log}")
|
||||||
|
|
||||||
|
if log_to_delete is not None:
|
||||||
|
delete_existing(item_id, media_type, log_to_delete)
|
||||||
|
|
||||||
|
|
||||||
def import_by_id(import_id, media_type) -> dict:
|
def import_by_id(import_id, media_type) -> dict:
|
||||||
"""Import from the appropriate API by unique ID"""
|
"""Import from the appropriate API by unique ID"""
|
||||||
|
|
||||||
if media_type in ["films", "tv-series"]:
|
if media_type in ["films", "tv-series"]:
|
||||||
return import_from_imdb_by_id(import_id, media_type)
|
return import_from_tmdb_by_id(import_id, media_type)
|
||||||
|
|
||||||
if media_type in ["tv-episodes"]:
|
if media_type in ["tv-episodes"]:
|
||||||
return #import_from_tvdb_by_id(import_id, media_type)
|
return #import_from_tvdb_by_id(import_id, media_type)
|
||||||
|
@ -120,15 +183,15 @@ def import_by_id(import_id, media_type) -> dict:
|
||||||
return import_from_openlibrary_by_id(import_id, media_type)
|
return import_from_openlibrary_by_id(import_id, media_type)
|
||||||
|
|
||||||
|
|
||||||
def import_from_imdb_by_id(imdb_id, media_type) -> dict:
|
def import_from_tmdb_by_id(tmdb_id, media_type) -> dict:
|
||||||
"""Retrieve a film, TV show or TV episode from TMDB using an IMDB ID"""
|
"""Retrieve a film, TV show or TV episode from TMDB using an IMDB ID"""
|
||||||
|
|
||||||
api_url = f"https://api.themoviedb.org/3/find/{imdb_id}"
|
api_path = "movie" if "films" == media_type else "tv"
|
||||||
|
api_url = f"https://api.themoviedb.org/3/{api_path}/{tmdb_id}"
|
||||||
|
|
||||||
# Sending API request
|
# Sending API request
|
||||||
response = requests.get(
|
response = requests.get(
|
||||||
api_url,
|
api_url,
|
||||||
params={"external_source": "imdb_id"},
|
|
||||||
headers={"Authorization": f"Bearer {TMDB_API_KEY}"},
|
headers={"Authorization": f"Bearer {TMDB_API_KEY}"},
|
||||||
timeout=15
|
timeout=15
|
||||||
)
|
)
|
||||||
|
@ -139,35 +202,21 @@ def import_from_imdb_by_id(imdb_id, media_type) -> dict:
|
||||||
|
|
||||||
elif 429 == response.status_code:
|
elif 429 == response.status_code:
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
return import_from_imdb_by_id(imdb_id, media_type)
|
return import_from_tmdb_by_id(tmdb_id, media_type)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise Exception(f"Error {response.status_code}: {response.text}")
|
raise Exception(f"Error {response.status_code}: {response.text}")
|
||||||
|
|
||||||
if "films" == media_type:
|
if "tv-episodes" == media_type:
|
||||||
results_key = "movie_results"
|
raise Exception("TV Episodes are TODO!")
|
||||||
elif "tv-episodes" == media_type:
|
|
||||||
results_key = "TODO"
|
|
||||||
elif "tv-series" == media_type:
|
|
||||||
results_key = "tv_results"
|
|
||||||
|
|
||||||
response_data = json.loads(response.text)[results_key]
|
response_data = json.loads(response.text)
|
||||||
|
|
||||||
if 1 == len(response_data):
|
if 1 == len(response_data):
|
||||||
item = response_data[0]
|
item = response_data[0]
|
||||||
|
|
||||||
elif 0 == len(response_data):
|
elif 0 == len(response_data):
|
||||||
raise Exception(f"Returned no results for {imdb_id}")
|
raise Exception(f"Returned no results for {tmdb_id}")
|
||||||
|
|
||||||
elif 1 < len(response_data):
|
|
||||||
logger.warning(f"Returned more than one {media_type} for ID '{imdb_id}'\n")
|
|
||||||
print(json.dumps(response_data, indent=4))
|
|
||||||
idx = input("\nEnter the index of the result to use: ")
|
|
||||||
try:
|
|
||||||
item = response_data[int(idx)]
|
|
||||||
|
|
||||||
except Exception as exc:
|
|
||||||
raise Exception(f"Index {idx} is invalid") from exc
|
|
||||||
|
|
||||||
# Modify the returned result to add additional data
|
# Modify the returned result to add additional data
|
||||||
return cleanup_result(item, media_type)
|
return cleanup_result(item, media_type)
|
||||||
|
@ -176,6 +225,8 @@ def import_from_imdb_by_id(imdb_id, media_type) -> dict:
|
||||||
def import_from_openlibrary_by_id(isbn, media_type) -> dict:
|
def import_from_openlibrary_by_id(isbn, media_type) -> dict:
|
||||||
"""Retrieve a film, TV show or TV episode from TMDB using an IMDB ID"""
|
"""Retrieve a film, TV show or TV episode from TMDB using an IMDB ID"""
|
||||||
|
|
||||||
|
logging.info(f"Importing '{isbn}'…")
|
||||||
|
|
||||||
api_url = f"https://openlibrary.org/isbn/{isbn}"
|
api_url = f"https://openlibrary.org/isbn/{isbn}"
|
||||||
|
|
||||||
# Sending API request
|
# Sending API request
|
||||||
|
@ -189,6 +240,9 @@ def import_from_openlibrary_by_id(isbn, media_type) -> dict:
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
return import_from_openlibrary_by_id(isbn, media_type)
|
return import_from_openlibrary_by_id(isbn, media_type)
|
||||||
|
|
||||||
|
elif 404 == response.status_code:
|
||||||
|
logger.error(f"{response.status_code}: Not Found for ISBN '{isbn}'")
|
||||||
|
return None
|
||||||
else:
|
else:
|
||||||
raise Exception(f"Error {response.status_code}: {response.text}")
|
raise Exception(f"Error {response.status_code}: {response.text}")
|
||||||
|
|
||||||
|
@ -199,6 +253,7 @@ def import_from_openlibrary_by_id(isbn, media_type) -> dict:
|
||||||
for i, sub_item in enumerate(item[key]):
|
for i, sub_item in enumerate(item[key]):
|
||||||
item[key][i] = import_from_openlibrary_by_ol_key(sub_item["key"])
|
item[key][i] = import_from_openlibrary_by_ol_key(sub_item["key"])
|
||||||
|
|
||||||
|
|
||||||
if "works" in item:
|
if "works" in item:
|
||||||
if len(item["works"]) > 1:
|
if len(item["works"]) > 1:
|
||||||
raise Exception(f"Multiple works found for {isbn}")
|
raise Exception(f"Multiple works found for {isbn}")
|
||||||
|
@ -206,6 +261,9 @@ def import_from_openlibrary_by_id(isbn, media_type) -> dict:
|
||||||
item["work"] = item["works"][0]
|
item["work"] = item["works"][0]
|
||||||
del item["works"]
|
del item["works"]
|
||||||
|
|
||||||
|
# Rate limiting
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
# Modify the returned result to add additional data
|
# Modify the returned result to add additional data
|
||||||
return cleanup_result(item, media_type)
|
return cleanup_result(item, media_type)
|
||||||
|
|
||||||
|
@ -213,9 +271,20 @@ def import_from_openlibrary_by_id(isbn, media_type) -> dict:
|
||||||
def import_from_openlibrary_by_ol_key(key) -> dict:
|
def import_from_openlibrary_by_ol_key(key) -> dict:
|
||||||
"""Retrieves an item (author or work, NOT edition) from OpenLibrary using an OL key"""
|
"""Retrieves an item (author or work, NOT edition) from OpenLibrary using an OL key"""
|
||||||
|
|
||||||
|
logger.info(f"Retrieving {key}…")
|
||||||
_, mode, ol_id = key.split("/")
|
_, mode, ol_id = key.split("/")
|
||||||
|
|
||||||
|
if "authors" == mode:
|
||||||
|
with open(f"./scripts/caching/authors.json", "r", encoding='utf-8') as authors_cache:
|
||||||
|
cached_authors = json.load(authors_cache)
|
||||||
|
|
||||||
if mode in ["works", "authors"]:
|
if mode in ["works", "authors"]:
|
||||||
|
if "authors" == mode:
|
||||||
|
matched_cached_authors = [aut for aut in cached_authors if aut['id'] == ol_id]
|
||||||
|
if len(matched_cached_authors) == 1:
|
||||||
|
logging.info(f"Found cached author '{matched_cached_authors[0]['name']}'")
|
||||||
|
return matched_cached_authors[0]
|
||||||
|
|
||||||
api_url = f"https://openlibrary.org{key}"
|
api_url = f"https://openlibrary.org{key}"
|
||||||
|
|
||||||
# Sending API request
|
# Sending API request
|
||||||
|
@ -227,11 +296,14 @@ def import_from_openlibrary_by_ol_key(key) -> dict:
|
||||||
|
|
||||||
elif 429 == response.status_code:
|
elif 429 == response.status_code:
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
return import_from_openlibrary_by_ol_key(key)
|
import_from_openlibrary_by_ol_key(key)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
raise Exception(f"Error {response.status_code}: {response.text}")
|
raise Exception(f"Error {response.status_code}: {response.text}")
|
||||||
|
|
||||||
|
# Rate limiting
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
item = json.loads(response.text)
|
item = json.loads(response.text)
|
||||||
|
|
||||||
if "authors" == mode:
|
if "authors" == mode:
|
||||||
|
@ -241,6 +313,16 @@ def import_from_openlibrary_by_ol_key(key) -> dict:
|
||||||
if item["name"] != item["personal_name"]:
|
if item["name"] != item["personal_name"]:
|
||||||
author["personal_name"] = item["personal_name"]
|
author["personal_name"] = item["personal_name"]
|
||||||
|
|
||||||
|
logger.info(f"Caching author '{author['name']}'…")
|
||||||
|
cached_authors.append(author)
|
||||||
|
with open(
|
||||||
|
f"./scripts/caching/authors.json",
|
||||||
|
"w",
|
||||||
|
encoding='utf-8'
|
||||||
|
) as authors_cache:
|
||||||
|
json.dump(cached_authors, authors_cache, indent=4)
|
||||||
|
logger.info(f"Author '{author['name']}' cached!")
|
||||||
|
|
||||||
return author
|
return author
|
||||||
|
|
||||||
if "works" == mode:
|
if "works" == mode:
|
||||||
|
@ -266,6 +348,7 @@ def cleanup_result(item, media_type) -> dict:
|
||||||
"copyright_date", # OpenLibrary
|
"copyright_date", # OpenLibrary
|
||||||
"classifications", # OpenLibrary
|
"classifications", # OpenLibrary
|
||||||
"created", # OpenLibrary
|
"created", # OpenLibrary
|
||||||
|
"dewey_decimal_class", # OpenLibary
|
||||||
"episode_type", # TMDB
|
"episode_type", # TMDB
|
||||||
"first_sentence", # OpenLibrary
|
"first_sentence", # OpenLibrary
|
||||||
"genre_ids", # TMDB
|
"genre_ids", # TMDB
|
||||||
|
@ -274,19 +357,27 @@ def cleanup_result(item, media_type) -> dict:
|
||||||
"last_modified", # OpenLibrary
|
"last_modified", # OpenLibrary
|
||||||
"latest_revision", # OpenLibrary
|
"latest_revision", # OpenLibrary
|
||||||
"lc_classifications", # OpenLibrary
|
"lc_classifications", # OpenLibrary
|
||||||
|
"lccn", # OpenLibrary
|
||||||
"local_id", # OpenLibrary
|
"local_id", # OpenLibrary
|
||||||
|
"notes", # OpenLibrary
|
||||||
"ocaid", # OpenLibrary
|
"ocaid", # OpenLibrary
|
||||||
"oclc_numbers", # OpenLibrary
|
"oclc_numbers", # OpenLibrary
|
||||||
|
"pagination", # OpenLibrary
|
||||||
|
"physical_dimensions", # OpenLibrary
|
||||||
"popularity", # TMDB
|
"popularity", # TMDB
|
||||||
"production_code", # TMDB
|
"production_code", # TMDB
|
||||||
"revision", # OpenLibrary
|
"revision", # OpenLibrary
|
||||||
"runtime", # TMDB
|
"runtime", # TMDB
|
||||||
"source_records", # OpenLibrary
|
"source_records", # OpenLibrary
|
||||||
"still_path", # TMDB
|
"still_path", # TMDB
|
||||||
|
"table_of_contents", # OpenLibrary
|
||||||
"type", # OpenLibrary
|
"type", # OpenLibrary
|
||||||
|
"uri_descriptions", # OpenLibrary
|
||||||
|
"url", # OpenLibrary
|
||||||
"video", # TMDB
|
"video", # TMDB
|
||||||
"vote_average", # TMDB
|
"vote_average", # TMDB
|
||||||
"vote_count", # TMDB
|
"vote_count", # TMDB
|
||||||
|
"weight", # OpenLibrary
|
||||||
]:
|
]:
|
||||||
if field_name in item:
|
if field_name in item:
|
||||||
del item[field_name]
|
del item[field_name]
|
||||||
|
@ -308,15 +399,12 @@ def cleanup_result(item, media_type) -> dict:
|
||||||
for key in ["isbn_10", "isbn_13"]:
|
for key in ["isbn_10", "isbn_13"]:
|
||||||
if key in item:
|
if key in item:
|
||||||
if len(item[key]) > 1:
|
if len(item[key]) > 1:
|
||||||
raise Exception("Multiple ISBN results")
|
logger.warning("Multiple ISBN results")
|
||||||
|
|
||||||
item[key] = item[key][0]
|
item[key] = item[key][0]
|
||||||
|
|
||||||
if "publish_places" in item:
|
if "publish_places" in item:
|
||||||
if len(item["publish_places"]) > 1:
|
item["published_in"] = item["publish_places"]
|
||||||
raise Exception("Multiple publish_places")
|
|
||||||
|
|
||||||
item["published_in"] = item["publish_places"][0]
|
|
||||||
del item["publish_places"]
|
del item["publish_places"]
|
||||||
|
|
||||||
if "languages" in item:
|
if "languages" in item:
|
||||||
|
@ -325,7 +413,7 @@ def cleanup_result(item, media_type) -> dict:
|
||||||
]
|
]
|
||||||
|
|
||||||
if "translation_of" in item:
|
if "translation_of" in item:
|
||||||
if item["translation_of"] == item["work"]["title"]:
|
if item["translation_of"].split(":")[0].lower() == item["work"]["title"].split(":")[0].lower():
|
||||||
del item["translation_of"]
|
del item["translation_of"]
|
||||||
else:
|
else:
|
||||||
raise Exception(
|
raise Exception(
|
||||||
|
@ -356,45 +444,37 @@ def main() -> None:
|
||||||
media_type = input("Select media type [films|tv-episodes|tv-series|books]: ")
|
media_type = input("Select media type [films|tv-episodes|tv-series|books]: ")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
item_id = ""
|
||||||
if "films" == media_type:
|
if "films" == media_type:
|
||||||
log = ""
|
log = ""
|
||||||
while log not in ["log", "wishlist"]:
|
while log not in ["log", "wishlist"]:
|
||||||
log = input("Enter log to update [log|wishlist]: ")
|
log = input("Enter log to update [log|wishlist]: ")
|
||||||
|
|
||||||
imdb_id = ""
|
while re.search("[0-9]+", item_id) is None:
|
||||||
while re.search("tt[0-9]+", imdb_id) is None:
|
item_id = input("Enter TMDB ID: ")
|
||||||
imdb_id = input("Enter IMDB ID: ")
|
|
||||||
|
|
||||||
add_item_to_log(imdb_id, media_type, log)
|
|
||||||
|
|
||||||
elif "books" == media_type:
|
elif "books" == media_type:
|
||||||
log = ""
|
log = ""
|
||||||
while log not in ["log", "current", "wishlist"]:
|
while log not in ["log", "current", "wishlist"]:
|
||||||
log = input("Enter log to update [log|current|wishlist]: ")
|
log = input("Enter log to update [log|current|wishlist]: ")
|
||||||
|
|
||||||
isbn = ""
|
while re.search("[0-9]+", item_id) is None:
|
||||||
while re.search("[0-9]+", isbn) is None:
|
item_id = input("Enter ISBN: ")
|
||||||
isbn = input("Enter ISBN: ")
|
|
||||||
|
|
||||||
add_item_to_log(isbn, media_type, log)
|
|
||||||
|
|
||||||
elif "tv-episodes" == media_type:
|
elif "tv-episodes" == media_type:
|
||||||
imdb_id = ""
|
log = "log"
|
||||||
while re.search("tt[0-9]+", imdb_id) is None:
|
while re.search("[0-9]+", item_id) is None:
|
||||||
imdb_id = input("Enter IMDB ID: ")
|
item_id = input("Enter TVDB ID: ")
|
||||||
|
|
||||||
add_item_to_log(imdb_id, media_type, "log")
|
|
||||||
|
|
||||||
elif "tv-series" == media_type:
|
elif "tv-series" == media_type:
|
||||||
log = ""
|
log = ""
|
||||||
while log not in ["log", "current", "wishlist"]:
|
while log not in ["log", "current", "wishlist"]:
|
||||||
log = input("Enter log to update [log|current|wishlist]: ")
|
log = input("Enter log to update [log|current|wishlist]: ")
|
||||||
|
|
||||||
imdb_id = ""
|
while re.search("[0-9]+", item_id) is None:
|
||||||
while re.search("tt[0-9]+", imdb_id) is None:
|
item_id = input("Enter TMDB ID: ")
|
||||||
imdb_id = input("Enter IMDB ID: ")
|
|
||||||
|
|
||||||
add_item_to_log(imdb_id, media_type, log)
|
add_item_to_log(item_id, media_type, log)
|
||||||
|
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.exception("Exception occurred")
|
logger.exception("Exception occurred")
|
||||||
|
|
|
@ -35,21 +35,38 @@ def process_log(media_type, log) -> None:
|
||||||
|
|
||||||
for i, item in enumerate(log_items):
|
for i, item in enumerate(log_items):
|
||||||
try:
|
try:
|
||||||
if "id" not in item:
|
if "id" not in item and "skip" not in item:
|
||||||
if "films" == media_type:
|
if media_type in ["films", "books"]:
|
||||||
item_title = item["Title"]
|
item_title = item["Title"]
|
||||||
elif "tv-episodes" == media_type:
|
elif "tv-episodes" == media_type:
|
||||||
item_title = item["Episode Title"]
|
item_title = item["Episode Title"]
|
||||||
elif "tv-series" == media_type:
|
elif "tv-series" == media_type:
|
||||||
item_title = item["Show Title"]
|
item_title = item["Show Title"]
|
||||||
|
|
||||||
logger.debug(f"Processing {item_title}…")
|
logger.info(f"Processing {item_title}…")
|
||||||
|
|
||||||
# Rename pre-existing fields
|
# Rename pre-existing fields
|
||||||
if "Date Added" in item:
|
if "Date Added" in item:
|
||||||
log_item_values["date_added"] = item["Date Added"]
|
log_item_values["date_added"] = item["Date Added"]
|
||||||
del item["Date Added"]
|
del item["Date Added"]
|
||||||
|
|
||||||
|
if "Date Started" in item:
|
||||||
|
log_item_values["date_started"] = item["Date Started"]
|
||||||
|
del item["Date Started"]
|
||||||
|
|
||||||
|
if "Date Finished" in item:
|
||||||
|
log_item_values["date_finished"] = item["Date Finished"]
|
||||||
|
del item["Date Finished"]
|
||||||
|
if "Date Read" in item:
|
||||||
|
if item["Date Finished"] == item["Date Read"]:
|
||||||
|
del item["Date Read"]
|
||||||
|
else:
|
||||||
|
raise Exception(f"'Date Read' != 'Date Finished' for {item['Title']}")
|
||||||
|
|
||||||
|
if "Read Count" in item:
|
||||||
|
log_item_values["read_count"] = item["Read Count"]
|
||||||
|
del item["Read Count"]
|
||||||
|
|
||||||
if "Date Watched" in item:
|
if "Date Watched" in item:
|
||||||
log_item_values["date_finished"] = item["Date Watched"]
|
log_item_values["date_finished"] = item["Date Watched"]
|
||||||
del item["Date Watched"]
|
del item["Date Watched"]
|
||||||
|
@ -99,10 +116,17 @@ def process_log(media_type, log) -> None:
|
||||||
if "IMDB ID" in item and item["IMDB ID"] != "":
|
if "IMDB ID" in item and item["IMDB ID"] != "":
|
||||||
new_log_item = import_by_id(item["IMDB ID"], media_type)
|
new_log_item = import_by_id(item["IMDB ID"], media_type)
|
||||||
|
|
||||||
|
elif "ISBN13" in item and item["ISBN13"] != "" and item["ISBN13"] is not None:
|
||||||
|
new_log_item = import_by_id(item["ISBN13"], media_type)
|
||||||
|
|
||||||
|
elif "ISBN" in item and item["ISBN"] != "" and item["ISBN"] is not None:
|
||||||
|
new_log_item = import_by_id(item["ISBN"], media_type)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
new_log_item = import_by_details(item, item_title, media_type)
|
new_log_item = import_by_details(item, item_title, media_type)
|
||||||
|
|
||||||
if new_log_item is None:
|
if new_log_item is None:
|
||||||
|
if media_type in ["films", "tv-series", "tv-episodes"] and "imdb_id" not in item:
|
||||||
item["imdb_id"] = input(f"Enter IMDB ID for {item_title}: ")
|
item["imdb_id"] = input(f"Enter IMDB ID for {item_title}: ")
|
||||||
|
|
||||||
if re.search("tt[0-9]+", item["imdb_id"]) is not None:
|
if re.search("tt[0-9]+", item["imdb_id"]) is not None:
|
||||||
|
@ -115,13 +139,31 @@ def process_log(media_type, log) -> None:
|
||||||
) as log_file:
|
) as log_file:
|
||||||
json.dump(log_items, log_file, indent=4)
|
json.dump(log_items, log_file, indent=4)
|
||||||
|
|
||||||
|
elif "books" == media_type:
|
||||||
|
if "ISBN" not in item and "ISBN13" not in item:
|
||||||
|
item["ISBN"] = input(f"Enter ISBN for {item_title}: ")
|
||||||
|
|
||||||
|
if re.search("[0-9-]+", item["ISBN"]) is not None:
|
||||||
|
log_items[i] = import_by_id(item["ISBN"], media_type)
|
||||||
|
|
||||||
|
with open(
|
||||||
|
f"./data/{media_type}/{log}.json",
|
||||||
|
"w",
|
||||||
|
encoding='utf-8'
|
||||||
|
) as log_file:
|
||||||
|
json.dump(log_items, log_file, indent=4)
|
||||||
|
|
||||||
|
else:
|
||||||
|
logger.warning(f"Skipped '{item_title}'")
|
||||||
|
log_items[i]["skip"] = True
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.warning(f"Skipped {item_title}")
|
logger.warning(f"Skipped {item_title}")
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log_items[i] = new_log_item
|
log_items[i] = new_log_item
|
||||||
|
|
||||||
if i % 15 == 0:
|
if i % 10 == 0:
|
||||||
with open(
|
with open(
|
||||||
f"./data/{media_type}/{log}.json",
|
f"./data/{media_type}/{log}.json",
|
||||||
"w",
|
"w",
|
||||||
|
@ -234,7 +276,7 @@ def import_from_tmdb_by_details(item, item_title, media_type) -> dict:
|
||||||
if "" != item["IMDB ID"]:
|
if "" != item["IMDB ID"]:
|
||||||
return import_by_id(item["IMDB ID"], media_type)
|
return import_by_id(item["IMDB ID"], media_type)
|
||||||
|
|
||||||
logger.warning(f"Skipped {item_title}")
|
logger.warning(f"Skipped {media_type} '{item_title}'")
|
||||||
return item
|
return item
|
||||||
|
|
||||||
|
|
||||||
|
@ -251,19 +293,13 @@ def main() -> None:
|
||||||
while log not in ["log", "wishlist"]:
|
while log not in ["log", "wishlist"]:
|
||||||
log = input("Enter log to process [log|wishlist]: ")
|
log = input("Enter log to process [log|wishlist]: ")
|
||||||
|
|
||||||
process_log(media_type, log)
|
|
||||||
|
|
||||||
elif "books" == media_type:
|
elif "books" == media_type:
|
||||||
log = ""
|
log = ""
|
||||||
while log not in ["log", "current", "wishlist"]:
|
while log not in ["log", "current", "wishlist"]:
|
||||||
log = input("Enter log to process [log|current|wishlist]: ")
|
log = input("Enter log to process [log|current|wishlist]: ")
|
||||||
|
|
||||||
# TODO
|
elif "tv-series" == media_type:
|
||||||
|
log = "log"
|
||||||
elif "tv-episodes" == media_type:
|
|
||||||
process_log(media_type, "log")
|
|
||||||
|
|
||||||
# TODO
|
|
||||||
|
|
||||||
elif "tv-series" == media_type:
|
elif "tv-series" == media_type:
|
||||||
log = ""
|
log = ""
|
||||||
|
|
Loading…
Reference in a new issue