"""The core of phial."""
import json
import logging
from collections.abc import Callable
from concurrent.futures import Future, ThreadPoolExecutor
from time import sleep
from typing import cast
from slack_sdk.socket_mode import SocketModeClient
from slack_sdk.socket_mode.request import SocketModeRequest
from slack_sdk.socket_mode.response import SocketModeResponse
from slack_sdk.web import WebClient
from phial.commands import help_command
from phial.errors import ArgumentTypeValidationError, ArgumentValidationError
from phial.globals import _command_ctx_stack
from phial.scheduler import Schedule, ScheduledJob, Scheduler
from phial.utils import parse_slack_event, validate_kwargs
from phial.wrappers import ( # fmt: off
Attachment,
Command,
Message,
PhialResponse,
Response,
)
[docs]
class Phial:
"""
The Phial class acts as the main interface to Slack.
It handles registration and execution of user defined commands,
as well as providing a wrapper around :obj:`slack_sdk.SocketModeClient`
to make sending messages to Slack simpler.
"""
#: Default configuration
default_config = {
"prefix": "!",
"registerHelpCommand": True,
"baseHelpText": "All available commands:",
"autoReconnect": True,
"loopDelay": 0.001,
"maxThreads": 4,
}
def __init__(
self,
app_token: str,
bot_token: str,
*,
config: dict = default_config,
) -> None:
self.config = dict(self.default_config)
self.config.update(config)
self.slack_client = SocketModeClient(
app_token=app_token,
web_client=WebClient(token=bot_token),
auto_reconnect_enabled=cast(bool, self.config["autoReconnect"]),
)
self.commands: list[Command] = []
self.middleware_functions: list[Callable[[Message], Message | None]] = []
self.scheduler = Scheduler()
self.fallback_func: Callable[[Message], PhialResponse] | None = None
self.logger = logging.getLogger(__name__)
if not self.logger.hasHandlers(): # pragma: nocover
handler = logging.StreamHandler()
formatter = logging.Formatter(fmt="%(asctime)s [%(name)s] - %(message)s")
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.propagate = False
self.logger.setLevel(logging.INFO)
self._register_standard_commands()
[docs]
def add_command(
self,
pattern: str,
func: Callable[..., PhialResponse],
*,
help_text_override: str | None = None,
case_sensitive: bool = False,
hide_from_help_command: bool | None = False,
) -> None:
"""
Register a command with the bot.
This method can be used as a decorator via :meth:`command`
:param pattern: The pattern that a :obj:`Message`'s text must
match for the command to be invoked.
:param func: The function to be executed when the command is
invoked
:param case_sensitive: Whether the :code:`pattern` should
respect case sensitivity.
Defaults to False
:param help_text_override: Text that should be used as a
description of the command using
the inbuilt help function.
If not overridden the command's
docstring will be used as the
help text.
Defaults to None
:param hide_from_help_command: A flag to specify whether or not
the inbuilt help command should
hide this command from the list
it generates.
Defaults to False
:raises ValueError: If command with the same pattern is already
registered
.. rubric:: Example
::
def hello():
return "world"
bot.add_command('hello', world)
Is the same as
::
@bot.command('hello')
def hello():
return "world"
"""
pattern = f"{self.config.get('prefix', '')}{pattern}"
# Validate command does not already exist
for existing_command in self.commands:
if pattern == existing_command.pattern_string:
raise ValueError(f"Command {pattern.split('<')[0]} already exists")
# Create and add command
command = Command(
pattern,
func,
help_text_override=help_text_override,
case_sensitive=case_sensitive,
hide_from_help_command=hide_from_help_command,
)
self.commands.append(command)
self.logger.debug(f"Command {pattern} added")
[docs]
def command(
self,
pattern: str,
*,
help_text_override: str | None = None,
case_sensitive: bool = False,
hide_from_help_command: bool | None = False,
) -> Callable:
"""
Register a command with the bot.
This command is a decorator version of :meth:`add_command`
:param pattern: The pattern that a :obj:`Message`'s text must
match for the command to be invoked.
:param case_sensitive: Whether the :code:`pattern` should
respect case sensitivity.
Defaults to False
:param help_text_override: Text that should be used as a
description of the command using
the inbuilt help function.
If not overridden the command's
docstring will be used as the
help text.
Defaults to None
:param hide_from_help_command: A flag to specify whether or not
the inbuilt help command should
hide this command from the list
it generates.
Defaults to False
.. rubric:: Example
::
@bot.command('hello')
def hello():
return "world"
@bot.command('caseSensitive', case_sensitive=True)
def case_sensitive():
return "You typed caseSensitive"
"""
def decorator(f: Callable) -> Callable:
self.add_command(
pattern,
f,
case_sensitive=case_sensitive,
help_text_override=help_text_override,
hide_from_help_command=hide_from_help_command,
)
return f
return decorator
[docs]
def alias(self, pattern: str) -> Callable:
"""
Register an alias for a command.
:param pattern: The pattern that a :obj:`Message`'s text must
match for the command to be invoked.
.. rubric:: Example
::
@bot.command('hello')
@bot.alias('goodbye')
def hello():
return "world"
"""
pattern = f"{self.config.get('prefix', '')}{pattern}"
def decorator(f: Callable) -> Callable:
if not hasattr(f, "alias_patterns"):
f.alias_patterns = [] # type: ignore
f.alias_patterns.append(pattern) # type: ignore
return f
return decorator
[docs]
def add_fallback_command(self, func: Callable[[Message], PhialResponse]) -> None:
"""
Add a fallback command.
Registers a 'fallback' function to run when a user tries to execute a
command that doesn't exist.
This method can be used as a decorator via :meth:`fallback_command`
:param func: The function to be executed when the user tries
to execute a command that doesn't exist
.. rubric:: Example
::
def error_handler(message: Message) -> str:
return "Oops that command doesn't seem to exist"
bot.add_fallback_command(error_handler)
Is the same as
::
@bot.fallback_command()
def error_handler(message: Message) -> str:
return "Oops that command doesn't seem to exist"
"""
self.fallback_func = func
[docs]
def fallback_command(self) -> Callable:
"""
Add a fallback command.
See :meth:`add_fallback_command` for more information on
fallback commands
.. rubric:: Example
::
@bot.fallback_command()
def error_handler(message: Message) -> str:
return "Oops that command doesn't seem to exist"
"""
def decorator(f: Callable) -> Callable:
self.add_fallback_command(f)
return f
return decorator
[docs]
def add_middleware(self, func: Callable[[Message], Message | None]) -> None:
"""
Add a middleware function to the bot.
Middleware functions get passed every message the bot receives from
slack before the bot process the message itself. Returning :obj:`None`
from a middleware function will prevent the bot from processing it.
This method can be used as a decorator via :meth:`middleware`.
:param middleware_func: The function to be added to the middleware
pipeline
.. rubric :: Example
::
def intercept(message):
return message
bot.add_middleware(intercept)
Is the same as
::
@bot.middleware()
def intercept(message):
return message
"""
self.middleware_functions.append(func)
self.logger.debug(f"Middleware {getattr(func, '__name__', repr(func))} added")
[docs]
def middleware(self) -> Callable:
"""
Add a middleware function to the bot.
See :meth:`add_middleware` for more information about middleware
.. rubric:: Example
::
@bot.middleware()
def intercept(message):
return message
"""
def decorator(f: Callable) -> Callable:
self.add_middleware(f)
return f
return decorator
[docs]
def add_scheduled(self, schedule: Schedule, func: Callable) -> None:
"""
Add a scheduled function to the bot.
This method can be used as a decorator via :meth:`scheduled`.
:param schedule: The schedule used to run the function
:param scheduled_func: The function to be run in accordance to the
schedule
.. rubric:: Example
::
def scheduled_beep():
bot.send_message(Response(text="Beep",
channel="channel-id">))
bot.add_scheduled(Schedule().every().day(), scheduled_beep)
Is the same as
::
@bot.scheduled(Schedule().every().day())
def scheduled_beep():
bot.send_message(Response(text="Beep",
channel="channel-id">))
"""
job = ScheduledJob(schedule, func)
self.scheduler.add_job(job)
self.logger.debug(f"Schedule {getattr(func, '__name__', repr(func))} added")
[docs]
def scheduled(self, schedule: Schedule) -> Callable:
"""
Register a scheduled function.
See :meth:`add_scheduled` for more information on scheduled
jobs.
:param schedule: The schedule used to determine when the function
should be run
.. rubric:: Example
::
@bot.scheduled(Schedule().every().day())
def scheduled_beep():
bot.send_message(Response(text="Beep",
channel="channel-id">))
"""
def decorator(f: Callable) -> Callable:
self.add_scheduled(schedule, f)
return f
return decorator
[docs]
def send_message(self, message: Response) -> None:
"""
Send a message to Slack.
:param message: The message to be sent to Slack
"""
if message.ephemeral:
if message.user is None:
raise ValueError("User not provided for ephemeral message")
self.slack_client.web_client.chat_postEphemeral(
channel=message.channel,
text=message.text,
thread_ts=message.original_ts,
attachments=json.dumps(message.attachments),
user=message.user,
as_user=True,
)
else:
self.slack_client.web_client.chat_postMessage(
channel=message.channel,
text=message.text,
thread_ts=message.original_ts,
attachments=json.dumps(message.attachments),
as_user=True,
)
[docs]
def send_reaction(self, response: Response) -> None:
"""
Send a reaction to a Slack Message.
:param response: Response containing the reaction to be
sent to Slack
"""
if response.original_ts is None or response.reaction is None:
raise ValueError(
"Original timestamp and reaction must be provided for reaction",
)
self.slack_client.web_client.reactions_add(
channel=response.channel,
timestamp=response.original_ts,
name=response.reaction,
)
[docs]
def upload_attachment(self, attachment: Attachment) -> None:
"""
Upload a file to Slack.
:param attachment: The attachment to be uploaded to Slack
"""
self.slack_client.web_client.files_upload_v2(
channels=attachment.channel,
filename=attachment.filename,
file=attachment.content, # type: ignore
title=attachment.filename,
)
def _register_standard_commands(self) -> None:
if "registerHelpCommand" in self.config and self.config["registerHelpCommand"]:
# The command function has to be a lambda as we wish to delay
# execution until all commands have been registered.
self.add_command(
"help",
lambda: help_command(self),
help_text_override="List all available commands",
)
def _send_response(self, response: PhialResponse, original_channel: str) -> None:
if response is None:
return # Do nothing if command function returns nothing
if isinstance(response, str):
self.send_message(Response(text=response, channel=original_channel))
elif not isinstance(response, Response) and not isinstance(
response,
Attachment,
):
raise ValueError(
"Only Response or Attachment objects can be "
"returned from command functions",
)
if isinstance(response, Response):
if response.original_ts and response.reaction and response.text:
raise ValueError(
"Response objects with an original timestamp "
"can only have one of the attributes: "
"Reaction, Text",
)
if response.original_ts and response.reaction:
self.send_reaction(response)
elif response.text or response.attachments:
self.send_message(response)
if isinstance(response, Attachment):
self.upload_attachment(response)
def _handle_request(self, client: SocketModeClient, req: SocketModeRequest) -> None:
try:
self._handle_request_internal(client, req)
except Exception as e:
self.logger.error(e)
def _handle_request_internal(
self,
client: SocketModeClient,
req: SocketModeRequest,
) -> None:
# Acknowledge the request so it is not resent
ack_response = SocketModeResponse(envelope_id=req.envelope_id)
client.send_socket_mode_response(ack_response)
if req.type != "events_api":
return
message = parse_slack_event(req.payload)
# Run middleware functions
for func in self.middleware_functions:
if message:
self.logger.debug(f"Ran middleware: {func.__name__} on {message}")
message = func(message)
# If message has been intercepted or is a bot message return early
if not message or message.bot_id:
return
# If message should have a prefix but doesn't return early
if (
"prefix" in self.config
and self.config["prefix"] is not None
and self.config["prefix"] != ""
and isinstance(self.config["prefix"], str)
and not message.text.startswith(self.config["prefix"])
):
return
# If message has not been intercepted continue with standard message
# handling
for command in self.commands:
command_name = command.func.__name__
kwargs = command.pattern_matches(message)
if kwargs is not None:
try:
kwargs = validate_kwargs(command.func, kwargs)
_command_ctx_stack.push(message)
response = command.func(**kwargs)
self._send_response(response, message.channel)
return
except (ArgumentValidationError, ArgumentTypeValidationError) as e:
self._send_response(str(e), message.channel)
return
finally:
self.logger.debug(f"Ran command: {command_name} on {message}")
_command_ctx_stack.pop()
# If we are here then no commands have matched
self.logger.warning(f"Command {message.text} not found")
if self.fallback_func is not None:
try:
_command_ctx_stack.push(message)
response = self.fallback_func(message)
self._send_response(response, message.channel)
finally:
_command_ctx_stack.pop()
def _start(self) -> None: # pragma: no cover
"""
Start the bot.
When called will start the bot listening to messages from Slack
"""
self.slack_client.socket_mode_request_listeners.append(self._handle_request) # type: ignore
self.slack_client.connect()
self.logger.info("Phial connected and running!")
thread_pool_size = int(cast(str, self.config["maxThreads"]))
thread_pool = ThreadPoolExecutor(thread_pool_size)
run_pending_tasks: Future | None = None
while True:
try:
if run_pending_tasks is None or run_pending_tasks.done():
run_pending_tasks = thread_pool.submit(self.scheduler.run_pending)
except Exception as e:
self.logger.error(e)
sleep(cast(float, self.config["loopDelay"])) # Help prevent high CPU usage.
[docs]
def run(self) -> None: # pragma: no cover
"""Run the bot."""
self._start()
# TODO: Implement hot reload
# if self.config["hotReload"]:
# self._start()
# else:
# self._start()