Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 124 additions & 125 deletions ingestion/2-augment_cols.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
# "artists": final derived artists dataframe
# "misc": final derived ids/liners/promos/etc dataframe
# }
import pandas, requests, time, orjson, os
import pandas, requests, time, os
from sqlalchemy import create_engine
import db_utils

MBSEARCH_HEADERS = {
'User-Agent': 'wuvt.vt.edu ROLLED/1',
Expand All @@ -27,38 +29,34 @@

QUERY_SLEEP = 0.5

"""
`cache` here refers to the name of the .json cache used.
"""
def cached_mb_get(endpoint, cache):
cached_response = manifest.get(title+artist)
response = requests.get(
MBSEARCH_RELEASEGROUP_QUERY.format(
album_title=title,
artist_name=artist
),
headers=MBSEARCH_HEADERS
).json() if cached_response is None else cached_response

def init_manifest(path):
if not os.path.exists(path):
print("creating fresh cache manifest...")
with open(path, "w") as f:
f.write("{}")
with open(path) as f:
manifest = orjson.loads(f.read())
return manifest


def run(ctx, cfg):
# leaving artists/misc off the proto
res = {
"albums": ctx,
"songs": pandas.DataFrame(columns=["id", "title", "artist", "release_id", "release-group_id"]),
"songs": pandas.DataFrame(columns=[
"id", "title", "artist", "release_id", "release-group_id"]),
"artists": pandas.DataFrame(),
"misc": pandas.DataFrame(),
}

#load cache from db
pgc = cfg["postgres"]
db = create_engine(
f"postgresql://{pgc['username']}:{pgc['password']}@{pgc['host']}:{pgc['port']}/{pgc['database']}"
)
cache = {
"albums": db_utils.postgres_to_df(db, "albums"),
"songs": db_utils.postgres_to_df(db, "songs"),
"artists": db_utils.postgres_to_df(db, "artists"),
"misc": db_utils.postgres_to_df(db, "misc"),
}
skip_cache = {
"albums": cache["albums"] is None,
"songs": cache["songs"] is None,
"artists": cache["artists"] is None,
"misc": cache["misc"] is None,
}

# Create a empty release-group_id, release_id, and tracklist cols on the albums table
if "release-group_id" not in res["albums"].columns:
res["albums"]["release-group_id"] = pandas.Series([""]*len(res["albums"]), dtype="string")
Expand All @@ -68,124 +66,121 @@ def run(ctx, cfg):

"""
populating albums with per-row MusicBrainz release-group MBIDs:
- make a query to the search API (cached, /data/xxx_manifest.json)
- make a query to the search API (cached via postgres db)
- rate-limited heavily. will take a while if not cached.
- set if applicable. albums not found should have an mbid of ""
"""
start_time = time.time()

# load in already-cached search responses
manifest = init_manifest(RG_MANIFEST_PATH)
for index, row in res["albums"].iterrows():
for index, row in res["albums"].iterrows():
artist = str(row["artist_name"])
title = str(row["album_title"])

fetched = False

if row["release-group_id"] != "":
print("RG_ID: manual entry for", artist, title, ":",
row["release-group_id"])
continue

cached_response = manifest.get(title+artist)
try:
response = requests.get(
MBSEARCH_RELEASEGROUP_QUERY.format(
album_title=title,
# musicbrainz does not like commas on artist names
artist_name=artist.replace(',', '')
),
headers=MBSEARCH_HEADERS
).json() if cached_response is None else cached_response
if cached_response is None: time.sleep(QUERY_SLEEP)
except:
print("No release-group fetched for:", title, artist)
continue

if "release-groups" in response and len(response["release-groups"]) > 0:
res["albums"].at[index, "release-group_id"] = response["release-groups"][0]["id"]

if cached_response is None:
manifest[title+artist] = {
"release-groups": response["release-groups"]
}
with open(RG_MANIFEST_PATH, "wb") as f:
f.write(orjson.dumps(manifest, option=orjson.OPT_INDENT_2))
if not skip_cache["albums"]:
cached_rgid = cache["albums"].at[index, "release-group_id"]
res["albums"].at[index, "release-group_id"] = cached_rgid

if skip_cache["albums"] or cached_rgid == "":
fetched = True
try:
response = requests.get(
MBSEARCH_RELEASEGROUP_QUERY.format(
album_title=title,
# musicbrainz does not like commas on artist names
artist_name=artist.replace(',', '')
),
headers=MBSEARCH_HEADERS
).json()
time.sleep(QUERY_SLEEP)
except:
print("No release-group fetched for:", title, artist)
continue

if "release-groups" in response and len(response["release-groups"]) > 0:
res["albums"].at[index, "release-group_id"] = response["release-groups"][0]["id"]

print(
"RG_ID:", res["albums"].at[index, "release-group_id"],
time.time() - start_time,
"At:", index, "Left:", len(res["albums"]) - index,
"Fetched?:", "Y" if cached_response is None else "N"
)

start_time = time.time()
"Fetched?:", "Y" if fetched else "N"
)

"""
- attempt to find correct-ish release for each release-group
- prefer physical and US releases when possible
- add to albums dataframe
"""
start_time = time.time()
# load in already-cached search responses
r_manifest = init_manifest(R_MANIFEST_PATH)
for index, row in res["albums"].iterrows():
rg_mbid = str(row["release-group_id"])
label = str(row["label"])
date = str(row["release_year"])
countries = ["US", "XE"]
countries = ["US", "XE"]

fetched = False

if rg_mbid == "":
continue

if row["release_id"] != "":
print("R_ID: manual entry for", rg_mbid, ":", row["release_id"])
continue

cached_response = r_manifest.get(rg_mbid)
try:
response = requests.get(
MBSEARCH_RELEASE.format(mbid=rg_mbid),
headers=MBSEARCH_HEADERS
).json() if cached_response is None else cached_response
if cached_response is None: time.sleep(QUERY_SLEEP)
except:
print("No release fetched for:", rg_mbid)
continue
if not skip_cache["albums"]:
cached_rid = cache["albums"].at[index, "release_id"]
res["albums"].at[index, "release_id"] = cached_rid

if skip_cache["albums"] or cached_rid == "":
fetched = True
try:
response = requests.get(
MBSEARCH_RELEASE.format(mbid=rg_mbid),
headers=MBSEARCH_HEADERS
).json()
time.sleep(QUERY_SLEEP)
except:
print("No release fetched for:", rg_mbid)
continue

# If there are more than one release for the realease group
# filter by date, label, and country.
if "releases" in response and len(response["releases"]) > 0:
releases = response["releases"]

# If there are more than one release for the realease group
# filter by date, label, and country.
if "releases" in response and len(response["releases"]) > 0:
releases = response["releases"]

if len(releases) != 1:
releases = [
rel for rel in releases
if "date" in rel and rel["date"][:4] == date
] or releases

if len(releases) != 1:
releases = [
rel for rel in releases
if "label" in rel and rel["label"] == label
rel for rel in releases
if "date" in rel and rel["date"][:4] == date
] or releases

if len(releases) != 1:
releases = [
rel for rel in releases
if "country" in rel and rel["country"] in countries
rel for rel in releases
if "label" in rel and rel["label"] == label
] or releases

res["albums"].at[index, "release_id"] = releases[0]["id"]
if len(releases) != 1:
releases = [
rel for rel in releases
if "country" in rel and rel["country"] in countries
] or releases

if cached_response is None:
r_manifest[rg_mbid] = {
"releases": response["releases"]
}
with open(R_MANIFEST_PATH, "wb") as f:
f.write(orjson.dumps(r_manifest, option=orjson.OPT_INDENT_2))
res["albums"].at[index, "release_id"] = releases[0]["id"]

print(
"R_ID:", res["albums"].at[index, "release_id"],
time.time() - start_time,
"At:", index, "Left:", len(res["albums"]) - index,
"Fetched?:", "Y" if cached_response is None else "N"
"Fetched?:", "Y" if fetched else "N"
)

# songs
Expand All @@ -199,62 +194,66 @@ def run(ctx, cfg):
start_time = time.time()
count = 0

t_manifest = init_manifest(T_MANIFEST_PATH)
for index, row in res["albums"].iterrows():
rg_mbid = str(row["release_id"])
rg_mbid = str(row["release-group_id"])
r_mbid = str(row["release_id"])

if r_mbid == "":
count += 1
continue
fetched = False

cached_response = t_manifest.get(r_mbid)
try:
response = requests.get(
MBLOOKUP_RELEASE.format(mbid=r_mbid),
headers=MBSEARCH_HEADERS
).json() if cached_response is None else cached_response
if cached_response is None: time.sleep(QUERY_SLEEP)
except:
if r_mbid == "":
count += 1
continue

tracks = []
try:
for track in response["media"][0]["tracks"]:
tracks.append(track["title"])
res["songs"].loc[len(res["songs"])] = {
if not skip_cache["albums"] and not skip_cache["songs"]:
cached_tracklist = cache["albums"].at[index, "tracklist"]
if cached_tracklist != "":
res["songs"] = pandas.concat([
res["songs"],
cache["songs"][
cache["songs"]['release_id'] == r_mbid]
], ignore_index=True)
res["albums"].at[index, "tracklist"] = cached_tracklist

if skip_cache["albums"] or skip_cache["songs"] or cached_tracklist == "":
fetched = True
try:
response = requests.get(
MBLOOKUP_RELEASE.format(mbid=r_mbid),
headers=MBSEARCH_HEADERS
).json()
time.sleep(QUERY_SLEEP)
except:
count += 1
continue

tracks = []
try:
for track in response["media"][0]["tracks"]:
tracks.append(track["title"])
res["songs"].loc[len(res["songs"])] = {
"id": track["id"],
"title": track["title"],
"artist": row["artist_name"],
"release_id": row["release_id"],
"release-group_id": rg_mbid
}
except:
count += 1
continue

res["albums"].at[index, "tracklist"] = " / ".join(tracks)
except:
count += 1
continue

if cached_response is None:
t_manifest[r_mbid] = {
"media": response["media"]
}
with open(T_MANIFEST_PATH, "wb") as f:
f.write(orjson.dumps(t_manifest, option=orjson.OPT_INDENT_2))
res["albums"].at[index, "tracklist"] = " / ".join(tracks)

count += 1
print(
"TRACKLIST:", res["albums"].at[index, "tracklist"],
time.time() - start_time,
"At:", count, "Left:", len(res["albums"]) - count,
"Fetched?:", "Y" if cached_response is None else "N"
"Fetched?:", "Y" if fetched else "N"
)

# artists
res["artists"]["artist_name"] = ctx["artist_name"]

# misc
pass
# TODO misc

return res
4 changes: 2 additions & 2 deletions ingestion/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
FROM docker.io/library/python:3.13-slim

RUN pip install pandas sqlalchemy typesense openpyxl psycopg2-binary xlrd orjson
RUN pip install pandas sqlalchemy typesense openpyxl psycopg2-binary xlrd requests

WORKDIR /ingestion
COPY . /ingestion

CMD sleep 5 && python3 -u run_pipeline.py
CMD sleep 5 && python3 -u run_pipeline.py
19 changes: 19 additions & 0 deletions ingestion/db_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Util function for rolled postgreSQL DB.

import pandas, sqlalchemy

# Import sql table as a pandas dataframe
def postgres_to_df(engine: sqlalchemy.engine.Engine, table_name: str,
schema: str = "public") -> pandas.DataFrame | None:
inspector = sqlalchemy.inspect(engine)

# Check if table exists
if not inspector.has_table(table_name, schema=schema):
print(f"Table '{table_name}' does not exist.")
return None

# Load table into DataFrame
with engine.connect() as conn:
df = pandas.read_sql_table(table_name, conn, schema=schema)

return df