Cataloguer/scripts/json_to_sql.py

278 lines
9.2 KiB
Python
Raw Normal View History

2024-05-05 09:51:22 +00:00
import sqlite3
import json
import traceback
json_keys = {
"books": {
"name_key": "title",
"item": {
"title",
"subtitle",
"edition_name",
"full_title",
"ol_id",
"isbn_10",
"isbn_13",
"added_by_id",
"covers",
"publish_date",
"publishers",
"physical_format",
"description",
},
"genres": "genres",
"collections": "series",
"work": "work",
"creators": "authors",
"languages": "languages",
"countries": "publish_country",
"entry": {"date_added", "date_started", "date_finished", "comments"},
},
"films": {
"name_key": "title",
"original_name_key": "original_title",
"original_language_key": "original_language",
"item": {
"title",
"imdb_id",
"tmdb_id",
"added_by_id",
"poster_path",
"release_date",
"overview",
"original_title",
"original_language",
},
"collections": {"key": "belongs_to_collection", "fields": {"name"}},
"languages": {
"key": "spoken_languages",
"fields": {"english_name", "iso_639_1"},
},
"countries": {"key": "production_countries", "fields": {"name", "iso_3166_1"}},
"entry": {"date_added", "date_started", "date_finished", "comments"},
},
"tv-series": {
"name_key": "name",
"original_name_key": "original_name",
"original_language_key": "original_language",
"item": {
"name",
"tmdb_id",
"tvdb_id",
"added_by_id",
"poster_url",
"overview",
"first_air_date",
"original_name",
"original_language",
},
"languages": {
"key": "spoken_languages",
"fields": {"english_name", "iso_639_1"},
},
"countries": {"key": "production_countries", "fields": {"name", "iso_3166_1"}},
"entry": {"date_added", "date_started", "date_finished", "comments"},
},
"tv-episodes": {
"name_key": "name",
"original_name_key": "original_name",
"original_language_key": "original_language",
"item": {
"name",
"tmdb_id",
"tvdb_id",
"added_by_id",
"overview",
"air_date",
"series",
"episode_number",
"season_number",
},
"languages": {
"key": "spoken_languages",
"fields": {"english_name", "iso_639_1"},
},
"countries": {"key": "production_countries", "fields": {"name", "iso_3166_1"}},
"entry": {"date_added", "date_finished", "comments"},
},
}
sql_columns = {
"films": {
"unique_keys": ["tmdb_id"],
"languages": {
"keys": ["name", "iso_639_code"],
"unique_keys": ["iso_639_code"],
"join_keys": ["film_id", "language_id"],
},
"countries": {
"keys": ["name", "iso_3166_code"],
"unique_keys": ["iso_3166_code"],
"join_keys": ["film_id", "country_id"],
}
},
"tv-episodes": {
"unique_keys": ["tmdb_id", "imdb_id"],
"languages": {
"keys": ["name", "iso_639_code"],
"unique_keys": ["iso_639_code"],
"join_keys": ["tv_episode_id", "language_id"],
}
},
"tv-series": {
"unique_keys": ["tmdb_id", "imdb_id"],
"languages": {
"keys": ["name", "iso_639_code"],
"unique_keys": ["iso_639_code"],
"join_keys": ["tv_episode_id", "language_id"],
},
"countries": {
"keys": ["name", "iso_3166_code"],
"unique_keys": ["iso_3166_code"],
"join_keys": ["tv_series_id", "country_id"],
}
}
}
def insert_from_json(media_type, log):
media_type_pl = get_media_type_pl(media_type)
json_path = f"./data/{media_type_pl}/{log}.json"
db_path = "./data/media.db"
with open(json_path, "r") as file:
data = json.load(file)
print(f"Results: {len(data)}")
conn = sqlite3.connect(db_path)
conn.isolation_level = None
cur = conn.cursor()
cur.execute('BEGIN')
try:
for entry in reversed(data):
print(f"Importing {entry.get( json_keys[media_type_pl]['name_key'] )}")
# Insert item
item_entry = {
key: entry[key] for key in entry.keys() & json_keys[media_type_pl]["item"]
}
if item_entry.get(json_keys[media_type_pl]['original_name_key']) is not None:
item_entry["title"] = item_entry.pop( json_keys[media_type_pl]['name_key'] )
item_entry["title_original"] = item_entry.pop(json_keys[media_type_pl]['original_name_key'] )
item_entry["title_original_language"] = item_entry.pop(json_keys[media_type_pl]['original_language_key'] )
else:
item_entry["title"] = item_entry.pop( json_keys[media_type_pl]['name_key'] )
keys = ", ".join(item_entry.keys())
unique_keys = ", ".join(sql_columns[media_type_pl]['unique_keys'])
question_marks = ", ".join(["?" for _ in item_entry])
values = tuple(item_entry.values())
cur.execute(
f"INSERT INTO '{media_type_pl}' ({keys}) VALUES ({question_marks}) ON CONFLICT({unique_keys}) DO UPDATE SET ({keys}) = ({question_marks}) RETURNING id",
values + values,
)
row = cur.fetchone()
(inserted_id,) = row if row else None
# Join tables
for join_type in ["languages", "countries"]:
if entry.get( json_keys[media_type_pl][join_type]["key"] ) is not None:
for join_item in entry.get( json_keys[media_type_pl][join_type]["key"] ):
print(f"Importing {join_type} {join_item}")
values = {
key: join_item[key]
for key in join_item.keys()
& json_keys[media_type_pl][join_type]["fields"]
}
insert_join(
inserted_id,
f"{join_type}",
f"{media_type_pl}_{join_type}",
sql_columns[media_type_pl][join_type]["join_keys"],
sql_columns[media_type_pl][join_type]["keys"],
values,
sql_columns[media_type_pl][join_type]["unique_keys"],
cur,
conn
)
# Log Entry
item_log_entry = {
key: entry[key] for key in entry.keys() & json_keys[media_type_pl]["entry"]
}
if item_log_entry.get("date_added") is not None:
item_log_entry["log"] = log
item_log_entry[f"{media_type}_id"] = inserted_id
print(f"Importing log entry added {item_log_entry.get('date_added')}")
keys = ", ".join(item_log_entry.keys())
question_marks = ", ".join(["?" for _ in item_log_entry])
values = tuple(item_log_entry.values())
cur.execute(
f"INSERT INTO '{media_type_pl}_log-entries' ({keys}) VALUES ({question_marks})",
values,
)
else:
print(f"No log details for {entry.get('name')}!")
except Exception:
print(traceback.format_exc())
cur.execute('ROLLBACK')
else:
conn.commit()
conn.close()
def insert_join(
media_id,
table_name,
join_table_name,
join_keys,
data_keys,
data_values,
data_unique,
cur,
conn,
):
keys = ", ".join(data_keys)
unique_keys = ", ".join(data_unique)
question_marks = ", ".join(["?" for _ in data_keys])
values = tuple(data_values)
cur.execute(
f"INSERT INTO '{table_name}' ({keys}) VALUES ({question_marks}) ON CONFLICT({unique_keys}) DO UPDATE SET ({keys}) = ({question_marks}) RETURNING id",
values + values,
)
row = cur.fetchone()
(data_id,) = row if row else None
if data_id is not None:
keys = ", ".join(join_keys)
print(f"Matching item ID {media_id} to data ID {data_id}")
cur.execute(
f"INSERT INTO '{join_table_name}' ({keys}) VALUES ({media_id}, {data_id}) ON CONFLICT({keys}) DO NOTHING"
)
def get_media_type_pl(media_type):
if media_type in [ 'tv-series' ]:
return media_type
else:
return media_type + 's'
# insert_from_json('./data/tv-series/log.json', './data/media.db', 'tv-series', 'log')
#insert_from_json("./data/tv-series/wishlist.json", "./data/media.db", "tv-series", "wishlist")
#insert_from_json("./data/tv-series/current.json", "./data/media.db", "tv-series", "current")
insert_from_json('film', 'log')
#insert_from_json("./data/films/wishlist.json", "./data/media.db", "films", "wishlist")