Source code for DiscordUtils.music

import asyncio
from typing import Callable, Iterable, Optional, Tuple, Union

import aiohttp
from discord.ext import commands

try:
    import discord
    import youtube_dl

    has_voice = True
except ImportError:
    has_voice = False

if has_voice:
    youtube_dl.utils.bug_reports_message = lambda: ""
    ydl = youtube_dl.YoutubeDL({
        "format": "bestaudio/best",
        "restrictfilenames": True,
        "noplaylist": True,
        "nocheckcertificate": True,
        "ignoreerrors": True,
        "logtostderr": False,
        "quiet": True,
        "no_warnings": True,
        "source_address": "0.0.0.0",
    })


[docs]class EmptyQueue(Exception): """Cannot skip because queue is empty"""
[docs]class NotConnectedToVoice(Exception): """Cannot create the player because bot is not connected to voice"""
[docs]class NotPlaying(Exception): """Cannot <do something> because nothing is being played"""
[docs]async def ytbettersearch(query) -> str: '''Formats the search string for the YouTube music search''' url = f"https://www.youtube.com/results?search_query={query}" async with aiohttp.ClientSession() as session: async with session.get(url) as resp: html = await resp.text() index = html.find("watch?v") url = "" while True: char = html[index] if char == '"': break url += char index += 1 url = f"https://www.youtube.com/{url}" return url
[docs]class Song: """The requested song data """ __slots__ = [ 'source', 'url', 'title', 'description', 'views', 'duration', 'thumbnail', 'channel', 'channel_url', 'loop', 'name', 'is_looping' ] def __init__( self, source: str, url: str, title: str, description: str, views: int, duration: Union[str,int], thumbnail: str, channel: str, channel_url: str, loop: bool, ): self.source = source self.url = url self.title = title self.description = description self.views = views self.name = title self.duration = duration self.thumbnail = thumbnail self.channel = channel self.channel_url = channel_url self.is_looping = loop
[docs]async def get_video_data(url, search: bool, bettersearch:bool, loop: Optional[asyncio.AbstractEventLoop]) -> Song: """It returns required video data after searching `YouTube` :raises RuntimeError: Is raised when the package is install without the .[voice] parameters :return: The song data in a formatted way :rtype: :class:`Song` """ if not has_voice: raise RuntimeError("DiscordUtils[voice] install needed in order to use voice") if not search and not bettersearch: data = await loop.run_in_executor( None, lambda: ydl.extract_info(url, download=False)) source = data.get("url") url = "https://www.youtube.com/watch?v=" + data.get("id") title = data.get("title") description = data.get("description") views = data.get("view_count") duration = data.get("duration") thumbnail = data.get("thumbnail") channel = data.get("uploader") channel_url = data.get("uploader_url") return Song( source, url, title, description, views, duration, thumbnail, channel, channel_url, False, ) if bettersearch: url = await ytbettersearch(url) data = await loop.run_in_executor(None, lambda: ydl.extract_info(url, download=False)) source = data.get("url") url = "https://www.youtube.com/watch?v=" + data.get("id") title = data.get("title") description = data.get("description") views = data.get("view_count") duration = data.get("duration") thumbnail = data.get("thumbnail") channel = data.get("uploader") channel_url = data.get("uploader_url") return Song( source, url, title, description, views, duration, thumbnail, channel, channel_url, False, ) ytdl = youtube_dl.YoutubeDL({ "format": "bestaudio/best", "restrictfilenames": True, "noplaylist": True, "nocheckcertificate": True, "ignoreerrors": True, "logtostderr": False, "quiet": True, "no_warnings": True, "default_search": "auto", "source_address": "0.0.0.0", }) data = await loop.run_in_executor(None, lambda: ytdl.extract_info(url, download=False)) try: data = data["entries"][0] except (KeyError, TypeError): pass source = data.get("url") url = "https://www.youtube.com/watch?v=" + data.get("id") title = data.get("title") description = data.get("description") views = data.get("view_count") duration = data.get("duration") thumbnail = data.get("thumbnail") channel = data.get("uploader") channel_url = data.get("uploader_url") return Song( source, url, title, description, views, duration, thumbnail, channel, channel_url, False, )
[docs]def check_queue(ctx: commands.Context, opts: dict, music: 'Music', after: Callable, on_play: Callable, loop: Optional[asyncio.AbstractEventLoop]) -> None: """It checks the music queue :param ctx: The commands `context` :type ctx: commands.Context :param opts: A set options for `ffmpeg` :type opts: dict :param music: The master class where the all the players data is stored :type music: Music :param after: The :func:`check_queue` which would be called afterwards :type after: Callable :param on_play: :func:`MusicPlayer.on_play` function :type on_play: MusicPlayer.on_play :param loop: The event loop in which the :class:`~discord.ext.commands.Bot` is running :type loop: Optional[asyncio.AbstractEventLoop] :raises RuntimeError: Is raised when the package is install without the .[voice] parameters """ if not has_voice: raise RuntimeError("DiscordUtils[voice] install needed in order to use voice") try: song = music.queue[ctx.guild.id][0] except IndexError: return if not song.is_looping: try: music.queue[ctx.guild.id].pop(0) except IndexError: return if len(music.queue[ctx.guild.id]) > 0: source = discord.PCMVolumeTransformer( discord.FFmpegPCMAudio(music.queue[ctx.guild.id][0].source, **opts)) ctx.voice_client.play( source, after=lambda error: after(ctx, opts, music, after, on_play,loop), ) song = music.queue[ctx.guild.id][0] if on_play: loop.create_task(on_play(ctx, song)) else: source = discord.PCMVolumeTransformer( discord.FFmpegPCMAudio(music.queue[ctx.guild.id][0].source,**opts)) ctx.voice_client.play( source, after=lambda error: after(ctx, opts, music, after, on_play, loop)) song = music.queue[ctx.guild.id][0] if on_play: loop.create_task(on_play(ctx, song))
[docs]class MusicPlayer: """The class which acts a music controller/player :raises RuntimeError: Is raised when the package is install without the .[voice] parameters :raises NotPlaying: See :func:`skip`, :func:`stop`, :func:`resume`, :func:`pause`, :func:`toggle_song_loop`, :func:`change_volume`, :func:`remove_from_queue` :raises EmptyQueue: See :func:`skip`, :func:`current_queue` """ __slots__ = [ 'ctx', 'voice', 'loop', 'music', 'after_func', 'on_play_func', 'on_queue_func', 'on_skip_func', 'on_stop_func', 'on_pause_func', 'on_resume_func', 'on_loop_toggle_func', 'on_volume_change_func', 'on_remove_from_queue_func', 'ffmpeg_opts' ] def __init__(self, ctx:commands.Context , music: 'Music', **kwargs): if not has_voice: raise RuntimeError("DiscordUtils[voice] install needed in order to use voice") self.ctx = ctx self.voice: Optional[discord.VoiceProtocol] = ctx.voice_client self.loop: Optional[asyncio.AbstractEventLoop] = ctx.bot.loop self.music = music if self.ctx.guild.id not in self.music.queue: self.music.queue[self.ctx.guild.id] = [] self.after_func: Callable = check_queue self.on_play_func: Optional[Callable] = None self.on_queue_func: Optional[Callable] = None self.on_skip_func: Optional[Callable] = None self.on_stop_func: Optional[Callable] = None self.on_pause_func: Optional[Callable] = None self.on_resume_func: Optional[Callable] = None self.on_loop_toggle_func: Optional[Callable] = None self.on_volume_change_func: Optional[Callable] = None self.on_remove_from_queue_func: Optional[Callable] = None ffmpeg_error = kwargs.get("ffmpeg_error_betterfix", kwargs.get("ffmpeg_error_fix")) if ffmpeg_error and "ffmpeg_error_betterfix" in kwargs: self.ffmpeg_opts: dict = { "options": "-vn -loglevel quiet -hide_banner -nostats", "before_options": "-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 0 -nostdin", } elif ffmpeg_error: self.ffmpeg_opts: dict = { "options": "-vn", "before_options": "-reconnect 1 -reconnect_streamed 1 -reconnect_delay_max 0 -nostdin", } else: self.ffmpeg_opts: dict = {"options": "-vn", "before_options": "-nostdin"}
[docs] def disable(self): '''It disables the `Music Player`''' self.music.players.remove(self)
[docs] def on_queue(self, func: Callable) -> None: """The event when the song is `queued` :param func: :type func: Callable """ self.on_queue_func = func
[docs] def on_play(self, func: Callable) -> None: """The event when the song is `played` :param func: :type func: Callable """ self.on_play_func = func
[docs] def on_skip(self, func: Callable) -> None: """The event when the song is `skipped` :param func: :type func: Callable """ self.on_skip_func = func
[docs] def on_stop(self, func: Callable) -> None: """The event when the player is `stopped` :param func: :type func: Callable """ self.on_stop_func = func
[docs] def on_pause(self, func: Callable) -> None: """The event when the song is `paused` :param func: :type func: Callable """ self.on_pause_func = func
[docs] def on_resume(self, func: Callable) -> None: """The event when the song is `resumed` :param func: :type func: Callable """ self.on_resume_func = func
[docs] def on_loop_toggle(self, func: Callable) -> None: """The event when the `looping` is `enabled` :param func: :type func: Callable """ self.on_loop_toggle_func = func
[docs] def on_volume_change(self, func: Callable) -> None: """The event when the `volume` is `changed` :param func: :type func: Callable """ self.on_volume_change_func = func
[docs] def on_remove_from_queue(self, func: Callable) -> None: """The event when the song is `removed from the queue` :param func: :type func: Callable """ self.on_remove_from_queue_func = func
[docs] async def queue(self, url: str, search: bool = False, bettersearch: bool = False) -> Song: """The song to queue :param url: The `url` of the song provider :type url: str :param search: Song Name, defaults to False :type search: bool, optional :param bettersearch: Search betterly or not, defaults to False :type bettersearch: bool, optional :return: The song with the minimum required data :rtype: Song """ song = await get_video_data(url, search, bettersearch, self.loop) self.music.queue[self.ctx.guild.id].append(song) if self.on_queue_func: await self.on_queue_func(self.ctx, song) return song
[docs] async def play(self) -> Song: """Determines which song to play from the queue :return: See above :rtype: Song """ source = discord.PCMVolumeTransformer( discord.FFmpegPCMAudio(self.music.queue[self.ctx.guild.id][0].source,**self.ffmpeg_opts) ) self.voice.play( source, after=lambda error: self.after_func( self.ctx, self.ffmpeg_opts, self.music, self.after_func, self.on_play_func, self.loop, ), ) song = self.music.queue[self.ctx.guild.id][0] if self.on_play_func: await self.on_play_func(self.ctx, song) return song
[docs] async def skip(self, force: bool = False) -> Union[Tuple[Song, Song], Song]: """Skips the current song which is being played :param force: Force skip or not, defaults to False :type force: bool, optional :raises NotPlaying: When there is no song played then this error is raised :raises EmptyQueue: When the queue is empty :return: It returns (old song, new song) or just (song) depending on the situtation :rtype: Union[Tuple[Song, Song], Song] """ if len(self.music.queue[self.ctx.guild.id]) == 0: raise NotPlaying("Cannot loop because nothing is being played") elif not len(self.music.queue[self.ctx.guild.id]) > 1 and not force: raise EmptyQueue("Cannot skip because queue is empty") old = self.music.queue[self.ctx.guild.id][0] old.is_looping = False if old.is_looping else False self.voice.stop() try: new = self.music.queue[self.ctx.guild.id][0] if self.on_skip_func: await self.on_skip_func(self.ctx, old, new) return (old, new) except IndexError: if self.on_skip_func: await self.on_skip_func(self.ctx, old) return old
[docs] async def stop(self) -> None: """Stops the player :raises NotPlaying: When nothing is played """ try: self.music.queue[self.ctx.guild.id] = [] self.voice.stop() self.music.players.remove(self) except: raise NotPlaying("Cannot loop because nothing is being played") if self.on_stop_func: await self.on_stop_func(self.ctx)
[docs] async def pause(self) -> Song: """Pauses the player :raises NotPlaying: When nothing is played :return: The song on which the pause was initiated :rtype: Song """ try: self.voice.pause() song = self.music.queue[self.ctx.guild.id][0] except: raise NotPlaying("Cannot pause because nothing is being played") if self.on_pause_func: await self.on_pause_func(self.ctx, song) return song
[docs] async def resume(self) -> Song: """Resumes the player :raises NotPlaying: When nothing was played by the player previously :return: The song which will be played :rtype: Song """ try: self.voice.resume() song = self.music.queue[self.ctx.guild.id][0] except: raise NotPlaying("Cannot resume because nothing is being played") if self.on_resume_func: await self.on_resume_func(self.ctx, song) return song
[docs] def current_queue(self) -> Union[Iterable, Song]: """Gives the current queue of songs which is there in the player :raises EmptyQueue: When the song queue is empty :return: _description_ :rtype: Union[Iterable, Song] """ try: return self.music.queue[self.ctx.guild.id] except KeyError: raise EmptyQueue("Queue is empty")
[docs] def now_playing(self) -> Optional[Union[Iterable, Song]]: """Returns the :class:`Song` which is currently being played :return: See above :rtype: Optional[Union[Iterable, Song]] """ try: return self.music.queue[self.ctx.guild.id][0] except: return None
[docs] async def toggle_song_loop(self) -> Optional[Union[Iterable, Song]]: """It toggles on/off the looping :raises NotPlaying: When no song is being played :return: The currently playing song or the looped queue :rtype: Optional[Union[Iterable, Song]] """ try: song = self.music.queue[self.ctx.guild.id][0] except: raise NotPlaying("Cannot loop because nothing is being played") if not song.is_looping: song.is_looping = True else: song.is_looping = False if self.on_loop_toggle_func: await self.on_loop_toggle_func(self.ctx, song) return song
[docs] async def change_volume(self, vol: Union[int, float]) -> Tuple[Song, int]: """Change the song volume of the currently played song :param vol: The amount by the volume needs to increased or decreased :type vol: int :raises NotPlaying: When no song is played :return: (The song which is being played, volume no by which the song's volume was increased or decreased) :rtype: Tuple[Song, int] """ self.voice.source.volume = vol try: song = self.music.queue[self.ctx.guild.id][0] except: raise NotPlaying("Cannot loop because nothing is being played") if self.on_volume_change_func: await self.on_volume_change_func(self.ctx, song, vol) return (song, vol)
[docs] async def remove_from_queue(self, index: int) -> Song: """The utility function to remove :class:`Song` from the queue :param index: The index at which the :class:`Song` is located :type index: int :raises NotPlaying: When nothing is player by the player :return: The song to be removed from the player :rtype: Song """ if index == 0: try: song = self.music.queue[self.ctx.guild.id][0] except: raise NotPlaying("Cannot loop because nothing is being played") await self.skip(force=True) return song song = self.music.queue[self.ctx.guild.id][index] self.music.queue[self.ctx.guild.id].pop(index) if self.on_remove_from_queue_func: await self.on_remove_from_queue_func(self.ctx, song) return song
[docs] def delete(self) -> None: """Removes the song from the queue """ self.music.players.remove(self)
[docs]class Music: """The manager class to initiate and music and manage its player :raises RuntimeError: Is raised when the package is install without the .[voice] parameters :raises NotConnectedToVoice: See :func:`create_player` """ __slots__ = ['queue', 'players'] def __init__(self): if not has_voice: raise RuntimeError("DiscordUtils[voice] install needed in order to use voice") self.queue: dict = {} self.players: list = []
[docs] def create_player(self, ctx: commands.Context, **kwargs) -> MusicPlayer: """It create a music player, using which the music will be played in the `voice channels` :param ctx: The commands `context` :type ctx: commands.Context :raises NotConnectedToVoice: When the client is not connect to any of the voice channel :return: The music player using the user will have the control over its requested songs :rtype: MusicPlayer """ if not ctx.voice_client: raise NotConnectedToVoice( "Cannot create the player because bot is not connected to voice" ) player = MusicPlayer(ctx, self, **kwargs) self.players.append(player) return player
[docs] def get_player(self, **kwargs) -> Optional[MusicPlayer]: """Its gets the `MusicPlayer` of the specified `guild` or `voice channel` :return: See above :rtype: Optional[MusicPlayer] """ guild = kwargs.get("guild_id") channel = kwargs.get("channel_id") for player in self.players: if (guild and channel and player.ctx.guild.id == guild and player.voice.channel.id == channel): return player elif not guild and channel and player.voice.channel.id == channel: return player elif not channel and guild and player.ctx.guild.id == guild: return player return None