zalando-nakadi / bubuku Goto Github PK
View Code? Open in Web Editor NEWstart, monitor and rebalance kafka cluster in AWS setup
License: MIT License
start, monitor and rebalance kafka cluster in AWS setup
License: MIT License
swap_partitions action should not run if size stats of at least one broker is older than the previous run time of swap_partitions action. This check should be added to avoid the problem that the wrong brokers are selected as fat and slim because the size-stats is too old.
Right now rebalance is made step by step with step size of 1 partition.
Maybe it will be better to transfer more partitions at a time (5, 10, or by leader count), and it will be faster.
Actually, there is only one action to cancel for now - rebalance. But It should be possible to cancel any action type (even queued)
Right now they are rebalanced as usual, but they actually can't be balanced - all replicas are dead for them
Right now if leader election can't proceed (for some strange reason) Broker won't start at all. Probably it will be good idea to start after waiting for leader election for some big amount of time (10 minutes for example)
graceful_terminate works with initial version of controller, but it may happen that controller is already changed, and it's not working anymore, but shutdown hook is already installed...
Right now it can move leadership. this is not acceptable, because leadership is really well balanced across nodes.
Currently rebalance process treats all topic/partitions as equal. To have a better data distribution rebalance should consider data size of each topic/partition.
It is good to have some kind of local environment to test the new features or reproduce the bugs. Currently we do not have any possibility to test what we did.
Checks should have a configurable execution period to allow more flexible checks scheduling.
The change is RestartBrokerChange, but string representation is RestartOnZkChange, also file name is quite frustrating: restart_on_zk_change.py
Hi,
we have a Kafka cluster and I want to move partitions to other nodes. As far as I understand, this is what bubuku-cli migrate
is for.
However when I call it like this:
$ bubuku-cli stats
Broker Id Free kb Used kb
52397540 464380884 42792420
52399021 4930335540 26419188
52400677 4801094116 155660612
52402004 478177812 28995492
52404200 486099964 21073340
$ bubuku-cli migrate --from "52399021,52400677" --to "52402004,52404200,52397540"
I get the following error:
INFO:kazoo.client:Zookeeper session lost, state: CLOSED
Traceback (most recent call last):
File "/usr/local/bin/bubuku-cli", line 11, in <module>
sys.exit(cli())
File "/usr/local/lib/python3.5/dist-packages/click/core.py", line 716, in __call__
return self.main(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/click/core.py", line 696, in main
rv = self.invoke(ctx)
File "/usr/local/lib/python3.5/dist-packages/click/core.py", line 1060, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/usr/local/lib/python3.5/dist-packages/click/core.py", line 889, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/usr/local/lib/python3.5/dist-packages/click/core.py", line 534, in invoke
return callback(*args, **kwargs)
File "/usr/local/lib/python3.5/dist-packages/bubuku/cli.py", line 94, in migrate_broker
RemoteCommandExecutorCheck.register_migration(zookeeper, from_.split(','), to.split(','), shrink, broker_id)
AttributeError: 'NoneType' object has no attribute 'split'
Seems like either from
or to
is not picked up correctly by click
? Should I use a different format?
Thanks!
We had situations, when the broker was restarted without any clear reasons.
The change checks if the broker is running and registered in zk. I have assumption, that the broker was running normally, but the connection to zk was lost, which triggered restart of the broker, because it was not possible to check if it is registered or not.
So, I suggest to extend logging for the check in order to identify the reason of restarts.
Sometimes leadership fails to became balanced.
The reason is that according to weights distribution it is possible to move partitions that are already not present on broker.
Test rebalance with dead brokers
If there are no changes in queue, than there is no need to take lock for processing. Right now bubuku tries to take lock on each step (5 seconds) which means that zk now can not be deployed on t2 instances because of credits usage.
There should be an autoscale feature in bubuku to scale up/down the cluster according to CPU/disk usage of kafka nodes.
Currently kafka restart is not very safe IMHO. With buku we had a situation that kafka did a start before the old one finished shutdown. We also do what buku does:
process.terminate()
process.wait()
So that doesn't guarantee that old kafka stopped.
I know that here we additionally check that node doesn't exist in ZK any more.
But I think to be on the safe side we can also check that Java process has ended (in a hacky way)
The problem is that if bubuku is in docker, than kafka instance will be terminated immediately (without writing data).
In our case it was like this:
Aug 19 10:58:07 ip-172-31-139-127 docker/26f64e7b1654[888]: WARNING:kazoo.client:Connection dropped: socket connection error: None
Aug 19 10:58:07 ip-172-31-139-127 docker/26f64e7b1654[888]: INFO:kazoo.client:Connecting to 172.31.172.94:2181
Aug 19 10:58:07 ip-172-31-139-127 docker/26f64e7b1654[888]: WARNING:kazoo.client:Connection dropped: socket connection broken
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: INFO Opening socket connection to server ip-172-31-174-143.eu-west-1.compute.internal/172.31.174.143:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: Traceback (most recent call last):
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: File "/usr/local/lib/python3.5/dist-packages/kazoo/retry.py", line 123, in __call__
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: return func(*args, **kwargs)
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: File "/usr/local/lib/python3.5/dist-packages/kazoo/client.py", line 1026, in get
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: return self.get_async(path, watch).get()
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: File "/usr/local/lib/python3.5/dist-packages/kazoo/handlers/utils.py", line 72, in get
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: raise self._exception
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: kazoo.exceptions.SessionExpiredError
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]:
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: During handling of the above exception, another exception occurred:
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]:
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: Traceback (most recent call last):
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: File "/usr/local/bin/bubuku", line 11, in <module>
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: sys.exit(main())
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: File "/usr/local/lib/python3.5/dist-packages/bubuku/daemon.py", line 81, in main
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: controller.loop()
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: File "/usr/local/lib/python3.5/dist-packages/bubuku/controller.py", line 125, in loop
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: self.make_step(ip)
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: File "/usr/local/lib/python3.5/dist-packages/bubuku/controller.py", line 143, in make_step
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: self._add_change_to_queue(check.check_if_time())
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: File "/usr/local/lib/python3.5/dist-packages/bubuku/controller.py", line 37, in check_if_time
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: return self.check()
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: File "/usr/local/lib/python3.5/dist-packages/bubuku/features/restart_if_dead.py", line 58, in check
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: if self.broker.is_running_and_registered():
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: File "/usr/local/lib/python3.5/dist-packages/bubuku/broker.py", line 29, in is_running_and_registered
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: return self.id_manager.is_registered()
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: File "/usr/local/lib/python3.5/dist-packages/bubuku/id_generator.py", line 62, in is_registered
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: return self.zk.is_broker_registered(self.broker_id)
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: File "/usr/local/lib/python3.5/dist-packages/bubuku/zookeeper/__init__.py", line 166, in is_broker_registered
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: _, stat = self.exhibitor.get('/brokers/ids/{}'.format(broker_id))
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: File "/usr/local/lib/python3.5/dist-packages/bubuku/zookeeper/__init__.py", line 110, in get
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: return self.client.retry(self.client.get, *params)
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: File "/usr/local/lib/python3.5/dist-packages/kazoo/client.py", line 273, in _retry
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: return self._retry.copy()(*args, **kwargs)
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: File "/usr/local/lib/python3.5/dist-packages/kazoo/retry.py", line 136, in __call__
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: raise RetryFailedError("Exceeded retry deadline")
Aug 19 10:58:08 ip-172-31-139-127 docker/26f64e7b1654[888]: kazoo.retry.RetryFailedError: Exceeded retry deadline
In case of any unpredictable errors bubuku should reinitialize itself without terminating kafka, and continue to work.
Now if exhibitor list can't be refreshed, than AWSExhibitorAddressProvider will query data from master_exhibitors. But it's much better to query data from AWS to get new exhibitor list from lb name.
And of course it's better not to start kafka instances if there are no exhibitor instances.
Maybe it's better not to start kafka if exhibitor list is empty.
Right now rebalance process thinks that 1,2,3 and 1,3,2 distribution is different and tries to move it (sometimes to [4,5,6]). Copying data does not makes any sense, so it's better not to make difference between 1,2,3 and 1,3,2 and use (leader_count, overall_partition_count) as optimization strategy
In order to work correctly there is need to refresh free space for node more frequently, than swapping partitions. The problem is that partition size is changed dynamically right after rebalance, and that means that if there are several swap data events in queue, they will do bad things one after another, because broker free space is not updated after rebalance process.
in case of unclean leader election it may work, but for other cases - not
Right now bubuku wants to use exhibitor, but it must be possible to use it with Zookeeper only.
When the new node is added to cluster and rebalance starts the old nodes sometimes get new partitions. In my understanding the rebalance should happen in a way that partitions are only moved from old nodes to the new one (in other case it can result in that old nodes can run out of disk space)
On very first start kafka will generate file "metadata.properties" that will hold broker.id
that will be used in all subsequent starts.
During restarts kafka won't used broker.id from server.properties, it will use one from metadata.properties. bubuku should respect this.
Right not bubuku supports configuration only by environment properties.
It must be possible to configure it using different means (file based configuration)
Right now on kafka start bubuku tries to do next steps:
Initial wait timeout is 300 seconds, but for some environments it should be significantly bigger (30 minutes, for example), or step of increase must be progressive.
Expanding from and shrinking to list:
First Thing to do (for all partitions):
[b1, b2, b3] -> [b1, b2, b3, n1, n2, n3]
Second thing:
[b1, b2, b3, n1, n2, n3] -> [n1, n2, n3]
If unhandled exception will occur in run_daemon_loop cycle, that bubuku process will try to restart it's own work cycle, and recreate all internal entities (having in mind existing kafka process).
But if in that case AmazonEnvProvder.get_region() will return null, than it won't be able to refresh exhibitor address list, and that will start unlimited cycle of exceptions (see AWSExhibitorAddressProvider.get_addresses_by_lb_name).
Right now it's impossible to list currently running actions on bubuku instances. It should be possible using cli tool and ui.
Every time before kafka starts, bubuku tries to do a lot of requests to zookeeper. The start process is delayed for 5 minutes (on 2000 topics and zk cross-region setup), Rebalance is delayed for 3 minutes. Can't work with it...
Right now it tries to restart kafka immediatly after figuring out that exhibitor list changed. Actually one need to wait for some time before doing this.
In case when broker instance dies, rebalance is added to actions queue, But 'start' action forces rebalance not to process (started rebalance is removed because of start progress), ongoing rebalances are removed during other (or this) broker restart.
Root cause of this bug - 'start' and 'rebalance' actions are added to queue in the same time, (first - rebalance, then - start), then in run() rebalance says that it will stop because there is start in progress, and start just starts.
Now rebalance is triggered only by some actions, like broker list change or new instance start. It will be good to have possibility to run commands to bubuku remotely, (for example - create special nodes in zk with actions to run, and periodicaly check for it)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.