diff options
author | 吳俊翰 | 2023-05-04 17:45:58 +0800 |
---|---|---|
committer | GitHub | 2023-05-04 17:45:58 +0800 |
commit | c600671b4235e57db7e2aa95fc99b4f5cbd46cec (patch) | |
tree | 2e0c7c907263f89dd4a6610e51eed4df701ffe2f /main.py | |
parent | 4e1e3ea8bdb25de41671c3c600070d88b7dd146b (diff) |
Add files via upload
Diffstat (limited to 'main.py')
-rw-r--r-- | main.py | 239 |
1 files changed, 239 insertions, 0 deletions
@@ -0,0 +1,239 @@ +import os +import requests +from tqdm import tqdm +import pylrc +import json + +from PIL import Image +from multiprocessing import Pool, Manager +from mutagen.easyid3 import EasyID3 +from mutagen.id3 import APIC, SYLT, Encoding, ID3 +from mutagen.flac import Picture, FLAC +from pydub import AudioSegment +import time + +def make_valid(filename): + # Make a filename valid in different OSs + f = filename.replace(':', '_') + f = f.replace('/', '_') + f = f.replace('<', '_') + f = f.replace('>', '_') + f = f.replace('\'', '_') + f = f.replace('\\', '_') + f = f.replace('|', '_') + f = f.replace('?', '_') + f = f.replace('*', '_') + return f + + +def lyric_file_to_text(filename): + lrc_file = open(filename, 'r', encoding='utf-8') + lrc_string = ''.join(lrc_file.readlines()) + lrc_file.close() + subs = pylrc.parse(lrc_string) + ret = [] + for sub in subs: + time = int(sub.time * 1000) + text = sub.text + ret.append((text, time)) + return ret + +def update_downloaded_albums(queue, directory): + while 1: + album_name = queue.get() + try: + with open(directory + 'completed_albums.json', 'r', encoding='utf8') as f: + completed_albums = json.load(f) + except: + completed_albums = [] + completed_albums.append(album_name) + with open(directory + 'completed_albums.json', 'w+', encoding='utf8') as f: + json.dump(completed_albums, f) + + +def fill_metadata(filename, filetype, album, title, albumartist, artist, tracknumber, albumcover, songlyricpath): + if filetype == '.mp3': + file = EasyID3(filename) + else: + file = FLAC(filename) + + file['album'] = album + file['title'] = title + file['albumartist'] = ''.join(albumartist) + file['artist'] = ''.join(artist) + file['tracknumber'] = str(tracknumber + 1) + file.save() + + if filetype == '.mp3': + file = ID3(filename) + file.add(APIC(mime='image/png',type=3,desc='Cover',data=open(albumcover,'rb').read())) + # Read and add lyrics + if (songlyricpath != None): + sylt = lyric_file_to_text(songlyricpath) + file.setall('SYLT', [SYLT(encoding=Encoding.UTF8, lang='eng', format=2, type=1, text=sylt)]) + file.save() + else: + image = Picture() + image.type = 3 + image.desc = 'Cover' + image.mime = 'image/png' + with open(albumcover,'rb') as f: + image.data = f.read() + with Image.open(albumcover) as imagePil: + image.width, image.height = imagePil.size + image.depth = 24 + file.add_picture(image) + # Read and add lyrics + if (songlyricpath != None): + musiclrc = open(songlyricpath, 'r', encoding='utf-8').read() + file['lyrics'] = musiclrc + file.save() + + return + + +def download_song(session, directory, name, url): + source = session.get(url, stream=True) + filename = directory + '/' + make_valid(name) + filetype = '' + + if source.headers['content-type'] == 'audio/mpeg': + filename += '.mp3' + filetype = '.mp3' + else: + filename += '.wav' + + # Download song + total = int(source.headers.get('content-length', 0)) + with open(filename, 'w+b') as f, tqdm( + desc=name, + total=total, + unit='iB', + unit_scale=True, + unit_divisor=1024, + ) as bar: + for data in source.iter_content(chunk_size = 1024): + size = f.write(data) + bar.update(size) + + # If file is .wav then export to .flac + if source.headers['content-type'] != 'audio/mpeg': + AudioSegment.from_wav(filename).export(directory + '/' + make_valid(name) + '.flac', format='flac') + os.remove(filename) + filename = directory + '/' + make_valid(name) + '.flac' + filetype = '.flac' + + return filename, filetype + + +def download_album( args): + directory = args['directory'] + session = args['session'] + queue = args['queue'] + album_cid = args['cid'] + album_name = args['name'] + album_coverUrl = args['coverUrl'] + album_artistes = args['artistes'] + album_url = 'https://monster-siren.hypergryph.com/api/album/' + album_cid + '/detail' + + + + try: + with open(directory + 'completed_albums.json', 'r', encoding='utf8') as f: + completed_albums = json.load(f) + except: + completed_albums = [] + + # fix the album name which have space in last word in Windows + album_name = album_name.rstrip().split() + if len(album_name) > 0 and album_name[-1].endswith(' '): + album_name[-1] = album_name[-1][:-1] + album_name = ' '.join(album_name) + + if album_name in completed_albums: + # If album is completed then skip + print(f'Skipping downloaded album {album_name}') + return + try: + os.mkdir(directory + album_name) + except: + pass + + # Download album art + with open(directory + album_name + '/cover.jpg', 'w+b') as f: + f.write(session.get(album_coverUrl).content) + + # Change album art from .jpg to .png + cover = Image.open(directory + album_name + '/cover.jpg') + cover.save(directory + album_name + '/cover.png') + os.remove(directory + album_name + '/cover.jpg') + + + songs = session.get(album_url, headers={'Accept': 'application/json'}).json()['data']['songs'] + for song_track_number, song in enumerate(songs): + # Get song details + song_cid = song['cid'] + song_name = song['name'] + song_artists = song['artistes'] + song_url = 'https://monster-siren.hypergryph.com/api/song/' + song_cid + song_detail = session.get(song_url, headers={'Accept': 'application/json'}).json()['data'] + song_lyricUrl = song_detail['lyricUrl'] + song_sourceUrl = song_detail['sourceUrl'] + + # Download lyric + if (song_lyricUrl != None): + songlyricpath = directory + album_name + '/' + make_valid(song_name) + '.lrc' + with open(songlyricpath, 'w+b') as f: + f.write(session.get(song_lyricUrl).content) + else: + songlyricpath = None + + # Download song and fill out metadata + filename, filetype = download_song(session=session, directory=directory + album_name, name=song_name, url=song_sourceUrl) + fill_metadata(filename=filename, + filetype=filetype, + album=album_name, + title=song_name, + albumartist=album_artistes, + artist=song_artists, + tracknumber=song_track_number, + albumcover=directory + album_name + '/cover.png', + songlyricpath=songlyricpath) + + # Mark album as finished + queue.put(album_name) + return + + +def main(): + directory = './MonsterSiren/' + session = requests.Session() + manager = Manager() + queue = manager.Queue() + + try: + os.mkdir(directory) + except: + pass + + + # Get all albums + albums = session.get('https://monster-siren.hypergryph.com/api/albums', headers={'Accept': 'application/json'}).json()['data'] + for album in albums: + album['directory'] = directory + album['session'] = session + album['queue'] = queue + + + with Pool(maxtasksperchild=1) as pool: + pool.apply_async(update_downloaded_albums, (queue, directory)) + print(len(albums)) + pool.map(download_album, albums) + queue.put('kill') + + return + + + +if __name__ == '__main__': + main()
\ No newline at end of file |