pytest_kafka package¶
Pytest-kafka public API.
-
pytest_kafka.
make_zookeeper_process
(zk_bin: str, zk_port: Optional[int] = None, zk_config_template: str = '\ndataDir={zk_data_dir}\nclientPort={zk_port}\nmaxClientCnxns=0\n', teardown_fn: Callable[subprocess.Popen, Any] = <function terminate>, scope: str = 'function') → Callable[..., Tuple[subprocess.Popen, int]][source]¶ Make a Zookeeper fixture.
The fixture will spawn a Zookeeper process in a new process group and return its process handle and port number. Data will be stored in a Pytest-provided temporary directory.
Return type: Callable
Parameters: - zk_bin (
str
) – path to Zookeeper launch script (typically to bin/zookeeper-server-start.sh) - zk_port – Zookeeper port (random free port by default)
- zk_config_template (
str
) – Zookeeper config template, must use keyszk_data_dir
andzk_port
. Seepytest_kafka.constants.ZOOKEEPER_CONFIG_TEMPLATE
. - teardown_fn (
Callable
) – function to tear down Zookeeper (terminate()
by default) - scope (
str
) – ‘function’ or ‘session’
- zk_bin (
-
pytest_kafka.
make_kafka_server
(kafka_bin: str, zookeeper_fixture_name: str, kafka_port: Optional[int] = None, kafka_config_template: str = '\nreserved.broker.max.id=65535\nbroker.id={kafka_port}\nlisteners=PLAINTEXT://:{kafka_port}\nlog.dirs={kafka_log_dir}\nnum.partitions=1\n# The number of threads lowered to 1 - may boost startup time:\nnum.recovery.threads.per.data.dir=1\nnum.network.threads=1\nnum.io.threads=1\nlog.retention.hours=1\nlog.segment.bytes=1073741824\nzookeeper.connect=localhost:{zk_port}\nzookeeper.connection.timeout.ms=6000\noffsets.topic.replication.factor=1\ndefault.replication.factor=1\n', teardown_fn: Callable[subprocess.Popen, Any] = <function terminate>, scope: str = 'function') → Callable[..., Tuple[subprocess.Popen, int]][source]¶ Make a Kafka fixture.
The fixture will spawn a Kafka process in a new process group and return its process handle and port number. Data will be stored in a Pytest-provided temporary directory.
Return type: Callable
Parameters: - zookeeper_fixture_name (
str
) – the name of the Zookeeper fixture to depend on. The scope must not be wider than this fixture’s scope. - kafka_bin (
str
) – path to Kafka launch script (typically to bin/kafka-server-start.sh) - kafka_port – Kafka port (random free port by default)
- kafka_config_template (
str
) – Kafka config template, must use keyskafka_log_dir
andkafka_port
. Seepytest_kafka.constants.KAFKA_SERVER_CONFIG_TEMPLATE
. - teardown_fn (
Callable
) – function to tear down Kafka (terminate()
by default) - scope (
str
) – ‘function’ or ‘session’
- zookeeper_fixture_name (
-
pytest_kafka.
make_kafka_consumer
(kafka_fixture_name: str, kafka_topics: Optional[List[str]] = None, seek_to_beginning: bool = False, **consumer_kwargs) → Callable[..., kafka.consumer.group.KafkaConsumer][source]¶ Make a Kafka consumer fixture.
Unlike the other fixtures, the scope is always
"function"
.Return type: Callable
Parameters: - kafka_fixture_name (
str
) – the name of the Kafka fixture to depend on - kafka_topics – topics to subscribe to
- seek_to_beginning (
bool
) – whether the consumer should consume from the earlies offsets. Solves the race condition between consumer setup and Kafka server + Producer setup but requires to know the topics upfront. - consumer_kwargs – what to pass to KafkaConsumer. By default
bootstrap_servers
will get the server from the passed fixture and consumer_timeout_ms will bepytest_kafka.constants.DEFAULT_CONSUMER_TIMEOUT_MS
.
It’s recommended to pass both
kafka_topics
andseek_to_beginning
.- kafka_fixture_name (