Skip to content

Commit

Permalink
Migrate from Unittest to pytest (#1620)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwidman authored and dpkp committed Nov 10, 2018
1 parent cd47701 commit bb5bc1f
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 73 deletions.
2 changes: 0 additions & 2 deletions test/conftest.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from __future__ import absolute_import

import inspect

import pytest

from test.fixtures import KafkaFixture, ZookeeperFixture, random_string, version as kafka_version
Expand Down
16 changes: 9 additions & 7 deletions test/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from mock import MagicMock, patch
from . import unittest
import pytest

from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
from kafka.errors import (
Expand All @@ -11,17 +12,13 @@
FetchResponsePayload, OffsetAndMessage, OffsetFetchResponsePayload)


class TestKafkaConsumer(unittest.TestCase):
def test_non_integer_partitions(self):
with self.assertRaises(AssertionError):
SimpleConsumer(MagicMock(), 'group', 'topic', partitions=['0'])

class TestKafkaConsumer:
def test_session_timeout_larger_than_request_timeout_raises(self):
with self.assertRaises(KafkaConfigurationError):
with pytest.raises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0,9), group_id='foo', session_timeout_ms=60000, request_timeout_ms=40000)

def test_fetch_max_wait_larger_than_request_timeout_raises(self):
with self.assertRaises(KafkaConfigurationError):
with pytest.raises(KafkaConfigurationError):
KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=41000, request_timeout_ms=40000)

def test_subscription_copy(self):
Expand All @@ -43,7 +40,12 @@ def test_partition_list(self):
self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member


class TestSimpleConsumer(unittest.TestCase):
def test_non_integer_partitions(self):
with self.assertRaises(AssertionError):
SimpleConsumer(MagicMock(), 'group', 'topic', partitions=['0'])

def test_simple_consumer_failed_payloads(self):
client = MagicMock()
consumer = SimpleConsumer(client, group=None,
Expand Down
20 changes: 2 additions & 18 deletions test/test_consumer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import pytest
from kafka.vendor import six

from kafka import SimpleClient
from kafka.conn import ConnectionStates
from kafka.consumer.group import KafkaConsumer
from kafka.coordinator.base import MemberState, Generation
Expand All @@ -20,25 +19,10 @@ def get_connect_str(kafka_broker):
return kafka_broker.host + ':' + str(kafka_broker.port)


@pytest.fixture
def simple_client(kafka_broker):
return SimpleClient(get_connect_str(kafka_broker))


@pytest.fixture
def topic(simple_client):
topic = random_string(5)
simple_client.ensure_topic_exists(topic)
return topic


@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
def test_consumer(kafka_broker, version):

def test_consumer(kafka_broker, topic, version):
# The `topic` fixture is included because
# 0.8.2 brokers need a topic to function well
if version >= (0, 8, 2) and version < (0, 9):
topic(simple_client(kafka_broker))

consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
consumer.poll(500)
assert len(consumer._client._conns) > 0
Expand Down
19 changes: 10 additions & 9 deletions test/test_consumer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,21 @@

from test.conftest import version
from test.fixtures import ZookeeperFixture, KafkaFixture, random_string
from test.testutil import (
KafkaIntegrationTestCase, kafka_versions, Timer,
send_messages
)
from test.testutil import KafkaIntegrationTestCase, kafka_versions, Timer


@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
def test_kafka_consumer(simple_client, topic, kafka_consumer_factory):
"""Test KafkaConsumer
"""
def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory):
"""Test KafkaConsumer"""
kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest')

send_messages(simple_client, topic, 0, range(0, 100))
send_messages(simple_client, topic, 1, range(100, 200))
# TODO replace this with a `send_messages()` pytest fixture
# as we will likely need this elsewhere
for i in range(0, 100):
kafka_producer.send(topic, partition=0, value=str(i).encode())
for i in range(100, 200):
kafka_producer.send(topic, partition=1, value=str(i).encode())
kafka_producer.flush()

cnt = 0
messages = {0: set(), 1: set()}
Expand Down
23 changes: 10 additions & 13 deletions test/test_package.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,25 @@
from . import unittest


class TestPackage(unittest.TestCase):
class TestPackage:
def test_top_level_namespace(self):
import kafka as kafka1
self.assertEqual(kafka1.KafkaConsumer.__name__, "KafkaConsumer")
self.assertEqual(kafka1.consumer.__name__, "kafka.consumer")
self.assertEqual(kafka1.codec.__name__, "kafka.codec")
assert kafka1.KafkaConsumer.__name__ == "KafkaConsumer"
assert kafka1.consumer.__name__ == "kafka.consumer"
assert kafka1.codec.__name__ == "kafka.codec"

def test_submodule_namespace(self):
import kafka.client as client1
self.assertEqual(client1.__name__, "kafka.client")
assert client1.__name__ == "kafka.client"

from kafka import client as client2
self.assertEqual(client2.__name__, "kafka.client")
assert client2.__name__ == "kafka.client"

from kafka.client import SimpleClient as SimpleClient1
self.assertEqual(SimpleClient1.__name__, "SimpleClient")
assert SimpleClient1.__name__ == "SimpleClient"

from kafka.codec import gzip_encode as gzip_encode1
self.assertEqual(gzip_encode1.__name__, "gzip_encode")
assert gzip_encode1.__name__ == "gzip_encode"

from kafka import SimpleClient as SimpleClient2
self.assertEqual(SimpleClient2.__name__, "SimpleClient")
assert SimpleClient2.__name__ == "SimpleClient"

from kafka.codec import snappy_encode
self.assertEqual(snappy_encode.__name__, "snappy_encode")
assert snappy_encode.__name__ == "snappy_encode"
26 changes: 3 additions & 23 deletions test/testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@
import functools
import operator
import os
import socket
import time
import uuid

import pytest
from . import unittest

from kafka import SimpleClient, create_message
from kafka import SimpleClient
from kafka.errors import (
LeaderNotAvailableError, KafkaTimeoutError, InvalidTopicError,
NotLeaderForPartitionError, UnknownTopicOrPartitionError,
FailedPayloadsError
)
from kafka.structs import OffsetRequestPayload, ProduceRequestPayload
from kafka.structs import OffsetRequestPayload
from test.fixtures import random_string, version_str_to_list, version as kafka_version #pylint: disable=wrong-import-order


Expand Down Expand Up @@ -67,26 +66,6 @@ def wrapper(func, *args, **kwargs):
return real_kafka_versions


_MESSAGES = {}
def msg(message):
"""Format, encode and deduplicate a message
"""
global _MESSAGES #pylint: disable=global-statement
if message not in _MESSAGES:
_MESSAGES[message] = '%s-%s' % (message, str(uuid.uuid4()))

return _MESSAGES[message].encode('utf-8')

def send_messages(client, topic, partition, messages):
"""Send messages to a topic's partition
"""
messages = [create_message(msg(str(m))) for m in messages]
produce = ProduceRequestPayload(topic, partition, messages=messages)
resp, = client.send_produce_request([produce])
assert resp.error == 0

return [x.value for x in messages]

def current_offset(client, topic, partition, kafka_broker=None):
"""Get the current offset of a topic's partition
"""
Expand All @@ -101,6 +80,7 @@ def current_offset(client, topic, partition, kafka_broker=None):
else:
return offsets.offsets[0]


class KafkaIntegrationTestCase(unittest.TestCase):
create_client = True
topic = None
Expand Down
1 change: 0 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ deps =
xxhash
crc32c
py26: unittest2
decorator
commands =
py.test {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka --cov-config=.covrc}
setenv =
Expand Down

0 comments on commit bb5bc1f

Please sign in to comment.