How to use Python logging QueueHandler with dictConfig

Rob Blackbourn
7 min readJan 2, 2019

--

I’m at the point of thinking how my new python code will go into production, and a key problem I’ve been ignoring is logging.

Edit: There have been a number of comments on this article discussing issues and “gotchas” which are extremely informative. Editing this post would invalidate the comments, so I would urge you to treat the comments as part of this post.

[Update] I’ve put the code for the final version on GitHub.

[Update] Support was added to Python 3.12. This is covered in a new article here.

As we’ve decided to use Docker containers, log files are problematic. Ideally I would like a centralised logging facility using something like elasticsearch to provide a single monitoring view, and alerting with a messaging platform like Slack. Writing the handlers for these is straightforward as they are simple JSON HTTP posts, but coupling my program to network delays and outages is a problem.

Decoupling

Fortunately python already has a solution to decouple the generation of log messages from the handlers with the classes logging.handler.QueueHandler and logging.handlers.QueueListener. The documentation of these classes is clear, so I’ll just give a brief example below.

import queue
import logging
from logging.handlers import QueueHandler, QueueListener
log_queue = queue.Queue(-1)
queue_handler = QueueHandler(log_queue)

logger = logging.getLogger()
logger.addHandler(queue_handler)

console_handler = logging.StreamHandler()
formatter = logging.Formatter('%(threadName)s: %(message)s')
console_handler.setFormatter(formatter)

file_handler = logging.FileHandler("queue_example.log")
file_handler.setFormatter(formatter)

listener = QueueListener(log_queue, console_handler, file_handler)
listener.start()

logger.warning('Look out!')

listener.stop()

So far so good. The QueueHandler and QueueListener share the queue.Queue. The QueueHandler simply adds the log messages to the shared queue, while the QueueListener sits on another thread taking them off the queue and forwarding them to the destination handlers.

Configuration

The second problem I need to solve is taking the configuration of the logging out of my program into a log file. Out of the box Python comes with two solutions to this: logging.config.dictConfig and logging.config.fileConfig. The dictConfig solution looks more elegant to my eye, particularly when used with yaml (via the PyYaml package).

The following configuration file looks fairly self explanatory to my eye:

version: 1
formatters:
simple:
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: simple
stream: ext://sys.stdout
file:
class: logging.FileHandler
filename: 'my_script2.log'
formatter: simple
loggers:
examples.my_script1:
level: DEBUG
handlers: [console]
propagate: false
examples.my_script2:
level: WARNING
handlers: [console, file]
propagate: false
root:
level: INFO
handlers: [console]

We have a root fallback handler, and specify loggers for the packages examples.my_script1 and examples.myscript2 with specific handlers, levels, etc. There is one non-obvious part of this file: ext://sys.stdout. Intuitively we would rightly expect this to refer to an external python attribute sys.stdout.

To try these we need to create a couple of modules to generate some logging:

# examples/my_script1.pyimport logging

logger = logging.getLogger(__name__)


def do_stuff1():
logger.debug('This is a debug message')
logger.info('This is an info message')
logger.warning('This is a warning message')
logger.error('This is an error message')
logger.critical('This is a critical message')

try:
x = 1 / 0
except:
logger.exception('This is an exception')

And:

# examples/myscript2.pyimport logging

logger = logging.getLogger(__name__)


def do_stuff2():
logger.debug('This is a debug message')
logger.info('This is an info message')
logger.warning('This is a warning message')
logger.error('This is an error message')
logger.critical('This is a critical message')

try:
x = 1 / 0
except:
logger.exception('This is an exception')

We can try this out with the following code (requires pip install PyYaml):

import logging
import logging.config
import yaml
from examples.my_script1 import do_stuff1
from examples.my_script2 import do_stuff2

LOGGING_CONFIG = """
version: 1
formatters:
simple:
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: simple
stream: ext://sys.stdout
loggers:
examples.my_script1:
level: DEBUG
handlers: [console]
propagate: false
examples.my_script2:
level: WARNING
handlers: [console]
propagate: false
root:
level: INFO
handlers: [console]
"""

logging_config = yaml.load(LOGGING_CONFIG)

logging.config.dictConfig(logging_config)

other_logger = logging.getLogger("foo")

do_stuff1()
do_stuff2()

other_logger.debug("A different debug message")
other_logger.info("A different info message")
other_logger.error("A different error message")

This generates the expected output:

/home/rblackbourn/log-client/examples/config_example1.py
2019-01-02 09:58:21,908 - examples.my_script1 - DEBUG - This is a debug message
2019-01-02 09:58:21,908 - examples.my_script1 - INFO - This is an info message
2019-01-02 09:58:21,908 - examples.my_script1 - WARNING - This is a warning message
2019-01-02 09:58:21,908 - examples.my_script1 - ERROR - This is an error message
2019-01-02 09:58:21,908 - examples.my_script1 - CRITICAL - This is a critical message
2019-01-02 09:58:21,908 - examples.my_script1 - ERROR - This is an exception
Traceback (most recent call last):
File "/home/rblackbourn/log-client/examples/my_script1.py", line 14, in do_stuff1
x = 1 / 0
ZeroDivisionError: division by zero
2019-01-02 09:58:21,909 - examples.my_script2 - WARNING - This is a warning message
2019-01-02 09:58:21,909 - examples.my_script2 - ERROR - This is an error message
2019-01-02 09:58:21,909 - examples.my_script2 - CRITICAL - This is a critical message
2019-01-02 09:58:21,909 - examples.my_script2 - ERROR - This is an exception
Traceback (most recent call last):
File "/home/rblackbourn/log-client/examples/my_script2.py", line 14, in do_stuff2
x = 1 / 0
ZeroDivisionError: division by zero
2019-01-02 09:58:21,909 - foo - INFO - A different info message
2019-01-02 09:58:21,909 - foo - ERROR - A different error message
Process finished with exit code 0

To use a queue handler we want to come up with a configuration file that looks something like the file below, however there are a number of problems with this.

# Invalid!!!version: 1
formatters:
simple:
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: simple
stream: ext://sys.stdout
file:
class: logging.FileHandler
filename: 'config_example2.log'
formatter: simple
queue:
class: logging.handlers.QueueHandler
handlers: [console, file]
loggers:
examples.my_script1:
level: DEBUG
handlers: [queue]
propagate: false
examples.my_script2:
level: WARNING
handlers: [queue]
propagate: false
root:
level: WARN
handlers: [console]
  • The constructor for QueueHandler is missing the queue that it shares with QueueListener.
  • The handlers.queue.handlers refers to console and file, but the dictConfig configurator doesn’t know these are the names of the handlers.
  • The QueueHandler doesn’t take the list of handlers anyway. That’s the job of the QueueListener.
  • We need to start and stop the QueueListener, but that isn’t trivially available, and what if we aren’t using any queues in our log config anyway?

What a mess. We have two problems to solve:
1. We need a new handler which bundles QueueHandler and QueueListener which provides a shared queue and handles the starting and stopping.
2. We need to work out how to pass the downstream handlers into the constructor of our new handler.

The second problem can be solved by using a configuration keyword cfg:// provided by dictConfig. An example config file using this is given below:

version: 1
formatters:
simple:
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: simple
stream: ext://sys.stdout
file:
class: logging.FileHandler
filename: 'config_example2.log'
formatter: simple
queue_listener:
class: examples.QueueListenerHandler
handlers:
- cfg://handlers.console
- cfg://handlers.file
loggers:
examples.my_script1:
level: DEBUG
handlers:
- queue_listener
propagate: false
examples.my_script2:
level: WARNING
handlers:
- queue_listener
propagate: false
root:
level: WARN
handlers:
- console

We’ve called our new (and as yet unwritten) queue handler class QueueListenerHandler and referred to the console handler as cfg://handlers.console. This is fine, but we’ve had to switch to the hyphen prefixed list syntax as PyYaml complains about colons in square bracket lists.

The new handler class will look something the code below. Note the auto_run argument which automatically starts the listener, and stops (which also flushes) it when the program exits.

# !!! INVALID !!!
# examples/QueueListenerHandler.py
from logging.handlers import QueueHandler, QueueListener
from queue import Queue
from atexit import register


class QueueListenerHandler(QueueHandler):

def __init__(self, handlers, respect_handler_level=False, auto_run=True, queue=Queue(-1)):
super().__init__(queue)
self._listener = QueueListener(
self.queue,
*handlers,
respect_handler_level=respect_handler_level)
if auto_run:
self.start()
register(self.stop)


def start(self):
self._listener.start()


def stop(self):
self._listener.stop()


def emit(self, record):
return super().emit(record)

While it looks like we’ve solved the problem, when we run it it fails, complaining that the string "cfg://handles.console" isn’t a handler. This wasn’t supposed to happen! However, stepping into QueueListenerHandler.__init__ we can see that the handlers argument has been passed as a logging.config.ConvertingList. The following code resolves this:

# examples/QueueListenerHandler.pyfrom logging.config import ConvertingList, ConvertingDict, valid_ident
from logging.handlers import QueueHandler, QueueListener
from queue import Queue
from atexit import register


def _resolve_handlers(l):
if not isinstance(l, ConvertingList):
return l

# Indexing the list performs the evaluation.
return [l[i] for i in range(len(l))]


class QueueListenerHandler(QueueHandler):

def __init__(self, handlers, respect_handler_level=False, auto_run=True, queue=Queue(-1)):
super().__init__(queue)
handlers = _resolve_handlers(handlers)
self._listener = QueueListener(
self.queue,
*handlers,
respect_handler_level=respect_handler_level)
if auto_run:
self.start()
register(self.stop)


def start(self):
self._listener.start()


def stop(self):
self._listener.stop()


def emit(self, record):
return super().emit(record)

Now we have a working, configurable queue handler, and all is right in the world.

Undocumented Configuration Possibilities

If you are (rightly) put off by the “Undocumented” part of this section you may want to stop reading now.

There is one more step I’d like to do with this handler. I want to have control over the queue.Queue used by the QueueListenerHandler; in particular to cap it’s size. It turns out you can put anything that’s syntactically consistent in the config.

version: 1
objects:
queue:
class: queue.Queue
maxsize: 1000
formatters:
simple:
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: simple
stream: ext://sys.stdout
file:
class: logging.FileHandler
filename: 'config_example2.log'
formatter: simple
queue_listener:
class: examples.QueueListenerHandler
handlers:
- cfg://handlers.console
- cfg://handlers.file
queue: cfg://objects.queue
loggers:
examples.my_script1:
level: DEBUG
handlers:
- queue_listener
propagate: false
examples.my_script2:
level: WARNING
handlers:
- queue_listener
propagate: false
root:
level: WARN
handlers:
- console

Note the new section objects (which could be called any non schema keyword) and the reference to it cfg://objects.queue. This gets passed by dictConfig as a ConfigurationDict, so we need some more code to resolve it.

The final code looks as follows:

from logging.config import ConvertingList, ConvertingDict, valid_ident
from logging.handlers import QueueHandler, QueueListener
from queue import Queue
import atexit


def _resolve_handlers(l):
if not isinstance(l, ConvertingList):
return l

# Indexing the list performs the evaluation.
return [l[i] for i in range(len(l))]


def _resolve_queue(q):
if not isinstance(q, ConvertingDict):
return q
if '__resolved_value__' in q:
return q['__resolved_value__']

cname = q.pop('class')
klass = q.configurator.resolve(cname)
props = q.pop('.', None)
kwargs = {k: q[k] for k in q if valid_ident(k)}
result = klass(**kwargs)
if props:
for name, value in props.items():
setattr(result, name, value)

q['__resolved_value__'] = result
return result


class QueueListenerHandler(QueueHandler):

def __init__(self, handlers, respect_handler_level=False, auto_run=True, queue=Queue(-1)):
queue = _resolve_queue(queue)
super().__init__(queue)
handlers = _resolve_handlers(handlers)
self._listener = QueueListener(
self.queue,
*handlers,
respect_handler_level=respect_handler_level)
if auto_run:
self.start()
atexit.register(self.stop)


def start(self):
self._listener.start()


def stop(self):
self._listener.stop()


def emit(self, record):
return super().emit(record)

Good luck with your coding.

--

--

Responses (8)