pytest_kafka package

Pytest-kafka public API.

pytest_kafka.make_zookeeper_process(zk_bin: str, zk_port: Optional[int] = None, zk_config_template: str = '\ndataDir={zk_data_dir}\nclientPort={zk_port}\nmaxClientCnxns=0\n', scope: str = 'function') → Callable[..., Tuple[subprocess.Popen, int]][source]

Make a Zookeeper fixture.

The fixture will spawn a Zookeeper process in a new process group and return its process handle and port number. Data will be stored in a Pytest-provided temporary directory.

Return type:

Callable

Parameters:
  • zk_bin (str) – path to Zookeeper launch script (typically to bin/zookeeper-server-start.sh)
  • zk_port (Union) – Zookeeper port (random free port by default)
  • zk_config_template (str) – Zookeeper config template, must use keys zk_data_dir and zk_port. See pytest_kafka.constants.ZOOKEEPER_CONFIG_TEMPLATE.
  • scope (str) – ‘function’ or ‘session’
pytest_kafka.make_kafka_server(kafka_bin: str, zookeeper_fixture_name: str, kafka_port: Optional[int] = None, kafka_config_template: str = '\nreserved.broker.max.id=65535\nbroker.id={kafka_port}\nlisteners=PLAINTEXT://:{kafka_port}\nlog.dirs={kafka_log_dir}\nnum.partitions=1\n# The number of threads lowered to 1 - may boost startup time:\nnum.recovery.threads.per.data.dir=1\nnum.network.threads=1\nnum.io.threads=1\nlog.retention.hours=1\nlog.segment.bytes=1073741824\nzookeeper.connect=localhost:{zk_port}\nzookeeper.connection.timeout.ms=6000\noffsets.topic.replication.factor=1\ndefault.replication.factor=1\n', scope: str = 'function') → Callable[..., Tuple[subprocess.Popen, int]][source]

Make a Kafka fixture.

The fixture will spawn a Kafka process in a new process group and return its process handle and port number. Data will be stored in a Pytest-provided temporary directory.

Return type:

Callable

Parameters:
  • zookeeper_fixture_name (str) – the name of the Zookeeper fixture to depend on. The scope must not be wider than this fixture’s scope.
  • kafka_bin (str) – path to Kafka launch script (typically to bin/kafka-server-start.sh)
  • kafka_port (Union) – Kafka port (random free port by default)
  • kafka_config_template (str) – Kafka config template, must use keys kafka_log_dir and kafka_port. See pytest_kafka.constants.KAFKA_SERVER_CONFIG_TEMPLATE.
  • scope (str) – ‘function’ or ‘session’
pytest_kafka.make_kafka_consumer(kafka_fixture_name: str, kafka_topics: Optional[List[str]] = None, seek_to_beginning: bool = False, **consumer_kwargs) → Callable[..., kafka.consumer.group.KafkaConsumer][source]

Make a Kafka consumer fixture.

Unlike the other fixtures, the scope is always "function".

Return type:

Callable

Parameters:
  • kafka_fixture_name (str) – the name of the Kafka fixture to depend on
  • kafka_topics (Union) – topics to subscribe to
  • seek_to_beginning (bool) – whether the consumer should consume from the earlies offsets. Solves the race condition between consumer setup and Kafka server + Producer setup but requires to know the topics upfront.
  • consumer_kwargs – what to pass to KafkaConsumer. By default bootstrap_servers will get the server from the passed fixture and consumer_timeout_ms will be pytest_kafka.constants.DEFAULT_CONSUMER_TIMEOUT_MS.

It’s recommended to pass both kafka_topics and seek_to_beginning.