"""Some logging utilities that are used for bot logs."""
from __future__ import annotations
import asyncio
import logging
import os
import re
import sys
from typing import Literal
import aiohttp
from colorama import Fore
from .enums import LogFormat, TimeFormat
from .internal.colors import (
DEFAULT_COLOR,
DEFAULT_LOG_COLORS,
get_escape_code,
replace_dc_format,
)
from .internal.dc import discord
DEFAULT_LOG = "ezcord"
log = logging.getLogger(DEFAULT_LOG)
background_tasks = set()
[docs]
def custom_log(
key: str, message: str, *, color: str | None = DEFAULT_COLOR, level: int = logging.INFO
):
"""Log a message with a custom log level.
Parameters
----------
key:
The name of the custom log level.
message:
The message to log.
color:
The color to use for the log level. Defaults to ``Fore.MAGENTA``.
level:
The log level. Defaults to ``logging.INFO``.
"""
color = get_escape_code(color)
logging.getLogger(DEFAULT_LOG).log(level, message, extra={"key": key, "color": color})
def _format_log_colors(log_format: str, file: bool, final_colors: dict[int, str]) -> dict[int, str]:
"""Checks if the is sent to a file and formats the colors accordingly."""
format_colors = re.findall(r"{.*?}", log_format)
remove_attrs = ["{color}"]
for attr in remove_attrs:
format_colors = [x for x in format_colors if attr not in x]
color_formats = {}
if "{end}" in log_format:
for level in final_colors:
if file:
for substring in format_colors:
log_format = log_format.replace(substring, "")
color_formats[level] = log_format.format(color="", end="")
else:
for substring in format_colors:
if substring == "{end}":
log_format = log_format.replace(substring, Fore.RESET, 1)
continue
log_format = log_format.replace(
substring, get_escape_code(substring.strip("{}"))
)
color_formats[level] = log_format.format(color=final_colors[level], end=Fore.RESET)
else:
for level in final_colors:
if file:
color_formats[level] = log_format
else:
color_formats[level] = final_colors[level] + log_format + Fore.RESET
return color_formats
def _format_colors(colors: dict[int, str] | str | None = None) -> dict[int, str]:
"""Overwrite the default colors for the given log levels in the given format."""
final_colors = DEFAULT_LOG_COLORS.copy()
if colors is None:
colors = final_colors
if isinstance(colors, str):
colors = get_escape_code(colors)
for level in final_colors:
final_colors[level] = colors
else:
for level in colors:
final_colors[level] = get_escape_code(colors[level])
return final_colors
class _ColorFormatter(logging.Formatter):
"""A logging formatter that adds colors to the output. This is used by :func:`set_log`."""
def __init__(
self,
file: bool,
log_format: str,
time_format: str,
dc_codeblocks: bool,
spacing: int,
space_after_level: bool,
colors: dict[int, str] | str | None = None,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.file = file
self.colors = colors
self.dc_codeblocks = dc_codeblocks
self.spacing = spacing
self.space_after_level = space_after_level
self.LOG_FORMAT = log_format
self.TIME_FORMAT = time_format
def format(self, record: logging.LogRecord):
"""Adds colors to log messages and formats them accordingly.
Can be used with :func:`set_log`.
Parameters
----------
record:
The log record to format.
"""
if "color" in record.__dict__:
colors = record.__dict__["color"]
else:
colors = self.colors
color_formats = _format_colors(colors)
if record.levelno not in color_formats:
if "color" in record.__dict__:
# if a custom log level is used by .custom_log()
color_formats[record.levelno] = colors
else:
# if no color is set for a custom log level used by .log()
color_formats[record.levelno] = DEFAULT_COLOR
color_log_formats = _format_log_colors(self.LOG_FORMAT, self.file, color_formats)
log_format = color_log_formats.get(record.levelno)
spacing_name = record.__dict__["key"] if "key" in record.__dict__ else record.levelname
spaces = " " * (self.spacing - len(spacing_name))
if self.space_after_level:
# add space directly after log level
spacing_name = spacing_name + spaces
record.levelname = spacing_name
else:
# add space in front of log message
record.msg = spaces + record.msg
if "key" in record.__dict__ and log_format:
log_format = log_format.replace("%(levelname)s", spacing_name)
current_level_color = color_formats.get(record.levelno)
new_record = logging.makeLogRecord(record.__dict__)
# color new lines
if isinstance(log_format, str) and log_format.endswith("//"):
log_format = log_format.replace("//", "")
split = new_record.msg.split("\n", 1)
if len(split) > 1:
new_record.msg = split[0] + "\n***" + split[1] + "***"
remove_stars = self.file or "{end}" not in self.LOG_FORMAT
new_record.msg = replace_dc_format(new_record.msg, current_level_color, remove_stars)
formatter = logging.Formatter(log_format, self.TIME_FORMAT)
return formatter.format(new_record)
class _DiscordHandler(logging.Handler):
"""A logging handler that sends logs to a Discord webhook."""
def __init__(
self, webhook_url: str | None, dc_codeblocks: bool, dc_format: str, *args, **kwargs
):
super().__init__(*args, **kwargs)
self.webhook_url = webhook_url
self.dc_codeblocks = dc_codeblocks
self.dc_format = dc_format
def emit(self, record: logging.LogRecord):
"""Removes exception information and sets the log format."""
record._exc_info_hidden, record.exc_info = record.exc_info, None
record.exc_text = None
if self.dc_codeblocks:
msg = f"```ansi\n{self.format(record)}```"
else:
if "key" in record.__dict__:
log_format = self.dc_format.replace("%(levelname)s", record.__dict__["key"])
formatter = logging.Formatter(log_format)
msg = formatter.format(record)
else:
msg = self.format(record)
if self.webhook_url:
loop = asyncio.get_event_loop()
task = loop.create_task(_send_discord_log(self.webhook_url, record, msg))
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
async def _send_discord_log(webhook_url: str, record: logging.LogRecord, msg):
async with aiohttp.ClientSession() as session:
webhook = discord.Webhook.from_url(webhook_url, session=session)
name = "EzCord" if record.name == DEFAULT_LOG else record.name
try:
await webhook.send(
content=msg,
username=f"{name} Log",
)
except discord.HTTPException:
log.error(
"Error while sending log message to webhook. Please check if the URL is correct.",
extra={"webhook_sent": True},
)
def _discord_filter(record):
"""A filter that blocks logs that have already been sent to a Discord webhook."""
if "webhook_sent" in record.__dict__:
return not record.__dict__["webhook_sent"]
return True
[docs]
def set_log(
name: str = DEFAULT_LOG,
log_level: int = logging.INFO,
*,
console: bool = True,
file: bool | str = False,
file_mode: str = "w",
log_format: str | LogFormat = LogFormat.default,
time_format: str | TimeFormat = TimeFormat.default,
discord_log_level: int = logging.WARNING,
webhook_url: str | None = None,
dc_codeblocks: bool = True,
level_spacing: int = 8,
space_mode: Literal["auto", "always", "never"] = "auto",
colors: dict[int, str] | str | None = None,
):
"""Creates a logger. If this logger already exists, it will return the existing logger.
This should be called before :class:`.Bot` is initialized.
Parameters
----------
name:
The name of the logger.
log_level:
The log level for default log messages ``logging.INFO``.
console:
Whether to log to the console. Defaults to ``True``.
file:
Whether to log to a file. Defaults to ``False``.
You can also pass a path to a log file.
file_mode:
The file mode for the log file. Defaults to ``w``.
log_format:
The log format. Defaults to :attr:`.LogFormat.default`.
time_format:
The time format. Defaults to :attr:`.TimeFormat.default`.
discord_log_level:
The log level for discord log messages. Defaults to ``logging.WARNING``.
.. note::
For discord log messages, ``webhook_url`` is required.
webhook_url:
The discord webhook URL to send logs to. Defaults to ``None``.
dc_codeblocks:
Whether to use codeblocks for all Discord log messages. Defaults to ``True``.
level_spacing:
The length of the log level. If the log level is ``INFO`` and the spacing is ``8``, the log
level will be filled with 4 additional spaces, so that all log levels are 8 characters long.
space_mode:
Choose when to add spacing to the log level. Defaults to ``auto``.
colors:
A dictionary of log levels and their corresponding colors. If only one color is given,
all log levels will be colored with that color.
Returns
-------
:class:`logging.Logger`
Example
-------
.. code-block:: python
import logging
import ezcord
colors = {
logging.DEBUG: "blue",
logging.INFO: "red",
}
ezcord.set_log(colors=colors)
"""
logger = logging.getLogger(name)
if logger.handlers:
return logger
logger.setLevel(log_level)
handlers: list[logging.FileHandler | logging.StreamHandler] = []
if isinstance(file, bool) and file:
if not os.path.exists("logs"):
os.mkdir("logs")
filename = name.split(".")[-1]
handlers.append(
logging.FileHandler(f"logs/{filename}.log", mode=file_mode, encoding="utf-8")
)
elif isinstance(file, str):
handlers.append(logging.FileHandler(file, mode=file_mode, encoding="utf-8"))
if console:
handlers.append(logging.StreamHandler(sys.stdout))
space_after_level = True
if "%(levelname)s " not in log_format and "%(levelname)s{end} " not in log_format:
space_after_level = False
if space_mode == "auto":
level_spacing = 0
if space_mode == "never":
level_spacing = 0
file_color_formatter = _ColorFormatter(
True, log_format, time_format, dc_codeblocks, level_spacing, space_after_level, colors
)
color_formatter = _ColorFormatter(
False, log_format, time_format, dc_codeblocks, level_spacing, space_after_level, colors
)
for handler in handlers:
if isinstance(handler, logging.FileHandler):
handler.setFormatter(file_color_formatter)
else:
handler.setFormatter(color_formatter)
handler.setLevel(log_level)
logger.addHandler(handler)
# Discord logs
dc_format = "**%(levelname)s:** %(message)s"
discord_handler = _DiscordHandler(webhook_url, dc_codeblocks, dc_format)
if dc_codeblocks:
discord_handler.setFormatter(color_formatter)
else:
dc_formatter = _ColorFormatter(
True, dc_format, time_format, dc_codeblocks, level_spacing, space_after_level, colors
)
discord_handler.setFormatter(dc_formatter)
discord_handler.addFilter(_discord_filter)
discord_handler.setLevel(discord_log_level)
logger.addHandler(discord_handler)
return logger