123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- #!/usr/bin/python3.8
- import sys
- import getopt
- import json
- import pymongo
- from bson.objectid import ObjectId
- import youtube_dl
- import ffmpy
- import os
- import eyed3
- from urllib import request
- import unicodedata
- import re
- from PIL import Image
- import PIL
- from alive_progress import alive_bar
- class bcolors:
- HEADER = "\033[95m"
- OKBLUE = "\033[94m"
- OKCYAN = "\033[96m"
- OKGREEN = "\033[92m"
- WARNING = "\033[93m"
- FAIL = "\033[91m"
- ENDC = "\033[0m"
- BOLD = "\033[1m"
- UNDERLINE = "\033[4m"
- def keys_exists(element, *keys):
- if not isinstance(element, dict):
- raise AttributeError("keys_exists() expects dict as first argument.")
- if len(keys) == 0:
- raise AttributeError("keys_exists() expects at least two arguments, one given.")
- _element = element
- for key in keys:
- try:
- _element = _element[key]
- except:
- return False
- return True
- def usage(e=False):
- print(f"""{bcolors.HEADER}Usage{bcolors.ENDC}
- -h,--help | Help
- -p playlistId,--playlist-id=playlistId | Musare Playlist ID
- -P playlistFile,--playlist-file=playlistFile | Musare Playlist JSON file
- -o outputPath,--output=outputPath | Output directory
- -f outputFormat,--format=outputFormat | Format of download, audio or video
- -i downloadImages,--images=downloadImages | Whether to download images, true or false
- --max-songs=maxSongs | Maximum number of songs to download
- --mongo-host=mongoHost | MongoDB Host
- --mongo-port=mongoPort | MongoDB Port
- --mongo-username=mongoUsername | MongoDB Username
- --mongo-password=mongoPassword | MongoDB Password
- --mongo-database=mongoDatabase | MongoDB Database
- """)
- if e:
- sys.exit(2)
- def slugify(value):
- value = unicodedata.normalize("NFKD", str(value)).encode("ascii", "ignore").decode("ascii")
- value = re.sub(r"[^(?!+)\w\s-]", "", value)
- return re.sub(r"[\s]+", "_", value).strip("-_")
- mongoSettings = {}
- playlistId = None
- playlistFile = None
- outputDir = os.path.join(os.getcwd(), "")
- outputFormat = None
- configFile = f"{outputDir}config.json"
- downloadImages = True
- maxSongs = 0
- os.chdir(outputDir)
- try:
- opts, args = getopt.getopt(
- sys.argv[1:],
- "p:P:o:f:i:c:h",
- [
- "playlist-id=",
- "playlist-file=",
- "output=",
- "format=",
- "images=",
- "config=",
- "help",
- "max-songs=",
- "mongo-host=",
- "mongo-port=",
- "mongo-username=",
- "mongo-password=",
- "mongo-database="
- ]
- )
- except getopt.GetoptError:
- usage(True)
- try:
- for opt, arg in opts:
- if opt in ("-c", "--config"):
- configFile = arg
- if not os.path.exists(configFile):
- sys.exit(f"{bcolors.FAIL}Error: Config file does not exist{bcolors.ENDC}")
- if os.path.exists(configFile):
- if os.path.isdir(configFile):
- sys.exit(f"{bcolors.FAIL}Error: Config file is a directory{bcolors.ENDC}")
- with open(configFile, "r") as configJson:
- config = json.load(configJson)
- for var in ["playlistId", "playlistFile", "outputPath", "outputFile", "downloadImages", "maxSongs"]:
- if keys_exists(config, var):
- globals()[var] = config[var]
- for param in ["host", "port", "username", "password", "database"]:
- if not keys_exists(mongoSettings, param) and keys_exists(config, "mongo", param):
- mongoSettings[param] = config["mongo"][param]
- if not keys_exists(mongoSettings, "host"):
- mongoSettings["host"] = "localhost"
- if not keys_exists(mongoSettings, "port"):
- mongoSettings["port"] = 27017
- if not keys_exists(mongoSettings, "username") or not keys_exists(mongoSettings, "password") or not keys_exists(mongoSettings, "database"):
- raise ValueError
- except ValueError:
- print(f"{bcolors.FAIL}Error: Mongo username, password and database required{bcolors.ENDC}")
- usage(True)
- except:
- sys.exit(f"{bcolors.FAIL}Error loading config.json{bcolors.ENDC}")
- for opt, arg in opts:
- if opt in ("-h", "--help"):
- usage(True)
- elif opt in ("-p", "--playlist-id"):
- playlistId = arg
- elif opt in ("-P", "--playlist-file"):
- playlistFile = arg
- elif opt in ("-o", "--output"):
- outputDir = arg
- elif opt in ("-f", "--format"):
- outputFormat = arg
- elif opt in ("-i", "--images"):
- downloadImages = arg
- elif opt in ("-c", "--config"):
- pass
- elif opt == "--max-songs":
- if arg.isdigit() == False:
- sys.exit(f"{bcolors.FAIL}Error: Invalid max-songs, must be int{bcolors.ENDC}")
- maxSongs = arg
- elif opt == "--mongo-host":
- mongoSettings["host"] = arg
- elif opt == "--mongo-port":
- mongoSettings["port"] = arg
- elif opt == "--mongo-username":
- mongoSettings["username"] = arg
- elif opt == "--mongo-password":
- mongoSettings["password"] = arg
- elif opt == "--mongo-database":
- mongoSettings["database"] = arg
- else:
- usage(True)
- if not playlistId and not playlistFile:
- print(f"{bcolors.FAIL}Error: Playlist ID or Playlist File need to be specified{bcolors.ENDC}")
- usage(True)
- if playlistId and playlistFile:
- print(f"{bcolors.FAIL}Error: Playlist ID and Playlist File can not be used at the same time{bcolors.ENDC}")
- usage(True)
- if playlistId and len(playlistId) != 24:
- sys.exit(f"{bcolors.FAIL}Error: Invalid Musare Playlist ID{bcolors.ENDC}")
- if playlistFile:
- if not os.path.exists(playlistFile):
- sys.exit(f"{bcolors.FAIL}Error: Musare Playlist File does not exist{bcolors.ENDC}")
- if os.path.isdir(playlistFile):
- sys.exit(f"{bcolors.FAIL}Error: Musare Playlist File is a directory{bcolors.ENDC}")
- if os.path.exists(outputDir):
- if not os.path.isdir(outputDir):
- sys.exit(f"{bcolors.FAIL}Error: Output path is not a directory{bcolors.ENDC}")
- outputDir = os.path.join(outputDir, "")
- else:
- sys.exit(f"{bcolors.FAIL}Error: Output directory does not exist{bcolors.ENDC}")
- if outputFormat:
- outputFormat = outputFormat.lower()
- if outputFormat != "audio" and outputFormat != "video":
- sys.exit(f"{bcolors.FAIL}Error: Invalid format, audio or video only{bcolors.ENDC}")
- else:
- outputFormat = "audio"
- if str(downloadImages).lower() == "true":
- downloadImages = True
- elif str(downloadImages).lower() == "false":
- downloadImages = False
- else:
- sys.exit(f"{bcolors.FAIL}Error: Invalid images, must be True or False{bcolors.ENDC}")
- maxSongs = int(maxSongs)
- if playlistId and not playlistFile:
- try:
- mongo = pymongo.MongoClient(
- host=mongoSettings["host"],
- port=int(mongoSettings["port"]),
- username=mongoSettings["username"],
- password=mongoSettings["password"],
- authSource=mongoSettings["database"]
- )
- mydb = mongo[mongoSettings["database"]]
- songs = []
- for playlist in mydb["playlists"].find({ "_id": ObjectId(playlistId) }, { "songs._id" }):
- for song in playlist["songs"]:
- songs.append(song["_id"])
- songsCount = mydb["songs"].count_documents({ "_id": { "$in": songs } })
- songs = mydb["songs"].find({ "_id": { "$in": songs } })
- except:
- sys.exit(f"{bcolors.FAIL}Error: Could not load songs from Mongo{bcolors.ENDC}")
- elif not playlistId and playlistFile:
- try:
- with open(playlistFile, "r") as playlistJson:
- songs = json.load(playlistJson)
- songs = songs["playlist"]["songs"]
- songsCount = len(songs)
- except:
- sys.exit(f"{bcolors.FAIL}Error: Could not load songs from playlist JSON file{bcolors.ENDC}")
- if not os.access(outputDir, os.W_OK):
- sys.exit(f"{bcolors.FAIL}Error: Unable to write to output directory{bcolors.ENDC}")
- if os.getcwd() != outputDir:
- os.chdir(outputDir)
- if downloadImages and not os.path.exists("images"):
- os.makedirs("images")
- i = 0
- completeSongs = []
- failedSongs = []
- if maxSongs > 0 and songsCount > maxSongs:
- songsCount = maxSongs
- with alive_bar(songsCount, title=f"{bcolors.BOLD}{bcolors.OKCYAN}musare-dl{bcolors.ENDC}") as bar:
- for song in songs:
- i = i + 1
- if maxSongs != 0 and i > maxSongs:
- i = i - 1
- break
- try:
- bar.text(f"{bcolors.OKBLUE}Downloading ({song['_id']}) {','.join(song['artists'])} - {song['title']}..{bcolors.ENDC}")
- ydl_opts = {
- "outtmpl": f"{song['_id']}.tmp",
- "quiet": True,
- "no_warnings": True
- }
- if outputFormat == "audio":
- outputExtension = "mp3"
- ydl_opts["format"] = "bestaudio[ext=m4a]"
- elif outputFormat == "video":
- outputExtension = "mp4"
- ydl_opts["format"] = "best[height<=1080]/bestaudio"
- ffmpegOpts = ["-hide_banner", "-loglevel", "error"]
- if keys_exists(song, "skipDuration") and keys_exists(song, "duration"):
- ffmpegOpts.append("-ss")
- ffmpegOpts.append(str(song["skipDuration"]))
- ffmpegOpts.append("-t")
- ffmpegOpts.append(str(song["duration"]))
- with youtube_dl.YoutubeDL(ydl_opts) as ydl:
- ydl.download([f"https://www.youtube.com/watch?v={song['youtubeId']}"])
- bar.text(f"{bcolors.OKBLUE}Converting ({song['_id']}) {','.join(song['artists'])} - {song['title']}..{bcolors.ENDC}")
- ff = ffmpy.FFmpeg(
- inputs={ f"{song['_id']}.tmp": None },
- outputs={ f"{song['_id']}.{outputExtension}": ffmpegOpts }
- )
- ff.run()
- os.remove(f"{song['_id']}.tmp")
- if outputFormat == "audio":
- track = eyed3.load(f"{song['_id']}.{outputExtension}")
- track.tag.artist = ";".join(song["artists"])
- track.tag.title = str(song["title"])
- if keys_exists(song, "discogs", "album", "title"):
- track.tag.album = str(song["discogs"]["album"]["title"])
- fileName = slugify(f"{'+'.join(song['artists'])}-{song['title']}-{song['_id']}")
- bar.text(f"{bcolors.OKBLUE}Downloading Images for ({song['_id']}) {','.join(song['artists'])} - {song['title']}..{bcolors.ENDC}")
- if downloadImages:
- try:
- imgRequest = request.Request(
- song["thumbnail"],
- headers={
- "User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/93.0.4577.82 Safari/537.36"
- }
- )
- img = Image.open(request.urlopen(imgRequest))
- img.save(f"images/{fileName}.jpg", "JPEG")
- if outputFormat == "audio":
- thumb = img.resize((32, 32), Image.Resampling.LANCZOS)
- thumb.save(f"images/{fileName}.thumb.jpg", "JPEG")
- imgData = open(f"images/{fileName}.jpg", "rb").read()
- thumbData = open(f"images/{fileName}.thumb.jpg", "rb").read()
- track.tag.images.set(1, thumbData, "image/jpeg", u"icon")
- track.tag.images.set(3, imgData, "image/jpeg", u"cover")
- except:
- print(f"{bcolors.FAIL}Error downloading album art for ({song['_id']}) {','.join(song['artists'])} - {song['title']}, skipping.{bcolors.ENDC}")
- if outputFormat == "audio":
- track.tag.save()
- os.rename(f"{song['_id']}.{outputExtension}", f"{fileName}.{outputExtension}")
- completeSongs.append(str(song["_id"]))
- print(f"{bcolors.OKGREEN}Downloaded ({song['_id']}) {','.join(song['artists'])} - {song['title']}{bcolors.ENDC}")
- except KeyboardInterrupt:
- print(f"{bcolors.FAIL}Cancelled downloads{bcolors.ENDC}")
- break
- except:
- failedSongs.append(str(song["_id"]))
- print(f"{bcolors.FAIL}Error downloading ({song['_id']}) {','.join(song['artists'])} - {song['title']}, skipping.{bcolors.ENDC}")
- bar()
- if len(failedSongs) > 0:
- print(f"\n{bcolors.FAIL}Failed Songs ({len(failedSongs)}): {', '.join(failedSongs)}{bcolors.ENDC}")
|