Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement methods to convert a Struct object to a pythonic object #1951

Merged
merged 5 commits into from
Feb 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion kafka/protocol/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import abc

from kafka.protocol.struct import Struct
from kafka.protocol.types import Int16, Int32, String, Schema
from kafka.protocol.types import Int16, Int32, String, Schema, Array


class RequestHeader(Struct):
Expand Down Expand Up @@ -47,6 +47,9 @@ def expect_response(self):
"""Override this method if an api request does not always generate a response"""
return True

def to_object(self):
return _to_object(self.SCHEMA, self)


class Response(Struct):
__metaclass__ = abc.ABCMeta
Expand All @@ -65,3 +68,30 @@ def API_VERSION(self):
def SCHEMA(self):
"""An instance of Schema() representing the response structure"""
pass

def to_object(self):
return _to_object(self.SCHEMA, self)


def _to_object(schema, data):
obj = {}
for idx, (name, _type) in enumerate(zip(schema.names, schema.fields)):
if isinstance(data, Struct):
val = data.get_item(name)
else:
val = data[idx]

if isinstance(_type, Schema):
obj[name] = _to_object(_type, val)
elif isinstance(_type, Array):
if isinstance(_type.array_of, (Array, Schema)):
obj[name] = [
_to_object(_type.array_of, x)
for x in val
]
else:
obj[name] = val
else:
obj[name] = val

return obj
6 changes: 6 additions & 0 deletions kafka/protocol/struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def __init__(self, *args, **kwargs):
# causes instances to "leak" to garbage
self.encode = WeakMethod(self._encode_self)


@classmethod
def encode(cls, item): # pylint: disable=E0202
bits = []
Expand All @@ -48,6 +49,11 @@ def decode(cls, data):
data = BytesIO(data)
return cls(*[field.decode(data) for field in cls.SCHEMA.fields])

def get_item(self, name):
if name not in self.SCHEMA.names:
raise KeyError("%s is not in the schema" % name)
return self.__dict__[name]

def __repr__(self):
key_vals = []
for name, field in zip(self.SCHEMA.names, self.SCHEMA.fields):
Expand Down
236 changes: 236 additions & 0 deletions test/test_object_conversion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
from kafka.protocol.admin import Request
from kafka.protocol.admin import Response
from kafka.protocol.types import Schema
from kafka.protocol.types import Array
from kafka.protocol.types import Int16
from kafka.protocol.types import String

import pytest

@pytest.mark.parametrize('superclass', (Request, Response))
class TestObjectConversion:
def test_get_item(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myobject', Int16))

tc = TestClass(myobject=0)
assert tc.get_item('myobject') == 0
with pytest.raises(KeyError):
tc.get_item('does-not-exist')

def test_with_empty_schema(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema()

tc = TestClass()
tc.encode()
assert tc.to_object() == {}

def test_with_basic_schema(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myobject', Int16))

tc = TestClass(myobject=0)
tc.encode()
assert tc.to_object() == {'myobject': 0}

def test_with_basic_array_schema(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myarray', Array(Int16)))

tc = TestClass(myarray=[1,2,3])
tc.encode()
assert tc.to_object()['myarray'] == [1, 2, 3]

def test_with_complex_array_schema(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myarray', Array(
('subobject', Int16),
('othersubobject', String('utf-8')))))

tc = TestClass(
myarray=[[10, 'hello']]
)
tc.encode()
obj = tc.to_object()
assert len(obj['myarray']) == 1
assert obj['myarray'][0]['subobject'] == 10
assert obj['myarray'][0]['othersubobject'] == 'hello'

def test_with_array_and_other(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myarray', Array(
('subobject', Int16),
('othersubobject', String('utf-8')))),
('notarray', Int16))

tc = TestClass(
myarray=[[10, 'hello']],
notarray=42
)

obj = tc.to_object()
assert len(obj['myarray']) == 1
assert obj['myarray'][0]['subobject'] == 10
assert obj['myarray'][0]['othersubobject'] == 'hello'
assert obj['notarray'] == 42

def test_with_nested_array(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myarray', Array(
('subarray', Array(Int16)),
('otherobject', Int16))))

tc = TestClass(
myarray=[
[[1, 2], 2],
[[2, 3], 4],
]
)
print(tc.encode())


obj = tc.to_object()
assert len(obj['myarray']) == 2
assert obj['myarray'][0]['subarray'] == [1, 2]
assert obj['myarray'][0]['otherobject'] == 2
assert obj['myarray'][1]['subarray'] == [2, 3]
assert obj['myarray'][1]['otherobject'] == 4

def test_with_complex_nested_array(self, superclass):
class TestClass(superclass):
API_KEY = 0
API_VERSION = 0
RESPONSE_TYPE = None # To satisfy the Request ABC
SCHEMA = Schema(
('myarray', Array(
('subarray', Array(
('innertest', String('utf-8')),
('otherinnertest', String('utf-8')))),
('othersubarray', Array(Int16)))),
('notarray', String('utf-8')))

tc = TestClass(
myarray=[
[[['hello', 'hello'], ['hello again', 'hello again']], [0]],
[[['hello', 'hello again']], [1]],
],
notarray='notarray'
)
tc.encode()

obj = tc.to_object()

assert obj['notarray'] == 'notarray'
myarray = obj['myarray']
assert len(myarray) == 2

assert myarray[0]['othersubarray'] == [0]
assert len(myarray[0]['subarray']) == 2
assert myarray[0]['subarray'][0]['innertest'] == 'hello'
assert myarray[0]['subarray'][0]['otherinnertest'] == 'hello'
assert myarray[0]['subarray'][1]['innertest'] == 'hello again'
assert myarray[0]['subarray'][1]['otherinnertest'] == 'hello again'

assert myarray[1]['othersubarray'] == [1]
assert len(myarray[1]['subarray']) == 1
assert myarray[1]['subarray'][0]['innertest'] == 'hello'
assert myarray[1]['subarray'][0]['otherinnertest'] == 'hello again'

def test_with_metadata_response():
from kafka.protocol.metadata import MetadataResponse_v5
tc = MetadataResponse_v5(
throttle_time_ms=0,
brokers=[
[0, 'testhost0', 9092, 'testrack0'],
[1, 'testhost1', 9092, 'testrack1'],
],
cluster_id='abcd',
controller_id=0,
topics=[
[0, 'testtopic1', False, [
[0, 0, 0, [0, 1], [0, 1], []],
[0, 1, 1, [1, 0], [1, 0], []],
],
], [0, 'other-test-topic', True, [
[0, 0, 0, [0, 1], [0, 1], []],
]
]]
)
tc.encode() # Make sure this object encodes successfully


obj = tc.to_object()

assert obj['throttle_time_ms'] == 0

assert len(obj['brokers']) == 2
assert obj['brokers'][0]['node_id'] == 0
assert obj['brokers'][0]['host'] == 'testhost0'
assert obj['brokers'][0]['port'] == 9092
assert obj['brokers'][0]['rack'] == 'testrack0'
assert obj['brokers'][1]['node_id'] == 1
assert obj['brokers'][1]['host'] == 'testhost1'
assert obj['brokers'][1]['port'] == 9092
assert obj['brokers'][1]['rack'] == 'testrack1'

assert obj['cluster_id'] == 'abcd'
assert obj['controller_id'] == 0

assert len(obj['topics']) == 2
assert obj['topics'][0]['error_code'] == 0
assert obj['topics'][0]['topic'] == 'testtopic1'
assert obj['topics'][0]['is_internal'] == False
assert len(obj['topics'][0]['partitions']) == 2
assert obj['topics'][0]['partitions'][0]['error_code'] == 0
assert obj['topics'][0]['partitions'][0]['partition'] == 0
assert obj['topics'][0]['partitions'][0]['leader'] == 0
assert obj['topics'][0]['partitions'][0]['replicas'] == [0, 1]
assert obj['topics'][0]['partitions'][0]['isr'] == [0, 1]
assert obj['topics'][0]['partitions'][0]['offline_replicas'] == []
assert obj['topics'][0]['partitions'][1]['error_code'] == 0
assert obj['topics'][0]['partitions'][1]['partition'] == 1
assert obj['topics'][0]['partitions'][1]['leader'] == 1
assert obj['topics'][0]['partitions'][1]['replicas'] == [1, 0]
assert obj['topics'][0]['partitions'][1]['isr'] == [1, 0]
assert obj['topics'][0]['partitions'][1]['offline_replicas'] == []

assert obj['topics'][1]['error_code'] == 0
assert obj['topics'][1]['topic'] == 'other-test-topic'
assert obj['topics'][1]['is_internal'] == True
assert len(obj['topics'][1]['partitions']) == 1
assert obj['topics'][1]['partitions'][0]['error_code'] == 0
assert obj['topics'][1]['partitions'][0]['partition'] == 0
assert obj['topics'][1]['partitions'][0]['leader'] == 0
assert obj['topics'][1]['partitions'][0]['replicas'] == [0, 1]
assert obj['topics'][1]['partitions'][0]['isr'] == [0, 1]
assert obj['topics'][1]['partitions'][0]['offline_replicas'] == []

tc.encode()