diff options
-rw-r--r-- | test.py | 369 |
1 files changed, 369 insertions, 0 deletions
@@ -0,0 +1,369 @@ +import os +import requests +from requests.adapters import HTTPAdapter +from urllib3.util import Retry +from tqdm import tqdm +import pylrc +import json +from PIL import Image +from multiprocessing import Pool, Manager,Lock, Value +from mutagen.easyid3 import EasyID3 +from mutagen.id3 import APIC, SYLT, Encoding, ID3 +from mutagen.flac import Picture, FLAC +from pydub import AudioSegment +import time +import datetime +import sys +import random +def make_valid(filename): + # Make a filename valid in different OSs + f = filename.replace(':', '_') + f = f.replace('/', '_') + f = f.replace('<', '_') + f = f.replace('>', '_') + f = f.replace('\'', '_') + f = f.replace('\\', '_') + f = f.replace('|', '_') + f = f.replace('?', '_') + f = f.replace('*', '_') + return f + + +def lyric_file_to_text(filename): + lrc_file = open(filename, 'r', encoding='utf-8') + lrc_string = ''.join(lrc_file.readlines()) + lrc_file.close() + subs = pylrc.parse(lrc_string) + ret = [] + for sub in subs: + time = int(sub.time * 1000) + text = sub.text + ret.append((text, time)) + return ret + +def update_downloaded_albums(queue, directory): + while 1: + album_name = queue.get() + try: + with open(directory + 'completed_albums.json', 'r', encoding='utf8') as f: + completed_albums = json.load(f) + except: + completed_albums = [] + completed_albums.append(album_name) + with open(directory + 'completed_albums.json', 'w+', encoding='utf8') as f: + json.dump(completed_albums, f) + + +def fill_metadata(filename, filetype, album, title, albumartist, artist, tracknumber, albumcover, songlyricpath): + if filetype == '.mp3': + file = EasyID3(filename) + else: + file = FLAC(filename) + + file['album'] = album + file['title'] = title + file['albumartist'] = ''.join(albumartist) + file['artist'] = ''.join(artist) + file['tracknumber'] = str(tracknumber + 1) + file.save() + + if filetype == '.mp3': + file = ID3(filename) + file.add(APIC(mime='image/png',type=3,desc='Cover',data=open(albumcover,'rb').read())) + # Read and add lyrics + if (songlyricpath != None): + sylt = lyric_file_to_text(songlyricpath) + file.setall('SYLT', [SYLT(encoding=Encoding.UTF8, lang='eng', format=2, type=1, text=sylt)]) + file.save() + else: + image = Picture() + image.type = 3 + image.desc = 'Cover' + image.mime = 'image/png' + with open(albumcover,'rb') as f: + image.data = f.read() + with Image.open(albumcover) as imagePil: + image.width, image.height = imagePil.size + image.depth = 24 + file.add_picture(image) + # Read and add lyrics + if (songlyricpath != None): + musiclrc = open(songlyricpath, 'r', encoding='utf-8').read() + file['lyrics'] = musiclrc + file.save() + + return + + + +def download_song(session, directory, name, url, song_counter, lock,file_format): + # Set timeout and retry parameters + time.sleep(3) + timeout = 10 + retries = 5 + + source = session.get(url, stream=True) + filename = directory + '/' + make_valid(name) + filetype = '' + + if source.headers['content-type'] == 'audio/mpeg': + filename += '.mp3' + filetype = '.mp3' + else: + filename += '.wav' + + # Download song with retries and timeout + total = int(source.headers.get('content-length', 0)) + downloaded = 0 + retry_count = 0 + while downloaded < total: + try: + with open(filename, 'ab') as f, tqdm( + desc=name, + total=total, + initial=downloaded, + unit='iB', + unit_scale=True, + unit_divisor=1024, + ) as bar: + f.seek(downloaded) + for data in source.iter_content(chunk_size = 1024): + size = f.write(data) + downloaded += size + bar.update(size) + if downloaded >= total: + break + if retry_count > 0: + print(f'Retry successful. Downloading {name}...') + retry_count = 0 + except (requests.exceptions.RequestException, IOError) as e: + if retry_count >= retries: + raise e + else: + retry_count += 1 + print(f"Download of {name} failed. Retrying in 3 seconds ({retry_count}/{retries})", file=sys.stderr) + time.sleep(3) + source = session.get(url, stream=True, timeout=timeout) + total = int(source.headers.get('content-length', 0)) + downloaded = f.tell() + + if downloaded < total: + print(f'Download of {name} was incomplete. Retrying...', file=sys.stderr) + os.remove(filename) + + # Increase song counter + with lock: + song_counter.value += 1 + + # If file is .wav then export to .flac + if source.headers['content-type'] != 'audio/mpeg': + all_filldata, filename, filetype = choice_format(file_format,filename,directory,name) + + + return all_filldata, filename, filetype + + +# define a function to make a valid file name +def choice_format(file_format,filename,directory,name): + # implementation details here + + # check the input and perform the conversion + if file_format == 'flac': + # convert to FLAC + AudioSegment.from_wav(filename).export(directory + '/' + make_valid(name) + '.flac', format='flac') + os.remove(filename) + filename = directory + '/' + make_valid(name) + '.flac' + filetype = '.flac' + elif file_format == 'mp3': + # convert to MP3 + AudioSegment.from_wav(filename).export(directory + '/' + make_valid(name) + '.mp3', format='mp3') + os.remove(filename) + filename = directory + '/' + make_valid(name) + '.mp3' + filetype = '.mp3' + elif file_format == 'all': + temp_filename = filename + # Convert to FLAC + flac_filename = os.path.join(directory, make_valid(name) + '.flac') + AudioSegment.from_wav(filename).export(flac_filename, format='flac') + flac_filetype = '.flac' + # Convert to MP3 + mp3_folder = os.path.join(directory, 'mp3') + if not os.path.exists(mp3_folder): + os.makedirs(mp3_folder) + mp3_filename = os.path.join(mp3_folder, make_valid(name) + '.mp3') + AudioSegment.from_wav(temp_filename).export(mp3_filename, format='mp3') + os.remove(filename) + mp3_filetype = '.mp3' + all_filldata = [flac_filename, mp3_filename], [flac_filetype, mp3_filetype] + else: + print("Invalid file format. Please enter 'flac' or 'mp3'.") + + return all_filldata, filename, filetype + +def download_album( args, pass_counter, song_counter, album_counter,lock,file_format): + directory = args['directory'] + session = args['session'] + queue = args['queue'] + album_cid = args['cid'] + album_name = args['name'] + album_coverUrl = args['coverUrl'] + album_artistes = args['artistes'] + album_url = 'https://monster-siren.hypergryph.com/api/album/' + album_cid + '/detail' + + + + try: + with open(directory + 'completed_albums.json', 'r', encoding='utf8') as f: + completed_albums = json.load(f) + except: + completed_albums = [] + + # fix the album name which have space in last word in Windows + album_name = album_name.rstrip().split() + if len(album_name) > 0 and album_name[-1].endswith(' '): + album_name[-1] = album_name[-1][:-1] + album_name = ' '.join(album_name) + + if album_name in completed_albums: + # If album is completed then skip + print(f'Skipping downloaded album {album_name}') + with lock: + pass_counter.value += 1 + return + try: + os.mkdir(directory + album_name) + except: + pass + + # Download album art + with open(directory + album_name + '/cover.jpg', 'w+b') as f: + f.write(session.get(album_coverUrl).content) + + # Change album art from .jpg to .png + cover = Image.open(directory + album_name + '/cover.jpg') + cover.save(directory + album_name + '/cover.png') + os.remove(directory + album_name + '/cover.jpg') + + + songs = session.get(album_url, headers={'Accept': 'application/json'}).json()['data']['songs'] + for song_track_number, song in enumerate(songs): + # Get song details + time.sleep(3) # add 3-second delay + song_cid = song['cid'] + song_name = song['name'] + song_artists = song['artistes'] + song_url = 'https://monster-siren.hypergryph.com/api/song/' + song_cid + headers = read_agent() + song_detail = session.get(song_url, headers=headers).json()['data'] + song_lyricUrl = song_detail['lyricUrl'] + song_sourceUrl = song_detail['sourceUrl'] + + # Download lyric + if (song_lyricUrl != None): + songlyricpath = directory + album_name + '/' + make_valid(song_name) + '.lrc' + with open(songlyricpath, 'w+b') as f: + f.write(session.get(song_lyricUrl).content) + else: + songlyricpath = None + + # Download song and fill out metadata + all_filldata, filename, filetype = download_song(session=session, directory=directory + album_name, name=song_name, url=song_sourceUrl,song_counter=song_counter,lock=lock,file_format=file_format) + if file_format == 'mp3' or file_format == 'flac': + fill_metadata(filename=filename, + filetype=filetype, + album=album_name, + title=song_name, + albumartist=album_artistes, + artist=song_artists, + tracknumber=song_track_number, + albumcover=directory + album_name + '/cover.png', + songlyricpath=songlyricpath) + elif file_format == 'all': + for i in range(0, len(all_filldata), 2): + fill_metadata(filename=all_filldata[i], + filetype=all_filldata[i+1], + album=album_name, + title=song_name, + albumartist=album_artistes, + artist=song_artists, + tracknumber=song_track_number, + albumcover=directory + album_name + '/cover.png', + songlyricpath=songlyricpath) + else: + print("fillmeta error") + # Increase album counter + with lock: + album_counter.value += 1 + # Mark album as finished + queue.put(album_name) + return + +def read_agent(): + # Read user agent strings from file + with open('user_agent.txt', 'r') as f: + user_agent_list = [line.strip() for line in f] + + # Choose a random user agent + user_agent = random.choice(user_agent_list) + + # Set headers with Accept and User-Agent + headers = { + 'Accept': 'application/json', + 'User-Agent': user_agent + } + return headers + + +def main(): + directory = './MonsterSiren/' + session = requests.Session() + manager = Manager() + queue = manager.Queue() + lock = manager.Lock() + pass_counter = manager.Value('i', 0) + song_counter = manager.Value('i', 0) + album_counter = manager.Value('i', 0) + + file_format = input("Enter the file format to convert to (flac/mp3/all): ") + + try: + os.mkdir(directory) + except: + pass + + + headers = read_agent() + # Get all albums + albums = session.get('https://monster-siren.hypergryph.com/api/albums', headers=headers).json()['data'] + for album in albums: + album['directory'] = directory + album['session'] = session + album['queue'] = queue + + + # Download all albums + num_workers = os.cpu_count() - 3 # leave CPU core free + with Pool(num_workers) as pool: + # with Pool(maxtasksperchild=1) as pool: + pool.apply_async(update_downloaded_albums, (queue, directory)) + results = pool.starmap(download_album, [(album, pass_counter, song_counter, album_counter, lock, file_format) for album in albums]) + queue.put('kill') + + pass_total = pass_counter.value + song_total = song_counter.value + album_total = album_counter.value + # Write counter to file + with open("Statistics.txt", "a") as f: + timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + f.write(f'Finish Time: {timestamp}\n') + f.write(f'Total albums skipped: {pass_total}\n') + f.write(f"Downloaded {song_total} songs from {album_total} albums.\n") + f.write(f"-----------------------------\n") + print(f'Total albums skipped: {pass_total}') + print(f"Downloaded {song_total} songs from {album_total} albums.") + return + + + +if __name__ == '__main__': + main() |