diff --git a/data/media.db b/data/media.db new file mode 100644 index 0000000..c361dbf Binary files /dev/null and b/data/media.db differ diff --git a/scripts/add_item.py b/scripts/add_item.py index 92c2bd3..387128e 100644 --- a/scripts/add_item.py +++ b/scripts/add_item.py @@ -275,6 +275,8 @@ def import_by_id(import_id, media_type, log) -> dict | None: "".join(re.findall(r"\d+", import_id)), media_type ) + logger.error("Invalid media_type!") + def import_from_tmdb_by_external_id(external_id, media_type) -> dict: """Retrieve a film, TV show or TV episode from TMDB using an IMDB or TVDB ID""" diff --git a/scripts/json_to_sql.py b/scripts/json_to_sql.py new file mode 100644 index 0000000..824aa8a --- /dev/null +++ b/scripts/json_to_sql.py @@ -0,0 +1,277 @@ +import sqlite3 +import json +import traceback + +json_keys = { + "books": { + "name_key": "title", + "item": { + "title", + "subtitle", + "edition_name", + "full_title", + "ol_id", + "isbn_10", + "isbn_13", + "added_by_id", + "covers", + "publish_date", + "publishers", + "physical_format", + "description", + }, + "genres": "genres", + "collections": "series", + "work": "work", + "creators": "authors", + "languages": "languages", + "countries": "publish_country", + "entry": {"date_added", "date_started", "date_finished", "comments"}, + }, + "films": { + "name_key": "title", + "original_name_key": "original_title", + "original_language_key": "original_language", + "item": { + "title", + "imdb_id", + "tmdb_id", + "added_by_id", + "poster_path", + "release_date", + "overview", + "original_title", + "original_language", + }, + "collections": {"key": "belongs_to_collection", "fields": {"name"}}, + "languages": { + "key": "spoken_languages", + "fields": {"english_name", "iso_639_1"}, + }, + "countries": {"key": "production_countries", "fields": {"name", "iso_3166_1"}}, + "entry": {"date_added", "date_started", "date_finished", "comments"}, + }, + "tv-series": { + "name_key": "name", + "original_name_key": "original_name", + "original_language_key": "original_language", + "item": { + "name", + "tmdb_id", + "tvdb_id", + "added_by_id", + "poster_url", + "overview", + "first_air_date", + "original_name", + "original_language", + }, + "languages": { + "key": "spoken_languages", + "fields": {"english_name", "iso_639_1"}, + }, + "countries": {"key": "production_countries", "fields": {"name", "iso_3166_1"}}, + "entry": {"date_added", "date_started", "date_finished", "comments"}, + }, + "tv-episodes": { + "name_key": "name", + "original_name_key": "original_name", + "original_language_key": "original_language", + "item": { + "name", + "tmdb_id", + "tvdb_id", + "added_by_id", + "overview", + "air_date", + "series", + "episode_number", + "season_number", + }, + "languages": { + "key": "spoken_languages", + "fields": {"english_name", "iso_639_1"}, + }, + "countries": {"key": "production_countries", "fields": {"name", "iso_3166_1"}}, + "entry": {"date_added", "date_finished", "comments"}, + }, +} + +sql_columns = { + "films": { + "unique_keys": ["tmdb_id"], + "languages": { + "keys": ["name", "iso_639_code"], + "unique_keys": ["iso_639_code"], + "join_keys": ["film_id", "language_id"], + }, + "countries": { + "keys": ["name", "iso_3166_code"], + "unique_keys": ["iso_3166_code"], + "join_keys": ["film_id", "country_id"], + } + }, + "tv-episodes": { + "unique_keys": ["tmdb_id", "imdb_id"], + "languages": { + "keys": ["name", "iso_639_code"], + "unique_keys": ["iso_639_code"], + "join_keys": ["tv_episode_id", "language_id"], + } + }, + "tv-series": { + "unique_keys": ["tmdb_id", "imdb_id"], + "languages": { + "keys": ["name", "iso_639_code"], + "unique_keys": ["iso_639_code"], + "join_keys": ["tv_episode_id", "language_id"], + }, + "countries": { + "keys": ["name", "iso_3166_code"], + "unique_keys": ["iso_3166_code"], + "join_keys": ["tv_series_id", "country_id"], + } + } +} + + +def insert_from_json(media_type, log): + media_type_pl = get_media_type_pl(media_type) + json_path = f"./data/{media_type_pl}/{log}.json" + db_path = "./data/media.db" + + with open(json_path, "r") as file: + data = json.load(file) + print(f"Results: {len(data)}") + + conn = sqlite3.connect(db_path) + conn.isolation_level = None + + cur = conn.cursor() + cur.execute('BEGIN') + + try: + for entry in reversed(data): + print(f"Importing {entry.get( json_keys[media_type_pl]['name_key'] )}…") + + # Insert item + item_entry = { + key: entry[key] for key in entry.keys() & json_keys[media_type_pl]["item"] + } + + if item_entry.get(json_keys[media_type_pl]['original_name_key']) is not None: + item_entry["title"] = item_entry.pop( json_keys[media_type_pl]['name_key'] ) + item_entry["title_original"] = item_entry.pop(json_keys[media_type_pl]['original_name_key'] ) + item_entry["title_original_language"] = item_entry.pop(json_keys[media_type_pl]['original_language_key'] ) + + else: + item_entry["title"] = item_entry.pop( json_keys[media_type_pl]['name_key'] ) + + keys = ", ".join(item_entry.keys()) + unique_keys = ", ".join(sql_columns[media_type_pl]['unique_keys']) + question_marks = ", ".join(["?" for _ in item_entry]) + values = tuple(item_entry.values()) + + cur.execute( + f"INSERT INTO '{media_type_pl}' ({keys}) VALUES ({question_marks}) ON CONFLICT({unique_keys}) DO UPDATE SET ({keys}) = ({question_marks}) RETURNING id", + values + values, + ) + row = cur.fetchone() + (inserted_id,) = row if row else None + + # Join tables + for join_type in ["languages", "countries"]: + if entry.get( json_keys[media_type_pl][join_type]["key"] ) is not None: + for join_item in entry.get( json_keys[media_type_pl][join_type]["key"] ): + print(f"Importing {join_type} {join_item}…") + values = { + key: join_item[key] + for key in join_item.keys() + & json_keys[media_type_pl][join_type]["fields"] + } + + + insert_join( + inserted_id, + f"{join_type}", + f"{media_type_pl}_{join_type}", + sql_columns[media_type_pl][join_type]["join_keys"], + sql_columns[media_type_pl][join_type]["keys"], + values, + sql_columns[media_type_pl][join_type]["unique_keys"], + cur, + conn + ) + + # Log Entry + item_log_entry = { + key: entry[key] for key in entry.keys() & json_keys[media_type_pl]["entry"] + } + if item_log_entry.get("date_added") is not None: + item_log_entry["log"] = log + item_log_entry[f"{media_type}_id"] = inserted_id + print(f"Importing log entry added {item_log_entry.get('date_added')}…") + keys = ", ".join(item_log_entry.keys()) + question_marks = ", ".join(["?" for _ in item_log_entry]) + values = tuple(item_log_entry.values()) + + cur.execute( + f"INSERT INTO '{media_type_pl}_log-entries' ({keys}) VALUES ({question_marks})", + values, + ) + else: + print(f"No log details for {entry.get('name')}!") + + except Exception: + print(traceback.format_exc()) + cur.execute('ROLLBACK') + + else: + conn.commit() + + conn.close() + + +def insert_join( + media_id, + table_name, + join_table_name, + join_keys, + data_keys, + data_values, + data_unique, + cur, + conn, +): + keys = ", ".join(data_keys) + unique_keys = ", ".join(data_unique) + question_marks = ", ".join(["?" for _ in data_keys]) + values = tuple(data_values) + + cur.execute( + f"INSERT INTO '{table_name}' ({keys}) VALUES ({question_marks}) ON CONFLICT({unique_keys}) DO UPDATE SET ({keys}) = ({question_marks}) RETURNING id", + values + values, + ) + row = cur.fetchone() + (data_id,) = row if row else None + + if data_id is not None: + keys = ", ".join(join_keys) + + print(f"Matching item ID {media_id} to data ID {data_id}…") + cur.execute( + f"INSERT INTO '{join_table_name}' ({keys}) VALUES ({media_id}, {data_id}) ON CONFLICT({keys}) DO NOTHING" + ) + +def get_media_type_pl(media_type): + if media_type in [ 'tv-series' ]: + return media_type + else: + return media_type + 's' + +# insert_from_json('./data/tv-series/log.json', './data/media.db', 'tv-series', 'log') +#insert_from_json("./data/tv-series/wishlist.json", "./data/media.db", "tv-series", "wishlist") +#insert_from_json("./data/tv-series/current.json", "./data/media.db", "tv-series", "current") + +insert_from_json('film', 'log') +#insert_from_json("./data/films/wishlist.json", "./data/media.db", "films", "wishlist") diff --git a/scripts/process_logs.py b/scripts/process_logs.py index 69110ab..8a3e081 100644 --- a/scripts/process_logs.py +++ b/scripts/process_logs.py @@ -7,6 +7,7 @@ import os import re import time import requests +from slugify import slugify from dotenv import load_dotenv from add_item import cleanup_result, import_by_id, setup_logger @@ -28,7 +29,7 @@ def process_log(media_type, log) -> None: logger.info(f"Processing {media_type}/{log}…") - with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file: + with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file: log_items = json.load(log_file) log_item_values = {} @@ -42,171 +43,187 @@ def process_log(media_type, log) -> None: id_key = "gb_id" for i, item in enumerate(log_items): - try: - if id_key not in item and "skip" not in item: - if media_type in ["films", "books"]: - item_title = item["Title"] - elif "tv-episodes" == media_type: - item_title = item["Episode Title"] - elif "tv-series" == media_type: - item_title = item["Show Title"] + if id_key not in item:# and "skip" not in item: + if media_type in ["films", "books"]: + item_title = item["Title"] + elif "tv-episodes" == media_type: + item_title = item["Episode Title"] + elif "tv-series" == media_type: + item_title = item["Show Title"] - logger.info(f"Processing {item_title}…") + logger.info(f"Processing {item_title} ({item['Author']})…") - # Rename pre-existing fields - if "Date Added" in item: - log_item_values["date_added"] = item["Date Added"] - del item["Date Added"] + # Rename pre-existing fields + if "Date Added" in item: + log_item_values["date_added"] = item["Date Added"] + del item["Date Added"] - if "date_added" in item: - log_item_values["date_added"] = item["date_added"] + if "date_added" in item: + log_item_values["date_added"] = item["date_added"] - if "Date Started" in item: - log_item_values["date_started"] = item["Date Started"] - del item["Date Started"] + if "Date Started" in item: + log_item_values["date_started"] = item["Date Started"] + del item["Date Started"] - if "date_started" in item: - log_item_values["date_started"] = item["date_started"] + if "date_started" in item: + log_item_values["date_started"] = item["date_started"] - if "Date Finished" in item: - log_item_values["date_finished"] = item["Date Finished"] - del item["Date Finished"] - if "Date Read" in item: - if item["Date Finished"] == item["Date Read"]: - del item["Date Read"] - else: - raise Exception(f"'Date Read' != 'Date Finished' for {item['Title']}") - - if "date_finished" in item: - log_item_values["date_finished"] = item["date_finished"] - - if "Read Count" in item: - log_item_values["read_count"] = item["Read Count"] - del item["Read Count"] - - if "read_count" in item: - log_item_values["read_count"] = item["read_count"] - - if "Date Watched" in item: - log_item_values["date_finished"] = item["Date Watched"] - del item["Date Watched"] - - if "Rewatch" in item: - log_item_values["is_repeat"] = item["Rewatch"] - del item["Rewatch"] - - if "Comments" in item: - log_item_values["comments"] = item["Comments"] - del item["Comments"] - - if "Series Title" in item: - log_item_values["series_title"] = item["Series Title"] - del item["Series Title"] - - if "Episode Title" in item: - log_item_values["name"] = item["Episode Title"] - del item["Episode Title"] - - if "Episode Number" in item: - if re.search("[0-9]+x[0-9]+", item["Episode Number"]) is not None: - season_no, _, episode_no = log_item_values[ - "episode_number" - ].split("x") - - elif ( - re.search("S[0-9]+E[0-9]+", item["Episode Number"]) is not None - ): - season_no, _, episode_no = log_item_values[ - "episode_number" - ].split("E") - - elif re.search("E[0-9]+", item["Episode Number"]) is not None: - season_no = None - episode_no = item["episode_number"][1:] + if "Date Finished" in item: + log_item_values["date_finished"] = item["Date Finished"] + del item["Date Finished"] + if "Date Read" in item: + if item["Date Finished"] == item["Date Read"]: + del item["Date Read"] else: - logger.error( - f"Invalid episode number format '{item['Episode Number']}'" + raise Exception( + f"'Date Read' != 'Date Finished' for {item['Title']}" ) - return - log_item_values["season_number"] = season_no - log_item_values["episode_number"] = episode_no - del item["Episode Number"] + if "date_finished" in item: + log_item_values["date_finished"] = item["date_finished"] - if "IMDB ID" in item and item["IMDB ID"] != "": - new_log_item = import_by_id(item["IMDB ID"], media_type) + if "Read Count" in item: + log_item_values["read_count"] = item["Read Count"] + del item["Read Count"] - elif "books" == media_type and "wishlist" == log: - ol_work_id = re.search("OL[0-9]+W", input(f"Enter OpenLibrary Work ID for '{item_title}' ({item['Author']}): ")) - try: - new_log_item = import_by_id(ol_work_id[0], media_type, log) - except: - new_log_item = item - item["skip"] = True - logger.info("Skipping…") + if "read_count" in item: + log_item_values["read_count"] = item["read_count"] - elif "ISBN13" in item and item["ISBN13"] != "" and item["ISBN13"] is not None: - new_log_item = import_by_id(item["ISBN13"], media_type, log) + if "Date Watched" in item: + log_item_values["date_finished"] = item["Date Watched"] + del item["Date Watched"] - elif "ISBN" in item and item["ISBN"] != "" and item["ISBN"] is not None: - new_log_item = import_by_id(item["ISBN13"], media_type, log) + if "Rewatch" in item: + log_item_values["is_repeat"] = item["Rewatch"] + del item["Rewatch"] + if "Comments" in item: + log_item_values["comments"] = item["Comments"] + del item["Comments"] + + if "Series Title" in item: + log_item_values["series_title"] = item["Series Title"] + del item["Series Title"] + + if "Episode Title" in item: + log_item_values["name"] = item["Episode Title"] + del item["Episode Title"] + + if "Episode Number" in item: + if re.search("[0-9]+x[0-9]+", item["Episode Number"]) is not None: + season_no, _, episode_no = log_item_values[ + "episode_number" + ].split("x") + + elif ( + re.search("S[0-9]+E[0-9]+", item["Episode Number"]) is not None + ): + season_no, _, episode_no = log_item_values[ + "episode_number" + ].split("E") + + elif re.search("E[0-9]+", item["Episode Number"]) is not None: + season_no = None + episode_no = item["episode_number"][1:] else: - new_log_item = import_by_details(item, item_title, media_type) + logger.error( + f"Invalid episode number format '{item['Episode Number']}'" + ) + return + + log_item_values["season_number"] = season_no + log_item_values["episode_number"] = episode_no + del item["Episode Number"] + + if "IMDB ID" in item and item["IMDB ID"] != "": + new_log_item = import_by_id(item["IMDB ID"], media_type) + + elif "books" == media_type and "wishlist" == log: + new_log_item = import_by_details(item, item_title, media_type) if new_log_item is None: - if media_type in ["films", "tv-series", "tv-episodes"] and "imdb_id" not in item: - item["imdb_id"] = input(f"Enter IMDB ID for {item_title}: ") + ol_work_id = input( + f"Enter OpenLibrary Work ID for '{item_title}' ({item['Author']}), or 'd' to delete the record: " + ) - if re.search("tt[0-9]+", item["imdb_id"]) is not None: - log_items[i] = import_by_id(item["imdb_id"], media_type) + if 'd' == ol_work_id: + logger.info("Deleting…") + del log_items[i] + continue + + ol_work_id = re.search("OL[0-9]+W", ol_work_id) + + try: + new_log_item = import_by_id(ol_work_id[0], media_type, log) + + except: + new_log_item = item + new_log_item["skip"] = True + logger.info("Skipping…") + + elif ( + "ISBN13" in item + and item["ISBN13"] != "" + and item["ISBN13"] is not None + ): + new_log_item = import_by_id(item["ISBN13"], media_type, log) + + elif "ISBN" in item and item["ISBN"] != "" and item["ISBN"] is not None: + new_log_item = import_by_id(item["ISBN13"], media_type, log) + + else: + new_log_item = import_by_details(item, item_title, media_type) + + if new_log_item is None: + if ( + media_type in ["films", "tv-series", "tv-episodes"] + and "imdb_id" not in item + ): + item["imdb_id"] = input(f"Enter IMDB ID for {item_title}: ") + + if re.search("tt[0-9]+", item["imdb_id"]) is not None: + log_items[i] = import_by_id(item["imdb_id"], media_type) + + with open( + f"./data/{media_type}/{log}.json", "w", encoding="utf-8" + ) as log_file: + json.dump(log_items, log_file, indent=4) + + elif "books" == media_type: + if "ISBN" not in item and "ISBN13" not in item: + item["ISBN"] = input(f"Enter ISBN for {item_title}: ") + + if re.search("[0-9-]+", item["ISBN"]) is not None: + log_items[i] = import_by_id(item["ISBN"], media_type) with open( f"./data/{media_type}/{log}.json", "w", - encoding='utf-8' + encoding="utf-8", ) as log_file: json.dump(log_items, log_file, indent=4) - elif "books" == media_type: - if "ISBN" not in item and "ISBN13" not in item: - item["ISBN"] = input(f"Enter ISBN for {item_title}: ") - - if re.search("[0-9-]+", item["ISBN"]) is not None: - log_items[i] = import_by_id(item["ISBN"], media_type) - - with open( - f"./data/{media_type}/{log}.json", - "w", - encoding='utf-8' - ) as log_file: - json.dump(log_items, log_file, indent=4) - - else: - logger.warning(f"Skipped '{item_title}'") - log_items[i]["skip"] = True - else: - logger.warning(f"Skipped {item_title}") + logger.warning(f"Skipped '{item_title}'") + log_items[i]["skip"] = True else: - log_items[i] = new_log_item + logger.warning(f"Skipped {item_title}") - if i % 3 == 0: - with open( - f"./data/{media_type}/{log}.json", - "w", - encoding='utf-8' - ) as log_file: - json.dump(log_items, log_file, indent=4) + else: + log_items[i] = new_log_item - if log_items[i] is not None: - log_items[i] |= log_item_values + if i % 3 == 0: + with open( + f"./data/{media_type}/{log}.json", "w", encoding="utf-8" + ) as log_file: + json.dump(log_items, log_file, indent=4) + logger.info("Saved…") - except KeyError: - print(json.dumps(item, indent=4)) + if log_items[i] is not None: + log_items[i] |= log_item_values - with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file: + with open(f"./data/{media_type}/{log}.json", "w", encoding="utf-8") as log_file: json.dump(log_items, log_file, indent=4) logger.info(f"Finished processing {media_type}/{log}") @@ -222,12 +239,69 @@ def import_by_details(item, item_title, media_type) -> dict: return # import_from_tvdb_by_details(item, item_title, media_type) if media_type in ["books"]: - return # import_from_openlibrary_by_details(item, item_title, media_type) + return import_from_openlibrary_by_details(item, item_title, media_type) if media_type in ["games"]: return # import_from_igdb_by_details(item, item_title, media_type) +def import_from_openlibrary_by_details(item, item_title, media_type) -> dict | None: + """Retrieve a book from OpenLibrary using a title and author name""" + + logger.info(f"Importing '{item_title}'…") + + api_url = f"https://openlibrary.org/search.json?title={slugify(item['Title'].split(':')[0], separator='%20')}&author={slugify(item['Author'], separator='%20')}" + + # Sending API request + response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15) + + # Process the response + if 200 == response.status_code: + logger.debug(response.status_code) + + elif 429 == response.status_code: + time.sleep(2) + return import_from_openlibrary_by_details(item, item_title, media_type) + + elif 404 == response.status_code: + logger.error(f"{response.status_code}: Not Found for title '{item_title}'") + return None + + else: + raise Exception(f"Error {response.status_code}: {response.text}") + + results = json.loads(response.text) + + logger.info(f"Found {results['num_found']} result{'s' if results['num_found'] != 1 else ''}…") + + if 0 < results["num_found"]: + result = results['docs'][0] + if 1 == results["num_found"]: + logger.info(f"Selecting OL ID {result['key']}…") + item_id_parsed = re.search("(OL|tt)?[0-9]+[WMA]?", result['key']) + if item_id_parsed is not None: + return import_by_id(item_id_parsed[0], "books", "wishlist") + + else: + if result['title'] == item['Title'].split(':')[0] and result['author_name'][0] == item['Author']: + logger.info(f"First result ({result['key']}) is a match!") + item_id_parsed = re.search("(OL|tt)?[0-9]+[WMA]?", result['key']) + if item_id_parsed is not None: + return import_by_id(item_id_parsed[0], "books", "wishlist") + + else: + print(json.dumps({k: result.get(k, None) for k in ('author', 'title', 'first_publish_year')}, indent=4)) + is_correct = input("Is this the correct result? [y/n]: ") + if "y" == is_correct: + logger.info(f"Selecting OL ID {result['key']}…") + item_id_parsed = re.search("(OL|tt)?[0-9]+[WMA]?", result['key']) + if item_id_parsed is not None: + return import_by_id(item_id_parsed[0], "books", "wishlist") + + logger.info(f"Returning nothing…") + return None + + def import_from_tmdb_by_details(item, item_title, media_type) -> dict: """Retrieve a film or TV series from TMDB using its title""" @@ -244,7 +318,7 @@ def import_from_tmdb_by_details(item, item_title, media_type) -> dict: "year": item["Release Year"] if "Release Year" in item else None, }, headers={"Authorization": f"Bearer {TMDB_API_KEY}"}, - timeout=15 + timeout=15, ) # Process the response @@ -281,17 +355,17 @@ def import_from_tmdb_by_details(item, item_title, media_type) -> dict: logger.warning(f"Returned more than one {media_type} for '{item_title}':\n") print( json.dumps( - filtered_response_data - if len(filtered_response_data) > 0 - else response_data, + ( + filtered_response_data + if len(filtered_response_data) > 0 + else response_data + ), indent=4, ) ) last_index = len(filtered_response_data if frd_len > 0 else response_data) - 1 - idx = input( - f"\nEnter the index of the result to use [0-{last_index}]: " - ) + idx = input(f"\nEnter the index of the result to use [0-{last_index}]: ") if "" != idx: try: