Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can't add a consumer to an existing group with a StickyPartitionAssignor #2153

Closed
joein opened this issue Oct 30, 2020 · 3 comments
Closed

Comments

@joein
Copy link

joein commented Oct 30, 2020

It is not possible to add a new consumer to an existing group with a StickyPartitionAssignor due to the lack of __len__ method in dict_itemiterator.

Kafka version 1.0.1 (https://archive.apache.org/dist/kafka/1.0.1/kafka_2.11-1.0.1.tgz)
Python 3.8
kafka-python 2.0.2

Suppose we have first_consumer.py:

from kafka import KafkaConsumer
from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignoraddr = "127.0.0.1:9092"
group_id = "some_group_id"
topic = ["first_topic"]
consumer = KafkaConsumer(
*topic,
 
                         partition_assignment_strategy=(StickyPartitionAssignor,), 
                         bootstrap_servers=addr,
  
                         group_id=group_id)
while True:
    consumer.poll(timeout_ms=4000, max_records=1)
    print(consumer.assignment())

And second_consumer.py

from kafka import KafkaConsumer
from kafka.coordinator.assignors.sticky.sticky_assignor import StickyPartitionAssignoraddr = "127.0.0.1:9092"
group_id = "some_group_id"
topic = ["second_topic"]
consumer = KafkaConsumer(
*topic,
 
                         partition_assignment_strategy=(StickyPartitionAssignor,), 
                         bootstrap_servers=addr,
  
                         group_id=group_id)
while True:
    consumer.poll(timeout_ms=4000, max_records=1)
    print(consumer.assignment())

Then we launch our script first_consumer.py and wait him to get his assignment and then launch the second script second_consumer.py.
This sequence leads to join_group_request during the second consumer connection, which in turn calls assignor.metadata(). Eventually, in assignor.metadata we are trying to encode user_data:

return b''.join(
           [Int32.encode(len(items))] +
           [self.array_of.encode(item) for item in items]
       )

But items is type of dict_iteritem (which is six.iteritems) and has no __len__ defined. And this is the reason for the following exception:

File "first_consumer.py", line 12, in consume
    records = consumer.poll(timeout_ms=4000, max_records=1)
  File "/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 655, in poll
    records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
  File "/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 675, in _poll_once
    self._coordinator.poll()
  File "/venv/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 289, in poll
    self.ensure_active_group()
  File "/venv/lib/python3.8/site-packages/kafka/coordinator/base.py", line 390, in ensure_active_group
    future = self._send_join_group_request()
  File "/venv/lib/python3.8/site-packages/kafka/coordinator/base.py", line 453, in _send_join_group_request
    for protocol, metadata in self.group_protocols()
  File "/venv/lib/python3.8/site-packages/kafka/coordinator/consumer.py", line 154, in group_protocols
    metadata = assignor.metadata(self._joined_subscription)
  File "/venv/lib/python3.8/site-packages/kafka/coordinator/assignors/sticky/sticky_assignor.py", line 660, in metadata
    user_data = data.encode()
  File "/venv/lib/python3.8/site-packages/kafka/util.py", line 50, in __call__
    return self.method()(self.target(), *args, **kwargs)
  File "/venv/lib/python3.8/site-packages/kafka/protocol/struct.py", line 42, in _encode_self
    return self.SCHEMA.encode(
  File "/venv/lib/python3.8/site-packages/kafka/protocol/types.py", line 146, in encode
    return b''.join([
  File "/venv/lib/python3.8/site-packages/kafka/protocol/types.py", line 147, in <listcomp>
    field.encode(item[i])
  File "/venv/lib/python3.8/site-packages/kafka/protocol/types.py", line 185, in encode
    [Int32.encode(len(items))] +
TypeError: object of type 'dict_itemiterator' has no len()

Maybe @aynroot can help here.

@jeffwidman
Copy link
Contributor

Wow, fantastic bug report! thank you so much

@jeffwidman
Copy link
Contributor

@joein does #2154 fix it for you?

@joein
Copy link
Author

joein commented Nov 2, 2020

Yeah, it doesn't crush anymore. Feel free to close this issue :)

Thanks @aynroot @jeffwidman

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants