pytest_kafka package
Pytest-kafka public API.
- pytest_kafka.make_kafka_consumer(kafka_fixture_name: str, kafka_topics: List[str] | None = None, seek_to_beginning: bool = False, scope: str = 'function', **consumer_kwargs) Callable[[...], KafkaConsumer][source]
Make a Kafka consumer fixture.
- Parameters:
kafka_fixture_name – the name of the Kafka fixture to depend on
kafka_topics – topics to subscribe to
seek_to_beginning – whether the consumer should consume from the earlies offsets. Solves the race condition between consumer setup and Kafka server + Producer setup but requires to know the topics upfront.
consumer_kwargs – what to pass to KafkaConsumer. By default
bootstrap_serverswill get the server from the passed fixture and consumer_timeout_ms will bepytest_kafka.constants.DEFAULT_CONSUMER_TIMEOUT_MS.
It’s recommended to pass both
kafka_topicsandseek_to_beginning.
- pytest_kafka.make_kafka_server(kafka_bin: str, zookeeper_fixture_name: str, kafka_port: int | None = None, kafka_config_template: str = 'reserved.broker.max.id=65535\nbroker.id={kafka_port}\nlisteners=PLAINTEXT://:{kafka_port}\nlog.dirs={kafka_log_dir}\nnum.partitions=1\n# The number of threads lowered to 1 - may boost startup time:\nnum.recovery.threads.per.data.dir=1\nnum.network.threads=1\nnum.io.threads=1\nlog.retention.hours=1\nlog.segment.bytes=1073741824\nzookeeper.connect=localhost:{zk_port}\nzookeeper.connection.timeout.ms=6000\noffsets.topic.replication.factor=1\ndefault.replication.factor=1\n', teardown_fn: ~typing.Callable[[~subprocess.Popen], ~typing.Any] = <function terminate>, scope: str = 'function', timeout: int = 15) Callable[[...], Tuple[Popen, int]][source]
Make a Kafka fixture.
The fixture will spawn a Kafka process in a new process group and return its process handle and port number. Data will be stored in a Pytest-provided temporary directory.
- Parameters:
zookeeper_fixture_name – the name of the Zookeeper fixture to depend on. The scope must not be wider than this fixture’s scope.
kafka_bin – path to Kafka launch script (typically to bin/kafka-server-start.sh)
kafka_port – Kafka port (random free port by default)
kafka_config_template – Kafka config template, must use keys
kafka_log_dirandkafka_port. Seepytest_kafka.constants.KAFKA_SERVER_CONFIG_TEMPLATE.teardown_fn – function to tear down Kafka (
terminate()by default)scope – ‘function’ or ‘session’
timeout – How long to wait for kafka to come online in seconds
- pytest_kafka.make_zookeeper_process(zk_bin: str, zk_port: int | None = None, zk_config_template: str = 'dataDir={zk_data_dir}\nclientPort={zk_port}\nmaxClientCnxns=0\nadmin.enableServer=false\n', teardown_fn: ~typing.Callable[[~subprocess.Popen], ~typing.Any] = <function terminate>, scope: str = 'function') Callable[[...], Tuple[Popen, int]][source]
Make a Zookeeper fixture.
The fixture will spawn a Zookeeper process in a new process group and return its process handle and port number. Data will be stored in a Pytest-provided temporary directory.
- Parameters:
zk_bin – path to Zookeeper launch script (typically to bin/zookeeper-server-start.sh)
zk_port – Zookeeper port (random free port by default)
zk_config_template – Zookeeper config template, must use keys
zk_data_dirandzk_port. Seepytest_kafka.constants.ZOOKEEPER_CONFIG_TEMPLATE.teardown_fn – function to tear down Zookeeper (
terminate()by default)scope – ‘function’ or ‘session’
- pytest_kafka.terminate(proc: ~subprocess.Popen, signal_fn: ~typing.Callable[[~subprocess.Popen], ~typing.Any] = <function Popen.terminate>, wait_timeout: float = 5) None[source]
Kill the process with the desired method (SIGTERM by default) and wait for it.