aboutsummaryrefslogtreecommitdiff
path: root/test.py
diff options
context:
space:
mode:
Diffstat (limited to 'test.py')
-rw-r--r--test.py369
1 files changed, 0 insertions, 369 deletions
diff --git a/test.py b/test.py
deleted file mode 100644
index c30cf96..0000000
--- a/test.py
+++ /dev/null
@@ -1,369 +0,0 @@
-import os
-import requests
-from requests.adapters import HTTPAdapter
-from urllib3.util import Retry
-from tqdm import tqdm
-import pylrc
-import json
-from PIL import Image
-from multiprocessing import Pool, Manager,Lock, Value
-from mutagen.easyid3 import EasyID3
-from mutagen.id3 import APIC, SYLT, Encoding, ID3
-from mutagen.flac import Picture, FLAC
-from pydub import AudioSegment
-import time
-import datetime
-import sys
-import random
-def make_valid(filename):
- # Make a filename valid in different OSs
- f = filename.replace(':', '_')
- f = f.replace('/', '_')
- f = f.replace('<', '_')
- f = f.replace('>', '_')
- f = f.replace('\'', '_')
- f = f.replace('\\', '_')
- f = f.replace('|', '_')
- f = f.replace('?', '_')
- f = f.replace('*', '_')
- return f
-
-
-def lyric_file_to_text(filename):
- lrc_file = open(filename, 'r', encoding='utf-8')
- lrc_string = ''.join(lrc_file.readlines())
- lrc_file.close()
- subs = pylrc.parse(lrc_string)
- ret = []
- for sub in subs:
- time = int(sub.time * 1000)
- text = sub.text
- ret.append((text, time))
- return ret
-
-def update_downloaded_albums(queue, directory):
- while 1:
- album_name = queue.get()
- try:
- with open(directory + 'completed_albums.json', 'r', encoding='utf8') as f:
- completed_albums = json.load(f)
- except:
- completed_albums = []
- completed_albums.append(album_name)
- with open(directory + 'completed_albums.json', 'w+', encoding='utf8') as f:
- json.dump(completed_albums, f)
-
-
-def fill_metadata(filename, filetype, album, title, albumartist, artist, tracknumber, albumcover, songlyricpath):
- if filetype == '.mp3':
- file = EasyID3(filename)
- else:
- file = FLAC(filename)
-
- file['album'] = album
- file['title'] = title
- file['albumartist'] = ''.join(albumartist)
- file['artist'] = ''.join(artist)
- file['tracknumber'] = str(tracknumber + 1)
- file.save()
-
- if filetype == '.mp3':
- file = ID3(filename)
- file.add(APIC(mime='image/png',type=3,desc='Cover',data=open(albumcover,'rb').read()))
- # Read and add lyrics
- if (songlyricpath != None):
- sylt = lyric_file_to_text(songlyricpath)
- file.setall('SYLT', [SYLT(encoding=Encoding.UTF8, lang='eng', format=2, type=1, text=sylt)])
- file.save()
- else:
- image = Picture()
- image.type = 3
- image.desc = 'Cover'
- image.mime = 'image/png'
- with open(albumcover,'rb') as f:
- image.data = f.read()
- with Image.open(albumcover) as imagePil:
- image.width, image.height = imagePil.size
- image.depth = 24
- file.add_picture(image)
- # Read and add lyrics
- if (songlyricpath != None):
- musiclrc = open(songlyricpath, 'r', encoding='utf-8').read()
- file['lyrics'] = musiclrc
- file.save()
-
- return
-
-
-
-def download_song(session, directory, name, url, song_counter, lock,file_format):
- # Set timeout and retry parameters
- time.sleep(3)
- timeout = 10
- retries = 5
-
- source = session.get(url, stream=True)
- filename = directory + '/' + make_valid(name)
- filetype = ''
-
- if source.headers['content-type'] == 'audio/mpeg':
- filename += '.mp3'
- filetype = '.mp3'
- else:
- filename += '.wav'
-
- # Download song with retries and timeout
- total = int(source.headers.get('content-length', 0))
- downloaded = 0
- retry_count = 0
- while downloaded < total:
- try:
- with open(filename, 'ab') as f, tqdm(
- desc=name,
- total=total,
- initial=downloaded,
- unit='iB',
- unit_scale=True,
- unit_divisor=1024,
- ) as bar:
- f.seek(downloaded)
- for data in source.iter_content(chunk_size = 1024):
- size = f.write(data)
- downloaded += size
- bar.update(size)
- if downloaded >= total:
- break
- if retry_count > 0:
- print(f'Retry successful. Downloading {name}...')
- retry_count = 0
- except (requests.exceptions.RequestException, IOError) as e:
- if retry_count >= retries:
- raise e
- else:
- retry_count += 1
- print(f"Download of {name} failed. Retrying in 3 seconds ({retry_count}/{retries})", file=sys.stderr)
- time.sleep(3)
- source = session.get(url, stream=True, timeout=timeout)
- total = int(source.headers.get('content-length', 0))
- downloaded = f.tell()
-
- if downloaded < total:
- print(f'Download of {name} was incomplete. Retrying...', file=sys.stderr)
- os.remove(filename)
-
- # Increase song counter
- with lock:
- song_counter.value += 1
-
- # If file is .wav then export to .flac
- if source.headers['content-type'] != 'audio/mpeg':
- all_filldata, filename, filetype = choice_format(file_format,filename,directory,name)
-
-
- return all_filldata, filename, filetype
-
-
-# define a function to make a valid file name
-def choice_format(file_format,filename,directory,name):
- # implementation details here
-
- # check the input and perform the conversion
- if file_format == 'flac':
- # convert to FLAC
- AudioSegment.from_wav(filename).export(directory + '/' + make_valid(name) + '.flac', format='flac')
- os.remove(filename)
- filename = directory + '/' + make_valid(name) + '.flac'
- filetype = '.flac'
- elif file_format == 'mp3':
- # convert to MP3
- AudioSegment.from_wav(filename).export(directory + '/' + make_valid(name) + '.mp3', format='mp3')
- os.remove(filename)
- filename = directory + '/' + make_valid(name) + '.mp3'
- filetype = '.mp3'
- elif file_format == 'all':
- temp_filename = filename
- # Convert to FLAC
- flac_filename = os.path.join(directory, make_valid(name) + '.flac')
- AudioSegment.from_wav(filename).export(flac_filename, format='flac')
- flac_filetype = '.flac'
- # Convert to MP3
- mp3_folder = os.path.join(directory, 'mp3')
- if not os.path.exists(mp3_folder):
- os.makedirs(mp3_folder)
- mp3_filename = os.path.join(mp3_folder, make_valid(name) + '.mp3')
- AudioSegment.from_wav(temp_filename).export(mp3_filename, format='mp3')
- os.remove(filename)
- mp3_filetype = '.mp3'
- all_filldata = [flac_filename, mp3_filename], [flac_filetype, mp3_filetype]
- else:
- print("Invalid file format. Please enter 'flac' or 'mp3'.")
-
- return all_filldata, filename, filetype
-
-def download_album( args, pass_counter, song_counter, album_counter,lock,file_format):
- directory = args['directory']
- session = args['session']
- queue = args['queue']
- album_cid = args['cid']
- album_name = args['name']
- album_coverUrl = args['coverUrl']
- album_artistes = args['artistes']
- album_url = 'https://monster-siren.hypergryph.com/api/album/' + album_cid + '/detail'
-
-
-
- try:
- with open(directory + 'completed_albums.json', 'r', encoding='utf8') as f:
- completed_albums = json.load(f)
- except:
- completed_albums = []
-
- # fix the album name which have space in last word in Windows
- album_name = album_name.rstrip().split()
- if len(album_name) > 0 and album_name[-1].endswith(' '):
- album_name[-1] = album_name[-1][:-1]
- album_name = ' '.join(album_name)
-
- if album_name in completed_albums:
- # If album is completed then skip
- print(f'Skipping downloaded album {album_name}')
- with lock:
- pass_counter.value += 1
- return
- try:
- os.mkdir(directory + album_name)
- except:
- pass
-
- # Download album art
- with open(directory + album_name + '/cover.jpg', 'w+b') as f:
- f.write(session.get(album_coverUrl).content)
-
- # Change album art from .jpg to .png
- cover = Image.open(directory + album_name + '/cover.jpg')
- cover.save(directory + album_name + '/cover.png')
- os.remove(directory + album_name + '/cover.jpg')
-
-
- songs = session.get(album_url, headers={'Accept': 'application/json'}).json()['data']['songs']
- for song_track_number, song in enumerate(songs):
- # Get song details
- time.sleep(3) # add 3-second delay
- song_cid = song['cid']
- song_name = song['name']
- song_artists = song['artistes']
- song_url = 'https://monster-siren.hypergryph.com/api/song/' + song_cid
- headers = read_agent()
- song_detail = session.get(song_url, headers=headers).json()['data']
- song_lyricUrl = song_detail['lyricUrl']
- song_sourceUrl = song_detail['sourceUrl']
-
- # Download lyric
- if (song_lyricUrl != None):
- songlyricpath = directory + album_name + '/' + make_valid(song_name) + '.lrc'
- with open(songlyricpath, 'w+b') as f:
- f.write(session.get(song_lyricUrl).content)
- else:
- songlyricpath = None
-
- # Download song and fill out metadata
- all_filldata, filename, filetype = download_song(session=session, directory=directory + album_name, name=song_name, url=song_sourceUrl,song_counter=song_counter,lock=lock,file_format=file_format)
- if file_format == 'mp3' or file_format == 'flac':
- fill_metadata(filename=filename,
- filetype=filetype,
- album=album_name,
- title=song_name,
- albumartist=album_artistes,
- artist=song_artists,
- tracknumber=song_track_number,
- albumcover=directory + album_name + '/cover.png',
- songlyricpath=songlyricpath)
- elif file_format == 'all':
- for i in range(0, len(all_filldata), 2):
- fill_metadata(filename=all_filldata[i],
- filetype=all_filldata[i+1],
- album=album_name,
- title=song_name,
- albumartist=album_artistes,
- artist=song_artists,
- tracknumber=song_track_number,
- albumcover=directory + album_name + '/cover.png',
- songlyricpath=songlyricpath)
- else:
- print("fillmeta error")
- # Increase album counter
- with lock:
- album_counter.value += 1
- # Mark album as finished
- queue.put(album_name)
- return
-
-def read_agent():
- # Read user agent strings from file
- with open('user_agent.txt', 'r') as f:
- user_agent_list = [line.strip() for line in f]
-
- # Choose a random user agent
- user_agent = random.choice(user_agent_list)
-
- # Set headers with Accept and User-Agent
- headers = {
- 'Accept': 'application/json',
- 'User-Agent': user_agent
- }
- return headers
-
-
-def main():
- directory = './MonsterSiren/'
- session = requests.Session()
- manager = Manager()
- queue = manager.Queue()
- lock = manager.Lock()
- pass_counter = manager.Value('i', 0)
- song_counter = manager.Value('i', 0)
- album_counter = manager.Value('i', 0)
-
- file_format = input("Enter the file format to convert to (flac/mp3/all): ")
-
- try:
- os.mkdir(directory)
- except:
- pass
-
-
- headers = read_agent()
- # Get all albums
- albums = session.get('https://monster-siren.hypergryph.com/api/albums', headers=headers).json()['data']
- for album in albums:
- album['directory'] = directory
- album['session'] = session
- album['queue'] = queue
-
-
- # Download all albums
- num_workers = os.cpu_count() - 3 # leave CPU core free
- with Pool(num_workers) as pool:
- # with Pool(maxtasksperchild=1) as pool:
- pool.apply_async(update_downloaded_albums, (queue, directory))
- results = pool.starmap(download_album, [(album, pass_counter, song_counter, album_counter, lock, file_format) for album in albums])
- queue.put('kill')
-
- pass_total = pass_counter.value
- song_total = song_counter.value
- album_total = album_counter.value
- # Write counter to file
- with open("Statistics.txt", "a") as f:
- timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- f.write(f'Finish Time: {timestamp}\n')
- f.write(f'Total albums skipped: {pass_total}\n')
- f.write(f"Downloaded {song_total} songs from {album_total} albums.\n")
- f.write(f"-----------------------------\n")
- print(f'Total albums skipped: {pass_total}')
- print(f"Downloaded {song_total} songs from {album_total} albums.")
- return
-
-
-
-if __name__ == '__main__':
- main()