# This program will download artists from a queue file. # Multiple instances of this program can run simultaniously. # Previously downloaded albums and singles are skipped, newly found albums and singles are downloaded. # Unrecognized files and folders will be removed; # Use this program with flag --test to see what it will download. ## TODO: actually verify if the download result is correct, stop otherwise. ## TODO: remove .webp files from failed downloads. ## TODO: take album limit as parameter. ## from ytmusicapi import YTMusic import sys import argparse import os import subprocess import time from subprocess import Popen, PIPE, STDOUT import shutil class QueueEmptyException(Exception): def __init__(self, message): self.message = message super().__init__(self.message) class InvalidPathException(Exception): def __init__(self, message): self.message = message super().__init__(self.message) class DownloadFailedException(Exception): def __init__(self, message): self.message = message super().__init__(self.message) def remove_external_albums(name, complete_albums): directory_path = output_dir+'/'+name if not os.path.exists(directory_path): return False album_titles = [album['title'].replace("/", "-") for album in complete_albums] # / in title will break path. album_titles = album_titles.copy() album_titles.append('Singles') filenames = [f for f in os.listdir(directory_path) if os.path.isdir(os.path.join(directory_path, f))] # Find .webp files from failed downloads. # Find albums that are on disk, but are not is discography list. #unknown_albums = [elem for elem in filenames if elem not in album_titles] #for external_album in unknown_albums: # directory_path = output_dir+'/'+name+'/'+external_album # if os.path.exists(directory_path): # if not args.test: # shutil.rmtree(directory_path) # print(f'Deleted album "{name}/{external_album}"') return True def contains_list(main_list, sub_list): return any(item == sub_list for item in main_list) def album_is_complete(directory_path, albums, singles): if os.path.exists(directory_path): album_titles = [album['title'].replace("/", "-") for album in albums] existing_albums = [f for f in os.listdir(directory_path) if os.path.isdir(os.path.join(directory_path, f))] if contains_list(existing_albums, album_titles): return True return False try: from subprocess import DEVNULL # py3k except ImportError: import os DEVNULL = open(os.devnull, 'wb') parser = argparse.ArgumentParser(description='Music downloader.') parser.add_argument('--test', action='store_true', help='Test script without downloading anything.') parser.add_argument('--update', action='store_true', help='Retrieve artists from output directory and download missing albums and singles. Overwrites queue file.') parser.add_argument('--out', type=str, default='download', help='Output directory. (default: folder /download/ will be created in active directory.)') parser.add_argument('--queue', type=str, default='queue.txt', help='Input queue file. .txt file with artists separated by newline. When testing with --test, test_ is prepended to queue file. e.g. test_queue.txt') args = parser.parse_args() NeedRestart = True DidInitialUpdate = False ArtistToDownload = None while (NeedRestart): print("Initializing") ytmusic = YTMusic() queue_test_file = 'test_' + args.queue queue_file = args.queue output_dir = args.out do_download = True NeedRestart = False if args.test: do_download = False queue_file = queue_test_file # Write out own queue file if updating existing library. if args.update and not DidInitialUpdate: DidInitialUpdate = True print('Updating existing library: getting artist list...') if not os.path.exists(output_dir): raise InvalidPathException filenames = [f for f in os.listdir(output_dir) if os.path.isdir(os.path.join(output_dir, f))] filenames.sort() print(f'Found {len(filenames)} artists to check.') with open(queue_file, 'w') as file: for line in filenames: file.write(line + '\n') try: while True: failCount = 0 print('') ## Pop item from top of list and write back file. with open(queue_file, 'r') as file: lines = file.readlines() lines = [line.strip() for line in lines if line.strip()] if len(lines) == 0: raise QueueEmptyException if ArtistToDownload == None: ArtistToDownload = lines.pop(0) with open(queue_file, 'w') as file: for line in lines: file.write(line + '\n') artist_id_to_download = 0 ## Find artist id by name. search_results = ytmusic.search(ArtistToDownload, filter='artists') if search_results: for artist in search_results: artist_id_to_download = artist['browseId'] print(f"Found artist: {artist['artist']}, ID: {artist['browseId']} ({len(lines)} left in queue)") break else: print(f"No artists found for {ArtistToDownload}") continue artist_result = ytmusic.get_artist(artist_id_to_download) artist_name = ArtistToDownload.replace("/", "-") # / in title will break path. #artist_result['name'] ## Collect albums and singles to download. albums = [] singles = [] if 'albums' in artist_result: albums = artist_result['albums']['results'] if artist_result['albums']['browseId'] != None: albums = ytmusic.get_artist_albums(artist_result["albums"]["browseId"], artist_result["albums"]["params"], 200, 'Popularity') if 'singles' in artist_result: singles = artist_result['singles']['results'] if artist_result['singles']['browseId'] != None: singles = ytmusic.get_artist_albums(artist_result["singles"]["browseId"], artist_result["singles"]["params"], 9999) remove_external_albums(artist_name, albums) directory_path = output_dir+'/'+artist_name if album_is_complete(directory_path, albums, singles): print(f'Artist {artist_name} is complete.') continue # 1. Download albums print('Artist "' + artist_name + '" has ' + str(len(albums)) + ' albums...') for index, album in enumerate(albums): while True: albumdata = ytmusic.get_album(album['browseId']) album_name = albumdata['title'].replace("/", "-") # / in title will break path. # Skip existing albums. directory_path = output_dir+'/'+artist_name+'/'+album_name if os.path.exists(directory_path) and os.path.isdir(directory_path): print(f'({index+1}/{len(albums)}) Skipping existing album "{album_name}"') break print(f'Downloading "{album_name}"', end='\r') # Construct the command as a list command = [ './yt-dlp_linux', f"https://music.youtube.com/playlist?list={albumdata['audioPlaylistId']}", '-o', output_dir+'/'+artist_name+'/'+album_name+'/%(title)s.%(ext)s', # Adjust the path as needed '-x', '--audio-format', 'mp3', '--embed-thumbnail', '--add-metadata', '--no-overwrites' ] if not do_download: print(f'({index+1}/{len(albums)}) Skipped download of album {album_name}') break p = subprocess.Popen(command, stdin=PIPE, stdout=DEVNULL, stderr=STDOUT) (output, err) = p.communicate() p_status = p.wait() directory_path = output_dir+'/'+artist_name+'/'+album_name if (os.path.exists(directory_path) and os.path.isdir(directory_path)): print(f'({index+1}/{len(albums)}) Downloaded "{album_name}"') break else: failCount += 1 print('Download failed') if failCount >= 5: raise DownloadFailedException # 2. Download singles print('Artist "' + artist_name + '" has ' + str(len(singles)) + ' singles...') for index, single in enumerate(singles): while True: singledata = ytmusic.get_album(single['browseId']) single_name = singledata['title'].replace("/", "-") # / in title will break path. # Skip existing albums. directory_path = output_dir+'/'+artist_name+'/Singles/'#+single_name+'.mp3' if os.path.exists(directory_path): print(f'({index+1}/{len(singles)}) Skipping existing single "{single_name}"') break print(f'Downloading "{single_name}"', end='\r') # Construct the command as a list command = [ './yt-dlp_linux', f"https://music.youtube.com/playlist?list={singledata['audioPlaylistId']}", '-o', output_dir+'/'+artist_name+'/Singles/%(title)s.%(ext)s', # Adjust the path as needed '-x', '--audio-format', 'mp3', '--embed-thumbnail', '--add-metadata', '--no-overwrites' ] if not do_download: print(f'({index+1}/{len(singles)}) Skipped download of single {single_name}') break p = subprocess.Popen(command, stdin=PIPE, stdout=DEVNULL, stderr=STDOUT) (output, err) = p.communicate() p_status = p.wait() #if os.path.exists(directory_path): print(f'({index+1}/{len(singles)}) Downloaded "{single_name}"') break #else: # print('Download failed') ArtistToDownload = None # Continue to next item in queue. except KeyboardInterrupt: print('Download cancelled') break except DownloadFailedException: print('Download failed exception thrown:') NeedRestart = True print('Attempting restart') time.sleep(5) except QueueEmptyException: print("Done; Queue is empty") break except InvalidPathException: print("ERROR: output path does not exist") break except: print('Exception cought:') NeedRestart = True print('Attempting restart') time.sleep(5) print('Download finished')