Source code for pytest_kafka._factories

"""Kafka fixures for Pytest."""
import os
import signal
import logging
from pathlib import Path
from time import time, sleep
from typing import List, Callable, Optional, Tuple, Any, TYPE_CHECKING
from subprocess import Popen, TimeoutExpired
try:
    from kafka import KafkaProducer, KafkaConsumer  # type: ignore
    from kafka.errors import NoBrokersAvailable  # type: ignore
except ImportError as e:
    raise ImportError(
        "Kafka depenency is no longer provided with the project. "
        "Depend on pytest-kafka[kafka-python] or pytest-kafka[kafka-python-ng]."
    ) from e

import pytest  # type: ignore
import port_for  # type: ignore
from pytest_kafka.constants import (
    KAFKA_SERVER_CONFIG_TEMPLATE, ZOOKEEPER_CONFIG_TEMPLATE, DEFAULT_CONSUMER_TIMEOUT_MS,
    DEFAULT_TERMINATION_WAIT_TIMEOUT_SEC,
)
if TYPE_CHECKING:
    # Don't break anything else than typechecking if pytest changes.
    from _pytest.fixtures import SubRequest  # type: ignore  # noqa


def _wait_until(cond: Callable[[], bool], timeout: float = 15, interval: float = 0.1):
    """Poll until the condition is True."""
    start = time()
    end = start + timeout
    while time() <= end:
        if cond() is True:
            return
        sleep(interval)

    raise AssertionError("Condition not true in {} seconds".format(timeout))


def _write_config(template_string: str, destination: Path, **template_vars) -> None:
    """
    Render the specified config template into the configs_dir.

    :param template_string: Python str.format template string
    :param destination: file to render the template into (create if needed)
    """
    rendered = template_string.format(**template_vars)
    destination.write_text(rendered)


[docs] def terminate( proc: Popen, signal_fn: Callable[[Popen], Any] = Popen.terminate, wait_timeout: float = DEFAULT_TERMINATION_WAIT_TIMEOUT_SEC, ) -> None: """Kill the process with the desired method (SIGTERM by default) and wait for it.""" signal_fn(proc) try: proc.wait(timeout=wait_timeout) except TimeoutExpired: try: os.killpg(proc.pid, signal.SIGKILL) except ProcessLookupError: pass # It is not possible to inherit any potential orphaned grandchildren (unless we use the new # Linux-specific PR_SET_CHILD_SUBREAPER) so we can only wait for the main process. # We don't expect Kafka/ZK to have child processes anyway. Even if, init would clean up # orphaned processes - unless we're running in a Docker container without an init. proc.wait()
def _get_tmpdir_fixture_name(scope: str) -> str: """Get appropriately-scoped tmpdir fixture.""" if scope == 'session': return 'session_tmpdir_path' else: return 'tmpdir_path'
[docs] def make_zookeeper_process( zk_bin: str, zk_port: Optional[int] = None, zk_config_template: str = ZOOKEEPER_CONFIG_TEMPLATE, teardown_fn: Callable[[Popen], Any] = terminate, scope: str = 'function', ) -> Callable[..., Tuple[Popen, int]]: """ Make a Zookeeper fixture. The fixture will spawn a Zookeeper process in a new process group and return its process handle and port number. Data will be stored in a Pytest-provided temporary directory. :param zk_bin: path to Zookeeper launch script (typically to bin/zookeeper-server-start.sh) :param zk_port: Zookeeper port (random free port by default) :param zk_config_template: Zookeeper config template, must use keys ``zk_data_dir`` and ``zk_port``. See :py:const:`pytest_kafka.constants.ZOOKEEPER_CONFIG_TEMPLATE`. :param teardown_fn: function to tear down Zookeeper (:py:func:`terminate` by default) :param scope: 'function' or 'session' """ @pytest.fixture(scope=scope) # type: ignore def zookeeper_process(request: 'SubRequest') -> Tuple[Popen, int]: """Configure and start a Zookeeper service.""" used_zk_port = port_for.select_random() if zk_port is None else zk_port tempdir_path = request.getfixturevalue(_get_tmpdir_fixture_name(scope)) zk_dir = tempdir_path / 'zookeeper-{}'.format(used_zk_port) zk_data_dir = zk_dir / 'data' zk_data_dir.mkdir(parents=True) zk_config_path = zk_dir / 'zookeeper.properties' _write_config( zk_config_template, zk_config_path, zk_port=used_zk_port, zk_data_dir=zk_data_dir ) zk_proc = Popen( [zk_bin, str(zk_config_path)], start_new_session=True, ) request.addfinalizer(lambda: teardown_fn(zk_proc)) # Kafka will wait for zookeeper, not need to poll it here. # If you use the zookeeper fixure alone, I'm sorry. return zk_proc, used_zk_port return zookeeper_process
[docs] def make_kafka_server( kafka_bin: str, zookeeper_fixture_name: str, kafka_port: Optional[int] = None, kafka_config_template: str = KAFKA_SERVER_CONFIG_TEMPLATE, teardown_fn: Callable[[Popen], Any] = terminate, scope: str = 'function', timeout: int = 15 ) -> Callable[..., Tuple[Popen, int]]: """ Make a Kafka fixture. The fixture will spawn a Kafka process in a new process group and return its process handle and port number. Data will be stored in a Pytest-provided temporary directory. :param zookeeper_fixture_name: the name of the Zookeeper fixture to depend on. The scope must not be wider than this fixture's scope. :param kafka_bin: path to Kafka launch script (typically to bin/kafka-server-start.sh) :param kafka_port: Kafka port (random free port by default) :param kafka_config_template: Kafka config template, must use keys ``kafka_log_dir`` and ``kafka_port``. See :py:const:`pytest_kafka.constants.KAFKA_SERVER_CONFIG_TEMPLATE`. :param teardown_fn: function to tear down Kafka (:py:func:`terminate` by default) :param scope: 'function' or 'session' :param timeout: How long to wait for kafka to come online in seconds """ @pytest.fixture(scope=scope) # type: ignore def kafka_server(request: 'SubRequest') -> Tuple[Popen, int]: """Configure and start a Kafka server.""" _, zk_port = request.getfixturevalue(zookeeper_fixture_name) used_kafka_port = port_for.select_random() if kafka_port is None else kafka_port tempdir_path = request.getfixturevalue(_get_tmpdir_fixture_name(scope)) kafka_dir = tempdir_path / 'kafka-server-{}'.format(used_kafka_port) kafka_log_dir = kafka_dir / 'logs' kafka_log_dir.mkdir(parents=True) kafka_config_path = kafka_dir / 'kafka-server.properties' _write_config( kafka_config_template, kafka_config_path, zk_port=zk_port, kafka_port=used_kafka_port, kafka_log_dir=kafka_log_dir ) kafka_proc = Popen( [kafka_bin, str(kafka_config_path)], start_new_session=True, ) request.addfinalizer(lambda: teardown_fn(kafka_proc)) def kafka_started(): assert kafka_proc.poll() is None, 'Kafka process must not terminate' try: producer = KafkaProducer(bootstrap_servers='localhost:{}'.format(used_kafka_port)) producer.close() except NoBrokersAvailable: return False return True # Silence kafka errors when polling. kafka_logger = logging.getLogger('kafka.producer.kafka') prev_propagate = kafka_logger.propagate try: kafka_logger.propagate = False _wait_until(kafka_started, timeout=timeout) finally: kafka_logger.propagate = prev_propagate return kafka_proc, used_kafka_port return kafka_server
[docs] def make_kafka_consumer( kafka_fixture_name: str, kafka_topics: Optional[List[str]] = None, seek_to_beginning: bool = False, scope: str = 'function', **consumer_kwargs ) -> Callable[..., KafkaConsumer]: """ Make a Kafka consumer fixture. :param kafka_fixture_name: the name of the Kafka fixture to depend on :param kafka_topics: topics to subscribe to :param seek_to_beginning: whether the consumer should consume from the earlies offsets. Solves the race condition between consumer setup and Kafka server + Producer setup but requires to know the topics upfront. :param consumer_kwargs: what to pass to KafkaConsumer. By default ``bootstrap_servers`` will get the server from the passed fixture and `consumer_timeout_ms` will be :py:const:`pytest_kafka.constants.DEFAULT_CONSUMER_TIMEOUT_MS`. It's recommended to pass both ``kafka_topics`` and ``seek_to_beginning``. """ if kafka_topics is None: kafka_topics = [] @pytest.fixture(scope=scope) # type: ignore def kafka_consumer(request: 'SubRequest') -> KafkaConsumer: """ Get a connected Kafka consumer. Will consume from the beginning and with a timeout, so ``list(consumer)`` can be used. """ _, kafka_port = request.getfixturevalue(kafka_fixture_name) used_consumer_kwargs = consumer_kwargs.copy() used_consumer_kwargs.setdefault('consumer_timeout_ms', DEFAULT_CONSUMER_TIMEOUT_MS) used_consumer_kwargs.setdefault('bootstrap_servers', 'localhost:{}'.format(kafka_port)) consumer = KafkaConsumer( *kafka_topics, **used_consumer_kwargs, ) if seek_to_beginning: assert kafka_topics, ( 'In order to be able to seek to beginning, we must have some partitions assigned ' 'for which we need to subscribe to topics.') def partitions_assigned(): consumer.poll(timeout_ms=20) return len(consumer.assignment()) > 0 _wait_until(partitions_assigned) consumer.seek_to_beginning() return consumer return kafka_consumer