Git Product home page Git Product logo

tornadis's Introduction

tornadis

Status (master branch)

Travis Coverage Status Code Health License Maturity Maintenance

What is it ?

tornadis is an async minimal redis client for tornado ioloop designed for performance (uses C hiredis parser).

WARNING : tornadis is considered in beta quality (API can change)

Features

  • simple
  • good performances
  • coroutine friendly
  • production ready (timeouts, connection pool, error management)
  • nearly all redis features (pipeline, pubsub, standard commands)
  • autoconnection, autoreconnection
  • Python2 (>=2.7) and Python3 (>=3.2) support
  • Tornado >=4.2 (in master branch) and Tornado 4.1 + toro (in tornado41 branch) support

Not implemented

  • cluster support

Example

# Let's import tornado and tornadis
import tornado
import tornadis


@tornado.gen.coroutine
def talk_to_redis():
    # let's (re)connect (autoconnect mode), call the ping redis command
    # and wait the reply without blocking the tornado ioloop
    # Note: call() method on Client instance returns a Future object (and
    # should be used as a coroutine).
    result = yield client.call("PING")
    if isinstance(result, tornadis.TornadisException):
        # For specific reasons, tornadis nearly never raises any exception
        # they are returned as result
        print "got exception: %s" % result
    else:
        # result is already a python object (a string in this simple example)
        print "Result: %s" % result


# Build a tornadis.Client object with some options as kwargs
# host: redis host to connect
# port: redis port to connect
# autoconnect=True: put the Client object in auto(re)connect mode
client = tornadis.Client(host="localhost", port=6379, autoconnect=True)

# Start a tornado IOLoop, execute the coroutine and end the program
loop = tornado.ioloop.IOLoop.instance()
loop.run_sync(talk_to_redis)

Full documentation

Full documentation is available at http://tornadis.readthedocs.org

tornadis's People

Contributors

cshoe avatar gward avatar jammed343 avatar sorlov-avito avatar spollard avatar thefab avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tornadis's Issues

What does this error mean and how should I handle it?

Hi - Quick question, I occasionally get this exception, how should I handle it?

Thanks!

[E 150409 10:21:13 ioloop:588] Exception in callback None
Traceback (most recent call last):
File "/opt/anaconda/lib/python2.7/site-packages/tornado-4.1-py2.7-macosx-10.5-x86_64.egg/tornado/ioloop.py", line 840, in start
handler_func(fd_obj, events)
File "/opt/anaconda/lib/python2.7/site-packages/tornado-4.1-py2.7-macosx-10.5-x86_64.egg/tornado/stack_context.py", line 275, in null_wrapper
return fn(_args, *_kwargs)
File "/opt/anaconda/lib/python2.7/site-packages/tornadis/connection.py", line 189, in _handle_events
self._handle_read()
File "/opt/anaconda/lib/python2.7/site-packages/tornadis/connection.py", line 201, in _handle_read
chunk = self._read(self.read_page_size)
File "/opt/anaconda/lib/python2.7/site-packages/tornadis/connection.py", line 244, in _read
raise ConnectionError("error during socket.recv: %s" % e)
ConnectionError: error during socket.recv: [Errno 60] Operation timed out
Error -> 'StopObject' object is not iterable

`add_sockets`: advanced multi-process don't support?

According to document, there are 3 ways to initial a HTTPServer.
And example just using simple single-process.
And when I test bind/start: simple multi-process.It is work well.
But add_sockets: advanced multi-process, error is found,.
here are test code:

from tornado.ioloop import IOLoop
from tornado.web import RequestHandler, Application, url
from tornado.httpserver import HTTPServer
import tornado.gen
import tornadis
import logging

logging.basicConfig(level=logging.WARNING)
POOL = tornadis.ClientPool(max_size=15)


class HelloHandler(RequestHandler):

    @tornado.gen.coroutine
    def get(self):
        with (yield POOL.connected_client()) as client:
            reply = yield client.call("PING")
            if not isinstance(reply, tornadis.TornadisException):
                self.write("Hello, %s" % reply)
        self.finish()


def make_app():
    return Application([
        url(r"/", HelloHandler),
        ])


def main():
    app = make_app()
    sockets = tornado.netutil.bind_sockets(8888)
    tornado.process.fork_processes(0)
    server = HTTPServer(app)
    server.add_sockets(sockets)
    IOLoop.current().start()

    # app.listen(8888)
    # IOLoop.current().start()

main()

and error massage was

Traceback (most recent call last):
  File "/usr/lib64/python3.4/asyncio/selector_events.py", line 234, in add_reader
    key = self._selector.get_key(fd)
  File "/usr/lib64/python3.4/selectors.py", line 182, in get_key
    raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '7 is not registered'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "test.py", line 46, in <module>
    main()
  File "test.py", line 34, in main
    server.add_sockets(sockets)
  File "/usr/lib64/python3.4/site-packages/tornado/tcpserver.py", line 159, in add_sockets
    sock, self._handle_connection)
  File "/usr/lib64/python3.4/site-packages/tornado/netutil.py", line 258, in add_accept_handler
    io_loop.add_handler(sock, accept_handler, IOLoop.READ)
  File "/usr/lib64/python3.4/site-packages/tornado/platform/asyncio.py", line 75, in add_handler
    fd, self._handle_events, fd, IOLoop.READ)
  File "/usr/lib64/python3.4/asyncio/selector_events.py", line 237, in add_reader
    (handle, None))
  File "/usr/lib64/python3.4/selectors.py", line 402, in register
    self._epoll.register(key.fd, epoll_events)
FileExistsError: [Errno 17] File exists
Traceback (most recent call last):
  File "/usr/lib64/python3.4/asyncio/selector_events.py", line 234, in add_reader
    key = self._selector.get_key(fd)
  File "/usr/lib64/python3.4/selectors.py", line 182, in get_key
    raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '7 is not registered'
...
....
Traceback (most recent call last):
  File "test.py", line 46, in <module>
    main()
  File "test.py", line 34, in main
    server.add_sockets(sockets)
  File "/usr/lib64/python3.4/site-packages/tornado/tcpserver.py", line 159, in add_sockets
    sock, self._handle_connection)
  File "/usr/lib64/python3.4/site-packages/tornado/netutil.py", line 258, in add_accept_handler
    io_loop.add_handler(sock, accept_handler, IOLoop.READ)
  File "/usr/lib64/python3.4/site-packages/tornado/platform/asyncio.py", line 75, in add_handler
    fd, self._handle_events, fd, IOLoop.READ)
  File "/usr/lib64/python3.4/asyncio/selector_events.py", line 237, in add_reader
    (handle, None))
  File "/usr/lib64/python3.4/selectors.py", line 402, in register
    self._epoll.register(key.fd, epoll_events)
FileExistsError: [Errno 17] File exists

it seems much lower than sync redis

code as:

from tornado import gen, ioloop
from time import time

def common( i = 9999 ):
    from redis import StrictRedis
    r = StrictRedis( 'localhost' )

    t0 = time()
    while i > 0:
        i -= 1
        for j in ( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ):
            r.hset( 'Purifier:Test', 'xxxxxxxxx%d' % j, j )
        result = r.hmget( 'Purifier:Test', 'xxxxxxxxx1', 'xxxxxxxxx6', 'xxxxxxxxx4', 'xxxxxxxxx8')
        r.delete( 'Purifier:Test' )
    print 'common', time() - t0
    print result

@gen.coroutine
def async( i = 9999 ):
    import tornadis
    client = tornadis.Client( host="localhost", port=6379, autoconnect=True )
    t0 = time()
    while i > 0:
        i -= 1
        for j in ( 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ):
            yield client.call( "hset", 'Purifier:Test', 'xxxxxxxxx%d' % j, j )
        result = yield client.call( 'hmget', 'Purifier:Test', 'xxxxxxxxx1', 'xxxxxxxxx6', 'xxxxxxxxx4', 'xxxxxxxxx8')
        yield client.call( 'delete', 'Purifier:Test' )
    print 'async', time() - t0
    print result


common()      #7.98881101608 seconds
ioloop.IOLoop.instance().run_sync( async )     #17.2736110687 seconds

Running pubsub_unsubscribe with no arguments seems to stop everything

In on_close method

@tornado.gen.coroutine
def on_close(self):
     if self.client.subscribed:
         print (yield self.client.pubsub_unsubscribe())
         print 'test'

Will not print 'test' for me. Everything after pubsub_unsubscribe with no arguments doesn't seem to run, though it should unsubscribe all channels.
But if I use pubsub_unsubscribe with a channel it seems to work.

tornadis can't connect in tests

If i run this:

import tornadis
from tornado.testing import AsyncTestCase, gen_test

class Test(AsyncTestCase):
    @gen_test
    def test_tornadis(self):
        client = tornadis.Client()
        yield client.call(["GET", "ciao"])

with tornado.testing mytest Tornado gives a tornado.ioloop.TimeoutError: Operation timed out after 5 seconds. But redis is up and running and redis-cli connects perfectly fine.

Connection already opened

Hi,

Thanks for creating this library. I followed your example for using this as an HTTP request handler here: http://tornadis.readthedocs.org/en/latest/. The first GET request works without any issues, but each subsequent request leads to this error: "you are already connected"

What's the preferred way of opening a connection in a web-server?

high cpu usage when opening a connection

Hi,

What's the proper way of opening a redis connection? I've noticed that when I run the below, my cpu usage spikes to 100% and remains there. Is there a way of only opening this connection once, as opposed to each time I hit the handler?

client = tornadis.Client()
class Handler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        data = get_data()
        self.write(data)
        self.finish()

@tornado.gen.coroutine
def get_data()
      yield client.connect()
      data = yield client.call( ... )
      raise tornado.gen.Return( data )

Tornado 6 not supported by current version (0.8.1)

I saw in requirements tornado>=4.2,<6.0, but maybe this issue will be usefull for someone.

Tornado drop some methods:

tornado.gen

Some older portions of this module have been removed. This includes engine, YieldPoint, Callback, Wait, WaitAll, MultiYieldPoint, and Task.

Release notes Tornado 6.0

My system:

python 3.8.0
tornadis 0.8.1
tornado: 6.0.3

And after update tornado I get this error:

ERROR:root:Traceback (most recent call last):
  File "/src/venv/lib/python3.8/site-packages/tornado/web.py", line 1699, in _execute
    result = await result
  File "/src/venv/lib/python3.8/site-packages/tornado/gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/src/handlers/auth.py", line 284, in post
    yield self.redis.call("DEL", auth_token)
  File "/src/venv/lib/python3.8/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/src/venv/lib/python3.8/site-packages/tornado/gen.py", line 748, in run
    yielded = self.gen.send(value)
  File "/src/venv/lib/python3.8/site-packages/tornadis/client.py", line 215, in _call_with_autoconnect
    res = yield self._call(*args, **kwargs)
  File "/src/venv/lib/python3.8/site-packages/tornadis/client.py", line 285, in _call
    return tornado.gen.Task(fn, *arguments, **kwargs)
AttributeError: module 'tornado.gen' has no attribute 'Task'

Tight pinning of Tornado

I was curious if there was a specific reason why you keep such a tight revision pin against tornado. This is causing conflicts with the version of Tornado I am using. Couldn't you instead do >=4.1,<4.3 or something?

4.2 branch kind, channel, body = msg TypeError: 'NoneType' object is not iterable

I would like to use the 4.2 branch, actually would like to go straight of the master tornado branch. However, when using just the 4.2 branch here and tornado 4.2 thats just installed from the requirements and I'm getting an error on pubsub message, which works fine on 4.1.

kind, channel, body = msg
TypeError: 'NoneType' object is not iterable

Traceback (most recent call last):
File ".../python2.7/site-packages/tornado/ioloop.py", line 592, in _run_callback
ret = callback()
File ".../python2.7/site-packages/tornado/stack_context.py", line 275, in null_wrapper
return fn(_args, *_kwargs)
File ".../python2.7/site-packages/tornado/ioloop.py", line 598, in
self.add_future(ret, lambda f: f.result())
File ".../python2.7/site-packages/tornado/concurrent.py", line 215, in result
raise_exc_info(self._exc_info)
File ".../python2.7/site-packages/tornado/gen.py", line 879, in run
yielded = self.gen.send(value)
File "project.py", line 127, in watch_redis
self.on_pubsub(msg)
File "project.py", line 131, in on_pubsub
kind, channel, body = msg
TypeError: 'NoneType' object is not iterable

Any ideas on why pubsub messages have just become None so we can get this working on 4.2, and I'll also try on latest tornado straight from master.

Best way of dealing with connection timeout issues

Sorry I've been out of touch for a while. I wanted to check-in and see if there were any updates on handling connection timeouts. My previous approach was as follows:

import tornadis
from tornadis.exceptions import ConnectionError

results = yield client4.call("HGETALL", redis_key)
if isinstance(results, ConnectionError):
    yield client4.connect()
    results = yield client4.call("HGETALL", redis_key)

Is there a more optimal solution, or should I continue with this approach?

Best way forward with regards to native coroutines

I'd just appreciate to hear your opinion on this, as I'm planning to update my Tornado app for async/await (and looking forward to Tornado 5):

Would a fork of tornadis for Python 3.5+ using native coroutines be the best move forward (especially regarding the gotchas mentioned in the official Tornado docs - I'm not aware how much tornadis relies on some of that functionality)?

Or is this something that this project will tackle down the line?

why can't connect redis db ???

tornadis 0.8.0
tornado 5.0.1
python 3.6.5

class MyVerification(RequestHandler):
@tornado.gen.coroutine
def get(self):
self.redisdb = tornadis.Client(db=1)
result = yield self.redisdb.call("SET", self.myId, myValue)
self.write(result)

Traceback (most recent call last):
File "/app/python3/lib/python3.6/site-packages/tornado/web.py", line 1543, in _execute
result = yield result
File "/app/python3/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
value = future.result()
File "/app/python3/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
yielded = self.gen.throw(*exc_info)
File "/app/tornadoWeb/com/verification.py", line 30, in get
result = yield self.redisdb.call("SET", self.myId, myValue)
File "/app/python3/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
value = future.result()
File "/app/python3/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
yielded = self.gen.throw(*exc_info)
File "/app/python3/lib/python3.6/site-packages/tornadis/client.py", line 212, in _call_with_autoconnect
yield self.connect()
File "/app/python3/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
value = future.result()
File "/app/python3/lib/python3.6/site-packages/tornado/gen.py", line 315, in wrapper
yielded = next(result)
File "/app/python3/lib/python3.6/site-packages/tornadis/client.py", line 101, in connect
self.__connection = Connection(cb1, cb2, **kwargs)
File "/app/python3/lib/python3.6/site-packages/tornadis/connection.py", line 87, in init
self._ioloop)
TypeError: init() takes 3 positional arguments but 4 were given

How to set expiry time for keys?

I am using the tornadis.Pipline() to set the data into redis is there a way that I can set an expiry time for the keys?The code is given below:
pipeline = tornadis.Pipeline()
pipeline.stack_call("SET", "key", "value")

python3.5 await support code

import tornadis
from redis import RedisError
from json import loads, dumps
import datetime

class Redis:
    def __init__(self, *args, **kwds):
        self._client = tornadis.Client(*args, **kwds)

    # SERVER INFORMATION
    async def bgrewriteaof(self):
        "Tell the Redis server to rewrite the AOF file from data in memory."
        return await self._client.call('BGREWRITEAOF')

    async def bgsave(self):
        """
        Tell the Redis server to save its data to disk.  Unlike save(),
        this method is asynchronous and return awaits immediately.
        """
        return await self._client.call('BGSAVE')

    async def client_kill(self, address):
        "Disconnects the client at ``address`` (ip:port)"
        return await self._client.call('CLIENT KILL', address)

    async def client_list(self):
        "Returns a list of currently connected clients"
        return await self._client.call('CLIENT LIST')

    async def client_getname(self):
        "Returns the current connection name"
        return await self._client.call('CLIENT GETNAME')

    async def client_setname(self, name):
        "Sets the current connection name"
        return await self._client.call('CLIENT SETNAME', name)

    async def config_get(self, pattern="*"):
        "Return a dictionary of configuration based on the ``pattern``"
        return await self._client.call('CONFIG GET', pattern)

    async def config_set(self, name, value):
        "Set config item ``name`` with ``value``"
        return await self._client.call('CONFIG SET', name, value)

    async def config_resetstat(self):
        "Reset runtime statistics"
        return await self._client.call('CONFIG RESETSTAT')

    async def config_rewrite(self):
        "Rewrite config file with the minimal change to reflect running config"
        return await self._client.call('CONFIG REWRITE')

    async def dbsize(self):
        "Returns the number of keys in the current database"
        return await self._client.call('DBSIZE')

    async def debug_object(self, key):
        "Returns version specific meta information about a given key"
        return await self._client.call('DEBUG OBJECT', key)

    async def echo(self, value):
        "Echo the string back from the server"
        return await self._client.call('ECHO', value)

    async def flushall(self):
        "Delete all keys in all databases on the current host"
        return await self._client.call('FLUSHALL')

    async def flushdb(self):
        "Delete all keys in the current database"
        return await self._client.call('FLUSHDB')

    async def info(self, section=None):
        """
        Returns a dictionary containing information about the Redis server

        The ``section`` option can be used to select a specific section
        of information

        The section option is not supported by older versions of Redis Server,
        and will generate ResponseError
        """
        if section is None:
            return await self._client.call('INFO')
        else:
            return await self._client.call('INFO', section)

    async def lastsave(self):
        """
        Return a Python datetime object representing the last time the
        Redis database was saved to disk
        """
        return await self._client.call('LASTSAVE')

    async def object(self, infotype, key):
        "Return the encoding, idletime, or refcount about the key"
        return await self._client.call('OBJECT', infotype, key, infotype=infotype)

    async def ping(self):
        "Ping the Redis server"
        return await self._client.call('PING')

    async def save(self):
        """
        Tell the Redis server to save its data to disk,
        blocking until the save is complete
        """
        return await self._client.call('SAVE')

    async def sentinel(self, *args):
        "Redis Sentinel's SENTINEL command."
        warnings.warn(
            DeprecationWarning('Use the individual sentinel_* methods'))

    async def sentinel_get_master_addr_by_name(self, service_name):
        "Returns a (host, port) pair for the given ``service_name``"
        return await self._client.call('SENTINEL GET-MASTER-ADDR-BY-NAME',
                                    service_name)

    async def sentinel_master(self, service_name):
        "Returns a dictionary containing the specified masters state."
        return await self._client.call('SENTINEL MASTER', service_name)

    async def sentinel_masters(self):
        "Returns a list of dictionaries containing each master's state."
        return await self._client.call('SENTINEL MASTERS')

    async def sentinel_monitor(self, name, ip, port, quorum):
        "Add a new master to Sentinel to be monitored"
        return await self._client.call('SENTINEL MONITOR', name, ip, port, quorum)

    async def sentinel_remove(self, name):
        "Remove a master from Sentinel's monitoring"
        return await self._client.call('SENTINEL REMOVE', name)

    async def sentinel_sentinels(self, service_name):
        "Returns a list of sentinels for ``service_name``"
        return await self._client.call('SENTINEL SENTINELS', service_name)

    async def sentinel_set(self, name, option, value):
        "Set Sentinel monitoring parameters for a given master"
        return await self._client.call('SENTINEL SET', name, option, value)

    async def sentinel_slaves(self, service_name):
        "Returns a list of slaves for ``service_name``"
        return await self._client.call('SENTINEL SLAVES', service_name)

    async def shutdown(self):
        "Shutdown the server"
        try:
            self._client.call('SHUTDOWN')
        except ConnectionError:
            # a ConnectionError here is expected
            return 
        raise RedisError("SHUTDOWN seems to have failed.")

    async def slaveof(self, host=None, port=None):
        """
        Set the server to be a replicated slave of the instance identified
        by the ``host`` and ``port``. If called without arguments, the
        instance is promoted to a master instead.
        """
        if host is None and port is None:
            return await self._client.call('SLAVEOF', Token('NO'), Token('ONE'))
        return await self._client.call('SLAVEOF', host, port)

    async def slowlog_get(self, num=None):
        """
        Get the entries from the slowlog. If ``num`` is specified, get the
        most recent ``num`` items.
        """
        args = ['SLOWLOG GET']
        if num is not None:
            args.append(num)
        return await self._client.call(*args)

    async def slowlog_len(self):
        "Get the number of items in the slowlog"
        return await self._client.call('SLOWLOG LEN')

    async def slowlog_reset(self):
        "Remove all items in the slowlog"
        return await self._client.call('SLOWLOG RESET')

    async def time(self):
        """
        Returns the server time as a 2-item tuple of ints:
        (seconds since epoch, microseconds into this second).
        """
        return await self._client.call('TIME')

    # BASIC KEY COMMANDS
    async def append(self, key, value):
        """
        Appends the string ``value`` to the value at ``key``. If ``key``
        doesn't already exist, create it with a value of ``value``.
        Returns the new length of the value at ``key``.
        """
        return await self._client.call('APPEND', key, value)

    async def bitcount(self, key, start=None, end=None):
        """
        Returns the count of set bits in the value of ``key``.  Optional
        ``start`` and ``end`` paramaters indicate which bytes to consider
        """
        params = [key]
        if start is not None and end is not None:
            params.append(start)
            params.append(end)
        elif (start is not None and end is None) or \
                (end is not None and start is None):
            raise RedisError("Both start and end must be specified")
        return await self._client.call('BITCOUNT', *params)

    async def bitop(self, operation, dest, *keys):
        """
        Perform a bitwise operation using ``operation`` between ``keys`` and
        store the result in ``dest``.
        """
        return await self._client.call('BITOP', operation, dest, *keys)

    async def bitpos(self, key, bit, start=None, end=None):
        """
        Return the position of the first bit set to 1 or 0 in a string.
        ``start`` and ``end`` difines search range. The range is interpreted
        as a range of bytes and not a range of bits, so start=0 and end=2
        means to look at the first three bytes.
        """
        if bit not in (0, 1):
            raise RedisError('bit must be 0 or 1')
        params = [key, bit]

        start is not None and params.append(start)

        if start is not None and end is not None:
            params.append(end)
        elif start is None and end is not None:
            raise RedisError("start argument is not set, "
                             "when end is specified")
        return await self._client.call('BITPOS', *params)

    async def decr(self, name, amount=1):
        """
        Decrements the value of ``key`` by ``amount``.  If no key exists,
        the value will be initialized as 0 - ``amount``
        """
        return await self._client.call('DECRBY', name, amount)

    async def delete(self, *names):
        "Delete one or more keys specified by ``names``"
        return await self._client.call('DEL', *names)

    async def __delitem__(self, name):
        self.delete(name)

    async def dump(self, name):
        """
        Return a serialized version of the value stored at the specified key.
        If key does not exist a nil bulk reply is return awaited.
        """
        return await self._client.call('DUMP', name)

    async def exists(self, name):
        "Returns a boolean indicating whether key ``name`` exists"
        return await self._client.call('EXISTS', name)
    __contains__ = exists

    async def expire(self, name, time):
        """
        Set an expire flag on key ``name`` for ``time`` seconds. ``time``
        can be represented by an integer or a Python timedelta object.
        """
        if isinstance(time, datetime.timedelta):
            time = time.seconds + time.days * 24 * 3600
        return await self._client.call('EXPIRE', name, time)

    async def expireat(self, name, when):
        """
        Set an expire flag on key ``name``. ``when`` can be represented
        as an integer indicating unix time or a Python datetime object.
        """
        if isinstance(when, datetime.datetime):
            when = int(mod_time.mktime(when.timetuple()))
        return await self._client.call('EXPIREAT', name, when)


    async def get_json(self, name):

        r = await self.get(name)
        if r:
            r = r.decode('utf-8')
            r = loads(r)
        return r

    async def get(self, name):
        """
        Return the value at key ``name``, or None if the key doesn't exist
        """
        return await self._client.call('GET', name)

    async def __getitem__(self, name):
        """
        Return the value at key ``name``, raises a KeyError if the key
        doesn't exist.
        """
        value = self.get(name)
        if value:
            return await value
        raise KeyError(name)

    async def getbit(self, name, offset):
        "Returns a boolean indicating the value of ``offset`` in ``name``"
        return await self._client.call('GETBIT', name, offset)

    async def getrange(self, key, start, end):
        """
        Returns the substring of the string value stored at ``key``,
        determined by the offsets ``start`` and ``end`` (both are inclusive)
        """
        return await self._client.call('GETRANGE', key, start, end)

    async def getset(self, name, value):
        """
        Sets the value at key ``name`` to ``value``
        and return awaits the old value at key ``name`` atomically.
        """
        return await self._client.call('GETSET', name, value)

    async def incr(self, name, amount=1):
        """
        Increments the value of ``key`` by ``amount``.  If no key exists,
        the value will be initialized as ``amount``
        """
        return await self._client.call('INCRBY', name, amount)

    async def incrby(self, name, amount=1):
        """
        Increments the value of ``key`` by ``amount``.  If no key exists,
        the value will be initialized as ``amount``
        """

        # An alias for ``incr()``, because it is already implemented
        # as INCRBY redis command.
        return await self.incr(name, amount)

    async def incrbyfloat(self, name, amount=1.0):
        """
        Increments the value at key ``name`` by floating ``amount``.
        If no key exists, the value will be initialized as ``amount``
        """
        return await self._client.call('INCRBYFLOAT', name, amount)

    async def keys(self, pattern='*'):
        "Returns a list of keys matching ``pattern``"
        return await self._client.call('KEYS', pattern)

    async def mget(self, keys, *args):
        """
        Returns a list of values ordered identically to ``keys``
        """
        args = list_or_args(keys, args)
        return await self._client.call('MGET', *args)

    async def mset(self, *args, **kwargs):
        """
        Sets key/values based on a mapping. Mapping can be supplied as a single
        dictionary argument or as kwargs.
        """
        if args:
            if len(args) != 1 or not isinstance(args[0], dict):
                raise RedisError('MSET requires **kwargs or a single dict arg')
            kwargs.update(args[0])
        items = []
        for pair in iteritems(kwargs):
            items.extend(pair)
        return await self._client.call('MSET', *items)

    async def msetnx(self, *args, **kwargs):
        """
        Sets key/values based on a mapping if none of the keys are already set.
        Mapping can be supplied as a single dictionary argument or as kwargs.
        Returns a boolean indicating if the operation was successful.
        """
        if args:
            if len(args) != 1 or not isinstance(args[0], dict):
                raise RedisError('MSETNX requires **kwargs or a single '
                                 'dict arg')
            kwargs.update(args[0])
        items = []
        for pair in iteritems(kwargs):
            items.extend(pair)
        return await self._client.call('MSETNX', *items)

    async def move(self, name, db):
        "Moves the key ``name`` to a different Redis database ``db``"
        return await self._client.call('MOVE', name, db)

    async def persist(self, name):
        "Removes an expiration on ``name``"
        return await self._client.call('PERSIST', name)

    async def pexpire(self, name, time):
        """
        Set an expire flag on key ``name`` for ``time`` milliseconds.
        ``time`` can be represented by an integer or a Python timedelta
        object.
        """
        if isinstance(time, datetime.timedelta):
            ms = int(time.microseconds / 1000)
            time = (time.seconds + time.days * 24 * 3600) * 1000 + ms
        return await self._client.call('PEXPIRE', name, time)

    async def pexpireat(self, name, when):
        """
        Set an expire flag on key ``name``. ``when`` can be represented
        as an integer representing unix time in milliseconds (unix time * 1000)
        or a Python datetime object.
        """
        if isinstance(when, datetime.datetime):
            ms = int(when.microsecond / 1000)
            when = int(mod_time.mktime(when.timetuple())) * 1000 + ms
        return await self._client.call('PEXPIREAT', name, when)

    async def psetex(self, name, time_ms, value):
        """
        Set the value of key ``name`` to ``value`` that expires in ``time_ms``
        milliseconds. ``time_ms`` can be represented by an integer or a Python
        timedelta object
        """
        if isinstance(time_ms, datetime.timedelta):
            ms = int(time_ms.microseconds / 1000)
            time_ms = (time_ms.seconds + time_ms.days * 24 * 3600) * 1000 + ms
        return await self._client.call('PSETEX', name, time_ms, value)

    async def pttl(self, name):
        "Returns the number of milliseconds until the key ``name`` will expire"
        return await self._client.call('PTTL', name)

    async def randomkey(self):
        "Returns the name of a random key"
        return await self._client.call('RANDOMKEY')

    async def rename(self, src, dst):
        """
        Rename key ``src`` to ``dst``
        """
        return await self._client.call('RENAME', src, dst)

    async def renamenx(self, src, dst):
        "Rename key ``src`` to ``dst`` if ``dst`` doesn't already exist"
        return await self._client.call('RENAMENX', src, dst)

    async def restore(self, name, ttl, value):
        """
        Create a key using the provided serialized value, previously obtained
        using DUMP.
        """
        return await self._client.call('RESTORE', name, ttl, value)

    async def set_json(self, name, value, ex=None, px=None, nx=False, xx=False):
        return await self.set(name,dumps(value,ensure_ascii=False),ex,px,nx,xx)

    async def set(self, name, value, ex=None, px=None, nx=False, xx=False):
        """
        Set the value at key ``name`` to ``value``

        ``ex`` sets an expire flag on key ``name`` for ``ex`` seconds.

        ``px`` sets an expire flag on key ``name`` for ``px`` milliseconds.

        ``nx`` if set to True, set the value at key ``name`` to ``value`` if it
            does not already exist.

        ``xx`` if set to True, set the value at key ``name`` to ``value`` if it
            already exists.
        """
        pieces = [name, value]
        if ex:
            pieces.append('EX')
            if isinstance(ex, datetime.timedelta):
                ex = ex.seconds + ex.days * 24 * 3600
            pieces.append(ex)
        if px:
            pieces.append('PX')
            if isinstance(px, datetime.timedelta):
                ms = int(px.microseconds / 1000)
                px = (px.seconds + px.days * 24 * 3600) * 1000 + ms
            pieces.append(px)

        if nx:
            pieces.append('NX')
        if xx:
            pieces.append('XX')
        return await self._client.call('SET', *pieces)

    async def __setitem__(self, name, value):
        self.set(name, value)

    async def setbit(self, name, offset, value):
        """
        Flag the ``offset`` in ``name`` as ``value``. Returns a boolean
        indicating the previous value of ``offset``.
        """
        value = value and 1 or 0
        return await self._client.call('SETBIT', name, offset, value)

    async def setex(self, name, time, value):
        """
        Set the value of key ``name`` to ``value`` that expires in ``time``
        seconds. ``time`` can be represented by an integer or a Python
        timedelta object.
        """
        if isinstance(time, datetime.timedelta):
            time = time.seconds + time.days * 24 * 3600
        return await self._client.call('SETEX', name, time, value)

    async def setnx(self, name, value):
        "Set the value of key ``name`` to ``value`` if key doesn't exist"
        return await self._client.call('SETNX', name, value)

    async def setrange(self, name, offset, value):
        """
        Overwrite bytes in the value of ``name`` starting at ``offset`` with
        ``value``. If ``offset`` plus the length of ``value`` exceeds the
        length of the original value, the new value will be larger than before.
        If ``offset`` exceeds the length of the original value, null bytes
        will be used to pad between the end of the previous value and the start
        of what's being injected.

        Returns the length of the new string.
        """
        return await self._client.call('SETRANGE', name, offset, value)

    async def strlen(self, name):
        "Return the number of bytes stored in the value of ``name``"
        return await self._client.call('STRLEN', name)

    async def substr(self, name, start, end=-1):
        """
        Return a substring of the string at key ``name``. ``start`` and ``end``
        are 0-based integers specifying the portion of the string to return await.
        """
        return await self._client.call('SUBSTR', name, start, end)

    async def ttl(self, name):
        "Returns the number of seconds until the key ``name`` will expire"
        return await self._client.call('TTL', name)

    async def type(self, name):
        "Returns the type of key ``name``"
        return await self._client.call('TYPE', name)

    async def watch(self, *names):
        """
        Watches the values at keys ``names``, or None if the key doesn't exist
        """
        warnings.warn(DeprecationWarning('Call WATCH from a Pipeline object'))

    async def unwatch(self):
        """
        Unwatches the value at key ``name``, or None of the key doesn't exist
        """
        warnings.warn(
            DeprecationWarning('Call UNWATCH from a Pipeline object'))

    # LIST COMMANDS
    async def blpop(self, keys, timeout=0):
        """
        LPOP a value off of the first non-empty list
        named in the ``keys`` list.

        If none of the lists in ``keys`` has a value to LPOP, then block
        for ``timeout`` seconds, or until a value gets pushed on to one
        of the lists.

        If timeout is 0, then block indefinitely.
        """
        if timeout is None:
            timeout = 0
        if isinstance(keys, basestring):
            keys = [keys]
        else:
            keys = list(keys)
        keys.append(timeout)
        return await self._client.call('BLPOP', *keys)

    async def brpop(self, keys, timeout=0):
        """
        RPOP a value off of the first non-empty list
        named in the ``keys`` list.

        If none of the lists in ``keys`` has a value to LPOP, then block
        for ``timeout`` seconds, or until a value gets pushed on to one
        of the lists.

        If timeout is 0, then block indefinitely.
        """
        if timeout is None:
            timeout = 0
        if isinstance(keys, basestring):
            keys = [keys]
        else:
            keys = list(keys)
        keys.append(timeout)
        return await self._client.call('BRPOP', *keys)

    async def brpoplpush(self, src, dst, timeout=0):
        """
        Pop a value off the tail of ``src``, push it on the head of ``dst``
        and then return await it.

        This command blocks until a value is in ``src`` or until ``timeout``
        seconds elapse, whichever is first. A ``timeout`` value of 0 blocks
        forever.
        """
        if timeout is None:
            timeout = 0
        return await self._client.call('BRPOPLPUSH', src, dst, timeout)

    async def lindex(self, name, index):
        """
        Return the item from list ``name`` at position ``index``

        Negative indexes are supported and will return await an item at the
        end of the list
        """
        return await self._client.call('LINDEX', name, index)

    async def linsert(self, name, where, refvalue, value):
        """
        Insert ``value`` in list ``name`` either immediately before or after
        [``where``] ``refvalue``

        Returns the new length of the list on success or -1 if ``refvalue``
        is not in the list.
        """
        return await self._client.call('LINSERT', name, where, refvalue, value)

    async def llen(self, name):
        "Return the length of the list ``name``"
        return await self._client.call('LLEN', name)

    async def lpop(self, name):
        "Remove and return await the first item of the list ``name``"
        return await self._client.call('LPOP', name)

    async def lpush(self, name, *values):
        "Push ``values`` onto the head of the list ``name``"
        return await self._client.call('LPUSH', name, *values)

    async def lpushx(self, name, value):
        "Push ``value`` onto the head of the list ``name`` if ``name`` exists"
        return await self._client.call('LPUSHX', name, value)

    async def lrange(self, name, start, end):
        """
        Return a slice of the list ``name`` between
        position ``start`` and ``end``

        ``start`` and ``end`` can be negative numbers just like
        Python slicing notation
        """
        return await self._client.call('LRANGE', name, start, end)

    async def lrem(self, name, count, value):
        """
        Remove the first ``count`` occurrences of elements equal to ``value``
        from the list stored at ``name``.

        The count argument influences the operation in the following ways:
            count > 0: Remove elements equal to value moving from head to tail.
            count < 0: Remove elements equal to value moving from tail to head.
            count = 0: Remove all elements equal to value.
        """
        return await self._client.call('LREM', name, count, value)

    async def lset(self, name, index, value):
        "Set ``position`` of list ``name`` to ``value``"
        return await self._client.call('LSET', name, index, value)

    async def ltrim(self, name, start, end):
        """
        Trim the list ``name``, removing all values not within the slice
        between ``start`` and ``end``

        ``start`` and ``end`` can be negative numbers just like
        Python slicing notation
        """
        return await self._client.call('LTRIM', name, start, end)

    async def rpop(self, name):
        "Remove and return await the last item of the list ``name``"
        return await self._client.call('RPOP', name)

    async def rpoplpush(self, src, dst):
        """
        RPOP a value off of the ``src`` list and atomically LPUSH it
        on to the ``dst`` list.  Returns the value.
        """
        return await self._client.call('RPOPLPUSH', src, dst)

    async def rpush(self, name, *values):
        "Push ``values`` onto the tail of the list ``name``"
        return await self._client.call('RPUSH', name, *values)

    async def rpushx(self, name, value):
        "Push ``value`` onto the tail of the list ``name`` if ``name`` exists"
        return await self._client.call('RPUSHX', name, value)

    async def sort(self, name, start=None, num=None, by=None, get=None,
             desc=False, alpha=False, store=None, groups=False):
        """
        Sort and return await the list, set or sorted set at ``name``.

        ``start`` and ``num`` allow for paging through the sorted data

        ``by`` allows using an external key to weight and sort the items.
            Use an "*" to indicate where in the key the item value is located

        ``get`` allows for return awaiting items from external keys rather than the
            sorted data itself.  Use an "*" to indicate where int he key
            the item value is located

        ``desc`` allows for reversing the sort

        ``alpha`` allows for sorting lexicographically rather than numerically

        ``store`` allows for storing the result of the sort into
            the key ``store``

        ``groups`` if set to True and if ``get`` contains at least two
            elements, sort will return await a list of tuples, each containing the
            values fetched from the arguments to ``get``.

        """
        if (start is not None and num is None) or \
                (num is not None and start is None):
            raise RedisError("``start`` and ``num`` must both be specified")

        pieces = [name]
        if by is not None:
            pieces.append(Token('BY'))
            pieces.append(by)
        if start is not None and num is not None:
            pieces.append(Token('LIMIT'))
            pieces.append(start)
            pieces.append(num)
        if get is not None:
            # If get is a string assume we want to get a single value.
            # Otherwise assume it's an interable and we want to get multiple
            # values. We can't just iterate blindly because strings are
            # iterable.
            if isinstance(get, basestring):
                pieces.append(Token('GET'))
                pieces.append(get)
            else:
                for g in get:
                    pieces.append(Token('GET'))
                    pieces.append(g)
        if desc:
            pieces.append(Token('DESC'))
        if alpha:
            pieces.append(Token('ALPHA'))
        if store is not None:
            pieces.append(Token('STORE'))
            pieces.append(store)

        if groups:
            if not get or isinstance(get, basestring) or len(get) < 2:
                raise DataError('when using "groups" the "get" argument '
                                'must be specified and contain at least '
                                'two keys')

        options = {'groups': len(get) if groups else None}
        return await self._client.call('SORT', *pieces, **options)

    # SCAN COMMANDS
    async def scan(self, cursor=0, match=None, count=None):
        """
        Incrementally return await lists of key names. Also return await a cursor
        indicating the scan position.

        ``match`` allows for filtering the keys by pattern

        ``count`` allows for hint the minimum number of return awaits
        """
        pieces = [cursor]
        if match is not None:
            pieces.extend([Token('MATCH'), match])
        if count is not None:
            pieces.extend([Token('COUNT'), count])
        return await self._client.call('SCAN', *pieces)

    #async def scan_iter(self, match=None, count=None):
    #    """
    #    Make an iterator using the SCAN command so that the client doesn't
    #    need to remember the cursor position.

    #    ``match`` allows for filtering the keys by pattern

    #    ``count`` allows for hint the minimum number of return awaits
    #    """
    #    cursor = '0'
    #    while cursor != 0:
    #        cursor, data = self.scan(cursor=cursor, match=match, count=count)
    #        for item in data:
    #            yield item

    async def sscan(self, name, cursor=0, match=None, count=None):
        """
        Incrementally return await lists of elements in a set. Also return await a cursor
        indicating the scan position.

        ``match`` allows for filtering the keys by pattern

        ``count`` allows for hint the minimum number of return awaits
        """
        pieces = [name, cursor]
        if match is not None:
            pieces.extend([Token('MATCH'), match])
        if count is not None:
            pieces.extend([Token('COUNT'), count])
        return await self._client.call('SSCAN', *pieces)

    # async def sscan_iter(self, name, match=None, count=None):
    #     """
    #     Make an iterator using the SSCAN command so that the client doesn't
    #     need to remember the cursor position.

    #     ``match`` allows for filtering the keys by pattern

    #     ``count`` allows for hint the minimum number of return awaits
    #     """
    #     cursor = '0'
    #     while cursor != 0:
    #         cursor, data = self.sscan(name, cursor=cursor,
    #                                   match=match, count=count)
    #         for item in data:
    #             yield item

    async def hscan(self, name, cursor=0, match=None, count=None):
        """
        Incrementally return await key/value slices in a hash. Also return await a cursor
        indicating the scan position.

        ``match`` allows for filtering the keys by pattern

        ``count`` allows for hint the minimum number of return awaits
        """
        pieces = [name, cursor]
        if match is not None:
            pieces.extend([Token('MATCH'), match])
        if count is not None:
            pieces.extend([Token('COUNT'), count])
        return await self._client.call('HSCAN', *pieces)

    # async def hscan_iter(self, name, match=None, count=None):
    #     """
    #     Make an iterator using the HSCAN command so that the client doesn't
    #     need to remember the cursor position.

    #     ``match`` allows for filtering the keys by pattern

    #     ``count`` allows for hint the minimum number of return awaits
    #     """
    #     cursor = '0'
    #     while cursor != 0:
    #         cursor, data = self.hscan(name, cursor=cursor,
    #                                   match=match, count=count)
    #         for item in data.items():
    #             yield item

    async def zscan(self, name, cursor=0, match=None, count=None,
              score_cast_func=float):
        """
        Incrementally return await lists of elements in a sorted set. Also return await a
        cursor indicating the scan position.

        ``match`` allows for filtering the keys by pattern

        ``count`` allows for hint the minimum number of return awaits

        ``score_cast_func`` a callable used to cast the score return await value
        """
        pieces = [name, cursor]
        if match is not None:
            pieces.extend([Token('MATCH'), match])
        if count is not None:
            pieces.extend([Token('COUNT'), count])
        options = {'score_cast_func': score_cast_func}
        return await self._client.call('ZSCAN', *pieces, **options)

    # async def zscan_iter(self, name, match=None, count=None,
    #                score_cast_func=float):
    #     """
    #     Make an iterator using the ZSCAN command so that the client doesn't
    #     need to remember the cursor position.

    #     ``match`` allows for filtering the keys by pattern

    #     ``count`` allows for hint the minimum number of return awaits

    #     ``score_cast_func`` a callable used to cast the score return await value
    #     """
    #     cursor = '0'
    #     while cursor != 0:
    #         cursor, data = self.zscan(name, cursor=cursor, match=match,
    #                                   count=count,
    #                                   score_cast_func=score_cast_func)
    #         for item in data:
    #             yield item

    # SET COMMANDS
    async def sadd(self, name, *values):
        "Add ``value(s)`` to set ``name``"
        return await self._client.call('SADD', name, *values)

    async def scard(self, name):
        "Return the number of elements in set ``name``"
        return await self._client.call('SCARD', name)

    async def sdiff(self, keys, *args):
        "Return the difference of sets specified by ``keys``"
        args = list_or_args(keys, args)
        return await self._client.call('SDIFF', *args)

    async def sdiffstore(self, dest, keys, *args):
        """
        Store the difference of sets specified by ``keys`` into a new
        set named ``dest``.  Returns the number of keys in the new set.
        """
        args = list_or_args(keys, args)
        return await self._client.call('SDIFFSTORE', dest, *args)

    async def sinter(self, keys, *args):
        "Return the intersection of sets specified by ``keys``"
        args = list_or_args(keys, args)
        return await self._client.call('SINTER', *args)

    async def sinterstore(self, dest, keys, *args):
        """
        Store the intersection of sets specified by ``keys`` into a new
        set named ``dest``.  Returns the number of keys in the new set.
        """
        args = list_or_args(keys, args)
        return await self._client.call('SINTERSTORE', dest, *args)

    async def sismember(self, name, value):
        "Return a boolean indicating if ``value`` is a member of set ``name``"
        return await self._client.call('SISMEMBER', name, value)

    async def smembers(self, name):
        "Return all members of the set ``name``"
        return await self._client.call('SMEMBERS', name)

    async def smove(self, src, dst, value):
        "Move ``value`` from set ``src`` to set ``dst`` atomically"
        return await self._client.call('SMOVE', src, dst, value)

    async def spop(self, name):
        "Remove and return await a random member of set ``name``"
        return await self._client.call('SPOP', name)

    async def srandmember(self, name, number=None):
        """
        If ``number`` is None, return awaits a random member of set ``name``.

        If ``number`` is supplied, return awaits a list of ``number`` random
        memebers of set ``name``. Note this is only available when running
        Redis 2.6+.
        """
        args = number and [number] or []
        return await self._client.call('SRANDMEMBER', name, *args)

    async def srem(self, name, *values):
        "Remove ``values`` from set ``name``"
        return await self._client.call('SREM', name, *values)

    async def sunion(self, keys, *args):
        "Return the union of sets specified by ``keys``"
        args = list_or_args(keys, args)
        return await self._client.call('SUNION', *args)

    async def sunionstore(self, dest, keys, *args):
        """
        Store the union of sets specified by ``keys`` into a new
        set named ``dest``.  Returns the number of keys in the new set.
        """
        args = list_or_args(keys, args)
        return await self._client.call('SUNIONSTORE', dest, *args)

    # SORTED SET COMMANDS
    async def zadd(self, name, *args, **kwargs):
        """
        Set any number of score, element-name pairs to the key ``name``. Pairs
        can be specified in two ways:

        As *args, in the form of: score1, name1, score2, name2, ...
        or as **kwargs, in the form of: name1=score1, name2=score2, ...

        The following example would add four values to the 'my-key' key:
        redis.zadd('my-key', 1.1, 'name1', 2.2, 'name2', name3=3.3, name4=4.4)
        """
        pieces = []
        if args:
            if len(args) % 2 != 0:
                raise RedisError("ZADD requires an equal number of "
                                 "values and scores")
            pieces.extend(args)
        for pair in iteritems(kwargs):
            pieces.append(pair[1])
            pieces.append(pair[0])
        return await self._client.call('ZADD', name, *pieces)

    async def zcard(self, name):
        "Return the number of elements in the sorted set ``name``"
        return await self._client.call('ZCARD', name)

    async def zcount(self, name, min, max):
        """
        Returns the number of elements in the sorted set at key ``name`` with
        a score between ``min`` and ``max``.
        """
        return await self._client.call('ZCOUNT', name, min, max)

    async def zincrby(self, name, value, amount=1):
        "Increment the score of ``value`` in sorted set ``name`` by ``amount``"
        return await self._client.call('ZINCRBY', name, amount, value)

    async def zinterstore(self, dest, keys, aggregate=None):
        """
        Intersect multiple sorted sets specified by ``keys`` into
        a new sorted set, ``dest``. Scores in the destination will be
        aggregated based on the ``aggregate``, or SUM if none is provided.
        """
        return await self._zaggregate('ZINTERSTORE', dest, keys, aggregate)

    async def zlexcount(self, name, min, max):
        """
        Return the number of items in the sorted set ``name`` between the
        lexicographical range ``min`` and ``max``.
        """
        return await self._client.call('ZLEXCOUNT', name, min, max)


    async def zrangebylex(self, name, min, max, start=None, num=None):
        """
        Return the lexicographical range of values from sorted set ``name``
        between ``min`` and ``max``.

        If ``start`` and ``num`` are specified, then return await a slice of the
        range.
        """
        if (start is not None and num is None) or \
                (num is not None and start is None):
            raise RedisError("``start`` and ``num`` must both be specified")
        pieces = ['ZRANGEBYLEX', name, min, max]
        if start is not None and num is not None:
            pieces.extend([Token('LIMIT'), start, num])
        return await self._client.call(*pieces)

    async def zrangebyscore(self, name, min, max, start=None, num=None,
                      withscores=False, score_cast_func=float):
        """
        Return a range of values from the sorted set ``name`` with scores
        between ``min`` and ``max``.

        If ``start`` and ``num`` are specified, then return await a slice
        of the range.

        ``withscores`` indicates to return await the scores along with the values.
        The return await type is a list of (value, score) pairs

        `score_cast_func`` a callable used to cast the score return await value
        """
        if (start is not None and num is None) or \
                (num is not None and start is None):
            raise RedisError("``start`` and ``num`` must both be specified")
        pieces = ['ZRANGEBYSCORE', name, min, max]
        if start is not None and num is not None:
            pieces.extend([Token('LIMIT'), start, num])
        if withscores:
            pieces.append(Token('WITHSCORES'))
        options = {
            'withscores': withscores,
            'score_cast_func': score_cast_func
        }
        return await self._client.call(*pieces, **options)

    async def zrank(self, name, value):
        """
        Returns a 0-based value indicating the rank of ``value`` in sorted set
        ``name``
        """
        return await self._client.call('ZRANK', name, value)

    async def zrem(self, name, *values):
        "Remove member ``values`` from sorted set ``name``"
        return await self._client.call('ZREM', name, *values)

    async def zremrangebylex(self, name, min, max):
        """
        Remove all elements in the sorted set ``name`` between the
        lexicographical range specified by ``min`` and ``max``.

        Returns the number of elements removed.
        """
        return await self._client.call('ZREMRANGEBYLEX', name, min, max)

    async def zremrangebyrank(self, name, min, max):
        """
        Remove all elements in the sorted set ``name`` with ranks between
        ``min`` and ``max``. Values are 0-based, ordered from smallest score
        to largest. Values can be negative indicating the highest scores.
        Returns the number of elements removed
        """
        return await self._client.call('ZREMRANGEBYRANK', name, min, max)

    async def zremrangebyscore(self, name, min, max):
        """
        Remove all elements in the sorted set ``name`` with scores
        between ``min`` and ``max``. Returns the number of elements removed.
        """
        return await self._client.call('ZREMRANGEBYSCORE', name, min, max)

    async def zrevrange(self, name, start, end, withscores=False,
                  score_cast_func=float):
        return await self._zrange( name, start, end, withscores, score_cast_func, "ZREVRANGE")

    async def zrange(self, name, start, end, withscores=False, score_cast_func=float):
        return await self._zrange( name, start, end, withscores, score_cast_func, "ZRANGE")

    async def _zrange(self, name, start, end, withscores=False,
                  score_cast_func=float, action="ZREVRANGE"):
        """
        Return a range of values from sorted set ``name`` between
        ``start`` and ``end`` sorted in descending order.

        ``start`` and ``end`` can be negative, indicating the end of the range.

        ``withscores`` indicates to return await the scores along with the values
        The return await type is a list of (value, score) pairs

        ``score_cast_func`` a callable used to cast the score return await value
        """
        pieces = [action, name, start, end]
        if withscores:
            pieces.append('WITHSCORES')
        r = await self._client.call(*pieces)
        if withscores:
            t = []
            result = []
            for pos,i in enumerate(r):
                if pos%2:
                    t.append(score_cast_func(i))
                    result.append(tuple(t))
                    t = []
                else:
                    t.append(i)
            r = result
        return r

    async def zrevrangebyscore(self, name, max, min, start=None, num=None,
                         withscores=False, score_cast_func=float):
        """
        Return a range of values from the sorted set ``name`` with scores
        between ``min`` and ``max`` in descending order.

        If ``start`` and ``num`` are specified, then return await a slice
        of the range.

        ``withscores`` indicates to return await the scores along with the values.
        The return await type is a list of (value, score) pairs

        ``score_cast_func`` a callable used to cast the score return await value
        """
        if (start is not None and num is None) or \
                (num is not None and start is None):
            raise RedisError("``start`` and ``num`` must both be specified")
        pieces = ['ZREVRANGEBYSCORE', name, max, min]
        if start is not None and num is not None:
            pieces.extend([Token('LIMIT'), start, num])
        if withscores:
            pieces.append(Token('WITHSCORES'))
        options = {
            'withscores': withscores,
            'score_cast_func': score_cast_func
        }
        return await self._client.call(*pieces, **options)

    async def zrevrank(self, name, value):
        """
        Returns a 0-based value indicating the descending rank of
        ``value`` in sorted set ``name``
        """
        return await self._client.call('ZREVRANK', name, value)

    async def zscore(self, name, value):
        "Return the score of element ``value`` in sorted set ``name``"
        return await self._client.call('ZSCORE', name, value)

    async def zunionstore(self, dest, keys, aggregate=None):
        """
        Union multiple sorted sets specified by ``keys`` into
        a new sorted set, ``dest``. Scores in the destination will be
        aggregated based on the ``aggregate``, or SUM if none is provided.
        """
        return await self._zaggregate('ZUNIONSTORE', dest, keys, aggregate)

    async def _zaggregate(self, command, dest, keys, aggregate=None):
        pieces = [command, dest, len(keys)]
        if isinstance(keys, dict):
            keys, weights = iterkeys(keys), itervalues(keys)
        else:
            weights = None
        pieces.extend(keys)
        if weights:
            pieces.append(Token('WEIGHTS'))
            pieces.extend(weights)
        if aggregate:
            pieces.append(Token('AGGREGATE'))
            pieces.append(aggregate)
        return await self._client.call(*pieces)

    # HYPERLOGLOG COMMANDS
    async def pfadd(self, name, *values):
        "Adds the specified elements to the specified HyperLogLog."
        return await self._client.call('PFADD', name, *values)

    async def pfcount(self, name):
        """
        Return the approximated cardinality of
        the set observed by the HyperLogLog at key.
        """
        return await self._client.call('PFCOUNT', name)

    async def pfmerge(self, dest, *sources):
        "Merge N different HyperLogLogs into a single one."
        return await self._client.call('PFMERGE', dest, *sources)

    # HASH COMMANDS
    async def hdel(self, name, *keys):
        "Delete ``keys`` from hash ``name``"
        return await self._client.call('HDEL', name, *keys)

    async def hexists(self, name, key):
        "Returns a boolean indicating if ``key`` exists within hash ``name``"
        return await self._client.call('HEXISTS', name, key)

    async def hget(self, name, key):
        "Return the value of ``key`` within the hash ``name``"
        return await self._client.call('HGET', name, key)

    async def hgetall(self, name):
        "Return a Python dict of the hash's name/value pairs"
        return await self._client.call('HGETALL', name)

    async def hincrby(self, name, key, amount=1):
        "Increment the value of ``key`` in hash ``name`` by ``amount``"
        return await self._client.call('HINCRBY', name, key, amount)

    async def hincrbyfloat(self, name, key, amount=1.0):
        """
        Increment the value of ``key`` in hash ``name`` by floating ``amount``
        """
        return await self._client.call('HINCRBYFLOAT', name, key, amount)

    async def hkeys(self, name):
        "Return the list of keys within hash ``name``"
        return await self._client.call('HKEYS', name)

    async def hlen(self, name):
        "Return the number of elements in hash ``name``"
        return await self._client.call('HLEN', name)

    async def hset(self, name, key, value):
        """
        Set ``key`` to ``value`` within hash ``name``
        Returns 1 if HSET created a new field, otherwise 0
        """
        return await self._client.call('HSET', name, key, value)

    async def hsetnx(self, name, key, value):
        """
        Set ``key`` to ``value`` within hash ``name`` if ``key`` does not
        exist.  Returns 1 if HSETNX created a field, otherwise 0.
        """
        return await self._client.call('HSETNX', name, key, value)

    async def hmset(self, name, mapping):
        """
        Set key to value within hash ``name`` for each corresponding
        key and value from the ``mapping`` dict.
        """
        if not mapping:
            raise DataError("'hmset' with 'mapping' of length 0")
        items = []
        for pair in iteritems(mapping):
            items.extend(pair)
        return await self._client.call('HMSET', name, *items)

    async def hmget(self, name, keys, *args):
        "Returns a list of values ordered identically to ``keys``"
        args = list_or_args(keys, args)
        return await self._client.call('HMGET', name, *args)

    async def hvals(self, name):
        "Return the list of values within hash ``name``"
        return await self._client.call('HVALS', name)

    async def publish(self, channel, message):
        """
        Publish ``message`` on ``channel``.
        Returns the number of subscribers the message was delivered to.
        """
        return await self._client.call('PUBLISH', channel, message)

    async def eval(self, script, numkeys, *keys_and_args):
        """
        Execute the Lua ``script``, specifying the ``numkeys`` the script
        will touch and the key names and argument values in ``keys_and_args``.
        Returns the result of the script.

        In practice, use the object return awaited by ``register_script``. This
        function exists purely for Redis API completion.
        """
        return await self._client.call('EVAL', script, numkeys, *keys_and_args)

    async def evalsha(self, sha, numkeys, *keys_and_args):
        """
        Use the ``sha`` to execute a Lua script already registered via EVAL
        or SCRIPT LOAD. Specify the ``numkeys`` the script will touch and the
        key names and argument values in ``keys_and_args``. Returns the result
        of the script.

        In practice, use the object return awaited by ``register_script``. This
        function exists purely for Redis API completion.
        """
        return await self._client.call('EVALSHA', sha, numkeys, *keys_and_args)

    async def script_exists(self, *args):
        """
        Check if a script exists in the script cache by specifying the SHAs of
        each script as ``args``. Returns a list of boolean values indicating if
        if each already script exists in the cache.
        """
        return await self._client.call('SCRIPT EXISTS', *args)

    async def script_flush(self):
        "Flush all scripts from the script cache"
        return await self._client.call('SCRIPT FLUSH')

    async def script_kill(self):
        "Kill the currently executing Lua script"
        return await self._client.call('SCRIPT KILL')

    async def script_load(self, script):
        "Load a Lua ``script`` into the script cache. Returns the SHA."
        return await self._client.call('SCRIPT LOAD', script)

    async def register_script(self, script):
        """
        Register a Lua ``script`` specifying the ``keys`` it will touch.
        Returns a Script object that is callable and hides the complexity of
        deal with scripts, keys, and shas. This is the preferred way to work
        with Lua scripts.
        """
        return await Script(self, script)


Document or deal with hiredis.ReplyError

import tornado
import tornadis
import logging
logging.basicConfig(level=logging.CRITICAL)

@tornado.gen.coroutine
def talk_to_redis():
result = yield client.call("PNG")
print type(result)
print "Result: %s" % result

loop = tornado.ioloop.IOLoop.instance()
client = tornadis.Client()
loop.run_sync(talk_to_redis)

how to run in production?

I run multiple tornado workers for production, curious, what's the best way of sharing the connection pool? Should each worker open it's own redis connection?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.