Limiter/restreindre le taux de requêtes HTTP dans les requêtes Grequest
j'écris un petit script en Python 2.7.3 avec GRequests et lxml qui me permettra de recueillir quelques prix de carte de collection de divers sites Web et de les comparer. Le problème est que l'un des sites Web limite le nombre de requêtes et renvoie L'erreur HTTP 429 si je la dépasse.
y-a-t-il un moyen d'ajouter la limitation du nombre de requêtes dans GRequestes pour que je ne dépasse pas le nombre de requêtes par seconde que je spécifie? Aussi - comment puis-je faire GRequestes retry après un certain temps si HTTP 429 se produit?
sur une note secondaire-leur limite est ridiculement basse. Quelque chose comme 8 demandes par 15 secondes. Je l'ai brisée avec mon navigateur à plusieurs reprises juste rafraîchir la page en attendant des changements de prix.
4 réponses
je vais répondre à ma propre question puisque j'ai dû m'en rendre compte par moi-même et il semble y avoir très peu d'information sur ce qui se passe autour.
l'idée est La suivante. Chaque objet request utilisé avec GRequests peut prendre un objet session comme paramètre lorsqu'il est créé. D'autre part, les objets de Session peuvent avoir des adaptateurs HTTP montés qui sont utilisés lors des requêtes. En créant notre propre adaptateur, nous pouvons intercepter les requêtes et les limiter de la manière que nous trouvons la meilleure pour notre application. Dans mon cas je me suis retrouvé avec le code ci-dessous.
Objet utilisé pour la limitation:
DEFAULT_BURST_WINDOW = datetime.timedelta(seconds=5)
DEFAULT_WAIT_WINDOW = datetime.timedelta(seconds=15)
class BurstThrottle(object):
max_hits = None
hits = None
burst_window = None
total_window = None
timestamp = None
def __init__(self, max_hits, burst_window, wait_window):
self.max_hits = max_hits
self.hits = 0
self.burst_window = burst_window
self.total_window = burst_window + wait_window
self.timestamp = datetime.datetime.min
def throttle(self):
now = datetime.datetime.utcnow()
if now < self.timestamp + self.total_window:
if (now < self.timestamp + self.burst_window) and (self.hits < self.max_hits):
self.hits += 1
return datetime.timedelta(0)
else:
return self.timestamp + self.total_window - now
else:
self.timestamp = now
self.hits = 1
return datetime.timedelta(0)
adaptateur HTTP:
class MyHttpAdapter(requests.adapters.HTTPAdapter):
throttle = None
def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE,
pool_maxsize=requests.adapters.DEFAULT_POOLSIZE, max_retries=requests.adapters.DEFAULT_RETRIES,
pool_block=requests.adapters.DEFAULT_POOLBLOCK, burst_window=DEFAULT_BURST_WINDOW,
wait_window=DEFAULT_WAIT_WINDOW):
self.throttle = BurstThrottle(pool_maxsize, burst_window, wait_window)
super(MyHttpAdapter, self).__init__(pool_connections=pool_connections, pool_maxsize=pool_maxsize,
max_retries=max_retries, pool_block=pool_block)
def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None):
request_successful = False
response = None
while not request_successful:
wait_time = self.throttle.throttle()
while wait_time > datetime.timedelta(0):
gevent.sleep(wait_time.total_seconds(), ref=True)
wait_time = self.throttle.throttle()
response = super(MyHttpAdapter, self).send(request, stream=stream, timeout=timeout,
verify=verify, cert=cert, proxies=proxies)
if response.status_code != 429:
request_successful = True
return response
Installation:
requests_adapter = adapter.MyHttpAdapter(
pool_connections=__CONCURRENT_LIMIT__,
pool_maxsize=__CONCURRENT_LIMIT__,
max_retries=0,
pool_block=False,
burst_window=datetime.timedelta(seconds=5),
wait_window=datetime.timedelta(seconds=20))
requests_session = requests.session()
requests_session.mount('http://', requests_adapter)
requests_session.mount('https://', requests_adapter)
unsent_requests = (grequests.get(url,
hooks={'response': handle_response},
session=requests_session) for url in urls)
grequests.map(unsent_requests, size=__CONCURRENT_LIMIT__)
jetez un coup d'oeil à ceci pour les demandes automatiques: https://pypi.python.org/pypi/RequestsThrottler/0.2.2
vous pouvez définir à la fois un délai fixe entre chaque requête ou un nombre de requêtes pour envoyer un nombre fixe de secondes (ce qui est essentiellement la même chose):
import requests
from requests_throttler import BaseThrottler
request = requests.Request(method='GET', url='http://www.google.com')
reqs = [request for i in range(0, 5)] # An example list of requests
with BaseThrottler(name='base-throttler', delay=1.5) as bt:
throttled_requests = bt.multi_submit(reqs)
la fonction multi_submit
renvoie une liste de ThrottledRequest
(voir doc: lien à la fin).
vous pouvez alors l'accès à l'réponses:
for tr in throttled_requests:
print tr.response
alternativement vous pouvez obtenir la même chose en spécifiant le nombre ou les requêtes à envoyer dans un temps déterminé (par exemple 15 requêtes toutes les 60 secondes):
import requests
from requests_throttler import BaseThrottler
request = requests.Request(method='GET', url='http://www.google.com')
reqs = [request for i in range(0, 5)] # An example list of requests
with BaseThrottler(name='base-throttler', reqs_over_time=(15, 60)) as bt:
throttled_requests = bt.multi_submit(reqs)
les deux solutions peuvent être mises en œuvre sans l'utilisation du with
déclaration:
import requests
from requests_throttler import BaseThrottler
request = requests.Request(method='GET', url='http://www.google.com')
reqs = [request for i in range(0, 5)] # An example list of requests
bt = BaseThrottler(name='base-throttler', delay=1.5)
bt.start()
throttled_requests = bt.multi_submit(reqs)
bt.shutdown()
Pour plus de détails: http://pythonhosted.org/RequestsThrottler/index.html
ne ressemble pas à il n'y a pas de mécanisme simple pour gérer cette compilation dans les requêtes ou le code de grequests. Le seul crochet qui semble être autour est pour les réponses.
voici un super travail hacky pour au moins prouver qu'il est possible - j'ai modifié les grequests pour garder une liste de l'heure à laquelle une demande a été émise et dormir la création de L'AsyncRequest jusqu'à ce que les demandes par seconde étaient inférieures au maximum.
class AsyncRequest(object):
def __init__(self, method, url, **kwargs):
print self,'init'
waiting=True
while waiting:
if len([x for x in q if x > time.time()-15]) < 8:
q.append(time.time())
waiting=False
else:
print self,'snoozing'
gevent.sleep(1)
vous pouvez utiliser les grequests.imap() pour regarder ce de manière interactive
import time
import rg
urls = [
'http://www.heroku.com',
'http://python-tablib.org',
'http://httpbin.org',
'http://python-requests.org',
'http://kennethreitz.com',
'http://www.cnn.com',
]
def print_url(r, *args, **kwargs):
print(r.url),time.time()
hook_dict=dict(response=print_url)
rs = (rg.get(u, hooks=hook_dict) for u in urls)
for r in rg.imap(rs):
print r
j'aimerais qu'il y ait une solution plus élégante, mais jusqu'à présent je n'en trouve pas. J'ai regardé dans les sessions et les adaptateurs. Peut-être que le poolmanager pourrait être augmenté à la place?
aussi, je ne mettrais pas ce code en production - la liste 'q' ne sera jamais tronquée et finira par devenir assez grande. En plus, je ne sais pas si ça marche comme annoncé. On dirait que c'est le cas quand je regarde la sortie de la console.
Ugh. Simplement en regardant ce code je peux dire qu'il est 3h du matin. Le temps de goto lit.
j'ai eu un problème similaire. Voici ma solution. Dans votre cas, je voudrais faire:
def worker():
with rate_limit('slow.domain.com', 2):
response = requests.get('https://slow.domain.com/path')
text = response.text
# Use `text`
en présumant que vous avez plusieurs domaines à partir desquels vous faites le tri, je mettrais en place un dictionnaire de mappage (domain, delay)
donc vous n'atteignez pas vos limites de taux.
Ce code suppose que vous allez utiliser gevent et monkey patch.
from contextlib import contextmanager
from gevent.event import Event
from gevent.queue import Queue
from time import time
def rate_limit(resource, delay, _queues={}):
"""Delay use of `resource` until after `delay` seconds have passed.
Example usage:
def worker():
with rate_limit('foo.bar.com', 1):
response = requests.get('https://foo.bar.com/path')
text = response.text
# use `text`
This will serialize and delay requests from multiple workers for resource
'foo.bar.com' by 1 second.
"""
if resource not in _queues:
queue = Queue()
gevent.spawn(_watch, queue)
_queues[resource] = queue
return _resource_manager(_queues[resource], delay)
def _watch(queue):
"Watch `queue` and wake event listeners after delay."
last = 0
while True:
event, delay = queue.get()
now = time()
if (now - last) < delay:
gevent.sleep(delay - (now - last))
event.set() # Wake worker but keep control.
event.clear()
event.wait() # Yield control until woken.
last = time()
@contextmanager
def _resource_manager(queue, delay):
"`with` statement support for `rate_limit`."
event = Event()
queue.put((event, delay))
event.wait() # Wait for queue watcher to wake us.
yield
event.set() # Wake queue watcher.