progress towards SQL data storage

This commit is contained in:
Ben Goldsworthy 2024-05-05 10:51:22 +01:00
parent e4a2572b89
commit 38e4057ca3
Signed by: Rumperuu
SSH key fingerprint: SHA256:v3uompaUiPqV2w1/AIRWBSQOxr2dntH9Xs/y8fDnUPU
4 changed files with 495 additions and 142 deletions

BIN
data/media.db Normal file

Binary file not shown.

View file

@ -275,6 +275,8 @@ def import_by_id(import_id, media_type, log) -> dict | None:
"".join(re.findall(r"\d+", import_id)), media_type
)
logger.error("Invalid media_type!")
def import_from_tmdb_by_external_id(external_id, media_type) -> dict:
"""Retrieve a film, TV show or TV episode from TMDB using an IMDB or TVDB ID"""

277
scripts/json_to_sql.py Normal file
View file

@ -0,0 +1,277 @@
import sqlite3
import json
import traceback
json_keys = {
"books": {
"name_key": "title",
"item": {
"title",
"subtitle",
"edition_name",
"full_title",
"ol_id",
"isbn_10",
"isbn_13",
"added_by_id",
"covers",
"publish_date",
"publishers",
"physical_format",
"description",
},
"genres": "genres",
"collections": "series",
"work": "work",
"creators": "authors",
"languages": "languages",
"countries": "publish_country",
"entry": {"date_added", "date_started", "date_finished", "comments"},
},
"films": {
"name_key": "title",
"original_name_key": "original_title",
"original_language_key": "original_language",
"item": {
"title",
"imdb_id",
"tmdb_id",
"added_by_id",
"poster_path",
"release_date",
"overview",
"original_title",
"original_language",
},
"collections": {"key": "belongs_to_collection", "fields": {"name"}},
"languages": {
"key": "spoken_languages",
"fields": {"english_name", "iso_639_1"},
},
"countries": {"key": "production_countries", "fields": {"name", "iso_3166_1"}},
"entry": {"date_added", "date_started", "date_finished", "comments"},
},
"tv-series": {
"name_key": "name",
"original_name_key": "original_name",
"original_language_key": "original_language",
"item": {
"name",
"tmdb_id",
"tvdb_id",
"added_by_id",
"poster_url",
"overview",
"first_air_date",
"original_name",
"original_language",
},
"languages": {
"key": "spoken_languages",
"fields": {"english_name", "iso_639_1"},
},
"countries": {"key": "production_countries", "fields": {"name", "iso_3166_1"}},
"entry": {"date_added", "date_started", "date_finished", "comments"},
},
"tv-episodes": {
"name_key": "name",
"original_name_key": "original_name",
"original_language_key": "original_language",
"item": {
"name",
"tmdb_id",
"tvdb_id",
"added_by_id",
"overview",
"air_date",
"series",
"episode_number",
"season_number",
},
"languages": {
"key": "spoken_languages",
"fields": {"english_name", "iso_639_1"},
},
"countries": {"key": "production_countries", "fields": {"name", "iso_3166_1"}},
"entry": {"date_added", "date_finished", "comments"},
},
}
sql_columns = {
"films": {
"unique_keys": ["tmdb_id"],
"languages": {
"keys": ["name", "iso_639_code"],
"unique_keys": ["iso_639_code"],
"join_keys": ["film_id", "language_id"],
},
"countries": {
"keys": ["name", "iso_3166_code"],
"unique_keys": ["iso_3166_code"],
"join_keys": ["film_id", "country_id"],
}
},
"tv-episodes": {
"unique_keys": ["tmdb_id", "imdb_id"],
"languages": {
"keys": ["name", "iso_639_code"],
"unique_keys": ["iso_639_code"],
"join_keys": ["tv_episode_id", "language_id"],
}
},
"tv-series": {
"unique_keys": ["tmdb_id", "imdb_id"],
"languages": {
"keys": ["name", "iso_639_code"],
"unique_keys": ["iso_639_code"],
"join_keys": ["tv_episode_id", "language_id"],
},
"countries": {
"keys": ["name", "iso_3166_code"],
"unique_keys": ["iso_3166_code"],
"join_keys": ["tv_series_id", "country_id"],
}
}
}
def insert_from_json(media_type, log):
media_type_pl = get_media_type_pl(media_type)
json_path = f"./data/{media_type_pl}/{log}.json"
db_path = "./data/media.db"
with open(json_path, "r") as file:
data = json.load(file)
print(f"Results: {len(data)}")
conn = sqlite3.connect(db_path)
conn.isolation_level = None
cur = conn.cursor()
cur.execute('BEGIN')
try:
for entry in reversed(data):
print(f"Importing {entry.get( json_keys[media_type_pl]['name_key'] )}")
# Insert item
item_entry = {
key: entry[key] for key in entry.keys() & json_keys[media_type_pl]["item"]
}
if item_entry.get(json_keys[media_type_pl]['original_name_key']) is not None:
item_entry["title"] = item_entry.pop( json_keys[media_type_pl]['name_key'] )
item_entry["title_original"] = item_entry.pop(json_keys[media_type_pl]['original_name_key'] )
item_entry["title_original_language"] = item_entry.pop(json_keys[media_type_pl]['original_language_key'] )
else:
item_entry["title"] = item_entry.pop( json_keys[media_type_pl]['name_key'] )
keys = ", ".join(item_entry.keys())
unique_keys = ", ".join(sql_columns[media_type_pl]['unique_keys'])
question_marks = ", ".join(["?" for _ in item_entry])
values = tuple(item_entry.values())
cur.execute(
f"INSERT INTO '{media_type_pl}' ({keys}) VALUES ({question_marks}) ON CONFLICT({unique_keys}) DO UPDATE SET ({keys}) = ({question_marks}) RETURNING id",
values + values,
)
row = cur.fetchone()
(inserted_id,) = row if row else None
# Join tables
for join_type in ["languages", "countries"]:
if entry.get( json_keys[media_type_pl][join_type]["key"] ) is not None:
for join_item in entry.get( json_keys[media_type_pl][join_type]["key"] ):
print(f"Importing {join_type} {join_item}")
values = {
key: join_item[key]
for key in join_item.keys()
& json_keys[media_type_pl][join_type]["fields"]
}
insert_join(
inserted_id,
f"{join_type}",
f"{media_type_pl}_{join_type}",
sql_columns[media_type_pl][join_type]["join_keys"],
sql_columns[media_type_pl][join_type]["keys"],
values,
sql_columns[media_type_pl][join_type]["unique_keys"],
cur,
conn
)
# Log Entry
item_log_entry = {
key: entry[key] for key in entry.keys() & json_keys[media_type_pl]["entry"]
}
if item_log_entry.get("date_added") is not None:
item_log_entry["log"] = log
item_log_entry[f"{media_type}_id"] = inserted_id
print(f"Importing log entry added {item_log_entry.get('date_added')}")
keys = ", ".join(item_log_entry.keys())
question_marks = ", ".join(["?" for _ in item_log_entry])
values = tuple(item_log_entry.values())
cur.execute(
f"INSERT INTO '{media_type_pl}_log-entries' ({keys}) VALUES ({question_marks})",
values,
)
else:
print(f"No log details for {entry.get('name')}!")
except Exception:
print(traceback.format_exc())
cur.execute('ROLLBACK')
else:
conn.commit()
conn.close()
def insert_join(
media_id,
table_name,
join_table_name,
join_keys,
data_keys,
data_values,
data_unique,
cur,
conn,
):
keys = ", ".join(data_keys)
unique_keys = ", ".join(data_unique)
question_marks = ", ".join(["?" for _ in data_keys])
values = tuple(data_values)
cur.execute(
f"INSERT INTO '{table_name}' ({keys}) VALUES ({question_marks}) ON CONFLICT({unique_keys}) DO UPDATE SET ({keys}) = ({question_marks}) RETURNING id",
values + values,
)
row = cur.fetchone()
(data_id,) = row if row else None
if data_id is not None:
keys = ", ".join(join_keys)
print(f"Matching item ID {media_id} to data ID {data_id}")
cur.execute(
f"INSERT INTO '{join_table_name}' ({keys}) VALUES ({media_id}, {data_id}) ON CONFLICT({keys}) DO NOTHING"
)
def get_media_type_pl(media_type):
if media_type in [ 'tv-series' ]:
return media_type
else:
return media_type + 's'
# insert_from_json('./data/tv-series/log.json', './data/media.db', 'tv-series', 'log')
#insert_from_json("./data/tv-series/wishlist.json", "./data/media.db", "tv-series", "wishlist")
#insert_from_json("./data/tv-series/current.json", "./data/media.db", "tv-series", "current")
insert_from_json('film', 'log')
#insert_from_json("./data/films/wishlist.json", "./data/media.db", "films", "wishlist")

View file

@ -7,6 +7,7 @@ import os
import re
import time
import requests
from slugify import slugify
from dotenv import load_dotenv
from add_item import cleanup_result, import_by_id, setup_logger
@ -28,7 +29,7 @@ def process_log(media_type, log) -> None:
logger.info(f"Processing {media_type}/{log}")
with open(f"./data/{media_type}/{log}.json", "r", encoding='utf-8') as log_file:
with open(f"./data/{media_type}/{log}.json", "r", encoding="utf-8") as log_file:
log_items = json.load(log_file)
log_item_values = {}
@ -42,171 +43,187 @@ def process_log(media_type, log) -> None:
id_key = "gb_id"
for i, item in enumerate(log_items):
try:
if id_key not in item and "skip" not in item:
if media_type in ["films", "books"]:
item_title = item["Title"]
elif "tv-episodes" == media_type:
item_title = item["Episode Title"]
elif "tv-series" == media_type:
item_title = item["Show Title"]
if id_key not in item:# and "skip" not in item:
if media_type in ["films", "books"]:
item_title = item["Title"]
elif "tv-episodes" == media_type:
item_title = item["Episode Title"]
elif "tv-series" == media_type:
item_title = item["Show Title"]
logger.info(f"Processing {item_title}")
logger.info(f"Processing {item_title} ({item['Author']})")
# Rename pre-existing fields
if "Date Added" in item:
log_item_values["date_added"] = item["Date Added"]
del item["Date Added"]
# Rename pre-existing fields
if "Date Added" in item:
log_item_values["date_added"] = item["Date Added"]
del item["Date Added"]
if "date_added" in item:
log_item_values["date_added"] = item["date_added"]
if "date_added" in item:
log_item_values["date_added"] = item["date_added"]
if "Date Started" in item:
log_item_values["date_started"] = item["Date Started"]
del item["Date Started"]
if "Date Started" in item:
log_item_values["date_started"] = item["Date Started"]
del item["Date Started"]
if "date_started" in item:
log_item_values["date_started"] = item["date_started"]
if "date_started" in item:
log_item_values["date_started"] = item["date_started"]
if "Date Finished" in item:
log_item_values["date_finished"] = item["Date Finished"]
del item["Date Finished"]
if "Date Read" in item:
if item["Date Finished"] == item["Date Read"]:
del item["Date Read"]
else:
raise Exception(f"'Date Read' != 'Date Finished' for {item['Title']}")
if "date_finished" in item:
log_item_values["date_finished"] = item["date_finished"]
if "Read Count" in item:
log_item_values["read_count"] = item["Read Count"]
del item["Read Count"]
if "read_count" in item:
log_item_values["read_count"] = item["read_count"]
if "Date Watched" in item:
log_item_values["date_finished"] = item["Date Watched"]
del item["Date Watched"]
if "Rewatch" in item:
log_item_values["is_repeat"] = item["Rewatch"]
del item["Rewatch"]
if "Comments" in item:
log_item_values["comments"] = item["Comments"]
del item["Comments"]
if "Series Title" in item:
log_item_values["series_title"] = item["Series Title"]
del item["Series Title"]
if "Episode Title" in item:
log_item_values["name"] = item["Episode Title"]
del item["Episode Title"]
if "Episode Number" in item:
if re.search("[0-9]+x[0-9]+", item["Episode Number"]) is not None:
season_no, _, episode_no = log_item_values[
"episode_number"
].split("x")
elif (
re.search("S[0-9]+E[0-9]+", item["Episode Number"]) is not None
):
season_no, _, episode_no = log_item_values[
"episode_number"
].split("E")
elif re.search("E[0-9]+", item["Episode Number"]) is not None:
season_no = None
episode_no = item["episode_number"][1:]
if "Date Finished" in item:
log_item_values["date_finished"] = item["Date Finished"]
del item["Date Finished"]
if "Date Read" in item:
if item["Date Finished"] == item["Date Read"]:
del item["Date Read"]
else:
logger.error(
f"Invalid episode number format '{item['Episode Number']}'"
raise Exception(
f"'Date Read' != 'Date Finished' for {item['Title']}"
)
return
log_item_values["season_number"] = season_no
log_item_values["episode_number"] = episode_no
del item["Episode Number"]
if "date_finished" in item:
log_item_values["date_finished"] = item["date_finished"]
if "IMDB ID" in item and item["IMDB ID"] != "":
new_log_item = import_by_id(item["IMDB ID"], media_type)
if "Read Count" in item:
log_item_values["read_count"] = item["Read Count"]
del item["Read Count"]
elif "books" == media_type and "wishlist" == log:
ol_work_id = re.search("OL[0-9]+W", input(f"Enter OpenLibrary Work ID for '{item_title}' ({item['Author']}): "))
try:
new_log_item = import_by_id(ol_work_id[0], media_type, log)
except:
new_log_item = item
item["skip"] = True
logger.info("Skipping…")
if "read_count" in item:
log_item_values["read_count"] = item["read_count"]
elif "ISBN13" in item and item["ISBN13"] != "" and item["ISBN13"] is not None:
new_log_item = import_by_id(item["ISBN13"], media_type, log)
if "Date Watched" in item:
log_item_values["date_finished"] = item["Date Watched"]
del item["Date Watched"]
elif "ISBN" in item and item["ISBN"] != "" and item["ISBN"] is not None:
new_log_item = import_by_id(item["ISBN13"], media_type, log)
if "Rewatch" in item:
log_item_values["is_repeat"] = item["Rewatch"]
del item["Rewatch"]
if "Comments" in item:
log_item_values["comments"] = item["Comments"]
del item["Comments"]
if "Series Title" in item:
log_item_values["series_title"] = item["Series Title"]
del item["Series Title"]
if "Episode Title" in item:
log_item_values["name"] = item["Episode Title"]
del item["Episode Title"]
if "Episode Number" in item:
if re.search("[0-9]+x[0-9]+", item["Episode Number"]) is not None:
season_no, _, episode_no = log_item_values[
"episode_number"
].split("x")
elif (
re.search("S[0-9]+E[0-9]+", item["Episode Number"]) is not None
):
season_no, _, episode_no = log_item_values[
"episode_number"
].split("E")
elif re.search("E[0-9]+", item["Episode Number"]) is not None:
season_no = None
episode_no = item["episode_number"][1:]
else:
new_log_item = import_by_details(item, item_title, media_type)
logger.error(
f"Invalid episode number format '{item['Episode Number']}'"
)
return
log_item_values["season_number"] = season_no
log_item_values["episode_number"] = episode_no
del item["Episode Number"]
if "IMDB ID" in item and item["IMDB ID"] != "":
new_log_item = import_by_id(item["IMDB ID"], media_type)
elif "books" == media_type and "wishlist" == log:
new_log_item = import_by_details(item, item_title, media_type)
if new_log_item is None:
if media_type in ["films", "tv-series", "tv-episodes"] and "imdb_id" not in item:
item["imdb_id"] = input(f"Enter IMDB ID for {item_title}: ")
ol_work_id = input(
f"Enter OpenLibrary Work ID for '{item_title}' ({item['Author']}), or 'd' to delete the record: "
)
if re.search("tt[0-9]+", item["imdb_id"]) is not None:
log_items[i] = import_by_id(item["imdb_id"], media_type)
if 'd' == ol_work_id:
logger.info("Deleting…")
del log_items[i]
continue
ol_work_id = re.search("OL[0-9]+W", ol_work_id)
try:
new_log_item = import_by_id(ol_work_id[0], media_type, log)
except:
new_log_item = item
new_log_item["skip"] = True
logger.info("Skipping…")
elif (
"ISBN13" in item
and item["ISBN13"] != ""
and item["ISBN13"] is not None
):
new_log_item = import_by_id(item["ISBN13"], media_type, log)
elif "ISBN" in item and item["ISBN"] != "" and item["ISBN"] is not None:
new_log_item = import_by_id(item["ISBN13"], media_type, log)
else:
new_log_item = import_by_details(item, item_title, media_type)
if new_log_item is None:
if (
media_type in ["films", "tv-series", "tv-episodes"]
and "imdb_id" not in item
):
item["imdb_id"] = input(f"Enter IMDB ID for {item_title}: ")
if re.search("tt[0-9]+", item["imdb_id"]) is not None:
log_items[i] = import_by_id(item["imdb_id"], media_type)
with open(
f"./data/{media_type}/{log}.json", "w", encoding="utf-8"
) as log_file:
json.dump(log_items, log_file, indent=4)
elif "books" == media_type:
if "ISBN" not in item and "ISBN13" not in item:
item["ISBN"] = input(f"Enter ISBN for {item_title}: ")
if re.search("[0-9-]+", item["ISBN"]) is not None:
log_items[i] = import_by_id(item["ISBN"], media_type)
with open(
f"./data/{media_type}/{log}.json",
"w",
encoding='utf-8'
encoding="utf-8",
) as log_file:
json.dump(log_items, log_file, indent=4)
elif "books" == media_type:
if "ISBN" not in item and "ISBN13" not in item:
item["ISBN"] = input(f"Enter ISBN for {item_title}: ")
if re.search("[0-9-]+", item["ISBN"]) is not None:
log_items[i] = import_by_id(item["ISBN"], media_type)
with open(
f"./data/{media_type}/{log}.json",
"w",
encoding='utf-8'
) as log_file:
json.dump(log_items, log_file, indent=4)
else:
logger.warning(f"Skipped '{item_title}'")
log_items[i]["skip"] = True
else:
logger.warning(f"Skipped {item_title}")
logger.warning(f"Skipped '{item_title}'")
log_items[i]["skip"] = True
else:
log_items[i] = new_log_item
logger.warning(f"Skipped {item_title}")
if i % 3 == 0:
with open(
f"./data/{media_type}/{log}.json",
"w",
encoding='utf-8'
) as log_file:
json.dump(log_items, log_file, indent=4)
else:
log_items[i] = new_log_item
if log_items[i] is not None:
log_items[i] |= log_item_values
if i % 3 == 0:
with open(
f"./data/{media_type}/{log}.json", "w", encoding="utf-8"
) as log_file:
json.dump(log_items, log_file, indent=4)
logger.info("Saved…")
except KeyError:
print(json.dumps(item, indent=4))
if log_items[i] is not None:
log_items[i] |= log_item_values
with open(f"./data/{media_type}/{log}.json", "w", encoding='utf-8') as log_file:
with open(f"./data/{media_type}/{log}.json", "w", encoding="utf-8") as log_file:
json.dump(log_items, log_file, indent=4)
logger.info(f"Finished processing {media_type}/{log}")
@ -222,12 +239,69 @@ def import_by_details(item, item_title, media_type) -> dict:
return # import_from_tvdb_by_details(item, item_title, media_type)
if media_type in ["books"]:
return # import_from_openlibrary_by_details(item, item_title, media_type)
return import_from_openlibrary_by_details(item, item_title, media_type)
if media_type in ["games"]:
return # import_from_igdb_by_details(item, item_title, media_type)
def import_from_openlibrary_by_details(item, item_title, media_type) -> dict | None:
"""Retrieve a book from OpenLibrary using a title and author name"""
logger.info(f"Importing '{item_title}'")
api_url = f"https://openlibrary.org/search.json?title={slugify(item['Title'].split(':')[0], separator='%20')}&author={slugify(item['Author'], separator='%20')}"
# Sending API request
response = requests.get(api_url, headers={"accept": "application/json"}, timeout=15)
# Process the response
if 200 == response.status_code:
logger.debug(response.status_code)
elif 429 == response.status_code:
time.sleep(2)
return import_from_openlibrary_by_details(item, item_title, media_type)
elif 404 == response.status_code:
logger.error(f"{response.status_code}: Not Found for title '{item_title}'")
return None
else:
raise Exception(f"Error {response.status_code}: {response.text}")
results = json.loads(response.text)
logger.info(f"Found {results['num_found']} result{'s' if results['num_found'] != 1 else ''}")
if 0 < results["num_found"]:
result = results['docs'][0]
if 1 == results["num_found"]:
logger.info(f"Selecting OL ID {result['key']}")
item_id_parsed = re.search("(OL|tt)?[0-9]+[WMA]?", result['key'])
if item_id_parsed is not None:
return import_by_id(item_id_parsed[0], "books", "wishlist")
else:
if result['title'] == item['Title'].split(':')[0] and result['author_name'][0] == item['Author']:
logger.info(f"First result ({result['key']}) is a match!")
item_id_parsed = re.search("(OL|tt)?[0-9]+[WMA]?", result['key'])
if item_id_parsed is not None:
return import_by_id(item_id_parsed[0], "books", "wishlist")
else:
print(json.dumps({k: result.get(k, None) for k in ('author', 'title', 'first_publish_year')}, indent=4))
is_correct = input("Is this the correct result? [y/n]: ")
if "y" == is_correct:
logger.info(f"Selecting OL ID {result['key']}")
item_id_parsed = re.search("(OL|tt)?[0-9]+[WMA]?", result['key'])
if item_id_parsed is not None:
return import_by_id(item_id_parsed[0], "books", "wishlist")
logger.info(f"Returning nothing…")
return None
def import_from_tmdb_by_details(item, item_title, media_type) -> dict:
"""Retrieve a film or TV series from TMDB using its title"""
@ -244,7 +318,7 @@ def import_from_tmdb_by_details(item, item_title, media_type) -> dict:
"year": item["Release Year"] if "Release Year" in item else None,
},
headers={"Authorization": f"Bearer {TMDB_API_KEY}"},
timeout=15
timeout=15,
)
# Process the response
@ -281,17 +355,17 @@ def import_from_tmdb_by_details(item, item_title, media_type) -> dict:
logger.warning(f"Returned more than one {media_type} for '{item_title}':\n")
print(
json.dumps(
filtered_response_data
if len(filtered_response_data) > 0
else response_data,
(
filtered_response_data
if len(filtered_response_data) > 0
else response_data
),
indent=4,
)
)
last_index = len(filtered_response_data if frd_len > 0 else response_data) - 1
idx = input(
f"\nEnter the index of the result to use [0-{last_index}]: "
)
idx = input(f"\nEnter the index of the result to use [0-{last_index}]: ")
if "" != idx:
try: