tampon circulaire efficace?

Je veux créer un tampon circulaire efficace en python (dans le but de prendre des moyennes des valeurs entières dans le tampon).

Est-ce un moyen efficace d'utiliser une liste pour collecter des valeurs?

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

Qu'est-ce qui serait plus efficace (et pourquoi)?

77
demandé sur Kromster 2010-11-11 07:17:18

12 réponses

Je voudrais utiliser collections.deque avec un maxlen arg

>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
...     d.append(i)
... 
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)

Il y a une recette dans les documents pour deque qui est similaire à ce que vous voulez. Mon affirmation selon laquelle c'est le plus efficace repose entièrement sur le fait qu'il est implémenté en C par un équipage incroyablement qualifié qui a l'habitude de lancer du code de premier ordre.

153
répondu aaronasterling 2010-11-11 05:07:45

Sauter de la tête d'une liste provoque la liste entière à copier, donc est inefficace

Vous devriez plutôt utiliser une Liste / Tableau de taille fixe et un index qui se déplace dans le tampon lorsque vous ajoutez / supprimez des éléments

10
répondu John La Rooy 2010-11-11 04:28:06

La deque de Python est lente. Vous pouvez également utiliser numpy.rouler au lieu Comment faire pivoter les nombres dans un tableau numpy de shape (N,) ou (n,1)?

Dans ce benchmark, deque est 448ms. Numpy.rouleau est 29 ms http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/

8
répondu Orvar Korvar 2017-05-23 12:34:37

Ok avec l'utilisation de la classe deque, mais pour les requeriments de la question (moyenne) c'est ma solution:

>>> from collections import deque
>>> class CircularBuffer(deque):
...     def __init__(self, size=0):
...             super(CircularBuffer, self).__init__(maxlen=size)
...     @property
...     def average(self):  # TODO: Make type check for integer or floats
...             return sum(self)/len(self)
...
>>>
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
...     cb.append(i)
...     print "@%s, Average: %s" % (cb, cb.average)
...
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14
5
répondu SmartElectron 2013-09-02 15:54:14

Basé sur la réponse de MoonCactus , Voici une classe circularlist. La différence avec sa version est qu'ici c[0] donnera toujours l'élément ajouté le plus ancien, c[-1] l'élément ajouté le plus récent, c[-2] l'avant-dernier... Ceci est plus naturel pour les applications.

c = circularlist(4)
c.append(1); print c, c[0], c[-1]    #[1] (1 items)              first 1, last 1
c.append(2); print c, c[0], c[-1]    #[1, 2] (2 items)           first 1, last 2
c.append(3); print c, c[0], c[-1]    #[1, 2, 3] (3 items)        first 1, last 3
c.append(8); print c, c[0], c[-1]    #[1, 2, 3, 8] (4 items)     first 1, last 8
c.append(10); print c, c[0], c[-1]   #[10, 2, 3, 8] (4 items)    first 2, last 10
c.append(11); print c, c[0], c[-1]   #[10, 11, 3, 8] (4 items)   first 3, last 11

Classe:

class circularlist(object):
    def __init__(self, size):
        """Initialization"""
        self.index = 0
        self.size = size
        self._data = []

    def append(self, value):
        """Append an element"""
        if len(self._data) == self.size:
            self._data[self.index] = value
        else:
            self._data.append(value)
        self.index = (self.index + 1) % self.size

    def __getitem__(self, key):
        """Get element by index, relative to the current index"""
        if len(self._data) == self.size:
            return(self._data[(key + self.index) % self.size])
        else:
            return(self._data[key])

    def __repr__(self):
        """Return string representation"""
        return self._data.__repr__() + ' (' + str(len(self._data))+' items)'
4
répondu Basj 2017-05-23 11:47:19

Vous pouvez également voir cette assez ancienne recette Python .

Voici ma propre version avec NumPy array:

#!/usr/bin/env python

import numpy as np

class RingBuffer(object):
    def __init__(self, size_max, default_value=0.0, dtype=float):
        """initialization"""
        self.size_max = size_max

        self._data = np.empty(size_max, dtype=dtype)
        self._data.fill(default_value)

        self.size = 0

    def append(self, value):
        """append an element"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value 

        self.size += 1

        if self.size == self.size_max:
            self.__class__  = RingBufferFull

    def get_all(self):
        """return a list of elements from the oldest to the newest"""
        return(self._data)

    def get_partial(self):
        return(self.get_all()[0:self.size])

    def __getitem__(self, key):
        """get element"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        s = self._data.__repr__()
        s = s + '\t' + str(self.size)
        s = s + '\t' + self.get_all()[::-1].__repr__()
        s = s + '\t' + self.get_partial()[::-1].__repr__()
        return(s)

class RingBufferFull(RingBuffer):
    def append(self, value):
        """append an element when buffer is full"""
        self._data = np.roll(self._data, 1)
        self._data[0] = value
3
répondu scls 2014-03-02 17:20:01

J'ai eu ce problème avant de faire de la programmation en série. À l'époque il y a un peu plus d'un an, je ne pouvais pas non plus trouver d'implémentations efficaces, donc j'ai fini par écrire un en tant qu'extension C et il est également disponible sur pypi sous une licence MIT. C'est super basique, ne gère que les tampons de caractères signés 8 bits, mais est de longueur flexible, donc vous pouvez utiliser Struct ou quelque chose en plus si vous avez besoin d'autre chose que des caractères. Je vois maintenant avec une recherche google qu'il y en a plusieurs options ces jours-ci cependant, de sorte que vous voudrez peut-être regarder ceux aussi.

1
répondu sirlark 2016-12-07 20:30:20

Celui - ci ne nécessite aucune bibliothèque. Il développe une liste, puis cycle à l'intérieur par index.

L'empreinte est très petite (pas de bibliothèque), et elle s'exécute deux fois plus vite que dequeue au moins. C'est bon pour calculer des moyennes mobiles en effet, mais sachez que les éléments ne sont pas triés par âge comme ci-dessus.

class CircularBuffer(object):
    def __init__(self, size):
        """initialization"""
        self.index= 0
        self.size= size
        self._data = []

    def record(self, value):
        """append an element"""
        if len(self._data) == self.size:
            self._data[self.index]= value
        else:
            self._data.append(value)
        self.index= (self.index + 1) % self.size

    def __getitem__(self, key):
        """get element by index like a regular array"""
        return(self._data[key])

    def __repr__(self):
        """return string representation"""
        return self._data.__repr__() + ' (' + str(len(self._data))+' items)'

    def get_all(self):
        """return a list of all the elements"""
        return(self._data)

Pour obtenir la valeur moyenne, par exemple:

q= CircularBuffer(1000000);
for i in range(40000):
    q.record(i);
print "capacity=", q.size
print "stored=", len(q.get_all())
print "average=", sum(q.get_all()) / len(q.get_all())

Résultats dans:

capacity= 1000000
stored= 40000
average= 19999

real 0m0.024s
user 0m0.020s
sys  0m0.000s

C'est environ 1/3 du temps de l'équivalent avec dequeue.

1
répondu MoonCactus 2016-12-15 14:50:32

Bien qu'il y ait déjà un grand nombre de bonnes réponses ici, je n'ai pas pu trouver de comparaison directe des timings pour les options mentionnées. Donc, s'il vous plaît trouver mon humble tentative de comparaison ci-dessous.

À des fins de test uniquement, la classe peut basculer entre un tampon basé sur list, un tampon basé sur collections.deque et un tampon basé sur Numpy.roll.

Notez que la méthode update n'ajoute qu'une seule valeur à la fois, pour rester simple.

import numpy
import timeit
import collections


class CircularBuffer(object):
    buffer_methods = ('list', 'deque', 'roll')

    def __init__(self, buffer_size, buffer_method):
        self.content = None
        self.size = buffer_size
        self.method = buffer_method

    def update(self, scalar):
        if self.method == self.buffer_methods[0]:
            # Use list
            try:
                self.content.append(scalar)
                self.content.pop(0)
            except AttributeError:
                self.content = [0.] * self.size
        elif self.method == self.buffer_methods[1]:
            # Use collections.deque
            try:
                self.content.append(scalar)
            except AttributeError:
                self.content = collections.deque([0.] * self.size,
                                                 maxlen=self.size)
        elif self.method == self.buffer_methods[2]:
            # Use Numpy.roll
            try:
                self.content = numpy.roll(self.content, -1)
                self.content[-1] = scalar
            except IndexError:
                self.content = numpy.zeros(self.size, dtype=float)

# Testing and Timing
circular_buffer_size = 100
circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size,
                                   buffer_method=method)
                    for method in CircularBuffer.buffer_methods]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
    # We add a convenient number of convenient values (see equality test below)
    code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
        i, circular_buffer_size)
    # Testing
    eval(code)
    buffer_content = [item for item in cb.content]
    assert buffer_content == range(circular_buffer_size)
    # Timing
    timeit_results.append(
        timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations)))
    print '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
        cb.method, timeit_results[-1],
        timeit_results[-1] / timeit_iterations * 1e3)

, Sur mon système, cela donne:

list:  total 1.06s (0.11ms per iteration)
deque: total 0.87s (0.09ms per iteration)
roll:  total 6.27s (0.63ms per iteration)
1
répondu Dennis 2018-04-12 16:59:32

La question initiale était:" efficace " tampon circulaire. Selon cette efficacité demandée, la réponse d'aaronasterling semble être définitivement correcte. Utilisation d'une classe dédiée programmée en Python et comparaison du traitement du temps avec les collections.deque montre une accélération x5. 2 fois avec deque! Voici un code très simple pour tester ceci:

class cb:
    def __init__(self, size):
        self.b = [0]*size
        self.i = 0
        self.sz = size
    def append(self, v):
        self.b[self.i] = v
        self.i = (self.i + 1) % self.sz

b = cb(1000)
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 1.097 second on my laptop

from collections import deque
b = deque( [], 1000 )
for i in range(10000):
    b.append(i)
# called 200 times, this lasts 0.211 second on my laptop

Pour transformer une deque en liste, utilisez simplement:

my_list = [v for v in my_deque]

Vous obtiendrez alors O (1) un accès aléatoire aux éléments deque. De bien sûr, cela n'a de valeur que si vous devez faire de nombreux accès aléatoires à la deque après l'avoir définie une fois.

0
répondu Schmouk 2016-05-15 22:08:21

Vous répondez n'est pas juste. Circulaire tampon principal ont deux priciples ( https://en.wikipedia.org/wiki/Circular_buffer )

  1. La longueur du tampon est réglée;
  2. premier entré premier sorti;
  3. Lorsque vous ajoutez ou supprimez un élément, les autres éléments ne doivent pas déplacer leur position

Votre code ci-dessous:

def add_to_buffer( self, num ):
    self.mylist.pop( 0 )
    self.mylist.append( num )

Considérons une situation où la liste est pleine, en utilisant votre code:

self.mylist = [1, 2, 3, 4, 5]

Maintenant, nous ajoutons 6, la liste est modifiée

self.mylist = [2, 3, 4, 5, 6]

Le les éléments attendent 1 dans la liste a changé leur position

Votre code est une file d'attente, pas un tampon de cercle.

La réponse de Basj, je pense est la plus efficace.

Par la façon dont, un cercle tampon peut imporve la performance de l'opération pour ajouter un élément.

0
répondu Johnny Wong 2017-03-09 03:30:49

Cela applique le même principe à certains tampons destinés à contenir les messages texte les plus récents.

import time
import datetime
import sys, getopt

class textbffr(object):
    def __init__(self, size_max):
        #initialization
        self.posn_max = size_max-1
        self._data = [""]*(size_max)
        self.posn = self.posn_max

    def append(self, value):
        #append an element
        if self.posn == self.posn_max:
            self.posn = 0
            self._data[self.posn] = value   
        else:
            self.posn += 1
            self._data[self.posn] = value

    def __getitem__(self, key):
        #return stored element
        if (key + self.posn+1) > self.posn_max:
            return(self._data[key - (self.posn_max-self.posn)])
        else:
            return(self._data[key + self.posn+1])


def print_bffr(bffr,bffer_max): 
    for ind in range(0,bffer_max):
        stored = bffr[ind]
        if stored != "":
            print(stored)
    print ( '\n' )

def make_time_text(time_value):
    return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2)
      + str(time_value.hour).zfill(2) +  str(time_value.minute).zfill(2)
      + str(time_value.second).zfill(2))


def main(argv):
    #Set things up 
    starttime = datetime.datetime.now()
    log_max = 5
    status_max = 7
    log_bffr = textbffr(log_max)
    status_bffr = textbffr(status_max)
    scan_count = 1

    #Main Loop
    # every 10 secounds write a line with the time and the scan count.
    while True: 

        time_text = make_time_text(datetime.datetime.now())
        #create next messages and store in buffers
        status_bffr.append(str(scan_count).zfill(6) + " :  Status is just fine at : " + time_text)
        log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ")

        #print whole buffers so far
        print_bffr(log_bffr,log_max)
        print_bffr(status_bffr,status_max)

        time.sleep(2)
        scan_count += 1 

if __name__ == '__main__':
    main(sys.argv[1:])  
0
répondu David Torrens 2018-01-19 01:04:19