Source code for

"""The core of phial."""
import json
import logging
from concurrent.futures import Future, ThreadPoolExecutor
from time import sleep
from typing import Callable, Dict, List, Optional

from slackclient import SlackClient  # type: ignore

from phial.commands import help_command
from phial.errors import ArgumentTypeValidationError
from phial.globals import _command_ctx_stack
from phial.scheduler import Schedule, ScheduledJob, Scheduler
from phial.types import PhialResponse
from phial.utils import parse_slack_output, validate_kwargs
from phial.wrappers import Attachment, Command, Message, Response

from ._reloader import run_with_reloader

[docs]class Phial: """ The Phial class acts as the main interface to Slack. It handles registraion and execution of user defined commands, as well as providing a wrapper around :obj:`slackclient.SlackClient` to make sending messages to Slack simpler. """ #: Default configuration default_config = { 'prefix': "!", 'registerHelpCommand': True, 'baseHelpText': "All available commands:", 'autoReconnect': True, 'loopDelay': 0.001, 'hotReload': False, 'maxThreads': 4, } def __init__(self, token: str, config: Dict = default_config) -> None: self.slack_client = SlackClient(token) self.commands: List[Command] = [] self.config: Dict = dict(self.default_config) self.config.update(config) self.middleware_functions: List[Callable[[Message], Optional[Message]]] = [] self.scheduler = Scheduler() self.fallback_func: Optional[Callable[[Message], PhialResponse]] = None self.logger = logging.getLogger(__name__) if not self.logger.hasHandlers(): # pragma: nocover handler = logging.StreamHandler() formatter = logging.Formatter( fmt="%(asctime)s [%(name)s] - %(message)s") handler.setFormatter(formatter) self.logger.addHandler(handler) self.logger.propagate = False self.logger.setLevel(logging.INFO) self._register_standard_commands()
[docs] def add_command(self, pattern: str, func: Callable[..., PhialResponse], case_sensitive: bool = False, help_text_override: Optional[str] = None, hide_from_help_command: Optional[bool] = False) -> None: """ Registers a command with the bot. This method can be used as a decorator via :meth:`command` :param pattern: The pattern that a :obj:`Message`'s text must match for the command to be invoked. :param func: The fucntion to be executed when the command is invoked :param case_sensitive: Whether the :code:`pattern` should respect case sensitivity. Defaults to False :param help_text_override: Text that should be used as a description of the command using the inbuilt help function. If not overriden the command's docstring will be used as the help text. Defaults to None :param hide_from_help_command: A flag to specify whether or not the inbuilt help command should hide this command from the list it generates. Defaults to False :raises ValueError: If command with the same pattern is already registered .. rubric:: Example :: def hello(): return "world" bot.add_command('hello', world) Is the same as :: @bot.command('hello') def hello(): return "world" """ pattern = "{0}{1}".format(self.config["prefix"] if "prefix" in self.config else "", pattern) # Validate command does not already exist for existing_command in self.commands: if pattern == existing_command.pattern_string: raise ValueError('Command {0} already exists' .format(pattern.split("<")[0])) # Create and add command command = Command(pattern, func, case_sensitive, help_text_override=help_text_override, hide_from_help_command=hide_from_help_command) self.commands.append(command) self.logger.debug("Command {0} added" .format(pattern))
[docs] def command(self, pattern: str, case_sensitive: bool = False, help_text_override: Optional[str] = None, hide_from_help_command: Optional[bool] = False) -> Callable: """ Registers a command with the bot. This command is a decorator version of :meth:`add_command` :param pattern: The pattern that a :obj:`Message`'s text must match for the command to be invoked. :param case_sensitive: Whether the :code:`pattern` should respect case sensitivity. Defaults to False :param help_text_override: Text that should be used as a description of the command using the inbuilt help function. If not overriden the command's docstring will be used as the help text. Defaults to None :param hide_from_help_command: A flag to specify whether or not the inbuilt help command should hide this command from the list it generates. Defaults to False .. rubric:: Example :: @bot.command('hello') def hello(): return "world" @bot.command('caseSensitive', case_sensitive=True) def case_sensitive(): return "You typed caseSensitive" """ def decorator(f: Callable) -> Callable: self.add_command(pattern, f, case_sensitive, help_text_override=help_text_override, hide_from_help_command=hide_from_help_command) return f return decorator
[docs] def alias(self, pattern: str) -> Callable: """ A decorator that is used to register an alias for a command. :param pattern: The pattern that a :obj:`Message`'s text must match for the command to be invoked. .. rubric:: Example :: @bot.command('hello') @bot.alias('goodbye') def hello(): return "world" """ pattern = "{0}{1}".format(self.config["prefix"] if "prefix" in self.config else "", pattern) def decorator(f: Callable) -> Callable: if not hasattr(f, 'alias_patterns'): f.alias_patterns = [] # type: ignore f.alias_patterns.append(pattern) # type: ignore return f return decorator
[docs] def add_fallback_command(self, func: Callable[[Message], PhialResponse]) -> None: """ Add a fallback command. Registers a 'fallback' function to run when a user tries to execute a command that doesn't exist. This method can be used as a decorator via :meth:`fallback_command` :param func: The function to be executed when the user tries to execute a command that doesn't exist .. rubric:: Example :: def error_handler(message: Message) -> str: return "Oops that command doesn't seem to exist" bot.add_fallback_command(error_handler) Is the same as :: @bot.fallback_command() def error_handler(message: Message) -> str: return "Oops that command doesn't seem to exist" """ self.fallback_func = func
[docs] def fallback_command(self) -> Callable: """ A decorator to add a fallback command. See :meth:`add_fallback_command` for more information on fallback commands .. rubric:: Example :: @bot.fallback_command() def error_handler(message: Message) -> str: return "Oops that command doesn't seem to exist" """ def decorator(f: Callable) -> Callable: self.add_fallback_command(f) return f return decorator
[docs] def add_middleware(self, func: Callable[[Message], Optional[Message]]) -> None: """ Adds a middleware function to the bot. Middleware functions get passed every message the bot recieves from slack before the bot process the message itself. Returning :obj:`None` from a middleware function will prevent the bot from processing it. This method can be used as a decorator via :meth:`middleware`. :param middleware_func: The function to be added to the middleware pipeline .. rubric :: Example :: def intercept(messaage): return message bot.add_middleware(intercept) Is the same as :: @bot.middleware() def intercept(messaage): return message """ self.middleware_functions.append(func) self.logger.debug("Middleware {0} added" .format(getattr(func, '__name__', repr(func))))
[docs] def middleware(self) -> Callable: """ A decorator to add a middleware function to the bot. See :meth:`add_middleware` for more information about middleware .. rubric:: Example :: @bot.middleware() def intercept(messaage): return message """ def decorator(f: Callable) -> Callable: self.add_middleware(f) return f return decorator
[docs] def add_scheduled(self, schedule: Schedule, func: Callable) -> None: """ Adds a scheduled function to the bot. This method can be used as a decorator via :meth:`scheduled`. :param schedule: The schedule used to run the function :param scheduled_func: The function to be run in accordance to the schedule .. rubric:: Example :: def scheduled_beep(): bot.send_message(Response(text="Beep", channel="channel-id">)) bot.add_scheduled(Schedule().every().day(), scheduled_beep) Is the same as :: @bot.scheduled(Schedule().every().day()) def scheduled_beep(): bot.send_message(Response(text="Beep", channel="channel-id">)) """ job = ScheduledJob(schedule, func) self.scheduler.add_job(job) self.logger.debug("Schedule {0} added" .format(getattr(func, '__name__', repr(func))))
[docs] def scheduled(self, schedule: Schedule) -> Callable: """ A decorator that is used to register a scheduled function. See :meth:`add_scheduled` for more information on scheduled jobs. :param schedule: The schedule used to determine when the function should be run .. rubric:: Example :: @bot.scheduled(Schedule().every().day()) def scheduled_beep(): bot.send_message(Response(text="Beep", channel="channel-id">)) """ def decorator(f: Callable) -> Callable: self.add_scheduled(schedule, f) return f return decorator
[docs] def send_message(self, message: Response) -> None: """ Sends a message to Slack. :param message: The message to be sent to Slack """ api_method = ('chat.postEphemeral' if message.ephemeral else 'chat.postMessage') if message.original_ts: self.slack_client.api_call(api_method,, text=message.text, thread_ts=message.original_ts, attachments=json.dumps( message.attachments), user=message.user, as_user=True) else: self.slack_client.api_call(api_method,, text=message.text, attachments=json.dumps( message.attachments), user=message.user, as_user=True)
[docs] def send_reaction(self, response: Response) -> None: """ Sends a reaction to a Slack Message. :param response: Response containing the reaction to be sent to Slack """ self.slack_client.api_call("reactions.add",, timestamp=response.original_ts, name=response.reaction, as_user=True)
[docs] def upload_attachment(self, attachment: Attachment) -> None: """ Upload a file to Slack. :param attachment: The attachment to be uploaded to Slack """ self.slack_client.api_call('files.upload',, filename=attachment.filename, file=attachment.content)
def _register_standard_commands(self) -> None: if ("registerHelpCommand" in self.config and self. config['registerHelpCommand']): # The command function has to be a lambda as we wish to delay # execution until all commands have been registered. self.add_command("help", lambda: help_command(self), help_text_override="List all available commmands") def _send_response(self, response: PhialResponse, original_channel: str) -> None: if response is None: return # Do nothing if command function returns nothing if isinstance(response, str): self.send_message(Response(text=response, channel=original_channel)) elif not isinstance(response, Response) and not \ isinstance(response, Attachment): raise ValueError('Only Response or Attachment objects can be ' 'returned from command functions') if isinstance(response, Response): if response.original_ts and response.reaction and response.text: raise ValueError('Response objects with an original timestamp ' 'can only have one of the attributes: ' 'Reaction, Text') if response.original_ts and response.reaction: self.send_reaction(response) elif response.text or response.attachments: self.send_message(response) if isinstance(response, Attachment): self.upload_attachment(response) def _handle_message(self, message: Optional[Message]) -> None: if not message: return # Run middleware functions for func in self.middleware_functions: if message: self.logger.debug("Ran middleware: {0} on" " {1}".format(func.__name__, message)) message = func(message) # If message has been intercepted or is a bot message return early if not message or message.bot_id: return # If message should have a prefix but doesn't return early if "prefix" in self.config and self.config["prefix"] is not None \ and not message.text.startswith(self.config["prefix"]): return # If message has not been intercepted continue with standard message # handling for command in self.commands: command_name = command.func.__name__ kwargs = command.pattern_matches(message) if kwargs is not None: try: kwargs = validate_kwargs(command.func, kwargs) _command_ctx_stack.push(message) response = command.func(**kwargs) self._send_response(response, return except ArgumentTypeValidationError as e: self._send_response(str(e), return finally: self.logger.debug("Ran command: {0} on" " {1}".format(command_name, message)) _command_ctx_stack.pop() # If we are here then no commands have matched self.logger.warning("Command {0} not found".format(message.text)) if self.fallback_func is not None: try: _command_ctx_stack.push(message) response = self.fallback_func(message) self._send_response(response, finally: _command_ctx_stack.pop() def _start(self) -> None: # pragma: no cover """ Starts the bot. When called will start the bot listening to messages from Slack """ auto_reconnect = self.config['autoReconnect'] if not self.slack_client.rtm_connect(auto_reconnect=auto_reconnect, with_team_state=False): raise ValueError("Connection failed. Invalid Token or bot ID")"Phial connected and running!") thread_pool_size = self.config['maxThreads'] thread_pool = ThreadPoolExecutor(thread_pool_size) run_pending_tasks: Optional[Future] = None while True: try: message = parse_slack_output(self.slack_client.rtm_read()) thread_pool.submit(self._handle_message, message) if run_pending_tasks is None or run_pending_tasks.done(): run_pending_tasks = thread_pool.submit(self.scheduler.run_pending) except Exception as e: self.logger.error(e) sleep(self.config['loopDelay']) # Help prevent high CPU usage.
[docs] def run(self) -> None: # pragma: no cover """Run the bot.""" if self.config['hotReload']: run_with_reloader(self._start) else: self._start()