version = '1.1.4.3'
author = 'ported by gcd0318, origined by Liao Xuefeng ([email protected])'
'''
Python client SDK for sina weibo API using OAuth 2.
'''
ported to python3
from io import StringIO, BytesIO
import gzip, time, json, hmac, base64, hashlib, urllib.request, urllib.parse, urllib.error, urllib.request, urllib.error, urllib.parse, logging, mimetypes, collections
class APIError(Exception):
'''
raise APIError if receiving json message indicating failure.
'''
def init(self, error_code, error, request):
self.error_code = error_code
self.error = error
self.request = request
Exception.init(self, error)
def __str__(self):
return 'APIError: %s: %s, request: %s' % (self.error_code, self.error, self.request)
def _parse_json(s):
' parse str into JsonDict '
def _obj_hook(pairs):
' convert json object to python object '
o = JsonDict()
for k, v in list(pairs.items()):
o[str(k)] = v
return o
return json.loads(s, object_hook=_obj_hook)
class JsonDict(dict):
' general json object that allows attributes to be bound to and also behaves like a dict '
def __getattr__(self, attr):
try:
return self[attr]
except KeyError:
raise AttributeError(r"'JsonDict' object has no attribute '%s'" % attr)
def __setattr__(self, attr, value):
self[attr] = value
def _encode_params(**kw):
'''
do url-encode parameters
_encode_params(a=1, b='R&D')
'a=1&b=R%26D'
_encode_params(a=u'\u4e2d\u6587', b=['A', 'B', 123])
'a=%E4%B8%AD%E6%96%87&b=A&b=B&b=123'
'''
args = []
for k, v in list(kw.items()):
qv = None
if isinstance(v, str):
qv = v
elif isinstance(v, collections.Iterable):
for i in v:
qv = str(i)
args.append('%s=%s' % (k, urllib.parse.quote(qv)))
qv = None
else:
qv = str(v)
if qv:
args.append('%s=%s' % (k, urllib.parse.quote(qv)))
return '&'.join(args)
def _encode_multipart(**kw):
boundary = '----------%s' % hex(int(time.time() * 1000))
data = []
for k, v in list(kw.items()):
data.append('--%s' % boundary)
if hasattr(v, 'read'):
# file-like object:
filename = getattr(v, 'name', '')
data.append('Content-Disposition: form-data; name="%s"; filename="hidden"' % k)
data.append('Content-Type: %s\r\n' % _guess_content_type(filename))
data.append(v.read().decode('iso-8859-1'))
else:
data.append('Content-Disposition: form-data; name="%s"\r\n' % k)
qv = urllib.parse.quote(v)
data.append(urllib.parse.quote(v) if isinstance(v, str) else v.decode('utf-8'))
data.append('--%s--\r\n' % boundary)
return '\r\n'.join(data).encode('iso-8859-1'), boundary
def _guess_content_type(url):
n = url.rfind('.')
if n==(-1):
return 'application/octet-stream'
ext = url[n:]
return mimetypes.types_map.get(ext, 'application/octet-stream')
_HTTP_GET = 0
_HTTP_POST = 1
_HTTP_UPLOAD = 2
def _http_get(url, authorization=None, *_kw):
logging.info('GET %s' % url)
return _http_call(url, _HTTP_GET, authorization, *_kw)
def _http_post(url, authorization=None, *_kw):
logging.info('POST %s' % url)
return _http_call(url, _HTTP_POST, authorization, *_kw)
def _http_upload(url, authorization=None, *_kw):
logging.info('MULTIPART POST %s' % url)
return _http_call(url, _HTTP_UPLOAD, authorization, *_kw)
def _read_body(obj):
using_gzip = obj.headers.get('Content-Encoding', '')=='gzip'
body = obj.read()
if using_gzip:
gzipper = gzip.GzipFile(fileobj=BytesIO(body))
fcontent = gzipper.read()
gzipper.close()
return fcontent.decode('utf-8')
return body.decode('utf-8')
def _http_call(the_url, method, authorization, **kw):
'''
send an http request and return a json object if no error occurred.
'''
params = None
boundary = None
if method==_HTTP_UPLOAD:
# fix sina upload url:
the_url = the_url.replace('https://api.', 'https://upload.api.')
params, boundary = _encode_multipart(**kw)
else:
params = _encode_params(**kw)
if '/remind/' in the_url:
# fix sina remind api:
the_url = the_url.replace('https://api.', 'https://rm.api.')
http_url = '%s?%s' % (the_url, params) if method==_HTTP_GET else the_url
http_body = None if method==_HTTP_GET else params
if http_body:
http_body = http_body.encode('utf-8')
req = urllib.request.Request(http_url, data=http_body)
req.add_header('Accept-Encoding', 'gzip')
if authorization:
req.add_header('Authorization', 'OAuth2 %s' % authorization)
if boundary:
req.add_header('Content-Type', 'multipart/form-data; boundary=%s' % boundary)
try:
resp = urllib.request.urlopen(req)
body = _read_body(resp)
r = _parse_json(body)
if hasattr(r, 'error_code'):
raise APIError(r.error_code, r.get('error', ''), r.get('request', ''))
return r
except urllib.error.HTTPError as e:
try:
r = _parse_json(_read_body(e))
except:
r = None
if hasattr(r, 'error_code'):
raise APIError(r.error_code, r.get('error', ''), r.get('request', ''))
raise e
class HttpObject(object):
def __init__(self, client, method):
self.client = client
self.method = method
def __getattr__(self, attr):
def wrap(**kw):
if self.client.is_expires():
raise APIError('21327', 'expired_token', attr)
return _http_call('%s%s.json' % (self.client.api_url, attr.replace('__', '/')), self.method, self.client.access_token, **kw)
return wrap
class APIClient(object):
'''
API client using synchronized invocation.
'''
def init(self, app_key, app_secret, redirect_uri=None, response_type='code', domain='api.weibo.com', version='2'):
self.client_id = str(app_key)
self.client_secret = str(app_secret)
self.redirect_uri = redirect_uri
self.response_type = response_type
self.auth_url = 'https://%s/oauth2/' % domain
self.api_url = 'https://%s/%s/' % (domain, version)
self.access_token = None
self.expires = 0.0
self.get = HttpObject(self, _HTTP_GET)
self.post = HttpObject(self, _HTTP_POST)
self.upload = HttpObject(self, _HTTP_UPLOAD)
def parse_signed_request(self, signed_request):
'''
parse signed request when using in-site app.
Returns:
dict object like { 'uid': 12345, 'access_token': 'ABC123XYZ', 'expires': unix-timestamp },
or None if parse failed.
'''
def _b64_normalize(s):
appendix = '=' * (4 - len(s) % 4)
return s.replace('-', '+').replace('_', '/') + appendix
sr = str(signed_request)
logging.info('parse signed request: %s' % sr)
enc_sig, enc_payload = sr.split('.', 1)
sig = base64.b64decode(_b64_normalize(enc_sig))
data = _parse_json(base64.b64decode(_b64_normalize(enc_payload)))
if data['algorithm'] != 'HMAC-SHA256':
return None
expected_sig = hmac.new(self.client_secret, enc_payload, hashlib.sha256).digest();
if expected_sig==sig:
data.user_id = data.uid = data.get('user_id', None)
data.access_token = data.get('oauth_token', None)
expires = data.get('expires', None)
if expires:
data.expires = data.expires_in = time.time() + expires
return data
return None
def set_access_token(self, access_token, expires):
self.access_token = str(access_token)
self.expires = float(expires)
def get_authorize_url(self, redirect_uri=None, **kw):
'''
return the authorization url that the user should be redirected to.
'''
redirect = redirect_uri if redirect_uri else self.redirect_uri
if not redirect:
raise APIError('21305', 'Parameter absent: redirect_uri', 'OAuth2 request')
response_type = kw.pop('response_type', 'code')
return '%s%s?%s' % (self.auth_url, 'authorize',
_encode_params(client_id = self.client_id,
response_type = response_type,
redirect_uri = redirect, **kw))
def _parse_access_token(self, r):
'''
new:return access token as a JsonDict: {"access_token":"your-access-token","expires_in":12345678,"uid":1234}, expires_in is represented using standard unix-epoch-time
'''
current = int(time.time())
expires = r.expires_in + current
remind_in = r.get('remind_in', None)
if remind_in:
rtime = int(remind_in) + current
if rtime < expires:
expires = rtime
return JsonDict(access_token=r.access_token, expires=expires, expires_in=expires, uid=r.get('uid', None))
def request_access_token(self, code, redirect_uri=None):
redirect = redirect_uri if redirect_uri else self.redirect_uri
if not redirect:
raise APIError('21305', 'Parameter absent: redirect_uri', 'OAuth2 request')
r = _http_post('%s%s' % (self.auth_url, 'access_token'), \
client_id = self.client_id, \
client_secret = self.client_secret, \
redirect_uri = redirect, \
code = code, grant_type = 'authorization_code')
return self._parse_access_token(r)
def refresh_token(self, refresh_token):
req_str = '%s%s' % (self.auth_url, 'access_token')
r = _http_post(req_str, \
client_id = self.client_id, \
client_secret = self.client_secret, \
refresh_token = refresh_token, \
grant_type = 'refresh_token')
return self._parse_access_token(r)
def is_expires(self):
return not self.access_token or time.time() > self.expires
def __getattr__(self, attr):
if '__' in attr:
return getattr(self.get, attr)
return _Callable(self, attr)
_METHOD_MAP = { 'GET': _HTTP_GET, 'POST': _HTTP_POST, 'UPLOAD': _HTTP_UPLOAD }
class _Executable(object):
def __init__(self, client, method, path):
self._client = client
self._method = method
self._path = path
def __call__(self, **kw):
method = _METHOD_MAP[self._method]
if method==_HTTP_POST and 'pic' in kw:
method = _HTTP_UPLOAD
return _http_call('%s%s.json' % (self._client.api_url, self._path), method, self._client.access_token, **kw)
def __str__(self):
return '_Executable (%s %s)' % (self._method, self._path)
__repr__ = __str__
class _Callable(object):
def __init__(self, client, name):
self._client = client
self._name = name
def __getattr__(self, attr):
if attr=='get':
return _Executable(self._client, 'GET', self._name)
if attr=='post':
return _Executable(self._client, 'POST', self._name)
name = '%s/%s' % (self._name, attr)
return _Callable(self._client, name)
def __str__(self):
return '_Callable (%s)' % self._name
__repr__ = __str__
if name=='main':
import doctest
doctest.testmod()