"""The core of phial."""
import json
import logging
from concurrent.futures import Future, ThreadPoolExecutor
from time import sleep
from typing import Callable, Dict, List, Optional
from slackclient import SlackClient # type: ignore
from phial.commands import help_command
from phial.errors import ArgumentTypeValidationError
from phial.globals import _command_ctx_stack
from phial.scheduler import Schedule, ScheduledJob, Scheduler
from phial.types import PhialResponse
from phial.utils import parse_slack_output, validate_kwargs
from phial.wrappers import Attachment, Command, Message, Response
from ._reloader import run_with_reloader
[docs]class Phial:
"""
The Phial class acts as the main interface to Slack.
It handles registraion and execution of user defined commands,
as well as providing a wrapper around :obj:`slackclient.SlackClient`
to make sending messages to Slack simpler.
"""
#: Default configuration
default_config = {
'prefix': "!",
'registerHelpCommand': True,
'baseHelpText': "All available commands:",
'autoReconnect': True,
'loopDelay': 0.001,
'hotReload': False,
'maxThreads': 4,
}
def __init__(self,
token: str,
config: Dict = default_config) -> None:
self.slack_client = SlackClient(token)
self.commands: List[Command] = []
self.config: Dict = dict(self.default_config)
self.config.update(config)
self.middleware_functions: List[Callable[[Message], Optional[Message]]] = []
self.scheduler = Scheduler()
self.fallback_func: Optional[Callable[[Message], PhialResponse]] = None
self.logger = logging.getLogger(__name__)
if not self.logger.hasHandlers(): # pragma: nocover
handler = logging.StreamHandler()
formatter = logging.Formatter(
fmt="%(asctime)s [%(name)s] - %(message)s")
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.propagate = False
self.logger.setLevel(logging.INFO)
self._register_standard_commands()
[docs] def add_command(self,
pattern: str,
func: Callable[..., PhialResponse],
case_sensitive: bool = False,
help_text_override: Optional[str] = None,
hide_from_help_command: Optional[bool] = False) -> None:
"""
Registers a command with the bot.
This method can be used as a decorator via :meth:`command`
:param pattern: The pattern that a :obj:`Message`'s text must
match for the command to be invoked.
:param func: The fucntion to be executed when the command is
invoked
:param case_sensitive: Whether the :code:`pattern` should
respect case sensitivity.
Defaults to False
:param help_text_override: Text that should be used as a
description of the command using
the inbuilt help function.
If not overriden the command's
docstring will be used as the
help text.
Defaults to None
:param hide_from_help_command: A flag to specify whether or not
the inbuilt help command should
hide this command from the list
it generates.
Defaults to False
:raises ValueError: If command with the same pattern is already
registered
.. rubric:: Example
::
def hello():
return "world"
bot.add_command('hello', world)
Is the same as
::
@bot.command('hello')
def hello():
return "world"
"""
pattern = "{0}{1}".format(self.config["prefix"] if "prefix"
in self.config else "", pattern)
# Validate command does not already exist
for existing_command in self.commands:
if pattern == existing_command.pattern_string:
raise ValueError('Command {0} already exists'
.format(pattern.split("<")[0]))
# Create and add command
command = Command(pattern,
func,
case_sensitive,
help_text_override=help_text_override,
hide_from_help_command=hide_from_help_command)
self.commands.append(command)
self.logger.debug("Command {0} added"
.format(pattern))
[docs] def command(self,
pattern: str,
case_sensitive: bool = False,
help_text_override: Optional[str] = None,
hide_from_help_command: Optional[bool] = False) -> Callable:
"""
Registers a command with the bot.
This command is a decorator version of :meth:`add_command`
:param pattern: The pattern that a :obj:`Message`'s text must
match for the command to be invoked.
:param case_sensitive: Whether the :code:`pattern` should
respect case sensitivity.
Defaults to False
:param help_text_override: Text that should be used as a
description of the command using
the inbuilt help function.
If not overriden the command's
docstring will be used as the
help text.
Defaults to None
:param hide_from_help_command: A flag to specify whether or not
the inbuilt help command should
hide this command from the list
it generates.
Defaults to False
.. rubric:: Example
::
@bot.command('hello')
def hello():
return "world"
@bot.command('caseSensitive', case_sensitive=True)
def case_sensitive():
return "You typed caseSensitive"
"""
def decorator(f: Callable) -> Callable:
self.add_command(pattern, f,
case_sensitive,
help_text_override=help_text_override,
hide_from_help_command=hide_from_help_command)
return f
return decorator
[docs] def alias(self, pattern: str) -> Callable:
"""
A decorator that is used to register an alias for a command.
:param pattern: The pattern that a :obj:`Message`'s text must
match for the command to be invoked.
.. rubric:: Example
::
@bot.command('hello')
@bot.alias('goodbye')
def hello():
return "world"
"""
pattern = "{0}{1}".format(self.config["prefix"] if "prefix"
in self.config else "", pattern)
def decorator(f: Callable) -> Callable:
if not hasattr(f, 'alias_patterns'):
f.alias_patterns = [] # type: ignore
f.alias_patterns.append(pattern) # type: ignore
return f
return decorator
[docs] def add_fallback_command(self,
func: Callable[[Message], PhialResponse]) -> None:
"""
Add a fallback command.
Registers a 'fallback' function to run when a user tries to execute a
command that doesn't exist.
This method can be used as a decorator via :meth:`fallback_command`
:param func: The function to be executed when the user tries
to execute a command that doesn't exist
.. rubric:: Example
::
def error_handler(message: Message) -> str:
return "Oops that command doesn't seem to exist"
bot.add_fallback_command(error_handler)
Is the same as
::
@bot.fallback_command()
def error_handler(message: Message) -> str:
return "Oops that command doesn't seem to exist"
"""
self.fallback_func = func
[docs] def fallback_command(self) -> Callable:
"""
A decorator to add a fallback command.
See :meth:`add_fallback_command` for more information on
fallback commands
.. rubric:: Example
::
@bot.fallback_command()
def error_handler(message: Message) -> str:
return "Oops that command doesn't seem to exist"
"""
def decorator(f: Callable) -> Callable:
self.add_fallback_command(f)
return f
return decorator
[docs] def add_middleware(self,
func: Callable[[Message], Optional[Message]]) -> None:
"""
Adds a middleware function to the bot.
Middleware functions get passed every message the bot recieves from
slack before the bot process the message itself. Returning :obj:`None`
from a middleware function will prevent the bot from processing it.
This method can be used as a decorator via :meth:`middleware`.
:param middleware_func: The function to be added to the middleware
pipeline
.. rubric :: Example
::
def intercept(messaage):
return message
bot.add_middleware(intercept)
Is the same as
::
@bot.middleware()
def intercept(messaage):
return message
"""
self.middleware_functions.append(func)
self.logger.debug("Middleware {0} added"
.format(getattr(func, '__name__', repr(func))))
[docs] def middleware(self) -> Callable:
"""
A decorator to add a middleware function to the bot.
See :meth:`add_middleware` for more information about middleware
.. rubric:: Example
::
@bot.middleware()
def intercept(messaage):
return message
"""
def decorator(f: Callable) -> Callable:
self.add_middleware(f)
return f
return decorator
[docs] def add_scheduled(self,
schedule: Schedule,
func: Callable) -> None:
"""
Adds a scheduled function to the bot.
This method can be used as a decorator via :meth:`scheduled`.
:param schedule: The schedule used to run the function
:param scheduled_func: The function to be run in accordance to the
schedule
.. rubric:: Example
::
def scheduled_beep():
bot.send_message(Response(text="Beep",
channel="channel-id">))
bot.add_scheduled(Schedule().every().day(), scheduled_beep)
Is the same as
::
@bot.scheduled(Schedule().every().day())
def scheduled_beep():
bot.send_message(Response(text="Beep",
channel="channel-id">))
"""
job = ScheduledJob(schedule, func)
self.scheduler.add_job(job)
self.logger.debug("Schedule {0} added"
.format(getattr(func, '__name__', repr(func))))
[docs] def scheduled(self, schedule: Schedule) -> Callable:
"""
A decorator that is used to register a scheduled function.
See :meth:`add_scheduled` for more information on scheduled
jobs.
:param schedule: The schedule used to determine when the function
should be run
.. rubric:: Example
::
@bot.scheduled(Schedule().every().day())
def scheduled_beep():
bot.send_message(Response(text="Beep",
channel="channel-id">))
"""
def decorator(f: Callable) -> Callable:
self.add_scheduled(schedule, f)
return f
return decorator
[docs] def send_message(self, message: Response) -> None:
"""
Sends a message to Slack.
:param message: The message to be sent to Slack
"""
api_method = ('chat.postEphemeral' if message.ephemeral
else 'chat.postMessage')
if message.original_ts:
self.slack_client.api_call(api_method,
channel=message.channel,
text=message.text,
thread_ts=message.original_ts,
attachments=json.dumps(
message.attachments),
user=message.user,
as_user=True)
else:
self.slack_client.api_call(api_method,
channel=message.channel,
text=message.text,
attachments=json.dumps(
message.attachments),
user=message.user,
as_user=True)
[docs] def send_reaction(self, response: Response) -> None:
"""
Sends a reaction to a Slack Message.
:param response: Response containing the reaction to be
sent to Slack
"""
self.slack_client.api_call("reactions.add",
channel=response.channel,
timestamp=response.original_ts,
name=response.reaction,
as_user=True)
[docs] def upload_attachment(self, attachment: Attachment) -> None:
"""
Upload a file to Slack.
:param attachment: The attachment to be uploaded to Slack
"""
self.slack_client.api_call('files.upload',
channels=attachment.channel,
filename=attachment.filename,
file=attachment.content)
def _register_standard_commands(self) -> None:
if ("registerHelpCommand" in self.config and self.
config['registerHelpCommand']):
# The command function has to be a lambda as we wish to delay
# execution until all commands have been registered.
self.add_command("help", lambda: help_command(self),
help_text_override="List all available commmands")
def _send_response(self,
response: PhialResponse,
original_channel: str) -> None:
if response is None:
return # Do nothing if command function returns nothing
if isinstance(response, str):
self.send_message(Response(text=response,
channel=original_channel))
elif not isinstance(response, Response) and not \
isinstance(response, Attachment):
raise ValueError('Only Response or Attachment objects can be '
'returned from command functions')
if isinstance(response, Response):
if response.original_ts and response.reaction and response.text:
raise ValueError('Response objects with an original timestamp '
'can only have one of the attributes: '
'Reaction, Text')
if response.original_ts and response.reaction:
self.send_reaction(response)
elif response.text or response.attachments:
self.send_message(response)
if isinstance(response, Attachment):
self.upload_attachment(response)
def _handle_message(self, message: Optional[Message]) -> None:
if not message:
return
# Run middleware functions
for func in self.middleware_functions:
if message:
self.logger.debug("Ran middleware: {0} on"
" {1}".format(func.__name__, message))
message = func(message)
# If message has been intercepted or is a bot message return early
if not message or message.bot_id:
return
# If message should have a prefix but doesn't return early
if "prefix" in self.config and self.config["prefix"] is not None \
and not message.text.startswith(self.config["prefix"]):
return
# If message has not been intercepted continue with standard message
# handling
for command in self.commands:
command_name = command.func.__name__
kwargs = command.pattern_matches(message)
if kwargs is not None:
try:
kwargs = validate_kwargs(command.func, kwargs)
_command_ctx_stack.push(message)
response = command.func(**kwargs)
self._send_response(response, message.channel)
return
except ArgumentTypeValidationError as e:
self._send_response(str(e), message.channel)
return
finally:
self.logger.debug("Ran command: {0} on"
" {1}".format(command_name, message))
_command_ctx_stack.pop()
# If we are here then no commands have matched
self.logger.warning("Command {0} not found".format(message.text))
if self.fallback_func is not None:
try:
_command_ctx_stack.push(message)
response = self.fallback_func(message)
self._send_response(response, message.channel)
finally:
_command_ctx_stack.pop()
def _start(self) -> None: # pragma: no cover
"""
Starts the bot.
When called will start the bot listening to messages from Slack
"""
auto_reconnect = self.config['autoReconnect']
if not self.slack_client.rtm_connect(auto_reconnect=auto_reconnect,
with_team_state=False):
raise ValueError("Connection failed. Invalid Token or bot ID")
self.logger.info("Phial connected and running!")
thread_pool_size = self.config['maxThreads']
thread_pool = ThreadPoolExecutor(thread_pool_size)
run_pending_tasks: Optional[Future] = None
while True:
try:
message = parse_slack_output(self.slack_client.rtm_read())
thread_pool.submit(self._handle_message, message)
if run_pending_tasks is None or run_pending_tasks.done():
run_pending_tasks = thread_pool.submit(self.scheduler.run_pending)
except Exception as e:
self.logger.error(e)
sleep(self.config['loopDelay']) # Help prevent high CPU usage.
[docs] def run(self) -> None: # pragma: no cover
"""Run the bot."""
if self.config['hotReload']:
run_with_reloader(self._start)
else:
self._start()