Git Product home page Git Product logo

txredisapi's Introduction

txredisapi

Build Status

For the latest source code, see http://github.com/IlyaSkriblovsky/txredisapi

txredisapi is a non-blocking client driver for the redis database, written in Python. It uses Twisted for the asynchronous communication with redis.

It started as a fork of the original redis protocol for twisted, and evolved into a more robust, reliable, and complete solution for applications like web servers. These types of applications often need a fault-tolerant pool of connections with multiple redis servers, making it possible to easily develop and maintain distributed systems.

Most of the redis commands are supported, as well as other features such as silent reconnection, connection pools, and automatic sharding.

This driver is distributed as part of the cyclone web framework.

Changelog

See CHANGELOG.md

Features

  • Connection Pools
  • Lazy Connections
  • Automatic Sharding
  • Automatic Reconnection
  • Connection using Redis Sentinel
  • Publish/Subscribe (PubSub)
  • Transactions
  • Unix Socket Connections

Install

Bear in mind that txredisapi.py is pure-python, in a single file. Thus, there's absolutely no need to install it. Instead, just copy it to your project directory and start using.

Latest source code is at https://github.com/IlyaSkriblovsky/txredisapi.

If you have cyclone, you probably already have it too. Try the following:

$ python
>>> import cyclone.redis
>>> cyclone.redis.version
'1.0'

However, if you really really insist in installing, get it from pypi:

pip install txredisapi

Unit Tests

Twisted Trial unit tests are available. Just start redis, and run trial ./tests. If unix sockets are disabled in redis, it will silently skip those tests.

Make sure you run redis-cli flushall to clean up redis after the tests.

Usage

First thing to do is choose what type of connection you want. The driver supports single connection, connection pools, sharded connections (with automatic distribution based on a built-in consistent hashing algorithm), sharded connection pools, and all of these different types can be lazy, which is explained later (because I'm lazy now).

Basically, you want normal connections for simple batch clients that connect to redis, execute a couple of commands and disconnect - like crawlers, etc.

Example:

#!/usr/bin/env python
# coding: utf-8

import txredisapi as redis

from twisted.internet import defer
from twisted.internet import reactor


@defer.inlineCallbacks
def main():
    rc = yield redis.Connection()
    print rc

    yield rc.set("foo", "bar")
    v = yield rc.get("foo")
    print "foo:", repr(v)

    yield rc.disconnect()


if __name__ == "__main__":
    main().addCallback(lambda ign: reactor.stop())
    reactor.run()

Easily switch between redis.Connection() and redis.ConnectionPool() with absolutely no changes to the logic of your program.

These are all the supported methods for connecting to Redis::

Connection(host, port, dbid, reconnect, charset)
lazyConnection(host, port, dbid, reconnect, charset)

ConnectionPool(host, port, dbid, poolsize, reconnect, charset)
lazyConnectionPool(host, port, dbid, poolsize, reconnect, charset)

ShardedConnection(hosts, dbid, reconnect, charset)
lazyShardedConnection(hosts, dbid, reconnect, charset)

ShardedConnectionPool(hosts, dbid, poolsize, reconnect, charset)
lazyShardedConnectionPool(hosts, dbid, poolsize, reconnect, charset)

UnixConnection(path, dbid, reconnect, charset)
lazyUnixConnection(path, dbid, reconnect, charset)

UnixConnectionPool(unix, dbid, poolsize, reconnect, charset)
lazyUnixConnectionPool(unix, dbid, poolsize, reconnect, charset)

ShardedUnixConnection(paths, dbid, reconnect, charset)
lazyShardedUnixConnection(paths, dbid, reconnect, charset)

ShardedUnixConnectionPool(paths, dbid, poolsize, reconnect, charset)
lazyShardedUnixConnectionPool(paths, dbid, poolsize, reconnect, charset)

The arguments are:

  • host: the IP address or hostname of the redis server. [default: localhost]
  • port: port number of the redis server. [default: 6379]
  • path: path of redis server's socket [default: /tmp/redis.sock]
  • dbid: database id of redis server. [default: 0]
  • poolsize: how many connections to make. [default: 10]
  • reconnect: auto-reconnect if connection is lost. [default: True]
  • charset: string encoding. Do not decode/encode strings if None. [default: utf-8]
  • hosts (for sharded): list of host:port pairs. [default: None]
  • paths (for sharded): list of pathnames. [default: None]
  • password: password for the redis server. [default: None]
  • ssl_context_factory: Either a boolean indicating wether to use SSL/TLS or a specific ClientContextFactory. [default: False]

Connection Handlers

All connection methods return a connection handler object at some point.

Normal connections (not lazy) return a deferred, which is fired with the connection handler after the connection is established.

In case of connection pools, it will only fire the callback after all connections are set up, and ready.

Connection handler is the client interface with redis. It accepts all the commands supported by redis, such as get, set, etc. It is the rc object in the example below.

Connection handlers will automatically select one of the available connections in connection pools, and automatically reconnect to redis when necessary.

If the connection with redis is lost, all commands will raise the ConnectionError exception, to indicate that there's no active connection. However, if the reconnect argument was set to True during the initialization, it will continuosly try to reconnect, in background.

Example:

#!/usr/bin/env python
# coding: utf-8

import txredisapi as redis

from twisted.internet import defer
from twisted.internet import reactor


def sleep(n):
    d = defer.Deferred()
    reactor.callLater(5, lambda *ign: d.callback(None))
    return d


@defer.inlineCallbacks
def main():
    rc = yield redis.ConnectionPool()
    print rc

    # set
    yield rc.set("foo", "bar")

    # sleep, so you can kill redis
    print "sleeping for 5s, kill redis now..."
    yield sleep(5)

    try:
      v = yield rc.get("foo")
      print "foo:", v

      yield rc.disconnect()
    except redis.ConnectionError, e:
      print str(e)


if __name__ == "__main__":
    main().addCallback(lambda ign: reactor.stop())
    reactor.run()

Lazy Connections

This type of connection will immediately return the connection handler object, even before the connection is made.

It will start the connection, (or connections, in case of connection pools) in background, and automatically reconnect if necessary.

You want lazy connections when you're writing servers, like web servers, or any other type of server that should not wait for the redis connection during the initialization of the program.

The example below is a web application, which will expose redis set, get and delete commands over HTTP.

If the database connection is down (either because redis is not running, or whatever reason), the web application will start normally. If connection is lost during the operation, nothing will change.

When there's no connection, all commands will fail, therefore the web application will respond with HTTP 503 (Service Unavailable). It will resume to normal once the connection with redis is re-established.

Try killing redis server after the application is running, and make a couple of requests. Then, start redis again and give it another try.

Example:

#!/usr/bin/env python
# coding: utf-8

import sys

import cyclone.web
import cyclone.redis
from twisted.internet import defer
from twisted.internet import reactor
from twisted.python import log


class Application(cyclone.web.Application):
    def __init__(self):
      handlers = [ (r"/text/(.+)", TextHandler) ]

      RedisMixin.setup()
      cyclone.web.Application.__init__(self, handlers, debug=True)


class RedisMixin(object):
    redis_conn = None

    @classmethod
    def setup(self):
        RedisMixin.redis_conn = cyclone.redis.lazyConnectionPool()


# Provide GET, SET and DELETE redis operations via HTTP
class TextHandler(cyclone.web.RequestHandler, RedisMixin):
    @defer.inlineCallbacks
    def get(self, key):
      try:
          value = yield self.redis_conn.get(key)
      except Exception, e:
          log.msg("Redis failed to get('%s'): %s" % (key, str(e)))
          raise cyclone.web.HTTPError(503)

      self.set_header("Content-Type", "text/plain")
      self.write("%s=%s\r\n" % (key, value))

    @defer.inlineCallbacks
    def post(self, key):
        value = self.get_argument("value")
        try:
            yield self.redis_conn.set(key, value)
        except Exception, e:
            log.msg("Redis failed to set('%s', '%s'): %s" % (key, value, str(e)))
            raise cyclone.web.HTTPError(503)

        self.set_header("Content-Type", "text/plain")
        self.write("%s=%s\r\n" % (key, value))

    @defer.inlineCallbacks
    def delete(self, key):
        try:
            n = yield self.redis_conn.delete(key)
        except Exception, e:
            log.msg("Redis failed to del('%s'): %s" % (key, str(e)))
            raise cyclone.web.HTTPError(503)

        self.set_header("Content-Type", "text/plain")
        self.write("DEL %s=%d\r\n" % (key, n))


def main():
    log.startLogging(sys.stdout)
    reactor.listenTCP(8888, Application(), interface="127.0.0.1")
    reactor.run()


if __name__ == "__main__":
    main()

This is the server running in one terminal::

$ ./helloworld.py
2012-02-17 15:40:25-0500 [-] Log opened.
2012-02-17 15:40:25-0500 [-] Starting factory <redis.Factory instance at 0x1012f0560>
2012-02-17 15:40:25-0500 [-] __main__.Application starting on 8888
2012-02-17 15:40:25-0500 [-] Starting factory <__main__.Application instance at 0x100f42290>
2012-02-17 15:40:53-0500 [RedisProtocol,client] 200 POST /text/foo (127.0.0.1) 1.20ms
2012-02-17 15:41:01-0500 [RedisProtocol,client] 200 GET /text/foo (127.0.0.1) 0.97ms
2012-02-17 15:41:09-0500 [RedisProtocol,client] 200 DELETE /text/foo (127.0.0.1) 0.65ms
(killed redis-server)
2012-02-17 15:48:48-0500 [HTTPConnection,0,127.0.0.1] Redis failed to get('foo'): Not connected
2012-02-17 15:48:48-0500 [HTTPConnection,0,127.0.0.1] 503 GET /text/foo (127.0.0.1) 2.99ms

And these are the requests, from curl in another terminal.

Set:

$ curl -D - -d "value=bar" http://localhost:8888/text/foo
HTTP/1.1 200 OK
Content-Length: 9
Content-Type: text/plain

foo=bar

Get:

$ curl -D - http://localhost:8888/text/foo
HTTP/1.1 200 OK
Content-Length: 9
Etag: "b63729aa7fa0e438eed735880951dcc21d733676"
Content-Type: text/plain

foo=bar

Delete:

$ curl -D - -X DELETE http://localhost:8888/text/foo
HTTP/1.1 200 OK
Content-Length: 11
Content-Type: text/plain

DEL foo=1

When redis is not running:

$ curl -D - http://localhost:8888/text/foo
HTTP/1.1 503 Service Unavailable
Content-Length: 89
Content-Type: text/html; charset=UTF-8

<html><title>503: Service Unavailable</title>
<body>503: Service Unavailable</body></html>

Sharded Connections

They can be normal, or lazy connections. They can be sharded connection pools. Not all commands are supported on sharded connections.

If the command you're trying to run is not supported on sharded connections, the connection handler will raise the NotImplementedError exception.

Simple example with automatic sharding of keys between two redis servers:

#!/usr/bin/env python
# coding: utf-8

import txredisapi as redis

from twisted.internet import defer
from twisted.internet import reactor


@defer.inlineCallbacks
def main():
    rc = yield redis.ShardedConnection(["localhost:6379", "localhost:6380"])
    print rc
    print "Supported methods on sharded connections:", rc.ShardedMethods

    keys = []
    for x in xrange(100):
        key = "foo%02d" % x
        yield rc.set(key, "bar%02d" % x)
        keys.append(key)

    # yey! mget is supported!
    response = yield rc.mget(keys)
    for val in response:
        print val

    yield rc.disconnect()


if __name__ == "__main__":
    main().addCallback(lambda ign: reactor.stop())
    reactor.run()

Transactions

For obvious reasons, transactions are NOT supported on sharded connections. But they work pretty good on normal or lazy connections, and connection pools.

NOTE: redis uses the following methods for transactions:

  • WATCH: synchronization
  • MULTI: start the transaction
  • EXEC: commit the transaction
  • DISCARD: you got it.

Because exec is a reserved word in Python, the command to commit is commit.

Example:

#!/usr/bin/env python
# coding: utf-8

import txredisapi as redis

from twisted.internet import defer
from twisted.internet import reactor


@defer.inlineCallbacks
def main():
    rc = yield redis.ConnectionPool()

    # Remove the keys
    yield rc.delete(["a1", "a2", "a3"])

    # Start transaction
    t = yield rc.multi()

    # These will return "QUEUED" - even t.get(key)
    yield t.set("a1", "1")
    yield t.set("a2", "2")
    yield t.set("a3", "3")
    yield t.get("a1")

    # Try to call get() while in a transaction.
    # It will fail if it's not a connection pool, or if all connections
    # in the pool are in a transaction.
    # Note that it's rc.get(), not the transaction object t.get().
    try:
        v = yield rc.get("foo")
    print "foo=", v
        except Exception, e:
        print "can't get foo:", e

    # Commit, and get all responses from transaction.
    r = yield t.commit()
    print "commit=", repr(r)

    yield rc.disconnect()


if __name__ == "__main__":
    main().addCallback(lambda ign: reactor.stop())
    reactor.run()

A "COUNTER" example, using WATCH/MULTI:

 #!/usr/bin/env python
 # coding: utf-8

 import txredisapi as redis

 from twisted.internet import defer
 from twisted.internet import reactor


 @defer.inlineCallbacks
 def main():
     rc = yield redis.ConnectionPool()

     # Reset keys
     yield rc.set("a1", 0)

     # Synchronize and start transaction
     t = yield rc.watch("a1")

     # Load previous value
     a1 = yield t.get("a1")

     # start the transactional pipeline
     yield t.multi()

     # modify and retrieve the new a1 value
     yield t.set("a1", a1 + 1)
     yield t.get("a1")

     print "simulating concurrency, this will abort the transaction"
     yield rc.set("a1", 2)

     try:
         r = yield t.commit()
         print "commit=", repr(r)
     except redis.WatchError, e:
         a1 = yield rc.get("a1")
         print "transaction has failed."
         print "current a1 value: ", a1

     yield rc.disconnect()


 if __name__ == "__main__":
     main().addCallback(lambda ign: reactor.stop())
     reactor.run()

Calling commit will cause it to return a list with the return of all commands executed in the transaction. discard, on the other hand, will normally return just an OK.

Pipelining

txredisapi automatically pipelines all commands by sending next commands without waiting for the previous one to receive reply from server. This works even on single connections and increases performance by reducing number of round-trip delays and. There are two exceptions, though:

  • no commands will be sent after blocking blpop, brpop or brpoplpush until response is received;
  • transaction by multi/commit are also blocking connection making all other commands to wait until transaction is executed.

When you need to load tons of data to Redis it might be more effective to sent commands in batches grouping them together offline to save on TCP packets and network stack overhead. You can do this using pipeline method to explicitly accumulate commands and send them to server in a single batch. Be careful to not accumulate too many commands: unreasonable batch size may eat up unexpected amount of memory on both client and server side. Group commands in batches of, for example, 10k commands instead of sending all your data at once. The speed will be nearly the same, but the additional memory used will be at max the amount needed to queue this 10k commands

To send commands in a batch:

#!/usr/bin/env python
# coding: utf-8

import txredisapi as redis

from twisted.internet import defer
from twisted.internet import reactor

@defer.inlineCallbacks
def main():
    rc = yield redis.ConnectionPool()

    # Start grouping commands
    pipeline = yield rc.pipeline()

    pipeline.set("foo", 123)
    pipeline.set("bar", 987)
    pipeline.get("foo")
    pipeline.get("bar")

    # Write those 2 sets and 2 gets to redis all at once, and wait
    # for all replies before continuing.
    results = yield pipeline.execute_pipeline()

    print "foo:", results[2] # should be 123
    print "bar:", results[3] # should be 987

    yield rc.disconnect()

if __name__ == "__main__":
    main().addCallback(lambda ign: reactor.stop())
    reactor.run()

Authentication

This is how to authenticate::

#!/usr/bin/env python

import txredisapi
from twisted.internet import defer
from twisted.internet import reactor


@defer.inlineCallbacks
def main():
    redis = yield txredisapi.Connection(password="foobared")
    yield redis.set("foo", "bar")
    print (yield redis.get("foo"))
    reactor.stop()


if __name__ == "__main__":
    main()
    reactor.run()

Connection using Redis Sentinel

txredisapi can discover Redis master and slaves addresses using Redis Sentinel and automatically failover in case of server failure.

#!/usr/bin/env python

from twisted.internet.task import react
import txredisapi

@defer.inlineCallbacks
def main(reactor):
    sentinel = txredisapi.Sentinel([("sentinel-a", 26379), ("sentinel-b", 26379), ("sentinel-c", 26379)])
    redis = sentinel.master_for("service_name")
    yield redis.set("foo", "bar")
    print (yield redis.get("foo"))
    yield redis.disconnect()
    yield sentinel.disconnect()
    
react(main)

Usual connection arguments like dbid=N or poolsize=N can be specified in master_for() call. Use sentinel.slave_for() to connect to one of the slaves instead of master.

Add min_other_sentinels=N to Sentinel constructor call to make it obey information only from sentinels that currently connected to specified number of other sentinels to minimize a risk of split-brain in case of network partitioning.

Credits

Thanks to (in no particular order):

  • Alexandre Fiori

    • Author of txredisapi
  • Gleicon Moraes

    • Bug fixes, testing, and RestMQ.
    • For writing the Consistent Hashing algorithm used for sharding.
  • Dorian Raymer and Ludovico Magnocavallo

    • Authors of the original redis protocol for twisted.
  • Vanderson Mota

    • Initial pypi setup, and patches.
  • Jeethu Rao

    • Contributed with test cases, and other ideas like support for travis-ci
  • Jeremy Archer

    • Minor bugfixes.
  • Christoph Tavan (@ctavan)

    • Idea and test case for nested multi bulk replies, minor command enhancements.
  • dgvncsz0f

    • WATCH/UNWATCH commands
  • Ilia Glazkov

    • Free connection selection algorithm for pools.
    • Non-unicode charset fixes.
    • SCAN commands
  • Matt Pizzimenti (mjpizz)

    • pipelining support
  • Nickolai Novik (jettify)

    • update of SET command
  • Evgeny Tataurov (etataurov)

    • Ability to use hiredis protocol parser
  • Ilya Skriblovsky (IlyaSkriblovsky)

    • Sentinel support

txredisapi's People

Contributors

a-kr avatar ctavan avatar dbravender avatar dgvncsz0f avatar e-max avatar ellietheyeen avatar etataurov avatar fatlotus avatar fiorix avatar fzambia avatar gleicon avatar iksteen avatar ilyaskriblovsky avatar ioga avatar jeethu avatar jettify avatar kbourgoin avatar lxfontes avatar minus7 avatar mjpizz avatar roeltm avatar shadowjonathan avatar steiza avatar tobixx avatar tommyvn avatar tonylazarew avatar tpena avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

txredisapi's Issues

exceptions.AttributeError: instance has no attribute 'factory'

After upgrading from 1.2 to 1.3 I constantly get errors across self.factory. I can't run txredisapi tests because of d11wtq/dockerpty#7, but test cases are extremly simple.

import txredisapi as redis
from twisted.internet import protocol
from twisted.internet import reactor


class RedisListenerProtocol(redis.SubscriberProtocol):
    pass

if __name__ == "__main__":
    host, port = "127.0.0.1", 6379
    protocol.ClientCreator(reactor, RedisListenerProtocol).connectTCP(host, port)
    reactor.run()
Unhandled error in Deferred:


Traceback (most recent call last):
  File "C:\Python27\lib\site-packages\twisted\internet\tcp.py", line 587, in doConnect
    self._connectDone()
  File "C:\Python27\lib\site-packages\twisted\internet\tcp.py", line 616, in _connectDone
    self.protocol.makeConnection(self)
  File "C:\Python27\lib\site-packages\twisted\internet\protocol.py", line 487, in makeConnection
    self.connectionMade()
  File "C:\Python27\lib\site-packages\twisted\internet\defer.py", line 1274, in unwindGenerator
    return _inlineCallbacks(None, gen, Deferred())
--- <exception caught here> ---
  File "C:\Python27\lib\site-packages\twisted\internet\defer.py", line 1128, in _inlineCallbacks
    result = g.send(result)
  File "C:\Python27\lib\site-packages\txredisapi.py", line 248, in connectionMade
    if self.factory.password is not None:
exceptions.AttributeError: RedisListenerProtocol instance has no attribute 'factory'
Unhandled Error
Traceback (most recent call last):
  File "C:\Python27\lib\site-packages\twisted\internet\defer.py", line 306, in addCallbacks
    self._runCallbacks()
  File "C:\Python27\lib\site-packages\twisted\internet\defer.py", line 588, in _runCallbacks
    current.result = callback(current.result, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\internet\base.py", line 430, in _continueFiring
    callable(*args, **kwargs)
  File "C:\Python27\lib\site-packages\twisted\internet\base.py", line 625, in disconnectAll
    failure.Failure(main.CONNECTION_LOST))
--- <exception caught here> ---
  File "C:\Python27\lib\site-packages\twisted\python\log.py", line 101, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\log.py", line 84, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\context.py", line 118, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\context.py", line 81, in callWithContext
    return func(*args,**kw)
  File "C:\Python27\lib\site-packages\twisted\internet\tcp.py", line 479, in connectionLost
    self._commonConnection.connectionLost(self, reason)
  File "C:\Python27\lib\site-packages\twisted\internet\tcp.py", line 293, in connectionLost
    protocol.connectionLost(reason)
  File "C:\Python27\lib\site-packages\txredisapi.py", line 285, in connectionLost
    self.factory.delConnection(self)
exceptions.AttributeError: RedisListenerProtocol instance has no attribute 'factory'
import txredisapi as redis
from twisted.internet import protocol
from twisted.internet import reactor


class RedisListenerProtocol(redis.SubscriberProtocol):
    def connectionMade(self):
        self.subscribe('channel_name')

if __name__ == "__main__":
    host, port = "127.0.0.1", 6379
    protocol.ClientCreator(reactor, RedisListenerProtocol).connectTCP(host, port)
    reactor.run()
Unhandled Error
Traceback (most recent call last):
  File "C:\Python27\lib\site-packages\twisted\python\log.py", line 101, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\log.py", line 84, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\context.py", line 118, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\context.py", line 81, in callWithContext
    return func(*args,**kw)
--- <exception caught here> ---
  File "C:\Python27\lib\site-packages\twisted\internet\selectreactor.py", line 149, in _doReadOrWrite
    why = getattr(selectable, method)()
  File "C:\Python27\lib\site-packages\twisted\internet\tcp.py", line 209, in doRead
    return self._dataReceived(data)
  File "C:\Python27\lib\site-packages\twisted\internet\tcp.py", line 215, in _dataReceived
    rval = self.protocol.dataReceived(data)
  File "C:\Python27\lib\site-packages\txredisapi.py", line 176, in dataReceived
    return self.rawDataReceived(data)
  File "C:\Python27\lib\site-packages\txredisapi.py", line 375, in rawDataReceived
    self.bulkDataReceived(bulk_buffer)
  File "C:\Python27\lib\site-packages\txredisapi.py", line 384, in bulkDataReceived
    el = self.tryConvertData(data)
  File "C:\Python27\lib\site-packages\txredisapi.py", line 395, in tryConvertData
    if self.factory.convertNumbers:
exceptions.AttributeError: RedisListenerProtocol instance has no attribute 'factory'

Unhandled Error
Traceback (most recent call last):
  File "C:\test2.py", line 13, in <module>
    reactor.run()
  File "C:\Python27\lib\site-packages\twisted\internet\base.py", line 1194, in run
    self.mainLoop()
  File "C:\Python27\lib\site-packages\twisted\internet\base.py", line 1206, in mainLoop
    self.doIteration(t)
  File "C:\Python27\lib\site-packages\twisted\internet\selectreactor.py", line 143, in doSelect
    _logrun(selectable, _drdw, selectable, method)
--- <exception caught here> ---
  File "C:\Python27\lib\site-packages\twisted\python\log.py", line 101, in callWithLogger
    return callWithContext({"system": lp}, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\log.py", line 84, in callWithContext
    return context.call({ILogContext: newCtx}, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\context.py", line 118, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "C:\Python27\lib\site-packages\twisted\python\context.py", line 81, in callWithContext
    return func(*args,**kw)
  File "C:\Python27\lib\site-packages\twisted\internet\selectreactor.py", line 154, in _doReadOrWrite
    self._disconnectSelectable(selectable, why, method=="doRead")
  File "C:\Python27\lib\site-packages\twisted\internet\posixbase.py", line 260, in _disconnectSelectable
    selectable.connectionLost(failure.Failure(why))
  File "C:\Python27\lib\site-packages\twisted\internet\tcp.py", line 479, in connectionLost
    self._commonConnection.connectionLost(self, reason)
  File "C:\Python27\lib\site-packages\twisted\internet\tcp.py", line 293, in connectionLost
    protocol.connectionLost(reason)
  File "C:\Python27\lib\site-packages\txredisapi.py", line 285, in connectionLost
    self.factory.delConnection(self)
exceptions.AttributeError: RedisListenerProtocol instance has no attribute 'factory'

ResponseError: NOWRITE You can't write against a non-write redis

Hi:

I encountered a problem that returned this error:

/env/lib/python2.7/site-packages/twisted/internet/defer.py\", line 577, in _runCallbacks"}
current.result = callback(current.result, *args, **kw)"}
/env/lib/python2.7/site-packages/txredisapi.py\", line 463, in handle_reply"}
{"":"    raise r"}
{"":"txredisapi.ResponseError: NOWRITE You can't write against a non-write redis."}

and I use txredisapi.ConnectionPool to build a connection pool.

The redis is a sharding cluster,master sometimes will encounter the hardware failure,and cause the master-slave switching. The problem is that the master-slave switching will cause the cluster 30S to be read-only.,so this error occurs.

I have checked the code, the error raise from:

    @staticmethod
    def handle_reply(r):
        if isinstance(r, Exception):
            raise r
        return r

My proplem is how can i handle this error?disconnect the current connection and reconnect it? or disconnect all the connections from pool and recreate the pool?

RecursionError if many calls were waiting for free connection

This code either hangs (py2) or crashes with RecursionError (py3):

from __future__ import print_function

@defer.inlineCallbacks
def main():
    redis = txredisapi.lazyConnection()
    redis.set('c', 0)

    yield defer.gatherResults(
        redis.incr('c').addCallback(print)
        for i in range(0, 1000)
    )

Each non-blocking call to Redis does connectionQueue.get() to get connection and then immediately does connectionQueue.put() because it's non-blocking. If there are other pending calls, DeferredQueue.put() immediately passes released connection to next pending call in same execution stack. This next call in its turn immediately calls connectionQueue.put() again.

So, if many queries were waiting for free connection (because either connection wasn't made yet like in code above or it is blocked by transaction), when connection is ready, all these queries will be processed recursively in a single execution stack. Default Python's recursion limit of 1000 gets exhausted after processing 73–74 queries.

The dumb solution: replace
self._factory.connectionQueue.put(connection) (txredisapi.py:1864)
by reactor.callLater(0, self._factory.connectionQueue.put, connection)
This solves the issue, but is very ugly.

Add support for BRPOPLPUSH

Hey guys-

I couldn't help noting that RedisProtocol has support for BRPOP and RPOPLPUSH, but not BRPOPLPUSH. It's a fairly easy workaround — simply executing protocol.execute_command("BRPOPLPUSH", source, destination) should do the trick — but I wasn't sure if it was worth a fork.

Anyway, the code to be added should look something like the following:

def brpoplpush(self, source, destination):
        """
        Pop a value from a list, push it to another list and return it; or block until one is available
        """
        return self.execute_command("BRPOPLPUSH", source, destination)

mget takes str not list?

Examples show passing a list of keys for mget but that seems to fail saying it needs a string. How is mget suppose to be used?

exceptions.TypeError: crc32() argument 1 must be string or read-only buffer, not list

ConnectionError exception not thrown when reconnect=True

Hay Folks

In contrast to version 1.2 from pip the current version from the repo does not throw an ConnectionError if a call failes.. or rather, it seems like the call is performed with a relly long timeout. The code used to test this this is shown below. My question is, whether this behavior is intended or a bug. Of cause i have read the last commits and saw changes to timeouts params, but nothing in the commit messages lead me to believe that this behavior is intended. Can you agree we have a bug here?

#!/usr/bin/env python
# coding: utf-8

from itertools import count
from twisted.internet import defer
from twisted.internet import reactor
from twisted.internet.task import LoopingCall
import txredisapi as redis

rc = redis.ConnectionPool(reconnect=True)


@defer.inlineCallbacks
def talk_to_redis(redis_conn, counter):

    try:
        yield redis_conn.set("count", next(counter))
        v = yield redis_conn.get("count")
        print("count: {}".format(v))

    except redis.ConnectionError as e:
        print(str(e))


def loop(redis_conn):
    LoopingCall(talk_to_redis, redis_conn, count()).start(0.8)

if __name__ == "__main__":
    rc.addCallback(loop)
    reactor.run()

Here is the output when my code snippet is run against the current version of txredisapi from git repo

(ocmg)brunsgaard@archbook ~/ocmg> python redis_reconn.py
count: 0
count: 1
count: 2
count: 3
count: 4
count: 5

Here I stopped redis. I started it again after ~180 seconds

count: 6
count: 7
count: 8
count: 9
count: 10
^C⏎

And here is the output when my code snippet is run against version 1.2 from pip.

(ocmg)brunsgaard@archbook ~/ocmg> python redis_reconn.py
count: 0
count: 1
count: 2
count: 3
count: 4
count: 5
Not connected
Not connected
Not connected
Not connected
Not connected
Not connected
Not connected
Not connected
Not connected
count: 15
count: 16
count: 17
count: 18
^C⏎

Disconnects do not return ConnectionError for lazy connections

The documentation for lazy connections says that

If the connection with redis is lost, all commands will raise the ConnectionError exception, to indicate that there's no active connection. However, if the reconnect argument was set to True during the initialization, it will continuosly try to reconnect, in background.

My understanding is that even if reconnect is set to True, I should expect a ConnectionError. This would be useful in the case when it is important to reasonably quickly return an HTTP request with a failure if there are no available redis connections. However, instead the deferred never fires and txredisapi keeps trying to reconnect. For example, examples/twistedweb_server.py will not return an error to a GET request if there is no active redis connection.

Timeout in each request

Is it possible to put a timeout on each query? For example, in setex or get. If redis are not responsed during timeout, cancel request and throw timeout exception

PyPi new release

There have been a lot of changes and no new release since Jan, 2013.
maybe it's time to release 1.2.

SubscriberProtocol hangs when password is set on factory

SubscriberProtocol.replyReceived swallows the "OK" reply for the AUTH command, causing factory.deferred never to be completed, thus not giving the connection to the user.
If authentication is enabled, AUTH is necessary to to subscribe. MonitorProtocol probably has the same issue. A dbid being set causes the same problem. Of course, SELECTing a DB does not make sense when using SubscriberProtocol, but in my case happened because I reused connection credentials from a "data" connection.

A quick fix that seems to work is to add an else clause to replyReceived:

        else:
            self.replyQueue.put(reply)

Example:

from twisted.internet import defer, reactor
from txredisapi import SubscriberProtocol, RedisFactory

@defer.inlineCallbacks
def main():
    factory = RedisFactory(None, dbid=None, poolsize=1, password="test")
    factory.protocol = SubscriberProtocol
    reactor.connectTCP("localhost", 6379, factory)
    print("Connecting...")
    conn = yield factory.deferred
    print("Connected.")
    yield conn.subscribe("test")
    print("Subscribed.")

main()
reactor.run()

Redis Module support

Thanks for this great lib.
Does txredisapi support module feature added in redis > 4.0 ?
I am specifically looking for redisSQL support.

Provide some convenience wrappers for SubscriberFactory?

The convenience wrappers for _Connection in the library _init.py are handy for making client connections (standard redis connections). It would be super nice to have something similar for SubscriberFactory connections, possibly with an argument to specify a message receive callback hook.

Thanks for your consideration.

canceling a deferred redis method causes replies to get out of sync

Test code:

@defer.inlineCallbacks
def test_cancel(self):
    db = yield redis.Connection(REDIS_HOST, REDIS_PORT)

    prefix = 'txredisapi:cancel'

    # Simple Set + Get
    key = prefix + '1'
    value = 'first'

    # set should return 'OK'
    res = yield db.set(key, value)
    self.assertEquals('OK', res)

    # get should return the value
    val = yield db.get(key)
    self.assertEquals(val, value)

    # Cancel a method, replies get out of sync
    d = db.time()
    d.addErrback(lambda _: True)
    d.cancel()

    # Set should return 'OK'
    # but note that failure shows it actually returns the result of the cancelled method...
    key = prefix + '2'
    value = 'second'
    res = yield db.set(key, value)
    self.assertEquals('OK', res)

    # Get should return value
    # but note that it actually returns the result of the previous set method...
    val = yield db.get(key)
    self.assertEquals(val, value)

    yield db.disconnect()

The issue appears to be that txredisapi uses a DeferredQueue to manage replies. The redis method returns a deferred from the DeferredQueue, waiting on the next reply to be written by the protocol to the queue. But if the method is cancelled before it consumes its reply from the queue, twisted will remove the deferred from the DeferredQueue waiting list. The reply data, however, is still written to the queue when received and is delivered to the next waiting deferred instead of the deferred that was cancelled and removed from the list. Now all future method calls yield incorrect results.

See https://github.com/twisted/twisted/blob/trunk/twisted/internet/defer.py#L1493-L1505

Bug in lineReceived (with mget 100000 keys)

Hi,

I really appreciate your txredisapi.

I am trying to manage a sharding solution with many keys.

But before it, I tested the API and I think that I have found a bug.
In fact, I have like two machins or more.
I put 10000 keys in it, and I try to get them with mget (like the sharding test does)

But arrive at the time of Mget, I get errors like:
"txredisapi.InvalidResponse: Cannot convert data '8$8' to integer"

This one append many times.

I checked the lineReceived function and the line is actually composed of that.
Sometimes I also get lines like "bar79683" (the value of a variable ...)

I really don't know where to look, because I don't know by which function the line is sended.

Killing an application with a subscriber factory yields a non clean shutdown

If you run the subscriber.py example and kill the server with ctrl+c you get the following:

2013-09-11 23:29:43-0700 [-] Log opened.
2013-09-11 23:29:43-0700 [-] twistd 13.0.0 (/Users/nicklas/.wrapp/virtualenvs/wrapport/bin/python2.7 2.7.5) starting up.
2013-09-11 23:29:43-0700 [-] reactor class: twisted.internet.selectreactor.SelectReactor.
2013-09-11 23:29:43-0700 [-] Starting factory <__builtin__.myFactory instance at 0x10a8850e0>
2013-09-11 23:29:43-0700 [Uninitialized] waiting for messages...
2013-09-11 23:29:43-0700 [Uninitialized] use the redis client to send messages:
2013-09-11 23:29:43-0700 [Uninitialized] $ redis-cli publish zz test
2013-09-11 23:29:43-0700 [Uninitialized] $ redis-cli publish foo.bar hello world
2013-09-11 23:29:43-0700 [-] pattern=subscribe, channel=zz message=1
2013-09-11 23:29:43-0700 [-] pattern=psubscribe, channel=foo.* message=2
^C2013-09-11 23:29:45-0700 [-] Received SIGINT, shutting down.
2013-09-11 23:29:45-0700 [myProtocol,client] lost connection: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionLost'>: Connection to the other side was lost in a non-clean fashion: Connection lost.
2013-09-11 23:29:45-0700 [myProtocol,client] ]
2013-09-11 23:29:45-0700 [myProtocol,client] <twisted.internet.tcp.Connector instance at 0x10a515b90> will retry in 3 seconds
2013-09-11 23:29:45-0700 [myProtocol,client] Stopping factory <__builtin__.myFactory instance at 0x10a8850e0>
2013-09-11 23:29:45-0700 [-] Main loop terminated.
2013-09-11 23:29:45-0700 [-] Server Shut Down.

Is there any way to prevent this "non clean" shutdown?
It would be kinda nice to be able to wait for all pending requests to finish before closing it.

I can add that this doesn't happen when using redis.Connection.

CLIENT SETNAME/CLIENT GETNAME

I'd like to be able to implement the CLIENT SETNAME and GETNAME operations, but it doesn't work well with a connection pool. What would be the current recommended way to make sure the client connections are named? Is there one?

hmset a string, hget a int type

# coding=UTF-8

from twisted.internet import defer, reactor
import txredisapi

redis_conf = {
    'host': 'localhost',
    'port': 6379,
    'dbid': 0
}

redis_db = txredisapi.lazyConnectionPool(**redis_conf)

@defer.inlineCallbacks
def test_txredis_hmset():
    k = 'test'
    v = {'a': '1'}
    yield redis_db.hmset(k, v)
    query_v = yield redis_db.hget(k, 'a')
    print type(query_v) # <type 'int'>

if __name__ == '__main__':
    reactor.callLater(0, test_txredis_hmset)
    reactor.callLater(1, reactor.stop)
    reactor.run()

python_version = 2.7.10
txredisapi_version = 2.4.3

reusing connections in a transaction state (?)

Good morning,

I am using:

PyPy 5.10.0
txredisapi 1.4.4
Twisted 17.5.0

When under load (lots of redis traffic) there are occasions where, when doing a burst of SCANs to get a list of keys from Redis, instead of getting back cursor information and data, we get the string "QUEUED". From what I understand so far the string "QUEUED" is a reply from Redis when I am issuing commands into a transaction. I am using transactions to periodically update a group of Redis keys asynchronously.

We are using makeUnixConnection() with a pool size of 20.

Here is how I am doing SCANs:

    while not endoflist:
        try:
            result = yield globalredisconnection.scan(cursor, pattern, limit)

            cursor = int(result[0])
            keys += result[1]

            if cursor == 0:
                # reached end of list
                endoflist = True
        except Exception as e:
            logger.error("Something went wrong while doing SCAN (%s): %s" % (result, e))
            logger.exception(e)

And here is a sample print out of the error occurring:

2018-05-11 01:28:34,278 ERROR: Something went wrong while doing SCAN (QUEUED): invalid literal for int() with base 10: 'Q'
2018-05-11 01:28:34,278 ERROR: invalid literal for int() with base 10: 'Q'
Traceback (most recent call last):
  File "/opt/backend/backend.py", line 3187, in getKeyList
    cursor = int(result[0])
ValueError: invalid literal for int() with base 10: 'Q'

Can anyone help to shed some light onto this issue ?

txredis client hangs up after "multi" command fail

Hi,

you can launch that code:

from twisted.internet import reactor, defer
import time
from txredisapi import ConnectionPool as Redis

class A(object):
    def __init__(self, db):
        self.db = db

    @defer.inlineCallbacks
    def do(self):
        print 'hello'
        try:
            pipe = yield self.db.multi()
            t = int(time.time())
            val = None
            if pipe:
                yield pipe.set('b', t)
                yield pipe.get('b')
                val = yield pipe.commit()
                print val
        except Exception as e:
            print e

        reactor.callLater(1.0, self.do)

@defer.inlineCallbacks
def main():
    db = yield Redis(dbid=1, poolsize=2)
    print "connected: %s" % db

    a = A(db)
    reactor.callLater(1.0, a.do)

main()
reactor.run()

and restart redis server to reproduce issue

I did a fix for that: 8e04e65

Redis reconnect broken when dbid != 0, or password is used.

Hi all,

I would really discorage the use of any other database than 0, as well as the use of a password. Reconnect is broken.

Right now we have a very large database which takes a while to come up. So if we shut down the database, and bring it up again; redis needs some time for it to load into memory. Every command will be replied with something like this:

Failure: exceptions.ValueError: Redis error: could not set dbid=0: LOADING Redis is loading the dataset in memory 

So, obviously we cannot set the database id right after a TCP reconnect, but we have to wait for redis to be loaded into memory.

I have no patch for it to fix yet, but all feedback is welcome!

Cheers,
Jonathan

Lazy connection pool - pipeline issue

txredisapi ( trying the master branch which has the fix for pipeline pool) hangs when there are greater number of concurrent pipeline commands executed than the maximum connection pool size.

How to reproduce :
1- Create lazyConnectin pool with pool size 2
2- Execute any more than 10 pipeline requests
Simple sample code to reproduce above is added below. The code should output all the 100 requests but it outputs only around 60.

import txredisapi as redis

from twisted.internet import defer
from twisted.internet import reactor
def print_result(result,count=0):
    print "got the result for count",count

def process(pipeline):
    pipeline.smembers('key1')
    return pipeline.execute_pipeline()

@defer.inlineCallbacks
def main():

    rc =  redis.lazyConnectionPool(poolsize=1)

    for i in range(100):
        rc.pipeline().addCallback(process).addCallback(print_result,count = i)
    yield rc.disconnect()


if __name__ == "__main__":
    main()
    reactor.run()

Support setnx in RedisShardingAPI

Is there a reason why setnx isn't supported in the sharding api? It seems like it could/should be. Wasn't sure if there was a reason why it wouldn't work.

support R, W from Dynamo/Riak?

http://github.com/fiorix/txredisapi/blob/master/txredisapi/api.py#L92

The auto-sharding looks nice. But for some fault-tolernace, it might be nice to support writing to multiple nodes in the hashring. This idea comes from Amazon Dynamo and has been recently made famous by Riak.

The idea is to write to W≥1 nodes and read from R≥1 nodes.

That way, if one of the nodes goes down there's a copy of the data on at least some of the other nodes, and your app has higher availability.

I think this would be an easy patch since I see iter_nodes in hashring. It could also be made to be backwards compatible with R=1, W=1 as defaults.

SubscriberFactory doesn't provide 'remove', but redisprotocol calls it.

SubscriberFactory uses SubscriberProtocol, which in turn inherits from RedisProtocol, which in RedisProtocol.connectionLost calls self.factory.remove. SubscriberFactory does not provide this method (while the RedisFactory class does).

In my code, I had a class that inherited SubscriberFactory, so I was able to put in a noop remove method.

Are Redis connection pools necessary with twisted asynchronous I/O?

In node.js, node_redis module just use a single connection. Both Node and Redis are effectively single thread. I don't think you'll gain anything by having multiple connections.

Why txredisapi use connection pool?

If I set pool size to 1 in txredisapi,I noticed that the single connection won't be putback before
recive redis reply,would not it will reduce the asynchronous concurrency?

use txredis single connection,twisted will auto pipeline request and reply:
01-05 15:17:28.106092 IN TWISTED senddata LEN:, 1272
01-05 15:17:28.106181 IN TWISTED recvdata LEN:, 684
use txredisapi single connection, the next request must wait for the redis to respond.

I think txredisapi must use the connection pool to get better performance, is not suitable for asynchronous single threaded mode.

ConnectionLost during authentication causes txredisapi to stop retrying

Hi all, today I noticed that my application using txredisapi suddenly stopped.

The log showed:

[BaseRedisProtocol (TLSMemoryBIOProtocol),client] <twisted.internet.tcp.Connector instance at 0x7f15fcda1440> will retry in 8 seconds
[BaseRedisProtocol (TLSMemoryBIOProtocol),client] <twisted.internet.tcp.Connector instance at 0x7f15fcd85e60> will retry in 9 seconds
[BaseRedisProtocol (TLSMemoryBIOProtocol),client] <twisted.internet.tcp.Connector instance at 0x7f15fcda16c8> will retry in 9 seconds
[BaseRedisProtocol (TLSMemoryBIOProtocol),client] <twisted.internet.tcp.Connector instance at 0x7f15fcd85f38> will retry in 9 seconds
Stopping factory <redis.txredisapi.RedisFactory instance at 0x7f15fcd85d40>
[-] Discarding dead connection.
message repeated 4 times: [ [-] Discarding dead connection.]

So far so good, but on reconnect I got:
Redis error: could not auth: Lost connection

Well thought the redis api ought to reconnect if the proper flag was set...
On inspection of the BaseRedisProtocol class I noticed the following

 def connectionMade(self):
        if self.factory.password is not None:
            try:
                response = yield self.auth(self.factory.password)
                if isinstance(response, ResponseError):
                    raise response
            except Exception as e:
                self.factory.continueTrying = False
                self.transport.loseConnection()

                msg = "Redis error: could not auth: %s" % (str(e))
                self.factory.connectionError(msg)
                if self.factory.isLazy:
                    log.msg(msg)
                defer.returnValue(None)

The retries are stopped if an exception is raised during auth...

Version 1.1 on PyPI

The current version on PyPI is v.1.0 and v.1.1 contains some great changes. It would be nice to get em out there.

关于redis Sentinel的问题

我测试了txredisapi的问sentinel功能,
发现sentinel并不能正常工作,当我使用
txredisapi.Sentinel([("xxxxxxxx", 7379)], password = "1234")
txredisapi会向sentinel发送AUTH指令,但其实sentinel并不支持AUTH指令,
所以sentinel返回错误,导到不能获取到master。
这个password选项其实应该是给master或slave使用的。
还有代码上的疑惑
self.sentinels = [ lazyConnection(host, port, **connection_kwargs) for host, port in sentinel_addresses ]

在sentinel初始化时创建了连向sentinel的factory,但应该是使用SentinelConnectionFactory吧。

def master_for(self, service_name, factory_class=SentinelConnectionFactory, dbid=None, poolsize=1, **connection_kwargs): factory = factory_class(sentinel_manager=self, service_name=service_name, is_master=True, uuid=None, dbid=dbid, poolsize=poolsize, **connection_kwargs) return self._connect_factory_and_return_handler(factory, poolsize)
而获取master时,应该当返回RedisFactory而不是SentinelConnectionFactory。

lazyShardConnectionPool not using consistent hash algorithm?

Hi, I've been using lazyShardConnectionPool for a while with only one Redis. Now I'm doing some tests with many servers and I found that txredisapi set commands are sending data just to the last server I pass to lazySharedConnectionPool. But when I run the sharding example it works fine, note that it uses ShardConnection instead of lazyShardConnectionPool.

Please, see my code in: https://gist.github.com/3241776 (please don't judge it, lol)

yield blockd if the connection fails

With the code below when __testRedis is called after __establishConnectionPool where an invalid host/port configuration is used only the 'now' time is printed, print x is never reached.

@staticmethod
def sleep(duration):
    d = defer.Deferred()
    reactor.callLater(duration, lambda *ign: d.callback(None))
    return d

@defer.inlineCallbacks
def __establishConnectionPool(self, config):
    redisConfig = config['redis']
    self.redisConnection = yield txredisapi.lazyConnectionPool(host=redisConfig['host'], port=int(redisConfig['port']), reconnect=True)

@defer.inlineCallbacks
def __testRedis(self):
    self.sleep(5)
    try:
        now = time.time()
        print now
        yield self.redisConnection.set('testme', now)
        x = yield self.redisConnection.get('testme')
        print x
        if x != now:
            raise Exception('Redis dead')
    except Exception as e:
        print "Exception: %s" % str(e)
    finally:
        try:
            yield self.redisConnection.disconnect()
        except Exception:
            pass

please fix comment in mset function

there is this line

    HMSET replaces old values with new values.

in mset function in which is MSET command executed

this comment looks not correct to me

Resending of password when reconnect=True

The connection protocol does not resend the password, when the connection was lost. Further, when the database was changed using select(), it doesn't remember this for after a connection was lost.

We use the following code for setting up a database connection:

model_storage = yield txredisapi.ConnectionPool(
        getattr(settings, 'MODEL_STORAGE_REDIS_HOST', 'localhost'),
        getattr(settings, 'MODEL_STORAGE_REDIS_PORT', 6379),
        None,
        True)

model_password = getattr(settings, 'MODEL_STORAGE_REDIS_PASSWORD', None)
if model_password:
    yield model_storage.auth(model_password)
yield model_storage.select(getattr(settings, 'MODEL_STORAGE_REDIS_DB', 3))

After a connection loss, we don't have the password or dbid anymore.

Add support for a Script class

I made this object for my own use that mimics the one from the redis-py driver: https://bitbucket.org/wanderua/wanderu.bamboo/raw/74aeda47a1764284d137fec437d73eb8047d5b98/wanderu/bamboo/txscript.py
That's from an open-source project that I am making for using Redis as a queuing mechanism with reliability. I would gladly contribute it or some version of it to the core library if you think that would be useful.

The redis driver has a Script class that is pretty useful: https://github.com/andymccurdy/redis-py/blob/master/redis/client.py#L2676

Cluster Moved Command Issue MOVED 9203 10.12.5.66:6379

Feb 07 14:00:21 DEVSRV-JASMIN02 jasmind[7291]: File "/usr/lib/python2.7/site-packages/jasmin/vendor/txredisapi.py", line 400, in handle_reply
Feb 07 14:00:21 DEVSRV-JASMIN02 jasmind[7291]: raise r
Feb 07 14:00:21 DEVSRV-JASMIN02 jasmind[7291]: jasmin.vendor.txredisapi.ResponseError: MOVED 9203 10.12.5.66:6379
Feb 07 14:00:21 DEVSRV-JASMIN02 jasmind[7291]: Unhandled error in Deferred:
Feb 07 14:00:21 DEVSRV-JASMIN02 jasmind[7291]: Traceback (most recent call last):
Feb 07 14:00:21 DEVSRV-JASMIN02 jasmind[7291]: File "/usr/lib/python2.7/site-packages/jasmin/vendor/txredisapi.py", line 395, in replyReceived
Feb 07 14:00:21 DEVSRV-JASMIN02 jasmind[7291]: self.replyQueue.put(reply)
Feb 07 14:00:21 DEVSRV-JASMIN02 jasmind[7291]: File "/usr/lib64/python2.7/site-packages/twisted/internet/defer.py", line 1515, in put
Feb 07 14:00:21 DEVSRV-JASMIN02 jasmind[7291]: self.waiting.pop(0).callback(obj)
Feb 07 14:00:21 DEVSRV-JASMIN02 jasmind[7291]: File "/usr/lib64/python2.7/site-packages/twisted/internet/defer.py", line 393, in callback

ZADD thrashes on redis >= 3.0.2 with NX/CH/INCR/XX

Issue

I had some issues with ZADD not actually adding values to the set if NX was set on redis > 3.0.2. When XX, NX, CH, or INCR was used they were interpreted to be the score and were then intermingled with extra pairs of arguments throughout the function.

Workaround

Just stripping out zadd's internals does get it working:

diff --git a/src/replay/src/lib/txredisapi.py b/src/replay/src/lib/txredisapi.py
index cf233d6..3557589 100644
--- a/src/replay/src/lib/txredisapi.py
+++ b/src/replay/src/lib/txredisapi.py
@@ -1118,11 +1118,22 @@ class BaseRedisProtocol(LineReceiver, policies.TimeoutMixin):
         return self.execute_command("SSCAN", key, *args)
 
     # Commands operating on sorted zsets (sorted sets)
+    def zadd(self, key, *args):
-    def zadd(self, key, score, member, *args):
         """
         Add the specified member to the Sorted Set value at key
         or update the score if it already exist
         """
-        if args:
-            # Args should be pairs (have even number of elements)
-            if len(args) % 2:
-                return defer.fail(InvalidData(
-                    "Invalid number of arguments to ZADD"))
-            else:
-                l = [score, member]
-                l.extend(args)
-                args = l
-        else:
-            args = [score, member]
         return self.execute_command("ZADD", key, *args)
 
     def zrem(self, key, *args):

Solution

By the previous behavior I'm guessing you want to keep the error checking in there - would you like it re-added and PR'd with a check for NX/CH/XX/INCR?

Not honoring timeout setting

First of all, thanks for great project.
I am recently ended up using sentinel. But txredisapi is losing connection frequently and trying to reconnect.

My code is something like this

    sentinel = txredisapi.Sentinel(sentinel_addresses=[("foo", 26379), ("bar", 26380), ("baz", 26381)], convertNumbers=False)
    redis = sentinel.master_for("my-cluster")

Below are logs

Starting factory <txredisapi.RedisFactory instance at 0x7f33d1f64200>
<twisted.internet.tcp.Connector instance at 0x7f33d1f64320> will retry in 9 seconds
Stopping factory <txredisapi.RedisFactory instance at 0x7f33d1f64200>
Starting factory <txredisapi.RedisFactory instance at 0x7f33d1f64680>
<twisted.internet.tcp.Connector instance at 0x7f33d1f647a0> will retry in 8 seconds
Stopping factory <txredisapi.RedisFactory instance at 0x7f33d1f64680>
Starting factory <txredisapi.RedisFactory instance at 0x7f33d1f64200>
Starting factory <txredisapi.RedisFactory instance at 0x7f33d1f64680>
<twisted.internet.tcp.Connector instance at 0x7f33d1f647a0> will retry in 9 seconds
Stopping factory <txredisapi.RedisFactory instance at 0x7f33d1f64680>

There is very old question related to this on stack over flow. But the accepted solution is not working. timeout value in redis.conf is 0 by default.

If I use lazyConnectionPool without sentinel, it works well. I am able to reproduce this issue on 3.2.8 and 4.0.9 version.

Any help will be appreciated. Thanks.

incr returns None instead of new value

I have this method:

@classmethod
@defer.inlineCallbacks
def get_new_id(cls, object_model):
    new_id = yield cls.connection.incr('{}.ID'.format(object_model.__name__), 1)
    print new_id
    defer.returnValue(new_id) 

cls.connection is connection of txredisapi.
Prints None

pipelining transactions

I am trying to to pipeline multiple transactions using txredisapi. I am using transactions because I want to check if a hash key exists before I add another key/value to it.

Here is my code:

pipeline = yield redisconnection.pipeline()
...
yield pipeline.watch(key)
transaction = yield pipeline.multi()
keyexists = yield transaction.exists(key)
if keyexists:
    transaction.hset(key, 'clients', json.dumps(clientslist))
r = yield transaction.commit()

...
yield pipeline.execute_pipeline()

I adjusted my code based on the example here: https://stackoverflow.com/questions/10987441/redis-only-allow-operation-on-existing-keys
What I am observing is that the program always hangs after the pipeline.multi() call.

Anyone have any thoughts ?

Sometimes unicode sometimes str

It appears that sometimes the commands will return unicode and sometimes standard strings. i have no idea if this is intended or not but it seems to become a standard string when it contains characters that is not allowed in unicode. This is annoying when you for example get all members of a set and they do not have the same type. Tested on Python version 2.7.6

Number types in hgetall not always equal to hmset.

I cannot tell whether the bug in in txredisapi, in redis or our code.

Set the value '4', as a string in this hash mapping

redis.hmset('key', { "a": "4" })

Retrieve the hash again.

redis.hgetall('key')

This returns { "a": 4}, so the string turned to an integer.

what would be the estimated effort for Python 3 compatibility?

Since twistedpy now is working with python 3.4, I wonder how much effort it would be to make this project also py3 compatible.

I noticed mainly two things:
(1) except Exception, e
(2) str vs byte

I'm not asking anyone to do the work, just curious if someone has an idea if this is hard.
I can educate myself about writing py2/3 compatible code and help as well.

Thanks!

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.