pytest_kafka package

Pytest-kafka public API.

pytest_kafka.make_zookeeper_process(zk_bin, zk_port=None, zk_config_template='dataDir={zk_data_dir}\nclientPort={zk_port}\nmaxClientCnxns=0\nadmin.enableServer=false\n', teardown_fn=<function terminate>, scope='function')[source]

Make a Zookeeper fixture.

The fixture will spawn a Zookeeper process in a new process group and return its process handle and port number. Data will be stored in a Pytest-provided temporary directory.

Parameters:
Return type:

Callable[…, Tuple[Popen, int]]

pytest_kafka.make_kafka_server(kafka_bin, zookeeper_fixture_name, kafka_port=None, kafka_config_template='reserved.broker.max.id=65535\nbroker.id={kafka_port}\nlisteners=PLAINTEXT://:{kafka_port}\nlog.dirs={kafka_log_dir}\nnum.partitions=1\n# The number of threads lowered to 1 - may boost startup time:\nnum.recovery.threads.per.data.dir=1\nnum.network.threads=1\nnum.io.threads=1\nlog.retention.hours=1\nlog.segment.bytes=1073741824\nzookeeper.connect=localhost:{zk_port}\nzookeeper.connection.timeout.ms=6000\noffsets.topic.replication.factor=1\ndefault.replication.factor=1\n', teardown_fn=<function terminate>, scope='function', timeout=15)[source]

Make a Kafka fixture.

The fixture will spawn a Kafka process in a new process group and return its process handle and port number. Data will be stored in a Pytest-provided temporary directory.

Parameters:
  • zookeeper_fixture_name (str) – the name of the Zookeeper fixture to depend on. The scope must not be wider than this fixture’s scope.

  • kafka_bin (str) – path to Kafka launch script (typically to bin/kafka-server-start.sh)

  • kafka_port (Optional[int]) – Kafka port (random free port by default)

  • kafka_config_template (str) – Kafka config template, must use keys kafka_log_dir and kafka_port. See pytest_kafka.constants.KAFKA_SERVER_CONFIG_TEMPLATE.

  • teardown_fn (Callable[[Popen], Any]) – function to tear down Kafka (terminate() by default)

  • scope (str) – ‘function’ or ‘session’

  • timeout (int) – How long to wait for kafka to come online in seconds

Return type:

Callable[…, Tuple[Popen, int]]

pytest_kafka.make_kafka_consumer(kafka_fixture_name, kafka_topics=None, seek_to_beginning=False, scope='function', **consumer_kwargs)[source]

Make a Kafka consumer fixture.

Parameters:
  • kafka_fixture_name (str) – the name of the Kafka fixture to depend on

  • kafka_topics (Optional[List[str]]) – topics to subscribe to

  • seek_to_beginning (bool) – whether the consumer should consume from the earlies offsets. Solves the race condition between consumer setup and Kafka server + Producer setup but requires to know the topics upfront.

  • consumer_kwargs – what to pass to KafkaConsumer. By default bootstrap_servers will get the server from the passed fixture and consumer_timeout_ms will be pytest_kafka.constants.DEFAULT_CONSUMER_TIMEOUT_MS.

It’s recommended to pass both kafka_topics and seek_to_beginning.

Return type:

Callable[…, KafkaConsumer]

pytest_kafka.terminate(proc, signal_fn=<function Popen.terminate>, wait_timeout=5)[source]

Kill the process with the desired method (SIGTERM by default) and wait for it.

Return type:

None

Submodules