aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test.py369
1 files changed, 369 insertions, 0 deletions
diff --git a/test.py b/test.py
new file mode 100644
index 0000000..c30cf96
--- /dev/null
+++ b/test.py
@@ -0,0 +1,369 @@
+import os
+import requests
+from requests.adapters import HTTPAdapter
+from urllib3.util import Retry
+from tqdm import tqdm
+import pylrc
+import json
+from PIL import Image
+from multiprocessing import Pool, Manager,Lock, Value
+from mutagen.easyid3 import EasyID3
+from mutagen.id3 import APIC, SYLT, Encoding, ID3
+from mutagen.flac import Picture, FLAC
+from pydub import AudioSegment
+import time
+import datetime
+import sys
+import random
+def make_valid(filename):
+ # Make a filename valid in different OSs
+ f = filename.replace(':', '_')
+ f = f.replace('/', '_')
+ f = f.replace('<', '_')
+ f = f.replace('>', '_')
+ f = f.replace('\'', '_')
+ f = f.replace('\\', '_')
+ f = f.replace('|', '_')
+ f = f.replace('?', '_')
+ f = f.replace('*', '_')
+ return f
+
+
+def lyric_file_to_text(filename):
+ lrc_file = open(filename, 'r', encoding='utf-8')
+ lrc_string = ''.join(lrc_file.readlines())
+ lrc_file.close()
+ subs = pylrc.parse(lrc_string)
+ ret = []
+ for sub in subs:
+ time = int(sub.time * 1000)
+ text = sub.text
+ ret.append((text, time))
+ return ret
+
+def update_downloaded_albums(queue, directory):
+ while 1:
+ album_name = queue.get()
+ try:
+ with open(directory + 'completed_albums.json', 'r', encoding='utf8') as f:
+ completed_albums = json.load(f)
+ except:
+ completed_albums = []
+ completed_albums.append(album_name)
+ with open(directory + 'completed_albums.json', 'w+', encoding='utf8') as f:
+ json.dump(completed_albums, f)
+
+
+def fill_metadata(filename, filetype, album, title, albumartist, artist, tracknumber, albumcover, songlyricpath):
+ if filetype == '.mp3':
+ file = EasyID3(filename)
+ else:
+ file = FLAC(filename)
+
+ file['album'] = album
+ file['title'] = title
+ file['albumartist'] = ''.join(albumartist)
+ file['artist'] = ''.join(artist)
+ file['tracknumber'] = str(tracknumber + 1)
+ file.save()
+
+ if filetype == '.mp3':
+ file = ID3(filename)
+ file.add(APIC(mime='image/png',type=3,desc='Cover',data=open(albumcover,'rb').read()))
+ # Read and add lyrics
+ if (songlyricpath != None):
+ sylt = lyric_file_to_text(songlyricpath)
+ file.setall('SYLT', [SYLT(encoding=Encoding.UTF8, lang='eng', format=2, type=1, text=sylt)])
+ file.save()
+ else:
+ image = Picture()
+ image.type = 3
+ image.desc = 'Cover'
+ image.mime = 'image/png'
+ with open(albumcover,'rb') as f:
+ image.data = f.read()
+ with Image.open(albumcover) as imagePil:
+ image.width, image.height = imagePil.size
+ image.depth = 24
+ file.add_picture(image)
+ # Read and add lyrics
+ if (songlyricpath != None):
+ musiclrc = open(songlyricpath, 'r', encoding='utf-8').read()
+ file['lyrics'] = musiclrc
+ file.save()
+
+ return
+
+
+
+def download_song(session, directory, name, url, song_counter, lock,file_format):
+ # Set timeout and retry parameters
+ time.sleep(3)
+ timeout = 10
+ retries = 5
+
+ source = session.get(url, stream=True)
+ filename = directory + '/' + make_valid(name)
+ filetype = ''
+
+ if source.headers['content-type'] == 'audio/mpeg':
+ filename += '.mp3'
+ filetype = '.mp3'
+ else:
+ filename += '.wav'
+
+ # Download song with retries and timeout
+ total = int(source.headers.get('content-length', 0))
+ downloaded = 0
+ retry_count = 0
+ while downloaded < total:
+ try:
+ with open(filename, 'ab') as f, tqdm(
+ desc=name,
+ total=total,
+ initial=downloaded,
+ unit='iB',
+ unit_scale=True,
+ unit_divisor=1024,
+ ) as bar:
+ f.seek(downloaded)
+ for data in source.iter_content(chunk_size = 1024):
+ size = f.write(data)
+ downloaded += size
+ bar.update(size)
+ if downloaded >= total:
+ break
+ if retry_count > 0:
+ print(f'Retry successful. Downloading {name}...')
+ retry_count = 0
+ except (requests.exceptions.RequestException, IOError) as e:
+ if retry_count >= retries:
+ raise e
+ else:
+ retry_count += 1
+ print(f"Download of {name} failed. Retrying in 3 seconds ({retry_count}/{retries})", file=sys.stderr)
+ time.sleep(3)
+ source = session.get(url, stream=True, timeout=timeout)
+ total = int(source.headers.get('content-length', 0))
+ downloaded = f.tell()
+
+ if downloaded < total:
+ print(f'Download of {name} was incomplete. Retrying...', file=sys.stderr)
+ os.remove(filename)
+
+ # Increase song counter
+ with lock:
+ song_counter.value += 1
+
+ # If file is .wav then export to .flac
+ if source.headers['content-type'] != 'audio/mpeg':
+ all_filldata, filename, filetype = choice_format(file_format,filename,directory,name)
+
+
+ return all_filldata, filename, filetype
+
+
+# define a function to make a valid file name
+def choice_format(file_format,filename,directory,name):
+ # implementation details here
+
+ # check the input and perform the conversion
+ if file_format == 'flac':
+ # convert to FLAC
+ AudioSegment.from_wav(filename).export(directory + '/' + make_valid(name) + '.flac', format='flac')
+ os.remove(filename)
+ filename = directory + '/' + make_valid(name) + '.flac'
+ filetype = '.flac'
+ elif file_format == 'mp3':
+ # convert to MP3
+ AudioSegment.from_wav(filename).export(directory + '/' + make_valid(name) + '.mp3', format='mp3')
+ os.remove(filename)
+ filename = directory + '/' + make_valid(name) + '.mp3'
+ filetype = '.mp3'
+ elif file_format == 'all':
+ temp_filename = filename
+ # Convert to FLAC
+ flac_filename = os.path.join(directory, make_valid(name) + '.flac')
+ AudioSegment.from_wav(filename).export(flac_filename, format='flac')
+ flac_filetype = '.flac'
+ # Convert to MP3
+ mp3_folder = os.path.join(directory, 'mp3')
+ if not os.path.exists(mp3_folder):
+ os.makedirs(mp3_folder)
+ mp3_filename = os.path.join(mp3_folder, make_valid(name) + '.mp3')
+ AudioSegment.from_wav(temp_filename).export(mp3_filename, format='mp3')
+ os.remove(filename)
+ mp3_filetype = '.mp3'
+ all_filldata = [flac_filename, mp3_filename], [flac_filetype, mp3_filetype]
+ else:
+ print("Invalid file format. Please enter 'flac' or 'mp3'.")
+
+ return all_filldata, filename, filetype
+
+def download_album( args, pass_counter, song_counter, album_counter,lock,file_format):
+ directory = args['directory']
+ session = args['session']
+ queue = args['queue']
+ album_cid = args['cid']
+ album_name = args['name']
+ album_coverUrl = args['coverUrl']
+ album_artistes = args['artistes']
+ album_url = 'https://monster-siren.hypergryph.com/api/album/' + album_cid + '/detail'
+
+
+
+ try:
+ with open(directory + 'completed_albums.json', 'r', encoding='utf8') as f:
+ completed_albums = json.load(f)
+ except:
+ completed_albums = []
+
+ # fix the album name which have space in last word in Windows
+ album_name = album_name.rstrip().split()
+ if len(album_name) > 0 and album_name[-1].endswith(' '):
+ album_name[-1] = album_name[-1][:-1]
+ album_name = ' '.join(album_name)
+
+ if album_name in completed_albums:
+ # If album is completed then skip
+ print(f'Skipping downloaded album {album_name}')
+ with lock:
+ pass_counter.value += 1
+ return
+ try:
+ os.mkdir(directory + album_name)
+ except:
+ pass
+
+ # Download album art
+ with open(directory + album_name + '/cover.jpg', 'w+b') as f:
+ f.write(session.get(album_coverUrl).content)
+
+ # Change album art from .jpg to .png
+ cover = Image.open(directory + album_name + '/cover.jpg')
+ cover.save(directory + album_name + '/cover.png')
+ os.remove(directory + album_name + '/cover.jpg')
+
+
+ songs = session.get(album_url, headers={'Accept': 'application/json'}).json()['data']['songs']
+ for song_track_number, song in enumerate(songs):
+ # Get song details
+ time.sleep(3) # add 3-second delay
+ song_cid = song['cid']
+ song_name = song['name']
+ song_artists = song['artistes']
+ song_url = 'https://monster-siren.hypergryph.com/api/song/' + song_cid
+ headers = read_agent()
+ song_detail = session.get(song_url, headers=headers).json()['data']
+ song_lyricUrl = song_detail['lyricUrl']
+ song_sourceUrl = song_detail['sourceUrl']
+
+ # Download lyric
+ if (song_lyricUrl != None):
+ songlyricpath = directory + album_name + '/' + make_valid(song_name) + '.lrc'
+ with open(songlyricpath, 'w+b') as f:
+ f.write(session.get(song_lyricUrl).content)
+ else:
+ songlyricpath = None
+
+ # Download song and fill out metadata
+ all_filldata, filename, filetype = download_song(session=session, directory=directory + album_name, name=song_name, url=song_sourceUrl,song_counter=song_counter,lock=lock,file_format=file_format)
+ if file_format == 'mp3' or file_format == 'flac':
+ fill_metadata(filename=filename,
+ filetype=filetype,
+ album=album_name,
+ title=song_name,
+ albumartist=album_artistes,
+ artist=song_artists,
+ tracknumber=song_track_number,
+ albumcover=directory + album_name + '/cover.png',
+ songlyricpath=songlyricpath)
+ elif file_format == 'all':
+ for i in range(0, len(all_filldata), 2):
+ fill_metadata(filename=all_filldata[i],
+ filetype=all_filldata[i+1],
+ album=album_name,
+ title=song_name,
+ albumartist=album_artistes,
+ artist=song_artists,
+ tracknumber=song_track_number,
+ albumcover=directory + album_name + '/cover.png',
+ songlyricpath=songlyricpath)
+ else:
+ print("fillmeta error")
+ # Increase album counter
+ with lock:
+ album_counter.value += 1
+ # Mark album as finished
+ queue.put(album_name)
+ return
+
+def read_agent():
+ # Read user agent strings from file
+ with open('user_agent.txt', 'r') as f:
+ user_agent_list = [line.strip() for line in f]
+
+ # Choose a random user agent
+ user_agent = random.choice(user_agent_list)
+
+ # Set headers with Accept and User-Agent
+ headers = {
+ 'Accept': 'application/json',
+ 'User-Agent': user_agent
+ }
+ return headers
+
+
+def main():
+ directory = './MonsterSiren/'
+ session = requests.Session()
+ manager = Manager()
+ queue = manager.Queue()
+ lock = manager.Lock()
+ pass_counter = manager.Value('i', 0)
+ song_counter = manager.Value('i', 0)
+ album_counter = manager.Value('i', 0)
+
+ file_format = input("Enter the file format to convert to (flac/mp3/all): ")
+
+ try:
+ os.mkdir(directory)
+ except:
+ pass
+
+
+ headers = read_agent()
+ # Get all albums
+ albums = session.get('https://monster-siren.hypergryph.com/api/albums', headers=headers).json()['data']
+ for album in albums:
+ album['directory'] = directory
+ album['session'] = session
+ album['queue'] = queue
+
+
+ # Download all albums
+ num_workers = os.cpu_count() - 3 # leave CPU core free
+ with Pool(num_workers) as pool:
+ # with Pool(maxtasksperchild=1) as pool:
+ pool.apply_async(update_downloaded_albums, (queue, directory))
+ results = pool.starmap(download_album, [(album, pass_counter, song_counter, album_counter, lock, file_format) for album in albums])
+ queue.put('kill')
+
+ pass_total = pass_counter.value
+ song_total = song_counter.value
+ album_total = album_counter.value
+ # Write counter to file
+ with open("Statistics.txt", "a") as f:
+ timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ f.write(f'Finish Time: {timestamp}\n')
+ f.write(f'Total albums skipped: {pass_total}\n')
+ f.write(f"Downloaded {song_total} songs from {album_total} albums.\n")
+ f.write(f"-----------------------------\n")
+ print(f'Total albums skipped: {pass_total}')
+ print(f"Downloaded {song_total} songs from {album_total} albums.")
+ return
+
+
+
+if __name__ == '__main__':
+ main()