Limiter/restreindre le taux de requêtes HTTP dans les requêtes Grequest

j'écris un petit script en Python 2.7.3 avec GRequests et lxml qui me permettra de recueillir quelques prix de carte de collection de divers sites Web et de les comparer. Le problème est que l'un des sites Web limite le nombre de requêtes et renvoie L'erreur HTTP 429 si je la dépasse.

y-a-t-il un moyen d'ajouter la limitation du nombre de requêtes dans GRequestes pour que je ne dépasse pas le nombre de requêtes par seconde que je spécifie? Aussi - comment puis-je faire GRequestes retry après un certain temps si HTTP 429 se produit?

sur une note secondaire-leur limite est ridiculement basse. Quelque chose comme 8 demandes par 15 secondes. Je l'ai brisée avec mon navigateur à plusieurs reprises juste rafraîchir la page en attendant des changements de prix.

22
demandé sur culix 2013-11-27 20:06:15

4 réponses

je vais répondre à ma propre question puisque j'ai dû m'en rendre compte par moi-même et il semble y avoir très peu d'information sur ce qui se passe autour.

l'idée est La suivante. Chaque objet request utilisé avec GRequests peut prendre un objet session comme paramètre lorsqu'il est créé. D'autre part, les objets de Session peuvent avoir des adaptateurs HTTP montés qui sont utilisés lors des requêtes. En créant notre propre adaptateur, nous pouvons intercepter les requêtes et les limiter de la manière que nous trouvons la meilleure pour notre application. Dans mon cas je me suis retrouvé avec le code ci-dessous.

Objet utilisé pour la limitation:

DEFAULT_BURST_WINDOW = datetime.timedelta(seconds=5)
DEFAULT_WAIT_WINDOW = datetime.timedelta(seconds=15)


class BurstThrottle(object):
    max_hits = None
    hits = None
    burst_window = None
    total_window = None
    timestamp = None

    def __init__(self, max_hits, burst_window, wait_window):
        self.max_hits = max_hits
        self.hits = 0
        self.burst_window = burst_window
        self.total_window = burst_window + wait_window
        self.timestamp = datetime.datetime.min

    def throttle(self):
        now = datetime.datetime.utcnow()
        if now < self.timestamp + self.total_window:
            if (now < self.timestamp + self.burst_window) and (self.hits < self.max_hits):
                self.hits += 1
                return datetime.timedelta(0)
            else:
                return self.timestamp + self.total_window - now
        else:
            self.timestamp = now
            self.hits = 1
            return datetime.timedelta(0)

adaptateur HTTP:

class MyHttpAdapter(requests.adapters.HTTPAdapter):
    throttle = None

    def __init__(self, pool_connections=requests.adapters.DEFAULT_POOLSIZE,
                 pool_maxsize=requests.adapters.DEFAULT_POOLSIZE, max_retries=requests.adapters.DEFAULT_RETRIES,
                 pool_block=requests.adapters.DEFAULT_POOLBLOCK, burst_window=DEFAULT_BURST_WINDOW,
                 wait_window=DEFAULT_WAIT_WINDOW):
        self.throttle = BurstThrottle(pool_maxsize, burst_window, wait_window)
        super(MyHttpAdapter, self).__init__(pool_connections=pool_connections, pool_maxsize=pool_maxsize,
                                            max_retries=max_retries, pool_block=pool_block)

    def send(self, request, stream=False, timeout=None, verify=True, cert=None, proxies=None):
        request_successful = False
        response = None
        while not request_successful:
            wait_time = self.throttle.throttle()
            while wait_time > datetime.timedelta(0):
                gevent.sleep(wait_time.total_seconds(), ref=True)
                wait_time = self.throttle.throttle()

            response = super(MyHttpAdapter, self).send(request, stream=stream, timeout=timeout,
                                                       verify=verify, cert=cert, proxies=proxies)

            if response.status_code != 429:
                request_successful = True

        return response

Installation:

requests_adapter = adapter.MyHttpAdapter(
    pool_connections=__CONCURRENT_LIMIT__,
    pool_maxsize=__CONCURRENT_LIMIT__,
    max_retries=0,
    pool_block=False,
    burst_window=datetime.timedelta(seconds=5),
    wait_window=datetime.timedelta(seconds=20))

requests_session = requests.session()
requests_session.mount('http://', requests_adapter)
requests_session.mount('https://', requests_adapter)

unsent_requests = (grequests.get(url,
                                 hooks={'response': handle_response},
                                 session=requests_session) for url in urls)
grequests.map(unsent_requests, size=__CONCURRENT_LIMIT__)
23
répondu Bartłomiej Siwek 2013-12-04 01:42:25

jetez un coup d'oeil à ceci pour les demandes automatiques: https://pypi.python.org/pypi/RequestsThrottler/0.2.2

vous pouvez définir à la fois un délai fixe entre chaque requête ou un nombre de requêtes pour envoyer un nombre fixe de secondes (ce qui est essentiellement la même chose):

import requests
from requests_throttler import BaseThrottler

request = requests.Request(method='GET', url='http://www.google.com')
reqs = [request for i in range(0, 5)]  # An example list of requests
with BaseThrottler(name='base-throttler', delay=1.5) as bt:
    throttled_requests = bt.multi_submit(reqs)

la fonction multi_submit renvoie une liste de ThrottledRequest (voir doc: lien à la fin).

vous pouvez alors l'accès à l'réponses:

for tr in throttled_requests:
    print tr.response

alternativement vous pouvez obtenir la même chose en spécifiant le nombre ou les requêtes à envoyer dans un temps déterminé (par exemple 15 requêtes toutes les 60 secondes):

import requests
from requests_throttler import BaseThrottler

request = requests.Request(method='GET', url='http://www.google.com')
reqs = [request for i in range(0, 5)]  # An example list of requests
with BaseThrottler(name='base-throttler', reqs_over_time=(15, 60)) as bt:
    throttled_requests = bt.multi_submit(reqs)

les deux solutions peuvent être mises en œuvre sans l'utilisation du with déclaration:

import requests
from requests_throttler import BaseThrottler

request = requests.Request(method='GET', url='http://www.google.com')
reqs = [request for i in range(0, 5)]  # An example list of requests
bt = BaseThrottler(name='base-throttler', delay=1.5)
bt.start()
throttled_requests = bt.multi_submit(reqs)
bt.shutdown()

Pour plus de détails: http://pythonhosted.org/RequestsThrottler/index.html

9
répondu se7entyse7en 2014-02-05 22:37:24

ne ressemble pas à il n'y a pas de mécanisme simple pour gérer cette compilation dans les requêtes ou le code de grequests. Le seul crochet qui semble être autour est pour les réponses.

voici un super travail hacky pour au moins prouver qu'il est possible - j'ai modifié les grequests pour garder une liste de l'heure à laquelle une demande a été émise et dormir la création de L'AsyncRequest jusqu'à ce que les demandes par seconde étaient inférieures au maximum.

class AsyncRequest(object):
    def __init__(self, method, url, **kwargs):
        print self,'init'
        waiting=True
        while waiting:
            if len([x for x in q if x > time.time()-15]) < 8:
                q.append(time.time())
                waiting=False
            else:
                print self,'snoozing'
                gevent.sleep(1)

vous pouvez utiliser les grequests.imap() pour regarder ce de manière interactive

import time
import rg

urls = [
        'http://www.heroku.com',
        'http://python-tablib.org',
        'http://httpbin.org',
        'http://python-requests.org',
        'http://kennethreitz.com',
        'http://www.cnn.com',
]

def print_url(r, *args, **kwargs):
        print(r.url),time.time()

hook_dict=dict(response=print_url)
rs = (rg.get(u, hooks=hook_dict) for u in urls)
for r in rg.imap(rs):
        print r

j'aimerais qu'il y ait une solution plus élégante, mais jusqu'à présent je n'en trouve pas. J'ai regardé dans les sessions et les adaptateurs. Peut-être que le poolmanager pourrait être augmenté à la place?

aussi, je ne mettrais pas ce code en production - la liste 'q' ne sera jamais tronquée et finira par devenir assez grande. En plus, je ne sais pas si ça marche comme annoncé. On dirait que c'est le cas quand je regarde la sortie de la console.

Ugh. Simplement en regardant ce code je peux dire qu'il est 3h du matin. Le temps de goto lit.

1
répondu synthesizerpatel 2013-11-28 10:50:28

j'ai eu un problème similaire. Voici ma solution. Dans votre cas, je voudrais faire:

def worker():
    with rate_limit('slow.domain.com', 2):
        response = requests.get('https://slow.domain.com/path')
        text = response.text
    # Use `text`

en présumant que vous avez plusieurs domaines à partir desquels vous faites le tri, je mettrais en place un dictionnaire de mappage (domain, delay) donc vous n'atteignez pas vos limites de taux.

Ce code suppose que vous allez utiliser gevent et monkey patch.

from contextlib import contextmanager
from gevent.event import Event
from gevent.queue import Queue
from time import time


def rate_limit(resource, delay, _queues={}):
    """Delay use of `resource` until after `delay` seconds have passed.

    Example usage:

    def worker():
        with rate_limit('foo.bar.com', 1):
            response = requests.get('https://foo.bar.com/path')
            text = response.text
        # use `text`

    This will serialize and delay requests from multiple workers for resource
    'foo.bar.com' by 1 second.

    """

    if resource not in _queues:
        queue = Queue()
        gevent.spawn(_watch, queue)
        _queues[resource] = queue

    return _resource_manager(_queues[resource], delay)


def _watch(queue):
    "Watch `queue` and wake event listeners after delay."

    last = 0

    while True:
        event, delay = queue.get()

        now = time()

        if (now - last) < delay:
            gevent.sleep(delay - (now - last))

        event.set()   # Wake worker but keep control.
        event.clear()
        event.wait()  # Yield control until woken.

        last = time()


@contextmanager
def _resource_manager(queue, delay):
    "`with` statement support for `rate_limit`."

    event = Event()
    queue.put((event, delay))

    event.wait() # Wait for queue watcher to wake us.

    yield

    event.set()  # Wake queue watcher.
0
répondu GrantJ 2015-11-17 21:29:35