aboutsummaryrefslogtreecommitdiff
path: root/test.py
diff options
context:
space:
mode:
Diffstat (limited to 'test.py')
-rw-r--r--test.py289
1 files changed, 0 insertions, 289 deletions
diff --git a/test.py b/test.py
deleted file mode 100644
index ed2f280..0000000
--- a/test.py
+++ /dev/null
@@ -1,289 +0,0 @@
-import os
-import requests
-from tqdm import tqdm
-import pylrc
-import json
-
-from PIL import Image
-from multiprocessing import Pool, Manager,Lock, Value
-from mutagen.easyid3 import EasyID3
-from mutagen.id3 import APIC, SYLT, Encoding, ID3
-from mutagen.flac import Picture, FLAC
-from pydub import AudioSegment
-import time
-import datetime
-
-def make_valid(filename):
- # Make a filename valid in different OSs
- f = filename.replace(':', '_')
- f = f.replace('/', '_')
- f = f.replace('<', '_')
- f = f.replace('>', '_')
- f = f.replace('\'', '_')
- f = f.replace('\\', '_')
- f = f.replace('|', '_')
- f = f.replace('?', '_')
- f = f.replace('*', '_')
- return f
-
-
-def lyric_file_to_text(filename):
- lrc_file = open(filename, 'r', encoding='utf-8')
- lrc_string = ''.join(lrc_file.readlines())
- lrc_file.close()
- subs = pylrc.parse(lrc_string)
- ret = []
- for sub in subs:
- time = int(sub.time * 1000)
- text = sub.text
- ret.append((text, time))
- return ret
-
-def update_downloaded_albums(queue, directory):
- while 1:
- album_name = queue.get()
- try:
- with open(directory + 'completed_albums.json', 'r', encoding='utf8') as f:
- completed_albums = json.load(f)
- except:
- completed_albums = []
- completed_albums.append(album_name)
- with open(directory + 'completed_albums.json', 'w+', encoding='utf8') as f:
- json.dump(completed_albums, f)
-
-
-def fill_metadata(filename, filetype, album, title, albumartist, artist, tracknumber, albumcover, songlyricpath):
- if filetype == '.mp3':
- file = EasyID3(filename)
- else:
- file = FLAC(filename)
-
- file['album'] = album
- file['title'] = title
- file['albumartist'] = ''.join(albumartist)
- file['artist'] = ''.join(artist)
- file['tracknumber'] = str(tracknumber + 1)
- file.save()
-
- if filetype == '.mp3':
- file = ID3(filename)
- file.add(APIC(mime='image/png',type=3,desc='Cover',data=open(albumcover,'rb').read()))
- # Read and add lyrics
- if (songlyricpath != None):
- sylt = lyric_file_to_text(songlyricpath)
- file.setall('SYLT', [SYLT(encoding=Encoding.UTF8, lang='eng', format=2, type=1, text=sylt)])
- file.save()
- else:
- image = Picture()
- image.type = 3
- image.desc = 'Cover'
- image.mime = 'image/png'
- with open(albumcover,'rb') as f:
- image.data = f.read()
- with Image.open(albumcover) as imagePil:
- image.width, image.height = imagePil.size
- image.depth = 24
- file.add_picture(image)
- # Read and add lyrics
- if (songlyricpath != None):
- musiclrc = open(songlyricpath, 'r', encoding='utf-8').read()
- file['lyrics'] = musiclrc
- file.save()
-
- return
-
-
-
-def download_song(session, directory, name, url):
- source = session.get(url, stream=True)
- filename = directory + '/' + make_valid(name)
- filetype = ''
-
- if source.headers['content-type'] == 'audio/mpeg':
- filename += '.mp3'
- filetype = '.mp3'
- else:
- filename += '.wav'
-
- # Download song
- total = int(source.headers.get('content-length', 0))
- downloaded = 0
- retries = 0
- while downloaded < total:
- try:
- with open(filename, 'ab') as f, tqdm(
- desc=name,
- total=total,
- initial=downloaded,
- unit='iB',
- unit_scale=True,
- unit_divisor=1024,
- ) as bar:
- # add a re-download feature for songs that weren't downloaded completely.
- f.seek(downloaded)
- for data in source.iter_content(chunk_size = 1024):
- size = f.write(data)
- downloaded += size
- bar.update(size)
- except requests.exceptions.RequestException as e:
- if retries >= 5:
- raise e
- else:
- retries += 1
- print(f"Download of {name} failed. Retrying in 5 seconds ({retries}/5)")
- time.sleep(5)
- source = session.get(url, stream=True)
- total = int(source.headers.get('content-length', 0))
- downloaded = f.tell() #returns the current position of the file pointer, used to resume the download from the last successful byte position in case of a connection error or other interruption.
-
- if downloaded < total:
- print(f'Download of {name} was incomplete. Retrying...')
- os.remove(filename)
-
- # If file is .wav then export to .flac
- if source.headers['content-type'] != 'audio/mpeg':
- AudioSegment.from_wav(filename).export(directory + '/' + make_valid(name) + '.flac', format='flac')
- os.remove(filename)
- filename = directory + '/' + make_valid(name) + '.flac'
- filetype = '.flac'
-
- return filename, filetype
-
-
-def download_album( args, pass_counter, song_counter, album_counter, lock):
- directory = args['directory']
- session = args['session']
- queue = args['queue']
- album_cid = args['cid']
- album_name = args['name']
- album_coverUrl = args['coverUrl']
- album_artistes = args['artistes']
- album_url = 'https://monster-siren.hypergryph.com/api/album/' + album_cid + '/detail'
-
-
-
- try:
- with open(directory + 'completed_albums.json', 'r', encoding='utf8') as f:
- completed_albums = json.load(f)
- except:
- completed_albums = []
-
- # fix the album name which have space in last word in Windows
- album_name = album_name.rstrip().split()
- if len(album_name) > 0 and album_name[-1].endswith(' '):
- album_name[-1] = album_name[-1][:-1]
- album_name = ' '.join(album_name)
-
- if album_name in completed_albums:
- # If album is completed then skip
- print(f'Skipping downloaded album {album_name}')
- with lock:
- pass_counter.value += 1
- return
- try:
- os.mkdir(directory + album_name)
- except:
- pass
-
- # Download album art
- with open(directory + album_name + '/cover.jpg', 'w+b') as f:
- f.write(session.get(album_coverUrl).content)
-
- # Change album art from .jpg to .png
- cover = Image.open(directory + album_name + '/cover.jpg')
- cover.save(directory + album_name + '/cover.png')
- os.remove(directory + album_name + '/cover.jpg')
-
-
- songs = session.get(album_url, headers={'Accept': 'application/json'}).json()['data']['songs']
- for song_track_number, song in enumerate(songs):
- # Get song details
- time.sleep(3) # add 5-second delay
- song_cid = song['cid']
- song_name = song['name']
- song_artists = song['artistes']
- song_url = 'https://monster-siren.hypergryph.com/api/song/' + song_cid
- song_detail = session.get(song_url, headers={'Accept': 'application/json'}).json()['data']
- song_lyricUrl = song_detail['lyricUrl']
- song_sourceUrl = song_detail['sourceUrl']
-
- # Download lyric
- if (song_lyricUrl != None):
- songlyricpath = directory + album_name + '/' + make_valid(song_name) + '.lrc'
- with open(songlyricpath, 'w+b') as f:
- f.write(session.get(song_lyricUrl).content)
- else:
- songlyricpath = None
-
- # Download song and fill out metadata
- filename, filetype = download_song(session=session, directory=directory + album_name, name=song_name, url=song_sourceUrl)
- fill_metadata(filename=filename,
- filetype=filetype,
- album=album_name,
- title=song_name,
- albumartist=album_artistes,
- artist=song_artists,
- tracknumber=song_track_number,
- albumcover=directory + album_name + '/cover.png',
- songlyricpath=songlyricpath)
- # Increase song counter
- with lock:
- song_counter.value += 1
-
- # Increase album counter
- with lock:
- album_counter.value += 1
- # Mark album as finished
- queue.put(album_name)
- return
-
-
-def main():
- directory = './MonsterSiren/'
- session = requests.Session()
- manager = Manager()
- queue = manager.Queue()
- lock = manager.Lock()
- pass_counter = manager.Value('i', 0)
- song_counter = manager.Value('i', 0)
- album_counter = manager.Value('i', 0)
-
- try:
- os.mkdir(directory)
- except:
- pass
-
- # Get all albums
- albums = session.get('https://monster-siren.hypergryph.com/api/albums', headers={'Accept': 'application/json'}).json()['data']
- for album in albums:
- album['directory'] = directory
- album['session'] = session
- album['queue'] = queue
-
-
- # Download all albums
- num_workers = os.cpu_count() - 3 # leave one CPU core free
- with Pool(num_workers) as pool:
- # with Pool(maxtasksperchild=1) as pool:
- pool.apply_async(update_downloaded_albums, (queue, directory))
- results = pool.starmap(download_album, [(album, pass_counter, song_counter, album_counter, lock) for album in albums])
- queue.put('kill')
-
- pass_total = pass_counter.value
- song_total = song_counter.value
- album_total = album_counter.value
- # Write counter to file
- with open("counter.txt", "a") as f:
- timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- f.write(f'Finish Time: {timestamp}\n')
- f.write(f'Total albums skipped: {pass_total}\n')
- f.write(f"Downloaded {song_total} songs from {album_total} albums.\n")
- f.write(f"-----------------------------\n")
- print(f'Total albums skipped: {pass_total}')
- print(f"Downloaded {song_total} songs from {album_total} albums.")
- return
-
-
-
-if __name__ == '__main__':
- main() \ No newline at end of file