pytest_kafka package¶
Pytest-kafka public API.
-
pytest_kafka.
make_zookeeper_process
(zk_bin, zk_port=None, zk_config_template='\ndataDir={zk_data_dir}\nclientPort={zk_port}\nmaxClientCnxns=0\n', teardown_fn=<function terminate>, scope='function')[source]¶ Make a Zookeeper fixture.
The fixture will spawn a Zookeeper process in a new process group and return its process handle and port number. Data will be stored in a Pytest-provided temporary directory.
- Parameters
zk_bin (
str
) – path to Zookeeper launch script (typically to bin/zookeeper-server-start.sh)zk_port (
Optional
[int
]) – Zookeeper port (random free port by default)zk_config_template (
str
) – Zookeeper config template, must use keyszk_data_dir
andzk_port
. Seepytest_kafka.constants.ZOOKEEPER_CONFIG_TEMPLATE
.teardown_fn (
Callable
[[Popen
],Any
]) – function to tear down Zookeeper (terminate()
by default)scope (
str
) – ‘function’ or ‘session’
- Return type
-
pytest_kafka.
make_kafka_server
(kafka_bin, zookeeper_fixture_name, kafka_port=None, kafka_config_template='\nreserved.broker.max.id=65535\nbroker.id={kafka_port}\nlisteners=PLAINTEXT://:{kafka_port}\nlog.dirs={kafka_log_dir}\nnum.partitions=1\n# The number of threads lowered to 1 - may boost startup time:\nnum.recovery.threads.per.data.dir=1\nnum.network.threads=1\nnum.io.threads=1\nlog.retention.hours=1\nlog.segment.bytes=1073741824\nzookeeper.connect=localhost:{zk_port}\nzookeeper.connection.timeout.ms=6000\noffsets.topic.replication.factor=1\ndefault.replication.factor=1\n', teardown_fn=<function terminate>, scope='function')[source]¶ Make a Kafka fixture.
The fixture will spawn a Kafka process in a new process group and return its process handle and port number. Data will be stored in a Pytest-provided temporary directory.
- Parameters
zookeeper_fixture_name (
str
) – the name of the Zookeeper fixture to depend on. The scope must not be wider than this fixture’s scope.kafka_bin (
str
) – path to Kafka launch script (typically to bin/kafka-server-start.sh)kafka_port (
Optional
[int
]) – Kafka port (random free port by default)kafka_config_template (
str
) – Kafka config template, must use keyskafka_log_dir
andkafka_port
. Seepytest_kafka.constants.KAFKA_SERVER_CONFIG_TEMPLATE
.teardown_fn (
Callable
[[Popen
],Any
]) – function to tear down Kafka (terminate()
by default)scope (
str
) – ‘function’ or ‘session’
- Return type
-
pytest_kafka.
make_kafka_consumer
(kafka_fixture_name, kafka_topics=None, seek_to_beginning=False, scope='function', **consumer_kwargs)[source]¶ Make a Kafka consumer fixture.
- Parameters
kafka_fixture_name (
str
) – the name of the Kafka fixture to depend onseek_to_beginning (
bool
) – whether the consumer should consume from the earlies offsets. Solves the race condition between consumer setup and Kafka server + Producer setup but requires to know the topics upfront.consumer_kwargs – what to pass to KafkaConsumer. By default
bootstrap_servers
will get the server from the passed fixture and consumer_timeout_ms will bepytest_kafka.constants.DEFAULT_CONSUMER_TIMEOUT_MS
.
It’s recommended to pass both
kafka_topics
andseek_to_beginning
.- Return type
Callable
[…,KafkaConsumer
]