diff options
Diffstat (limited to 'test.py')
-rw-r--r-- | test.py | 369 |
1 files changed, 0 insertions, 369 deletions
diff --git a/test.py b/test.py deleted file mode 100644 index c30cf96..0000000 --- a/test.py +++ /dev/null @@ -1,369 +0,0 @@ -import os -import requests -from requests.adapters import HTTPAdapter -from urllib3.util import Retry -from tqdm import tqdm -import pylrc -import json -from PIL import Image -from multiprocessing import Pool, Manager,Lock, Value -from mutagen.easyid3 import EasyID3 -from mutagen.id3 import APIC, SYLT, Encoding, ID3 -from mutagen.flac import Picture, FLAC -from pydub import AudioSegment -import time -import datetime -import sys -import random -def make_valid(filename): - # Make a filename valid in different OSs - f = filename.replace(':', '_') - f = f.replace('/', '_') - f = f.replace('<', '_') - f = f.replace('>', '_') - f = f.replace('\'', '_') - f = f.replace('\\', '_') - f = f.replace('|', '_') - f = f.replace('?', '_') - f = f.replace('*', '_') - return f - - -def lyric_file_to_text(filename): - lrc_file = open(filename, 'r', encoding='utf-8') - lrc_string = ''.join(lrc_file.readlines()) - lrc_file.close() - subs = pylrc.parse(lrc_string) - ret = [] - for sub in subs: - time = int(sub.time * 1000) - text = sub.text - ret.append((text, time)) - return ret - -def update_downloaded_albums(queue, directory): - while 1: - album_name = queue.get() - try: - with open(directory + 'completed_albums.json', 'r', encoding='utf8') as f: - completed_albums = json.load(f) - except: - completed_albums = [] - completed_albums.append(album_name) - with open(directory + 'completed_albums.json', 'w+', encoding='utf8') as f: - json.dump(completed_albums, f) - - -def fill_metadata(filename, filetype, album, title, albumartist, artist, tracknumber, albumcover, songlyricpath): - if filetype == '.mp3': - file = EasyID3(filename) - else: - file = FLAC(filename) - - file['album'] = album - file['title'] = title - file['albumartist'] = ''.join(albumartist) - file['artist'] = ''.join(artist) - file['tracknumber'] = str(tracknumber + 1) - file.save() - - if filetype == '.mp3': - file = ID3(filename) - file.add(APIC(mime='image/png',type=3,desc='Cover',data=open(albumcover,'rb').read())) - # Read and add lyrics - if (songlyricpath != None): - sylt = lyric_file_to_text(songlyricpath) - file.setall('SYLT', [SYLT(encoding=Encoding.UTF8, lang='eng', format=2, type=1, text=sylt)]) - file.save() - else: - image = Picture() - image.type = 3 - image.desc = 'Cover' - image.mime = 'image/png' - with open(albumcover,'rb') as f: - image.data = f.read() - with Image.open(albumcover) as imagePil: - image.width, image.height = imagePil.size - image.depth = 24 - file.add_picture(image) - # Read and add lyrics - if (songlyricpath != None): - musiclrc = open(songlyricpath, 'r', encoding='utf-8').read() - file['lyrics'] = musiclrc - file.save() - - return - - - -def download_song(session, directory, name, url, song_counter, lock,file_format): - # Set timeout and retry parameters - time.sleep(3) - timeout = 10 - retries = 5 - - source = session.get(url, stream=True) - filename = directory + '/' + make_valid(name) - filetype = '' - - if source.headers['content-type'] == 'audio/mpeg': - filename += '.mp3' - filetype = '.mp3' - else: - filename += '.wav' - - # Download song with retries and timeout - total = int(source.headers.get('content-length', 0)) - downloaded = 0 - retry_count = 0 - while downloaded < total: - try: - with open(filename, 'ab') as f, tqdm( - desc=name, - total=total, - initial=downloaded, - unit='iB', - unit_scale=True, - unit_divisor=1024, - ) as bar: - f.seek(downloaded) - for data in source.iter_content(chunk_size = 1024): - size = f.write(data) - downloaded += size - bar.update(size) - if downloaded >= total: - break - if retry_count > 0: - print(f'Retry successful. Downloading {name}...') - retry_count = 0 - except (requests.exceptions.RequestException, IOError) as e: - if retry_count >= retries: - raise e - else: - retry_count += 1 - print(f"Download of {name} failed. Retrying in 3 seconds ({retry_count}/{retries})", file=sys.stderr) - time.sleep(3) - source = session.get(url, stream=True, timeout=timeout) - total = int(source.headers.get('content-length', 0)) - downloaded = f.tell() - - if downloaded < total: - print(f'Download of {name} was incomplete. Retrying...', file=sys.stderr) - os.remove(filename) - - # Increase song counter - with lock: - song_counter.value += 1 - - # If file is .wav then export to .flac - if source.headers['content-type'] != 'audio/mpeg': - all_filldata, filename, filetype = choice_format(file_format,filename,directory,name) - - - return all_filldata, filename, filetype - - -# define a function to make a valid file name -def choice_format(file_format,filename,directory,name): - # implementation details here - - # check the input and perform the conversion - if file_format == 'flac': - # convert to FLAC - AudioSegment.from_wav(filename).export(directory + '/' + make_valid(name) + '.flac', format='flac') - os.remove(filename) - filename = directory + '/' + make_valid(name) + '.flac' - filetype = '.flac' - elif file_format == 'mp3': - # convert to MP3 - AudioSegment.from_wav(filename).export(directory + '/' + make_valid(name) + '.mp3', format='mp3') - os.remove(filename) - filename = directory + '/' + make_valid(name) + '.mp3' - filetype = '.mp3' - elif file_format == 'all': - temp_filename = filename - # Convert to FLAC - flac_filename = os.path.join(directory, make_valid(name) + '.flac') - AudioSegment.from_wav(filename).export(flac_filename, format='flac') - flac_filetype = '.flac' - # Convert to MP3 - mp3_folder = os.path.join(directory, 'mp3') - if not os.path.exists(mp3_folder): - os.makedirs(mp3_folder) - mp3_filename = os.path.join(mp3_folder, make_valid(name) + '.mp3') - AudioSegment.from_wav(temp_filename).export(mp3_filename, format='mp3') - os.remove(filename) - mp3_filetype = '.mp3' - all_filldata = [flac_filename, mp3_filename], [flac_filetype, mp3_filetype] - else: - print("Invalid file format. Please enter 'flac' or 'mp3'.") - - return all_filldata, filename, filetype - -def download_album( args, pass_counter, song_counter, album_counter,lock,file_format): - directory = args['directory'] - session = args['session'] - queue = args['queue'] - album_cid = args['cid'] - album_name = args['name'] - album_coverUrl = args['coverUrl'] - album_artistes = args['artistes'] - album_url = 'https://monster-siren.hypergryph.com/api/album/' + album_cid + '/detail' - - - - try: - with open(directory + 'completed_albums.json', 'r', encoding='utf8') as f: - completed_albums = json.load(f) - except: - completed_albums = [] - - # fix the album name which have space in last word in Windows - album_name = album_name.rstrip().split() - if len(album_name) > 0 and album_name[-1].endswith(' '): - album_name[-1] = album_name[-1][:-1] - album_name = ' '.join(album_name) - - if album_name in completed_albums: - # If album is completed then skip - print(f'Skipping downloaded album {album_name}') - with lock: - pass_counter.value += 1 - return - try: - os.mkdir(directory + album_name) - except: - pass - - # Download album art - with open(directory + album_name + '/cover.jpg', 'w+b') as f: - f.write(session.get(album_coverUrl).content) - - # Change album art from .jpg to .png - cover = Image.open(directory + album_name + '/cover.jpg') - cover.save(directory + album_name + '/cover.png') - os.remove(directory + album_name + '/cover.jpg') - - - songs = session.get(album_url, headers={'Accept': 'application/json'}).json()['data']['songs'] - for song_track_number, song in enumerate(songs): - # Get song details - time.sleep(3) # add 3-second delay - song_cid = song['cid'] - song_name = song['name'] - song_artists = song['artistes'] - song_url = 'https://monster-siren.hypergryph.com/api/song/' + song_cid - headers = read_agent() - song_detail = session.get(song_url, headers=headers).json()['data'] - song_lyricUrl = song_detail['lyricUrl'] - song_sourceUrl = song_detail['sourceUrl'] - - # Download lyric - if (song_lyricUrl != None): - songlyricpath = directory + album_name + '/' + make_valid(song_name) + '.lrc' - with open(songlyricpath, 'w+b') as f: - f.write(session.get(song_lyricUrl).content) - else: - songlyricpath = None - - # Download song and fill out metadata - all_filldata, filename, filetype = download_song(session=session, directory=directory + album_name, name=song_name, url=song_sourceUrl,song_counter=song_counter,lock=lock,file_format=file_format) - if file_format == 'mp3' or file_format == 'flac': - fill_metadata(filename=filename, - filetype=filetype, - album=album_name, - title=song_name, - albumartist=album_artistes, - artist=song_artists, - tracknumber=song_track_number, - albumcover=directory + album_name + '/cover.png', - songlyricpath=songlyricpath) - elif file_format == 'all': - for i in range(0, len(all_filldata), 2): - fill_metadata(filename=all_filldata[i], - filetype=all_filldata[i+1], - album=album_name, - title=song_name, - albumartist=album_artistes, - artist=song_artists, - tracknumber=song_track_number, - albumcover=directory + album_name + '/cover.png', - songlyricpath=songlyricpath) - else: - print("fillmeta error") - # Increase album counter - with lock: - album_counter.value += 1 - # Mark album as finished - queue.put(album_name) - return - -def read_agent(): - # Read user agent strings from file - with open('user_agent.txt', 'r') as f: - user_agent_list = [line.strip() for line in f] - - # Choose a random user agent - user_agent = random.choice(user_agent_list) - - # Set headers with Accept and User-Agent - headers = { - 'Accept': 'application/json', - 'User-Agent': user_agent - } - return headers - - -def main(): - directory = './MonsterSiren/' - session = requests.Session() - manager = Manager() - queue = manager.Queue() - lock = manager.Lock() - pass_counter = manager.Value('i', 0) - song_counter = manager.Value('i', 0) - album_counter = manager.Value('i', 0) - - file_format = input("Enter the file format to convert to (flac/mp3/all): ") - - try: - os.mkdir(directory) - except: - pass - - - headers = read_agent() - # Get all albums - albums = session.get('https://monster-siren.hypergryph.com/api/albums', headers=headers).json()['data'] - for album in albums: - album['directory'] = directory - album['session'] = session - album['queue'] = queue - - - # Download all albums - num_workers = os.cpu_count() - 3 # leave CPU core free - with Pool(num_workers) as pool: - # with Pool(maxtasksperchild=1) as pool: - pool.apply_async(update_downloaded_albums, (queue, directory)) - results = pool.starmap(download_album, [(album, pass_counter, song_counter, album_counter, lock, file_format) for album in albums]) - queue.put('kill') - - pass_total = pass_counter.value - song_total = song_counter.value - album_total = album_counter.value - # Write counter to file - with open("Statistics.txt", "a") as f: - timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - f.write(f'Finish Time: {timestamp}\n') - f.write(f'Total albums skipped: {pass_total}\n') - f.write(f"Downloaded {song_total} songs from {album_total} albums.\n") - f.write(f"-----------------------------\n") - print(f'Total albums skipped: {pass_total}') - print(f"Downloaded {song_total} songs from {album_total} albums.") - return - - - -if __name__ == '__main__': - main() |