Git Product home page Git Product logo

Comments (2)

Mr-Dai avatar Mr-Dai commented on July 22, 2024

The current implementation uses the same thread (etcd3.watch.Watcher.run) to execute user-defined callbacks and get watch id from etcd when user is invoking add_callback:

def run(self):
    try:
        for response in self._watch_response_iterator:
            if response.created:
                self._watch_id_callbacks[response.watch_id] = self._callback
                self._watch_id_queue.put(response.watch_id)   # Return the new watch id to user

            callback = self._watch_id_callbacks.get(response.watch_id)
            if callback:
                for event in response.events:
                    callback(events.new_event(event))  # Invoked user-defined callbacks
    except grpc.RpcError as e:
        # ...

While the watcher is running user-defined callback, it pauses at the statement callback(events.new_event(event)) on the last line of the try clause. In this situation, the add_callback will block indefinitely (if the user didn't set the timeout parameter), waiting for the watcher to return the watch id, which can never happen as Watcher.run already paused:

def add_callback(self, key, callback, range_end=None, start_revision=None, progress_notify=False,
                 filters=None, prev_kv=False):
    with self._watch_id_lock:
        # ....
        self._watch_requests_queue.put((request, callback))
        return self._watch_id_queue.get(timeout=self.timeout)  # Deadlock if invoked by `Watcher.run`

from python-etcd3.

Mr-Dai avatar Mr-Dai commented on July 22, 2024

I managed to bypass this problem by using 2 Etcd3Clients:

import random
import threading

import etcd3

class EtcdClient(object):

    def __init__(self, host='127.0.0.1', port=2379, timeout=None):
        random.seed()
        self._etcds = [etcd3.client(host=host, port=port, timeout=timeout),
                       etcd3.client(host=host, port=port, timeout=timeout)]
        self._watcher_idents = {etcd.watcher.ident: etcd
                                for etcd in self._etcds}
        assert self._watcher_idents.keys()[0] != self._watcher_idents.keys()[1]

    @property
    def _etcd(self):
        current_ident = threading.current_thread().ident
        if current_ident not in self._watcher_idents:
            return self._etcds[random.randint(0, 1)]  # Load balancing ;-P
        else:
            for watcher_idents, etcd in self._watcher_idents.items():
                if watcher_idents != current_ident:
                    return etcd  # Use the other client to handle the request

    def add_watch_callback(self, *args, **kwargs):
        etcd = self._etcd
        watch_id = etcd.add_watch_callback(self.task_key_prefix,
                                           _inner_callback,
                                           range_end=range_end)
        return etcd, watch_id  # The user should treat this tuple as a whole at all time

    def cancel_watch(self, watch_id):
        etcd, watch_id = watch_id
        etcd.cancel_watch(watch_id)

The way I implement cancel_watch is not perfect, but it's effective.

Hope this can help.

from python-etcd3.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.