# This program will download artists from a queue file. # Multiple instances of this program can run simultaniously. # Previously downloaded albums and singles are skipped, newly found albums and singles are downloaded. # Use this program with flag --test to see what it will download. from ytmusicapi import YTMusic import sys import argparse import os import subprocess from subprocess import Popen, PIPE, STDOUT try: from subprocess import DEVNULL # py3k except ImportError: import os DEVNULL = open(os.devnull, 'wb') parser = argparse.ArgumentParser(description='Music downloader.') parser.add_argument('--test', action='store_true', help='Test script without downloading anything.') parser.add_argument('--update', action='store_true', help='Retrieve artists from output directory and download missing albums and singles. Overwrites queue file.') parser.add_argument('--out', type=str, default='download', help='Output directory. (default: folder /download/ will be created in active directory.)') parser.add_argument('--queue', type=str, default='queue.txt', help='Input queue file. .txt file with artists separated by newline. When testing with --test, test_ is prepended to queue file. e.g. test_queue.txt') args = parser.parse_args() ytmusic = YTMusic() queue_test_file = 'test_' + args.queue queue_file = args.queue output_dir = args.out do_download = True if args.test: do_download = False queue_file = queue_test_file # Write out own queue file if updating existing library. if args.update: print('Updating existing library: getting artist list...') filenames = [f for f in os.listdir(output_dir) if os.path.isdir(os.path.join(output_dir, f))] filenames.sort() print(f'Found {len(filenames)} artists to check.') with open(queue_file, 'w') as file: for line in filenames: file.write(line + '\n') def artist_is_external(name, complete_albums): directory_path = output_dir+'/'+name if not os.path.exists(directory_path): return False album_titles = [album['title'] for album in complete_albums] album_titles = album_titles.copy() album_titles.append('Singles') filenames = [f for f in os.listdir(directory_path) if os.path.isdir(os.path.join(directory_path, f))] # Find albums that are on disk, but are not is discography list. unknown_albums = [elem for elem in filenames if elem not in album_titles] if len(unknown_albums) > 0: return True return False try: while True: print('') ## Pop item from top of list and write back file. with open(queue_file, 'r') as file: lines = file.readlines() lines = [line.strip() for line in lines if line.strip()] artist_to_download = lines.pop(0) with open(queue_file, 'w') as file: for line in lines: file.write(line + '\n') artist_id_to_download = 0 ## Find artist id by name. search_results = ytmusic.search(artist_to_download, filter='artists') if search_results: for artist in search_results: artist_id_to_download = artist['browseId'] print(f"Found artist: {artist['artist']}, ID: {artist['browseId']} ({len(lines)} left in queue)") break else: print(f"No artists found for {artist_to_download}") continue artist_result = ytmusic.get_artist(artist_id_to_download) artist_name = artist_to_download #artist_result['name'] ## Collect albums and singles to download. albums = [] singles = [] if 'albums' in artist_result: albums = artist_result['albums']['results'] if artist_result['albums']['browseId'] != None: albums = ytmusic.get_artist_albums(artist_result["albums"]["browseId"], artist_result["albums"]["params"], 9999) if 'singles' in artist_result: singles = artist_result['singles']['results'] if artist_result['singles']['browseId'] != None: singles = ytmusic.get_artist_albums(artist_result["singles"]["browseId"], artist_result["singles"]["params"], 9999) if artist_is_external(artist_name, albums): print(f'Artist {artist_name} was downloaded externally and has been skipped.') continue # Check if artist albums and singles are downloaded already by counting. #directory_path = output_dir+'/'+artist_name+'/Singles' #existing_single_count = sum(1 for entry in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, entry))) directory_path = output_dir+'/'+artist_name.replace('/', r'\/') existing_album_count = sum(1 for entry in os.listdir(directory_path) if os.path.isdir(os.path.join(directory_path, entry))) if existing_album_count == len(albums)+1: # Ideally we check single count here too but we have no way to find out how many singles the artist actually has. print(f'Artist {artist_name} is complete.') continue # 1. Download albums print('Artist "' + artist_name + '" has ' + str(len(albums)) + ' albums...') for index, album in enumerate(albums): albumdata = ytmusic.get_album(album['browseId']) album_name = albumdata['title'] # Skip existing albums. directory_path = output_dir+'/'+artist_name+'/'+album_name if os.path.exists(directory_path) and os.path.isdir(directory_path): print(f'Skipping existing album "{album_name}"') continue print(f'Downloading "{album_name}"', end='\r') # Construct the command as a list command = [ './yt-dlp_linux', f"https://music.youtube.com/playlist?list={albumdata['audioPlaylistId']}", '-o', output_dir+'/'+artist_name+'/'+album_name+'/%(title)s.%(ext)s', # Adjust the path as needed '-x', '--audio-format', 'mp3', '--embed-thumbnail', '--add-metadata', '--no-overwrites' ] if not do_download: print(f'Skipped download of album {album_name}') continue p = subprocess.Popen(command, stdin=PIPE, stdout=DEVNULL, stderr=STDOUT) (output, err) = p.communicate() p_status = p.wait() directory_path = output_dir+'/'+artist_name+'/'+album_name if os.path.exists(directory_path) and os.path.isdir(directory_path): print(f'Downloaded "{album_name} ({index+1}/{len(albums)})"') else: print('Download failed') exit() # 2. Download singles print('Artist "' + artist_name + '" has ' + str(len(singles)) + ' singles...') for index, single in enumerate(singles): singledata = ytmusic.get_album(single['browseId']) single_name = singledata['title'] # Skip existing albums. directory_path = output_dir+'/'+artist_name+'/Singles/'+single_name+'.mp3' if os.path.exists(directory_path): print(f'Skipping existing single "{single_name}"') continue print(f'Downloading "{single_name}"', end='\r') # Construct the command as a list command = [ './yt-dlp_linux', f"https://music.youtube.com/playlist?list={singledata['audioPlaylistId']}", '-o', output_dir+'/'+artist_name+'/Singles/%(title)s.%(ext)s', # Adjust the path as needed '-x', '--audio-format', 'mp3', '--embed-thumbnail', '--add-metadata', '--no-overwrites' ] if not do_download: print(f'Skipped download of single {single_name}') continue p = subprocess.Popen(command, stdin=PIPE, stdout=DEVNULL, stderr=STDOUT) (output, err) = p.communicate() p_status = p.wait() if os.path.exists(directory_path): print(f'Downloaded "{single_name} ({index+1}/{len(singles)})"') else: print('Download failed') exit() except KeyboardInterrupt: print('Download cancelled') print('Download finished')