From fb68e7a4f523da8575fb900592ff7b63bc95f27e Mon Sep 17 00:00:00 2001 From: 吳俊翰 Date: Fri, 5 May 2023 17:46:39 +0800 Subject: Delete test.py --- test.py | 289 ---------------------------------------------------------------- 1 file changed, 289 deletions(-) delete mode 100644 test.py diff --git a/test.py b/test.py deleted file mode 100644 index ed2f280..0000000 --- a/test.py +++ /dev/null @@ -1,289 +0,0 @@ -import os -import requests -from tqdm import tqdm -import pylrc -import json - -from PIL import Image -from multiprocessing import Pool, Manager,Lock, Value -from mutagen.easyid3 import EasyID3 -from mutagen.id3 import APIC, SYLT, Encoding, ID3 -from mutagen.flac import Picture, FLAC -from pydub import AudioSegment -import time -import datetime - -def make_valid(filename): - # Make a filename valid in different OSs - f = filename.replace(':', '_') - f = f.replace('/', '_') - f = f.replace('<', '_') - f = f.replace('>', '_') - f = f.replace('\'', '_') - f = f.replace('\\', '_') - f = f.replace('|', '_') - f = f.replace('?', '_') - f = f.replace('*', '_') - return f - - -def lyric_file_to_text(filename): - lrc_file = open(filename, 'r', encoding='utf-8') - lrc_string = ''.join(lrc_file.readlines()) - lrc_file.close() - subs = pylrc.parse(lrc_string) - ret = [] - for sub in subs: - time = int(sub.time * 1000) - text = sub.text - ret.append((text, time)) - return ret - -def update_downloaded_albums(queue, directory): - while 1: - album_name = queue.get() - try: - with open(directory + 'completed_albums.json', 'r', encoding='utf8') as f: - completed_albums = json.load(f) - except: - completed_albums = [] - completed_albums.append(album_name) - with open(directory + 'completed_albums.json', 'w+', encoding='utf8') as f: - json.dump(completed_albums, f) - - -def fill_metadata(filename, filetype, album, title, albumartist, artist, tracknumber, albumcover, songlyricpath): - if filetype == '.mp3': - file = EasyID3(filename) - else: - file = FLAC(filename) - - file['album'] = album - file['title'] = title - file['albumartist'] = ''.join(albumartist) - file['artist'] = ''.join(artist) - file['tracknumber'] = str(tracknumber + 1) - file.save() - - if filetype == '.mp3': - file = ID3(filename) - file.add(APIC(mime='image/png',type=3,desc='Cover',data=open(albumcover,'rb').read())) - # Read and add lyrics - if (songlyricpath != None): - sylt = lyric_file_to_text(songlyricpath) - file.setall('SYLT', [SYLT(encoding=Encoding.UTF8, lang='eng', format=2, type=1, text=sylt)]) - file.save() - else: - image = Picture() - image.type = 3 - image.desc = 'Cover' - image.mime = 'image/png' - with open(albumcover,'rb') as f: - image.data = f.read() - with Image.open(albumcover) as imagePil: - image.width, image.height = imagePil.size - image.depth = 24 - file.add_picture(image) - # Read and add lyrics - if (songlyricpath != None): - musiclrc = open(songlyricpath, 'r', encoding='utf-8').read() - file['lyrics'] = musiclrc - file.save() - - return - - - -def download_song(session, directory, name, url): - source = session.get(url, stream=True) - filename = directory + '/' + make_valid(name) - filetype = '' - - if source.headers['content-type'] == 'audio/mpeg': - filename += '.mp3' - filetype = '.mp3' - else: - filename += '.wav' - - # Download song - total = int(source.headers.get('content-length', 0)) - downloaded = 0 - retries = 0 - while downloaded < total: - try: - with open(filename, 'ab') as f, tqdm( - desc=name, - total=total, - initial=downloaded, - unit='iB', - unit_scale=True, - unit_divisor=1024, - ) as bar: - # add a re-download feature for songs that weren't downloaded completely. - f.seek(downloaded) - for data in source.iter_content(chunk_size = 1024): - size = f.write(data) - downloaded += size - bar.update(size) - except requests.exceptions.RequestException as e: - if retries >= 5: - raise e - else: - retries += 1 - print(f"Download of {name} failed. Retrying in 5 seconds ({retries}/5)") - time.sleep(5) - source = session.get(url, stream=True) - total = int(source.headers.get('content-length', 0)) - downloaded = f.tell() #returns the current position of the file pointer, used to resume the download from the last successful byte position in case of a connection error or other interruption. - - if downloaded < total: - print(f'Download of {name} was incomplete. Retrying...') - os.remove(filename) - - # If file is .wav then export to .flac - if source.headers['content-type'] != 'audio/mpeg': - AudioSegment.from_wav(filename).export(directory + '/' + make_valid(name) + '.flac', format='flac') - os.remove(filename) - filename = directory + '/' + make_valid(name) + '.flac' - filetype = '.flac' - - return filename, filetype - - -def download_album( args, pass_counter, song_counter, album_counter, lock): - directory = args['directory'] - session = args['session'] - queue = args['queue'] - album_cid = args['cid'] - album_name = args['name'] - album_coverUrl = args['coverUrl'] - album_artistes = args['artistes'] - album_url = 'https://monster-siren.hypergryph.com/api/album/' + album_cid + '/detail' - - - - try: - with open(directory + 'completed_albums.json', 'r', encoding='utf8') as f: - completed_albums = json.load(f) - except: - completed_albums = [] - - # fix the album name which have space in last word in Windows - album_name = album_name.rstrip().split() - if len(album_name) > 0 and album_name[-1].endswith(' '): - album_name[-1] = album_name[-1][:-1] - album_name = ' '.join(album_name) - - if album_name in completed_albums: - # If album is completed then skip - print(f'Skipping downloaded album {album_name}') - with lock: - pass_counter.value += 1 - return - try: - os.mkdir(directory + album_name) - except: - pass - - # Download album art - with open(directory + album_name + '/cover.jpg', 'w+b') as f: - f.write(session.get(album_coverUrl).content) - - # Change album art from .jpg to .png - cover = Image.open(directory + album_name + '/cover.jpg') - cover.save(directory + album_name + '/cover.png') - os.remove(directory + album_name + '/cover.jpg') - - - songs = session.get(album_url, headers={'Accept': 'application/json'}).json()['data']['songs'] - for song_track_number, song in enumerate(songs): - # Get song details - time.sleep(3) # add 5-second delay - song_cid = song['cid'] - song_name = song['name'] - song_artists = song['artistes'] - song_url = 'https://monster-siren.hypergryph.com/api/song/' + song_cid - song_detail = session.get(song_url, headers={'Accept': 'application/json'}).json()['data'] - song_lyricUrl = song_detail['lyricUrl'] - song_sourceUrl = song_detail['sourceUrl'] - - # Download lyric - if (song_lyricUrl != None): - songlyricpath = directory + album_name + '/' + make_valid(song_name) + '.lrc' - with open(songlyricpath, 'w+b') as f: - f.write(session.get(song_lyricUrl).content) - else: - songlyricpath = None - - # Download song and fill out metadata - filename, filetype = download_song(session=session, directory=directory + album_name, name=song_name, url=song_sourceUrl) - fill_metadata(filename=filename, - filetype=filetype, - album=album_name, - title=song_name, - albumartist=album_artistes, - artist=song_artists, - tracknumber=song_track_number, - albumcover=directory + album_name + '/cover.png', - songlyricpath=songlyricpath) - # Increase song counter - with lock: - song_counter.value += 1 - - # Increase album counter - with lock: - album_counter.value += 1 - # Mark album as finished - queue.put(album_name) - return - - -def main(): - directory = './MonsterSiren/' - session = requests.Session() - manager = Manager() - queue = manager.Queue() - lock = manager.Lock() - pass_counter = manager.Value('i', 0) - song_counter = manager.Value('i', 0) - album_counter = manager.Value('i', 0) - - try: - os.mkdir(directory) - except: - pass - - # Get all albums - albums = session.get('https://monster-siren.hypergryph.com/api/albums', headers={'Accept': 'application/json'}).json()['data'] - for album in albums: - album['directory'] = directory - album['session'] = session - album['queue'] = queue - - - # Download all albums - num_workers = os.cpu_count() - 3 # leave one CPU core free - with Pool(num_workers) as pool: - # with Pool(maxtasksperchild=1) as pool: - pool.apply_async(update_downloaded_albums, (queue, directory)) - results = pool.starmap(download_album, [(album, pass_counter, song_counter, album_counter, lock) for album in albums]) - queue.put('kill') - - pass_total = pass_counter.value - song_total = song_counter.value - album_total = album_counter.value - # Write counter to file - with open("counter.txt", "a") as f: - timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") - f.write(f'Finish Time: {timestamp}\n') - f.write(f'Total albums skipped: {pass_total}\n') - f.write(f"Downloaded {song_total} songs from {album_total} albums.\n") - f.write(f"-----------------------------\n") - print(f'Total albums skipped: {pass_total}') - print(f"Downloaded {song_total} songs from {album_total} albums.") - return - - - -if __name__ == '__main__': - main() \ No newline at end of file -- cgit v1.2.3-70-g09d2