Comments (17)
Hi @MykolaBordakov,
could you please give some details, like code snippets and ways to reproduce the issue?
from rq.
Hi @yurnov
Yes, here code exmple:
`from redis.rediscluster import RedisCluster
from rq.worker import Worker
from rq.job import Job
from rq.queue import Queue
def main():
queues = [env.redis_queue]
connection = RedisCluster(host=env.redis_service, port=env.redis_port, password=env.redis_password,
username=env.redis_user, ssl_keyfile=env.redis_key, ssl_certfile=env.redis_cert,
ssl=True, ssl_cert_reqs=None)
worker = Worker(connection=connection,
queues = queues ,
job_class=Job,
queue_class=Queue)
worker.work(logging_level=env.log_level)
if name == 'main':
main()`
Code does not works only on new rq version. ()
from rq.
I experience the problem as described above.
I am establishing a connection to a RedisCluster
as follows:
from redis.cluster import RedisCluster
rc = RedisCluster(host='***', port=6379, ssl=True, decode_responses=True)
print(rc.info("server"))
print(rc.execute_command('CLUSTER', 'INFO'))
Outputs:
{'redis_version': 7.1, 'redis_mode': 'cluster', 'arch_bits': 64, 'run_id': 0}
cluster_state:ok
cluster_slots_assigned:16384
cluster_slots_ok:16384
cluster_slots_pfail:0
cluster_slots_fail:0
cluster_known_nodes:2
cluster_size:1
cluster_current_epoch:0
cluster_my_epoch:0
cluster_stats_messages_sent:0
cluster_stats_messages_received:0
total_cluster_links_buffer_limit_exceeded:0
Then I am trying to create an RQ Worker
over this connection:
from rq import Queue, Worker
q = Queue(connection=rc)
w = Worker(connection=rc, queues=[q])
w.work()
I end up with the error as follow:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
Cell In[4], line 4
1 from rq import Queue, Worker
3 q = Queue(connection=rc)
----> 4 w = Worker(connection=rc, queues=[q])
5 w.work()
File ~/.pyenv/versions/3.11.7/lib/python3.11/site-packages/rq/worker.py:148, in BaseWorker.__init__(self, queues, name, default_result_ttl, connection, exc_handler, exception_handlers, default_worker_ttl, maintenance_interval, job_class, queue_class, log_job_description, job_monitoring_interval, disable_default_exception_handler, prepare_for_work, serializer, work_horse_killed_handler)
145 self.job_monitoring_interval = job_monitoring_interval
146 self.maintenance_interval = maintenance_interval
--> 148 connection = self._set_connection(connection)
149 self.connection = connection
150 self.redis_server_version = None
File ~/.pyenv/versions/3.11.7/lib/python3.11/site-packages/rq/worker.py:299, in BaseWorker._set_connection(self, connection)
297 if connection is None:
298 connection = get_current_connection()
--> 299 current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout")
300 if current_socket_timeout is None:
301 timeout_config = {"socket_timeout": self.connection_timeout}
AttributeError: 'RedisCluster' object has no attribute 'connection_pool'
This is a major blocker for our organization internally.
@yurnov, @selwin, is there any further information or reproduction I can provide?
I would be also glad to contribute a fix myself.
A more general, but related question: can you foresee any other limitations of rq
when being used with a RedisCluster
?
If so, what are these limitations? What do you think, what efforts would be required to address these limitations (if any)?
from rq.
It seems that this PR should solve this issue. It's included to redis version 5.1.0b1 and later
@MykolaBordakov @peter-gy could you please test your issue with https://github.com/redis/redis-py/releases/tag/v5.1.0b3?
from rq.
@yurnov I just did a pip install redis==5.1.0b3
but the same error is present.
import redis
redis.__version__
outputs '5.1.0b3'
, so I am sure the source changes got picked up.
from rq.
Hey, so it seems like _set_connection(connection: Redis) does not work with RedisCluster connection.
Unfortunately I don't use Redis cluster, nor do I have access to a dev environment with Redis cluster. If you can open a pull request fixing this, I'll gladly merge this in.
Essentially what this part of the code is doing is trying to recreate a Redis connection to be used by other parts of the Worker.
from rq.
@selwin We'd be happy to sponsor the project if that can help pay for a small testing Redis Cluster. It's really easy to setup on AWS. We can also help with the setup if necessary.
from rq.
@ghalimi if you can provide a Redis Cluster that I can use to test, I can try to debug this issue sometime in the next few days.
from rq.
Hi @selwin
You can run cluster by docker. Here valid compose file https://github.com/bitnami/containers/blob/main/bitnami/redis-cluster/docker-compose.yml
from rq.
Hi @selwin
I have done some fix things. Issue become possible because in RedisCluster connection_pool was gone to other class that calls Node. I have update method in worker.py
def _set_connection(self, connection: Optional['Redis']) -> 'Redis':
"""Configures the Redis connection to have a socket timeout.
This should timouet the connection in case any specific command hangs at any given time (eg. BLPOP).
If the connection provided already has a socket_timeout
defined, skips.
Args:
connection (Optional[Redis]): The Redis Connection.
"""
if connection is None:
connection = get_current_connection()
try:
current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout")
if current_socket_timeout is None:
timeout_config = {"socket_timeout": self.connection_timeout}
connection.connection_pool.connection_kwargs.update(timeout_config)
except AttributeError:
nodes = connection.get_nodes()
for node in nodes:
current_socket_timeout = node.redis_connection.connection_pool.connection_kwargs.get("socket_timeout")
if current_socket_timeout is None:
timeout_config = {"socket_timeout": self.connection_timeout}
node.redis_connection.connection_pool.connection_kwargs.update(timeout_config)
return connection
This update fix such issue. But now, i have a new one. Worker starts well. But when it try to use method lmove in Queue class it got such error "redis.exceptions.RedisClusterException: BLMOVE - all keys must map to the same key slot"
This error connected with Redis.py . Maybe you have some ideas about it??
from rq.
Ah, in this case if Redis cluster is used, you can't use queue.lmove
. You should fallback to queue.lpop()
Line 1364 in 2f5fecc
from rq.
Hi @MykolaBordakov,
could you please prepare a PR with your fix in worker.py
and queue.py
from rq.
Yes. I have clean my code. Can try do PR.
from rq.
Hi @selwin
How can i create PR to yours repo?
Now i heve this:
remote: Permission to rq/rq.git denied to MykolaBordakov.
fatal: unable to access 'https://github.com/rq/rq.git/': The requested URL returned error: 403
I trying to push new branch with commit. My command here: git push --set-upstream origin support-redis-cluster
from rq.
Hi @selwin
I prepared PR. It is here: #2030
from rq.
Hi @selwin
Did you see my PR?
Thank you.
from rq.
Related Issues (20)
- Don't call cleanup from count and get_job_id
- redis.exceptions.ResponseError: Command # 1 (SADD rq:queues rq:queue:test) of pipeline caused error: MOVED 8713 mxxx.euw1.cache.amazonaws.com:6379 HOT 1
- strange interactions between last_cleaned_at, maintenance_interval and rq:clean_registries lock key HOT 1
- Dependencies specified with Dependency(allow_failure=True) are never enqueued when a job is moved to FailedJobRegistry due to AbandonedJobError
- Can we pass custom args to a callback (on_failure) HOT 2
- Worker rq:worker:... found an unhandled exception, quitting (ModuleNotFoundError: No module named 'resource') HOT 5
- Jobs being lost when killing workers HOT 2
- Make registry cleaning lock expiry configurable HOT 2
- Programmatically create more workers at run-time by using Django_rq
- Seg fault 11 on setprocname HOT 1
- execute code with workhorse PID after the workhorse exits HOT 1
- ValueError: Invalid attribute name for job callback HOT 1
- Job.get_status() does not always return JobStatus Enum
- database connection issues when using rq with flask-sqlalchemy HOT 1
- Work-horse termination HOT 1
- DeprecationWarning: datetime.datetime.utcnow() HOT 1
- Rq Worker.all cannot find worker
- Are two connections per worker needed? HOT 1
- keys of command in MULTI calls must be in same slot HOT 3
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from rq.