tampon circulaire efficace?
Je veux créer un tampon circulaire efficace en python (dans le but de prendre des moyennes des valeurs entières dans le tampon).
Est-ce un moyen efficace d'utiliser une liste pour collecter des valeurs?
def add_to_buffer( self, num ):
self.mylist.pop( 0 )
self.mylist.append( num )
Qu'est-ce qui serait plus efficace (et pourquoi)?
12 réponses
Je voudrais utiliser collections.deque
avec un maxlen
arg
>>> import collections
>>> d = collections.deque(maxlen=10)
>>> d
deque([], maxlen=10)
>>> for i in xrange(20):
... d.append(i)
...
>>> d
deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10)
Il y a une recette dans les documents pour deque
qui est similaire à ce que vous voulez. Mon affirmation selon laquelle c'est le plus efficace repose entièrement sur le fait qu'il est implémenté en C par un équipage incroyablement qualifié qui a l'habitude de lancer du code de premier ordre.
Sauter de la tête d'une liste provoque la liste entière à copier, donc est inefficace
Vous devriez plutôt utiliser une Liste / Tableau de taille fixe et un index qui se déplace dans le tampon lorsque vous ajoutez / supprimez des éléments
La deque de Python est lente. Vous pouvez également utiliser numpy.rouler au lieu Comment faire pivoter les nombres dans un tableau numpy de shape (N,) ou (n,1)?
Dans ce benchmark, deque est 448ms. Numpy.rouleau est 29 ms http://scimusing.wordpress.com/2013/10/25/ring-buffers-in-pythonnumpy/
Ok avec l'utilisation de la classe deque, mais pour les requeriments de la question (moyenne) c'est ma solution:
>>> from collections import deque
>>> class CircularBuffer(deque):
... def __init__(self, size=0):
... super(CircularBuffer, self).__init__(maxlen=size)
... @property
... def average(self): # TODO: Make type check for integer or floats
... return sum(self)/len(self)
...
>>>
>>> cb = CircularBuffer(size=10)
>>> for i in range(20):
... cb.append(i)
... print "@%s, Average: %s" % (cb, cb.average)
...
@deque([0], maxlen=10), Average: 0
@deque([0, 1], maxlen=10), Average: 0
@deque([0, 1, 2], maxlen=10), Average: 1
@deque([0, 1, 2, 3], maxlen=10), Average: 1
@deque([0, 1, 2, 3, 4], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5], maxlen=10), Average: 2
@deque([0, 1, 2, 3, 4, 5, 6], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7], maxlen=10), Average: 3
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8], maxlen=10), Average: 4
@deque([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], maxlen=10), Average: 4
@deque([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], maxlen=10), Average: 5
@deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11], maxlen=10), Average: 6
@deque([3, 4, 5, 6, 7, 8, 9, 10, 11, 12], maxlen=10), Average: 7
@deque([4, 5, 6, 7, 8, 9, 10, 11, 12, 13], maxlen=10), Average: 8
@deque([5, 6, 7, 8, 9, 10, 11, 12, 13, 14], maxlen=10), Average: 9
@deque([6, 7, 8, 9, 10, 11, 12, 13, 14, 15], maxlen=10), Average: 10
@deque([7, 8, 9, 10, 11, 12, 13, 14, 15, 16], maxlen=10), Average: 11
@deque([8, 9, 10, 11, 12, 13, 14, 15, 16, 17], maxlen=10), Average: 12
@deque([9, 10, 11, 12, 13, 14, 15, 16, 17, 18], maxlen=10), Average: 13
@deque([10, 11, 12, 13, 14, 15, 16, 17, 18, 19], maxlen=10), Average: 14
Basé sur la réponse de MoonCactus , Voici une classe circularlist
. La différence avec sa version est qu'ici c[0]
donnera toujours l'élément ajouté le plus ancien, c[-1]
l'élément ajouté le plus récent, c[-2]
l'avant-dernier... Ceci est plus naturel pour les applications.
c = circularlist(4)
c.append(1); print c, c[0], c[-1] #[1] (1 items) first 1, last 1
c.append(2); print c, c[0], c[-1] #[1, 2] (2 items) first 1, last 2
c.append(3); print c, c[0], c[-1] #[1, 2, 3] (3 items) first 1, last 3
c.append(8); print c, c[0], c[-1] #[1, 2, 3, 8] (4 items) first 1, last 8
c.append(10); print c, c[0], c[-1] #[10, 2, 3, 8] (4 items) first 2, last 10
c.append(11); print c, c[0], c[-1] #[10, 11, 3, 8] (4 items) first 3, last 11
Classe:
class circularlist(object):
def __init__(self, size):
"""Initialization"""
self.index = 0
self.size = size
self._data = []
def append(self, value):
"""Append an element"""
if len(self._data) == self.size:
self._data[self.index] = value
else:
self._data.append(value)
self.index = (self.index + 1) % self.size
def __getitem__(self, key):
"""Get element by index, relative to the current index"""
if len(self._data) == self.size:
return(self._data[(key + self.index) % self.size])
else:
return(self._data[key])
def __repr__(self):
"""Return string representation"""
return self._data.__repr__() + ' (' + str(len(self._data))+' items)'
Vous pouvez également voir cette assez ancienne recette Python .
Voici ma propre version avec NumPy array:
#!/usr/bin/env python
import numpy as np
class RingBuffer(object):
def __init__(self, size_max, default_value=0.0, dtype=float):
"""initialization"""
self.size_max = size_max
self._data = np.empty(size_max, dtype=dtype)
self._data.fill(default_value)
self.size = 0
def append(self, value):
"""append an element"""
self._data = np.roll(self._data, 1)
self._data[0] = value
self.size += 1
if self.size == self.size_max:
self.__class__ = RingBufferFull
def get_all(self):
"""return a list of elements from the oldest to the newest"""
return(self._data)
def get_partial(self):
return(self.get_all()[0:self.size])
def __getitem__(self, key):
"""get element"""
return(self._data[key])
def __repr__(self):
"""return string representation"""
s = self._data.__repr__()
s = s + '\t' + str(self.size)
s = s + '\t' + self.get_all()[::-1].__repr__()
s = s + '\t' + self.get_partial()[::-1].__repr__()
return(s)
class RingBufferFull(RingBuffer):
def append(self, value):
"""append an element when buffer is full"""
self._data = np.roll(self._data, 1)
self._data[0] = value
J'ai eu ce problème avant de faire de la programmation en série. À l'époque il y a un peu plus d'un an, je ne pouvais pas non plus trouver d'implémentations efficaces, donc j'ai fini par écrire un en tant qu'extension C et il est également disponible sur pypi sous une licence MIT. C'est super basique, ne gère que les tampons de caractères signés 8 bits, mais est de longueur flexible, donc vous pouvez utiliser Struct ou quelque chose en plus si vous avez besoin d'autre chose que des caractères. Je vois maintenant avec une recherche google qu'il y en a plusieurs options ces jours-ci cependant, de sorte que vous voudrez peut-être regarder ceux aussi.
Celui - ci ne nécessite aucune bibliothèque. Il développe une liste, puis cycle à l'intérieur par index.
L'empreinte est très petite (pas de bibliothèque), et elle s'exécute deux fois plus vite que dequeue au moins. C'est bon pour calculer des moyennes mobiles en effet, mais sachez que les éléments ne sont pas triés par âge comme ci-dessus.
class CircularBuffer(object):
def __init__(self, size):
"""initialization"""
self.index= 0
self.size= size
self._data = []
def record(self, value):
"""append an element"""
if len(self._data) == self.size:
self._data[self.index]= value
else:
self._data.append(value)
self.index= (self.index + 1) % self.size
def __getitem__(self, key):
"""get element by index like a regular array"""
return(self._data[key])
def __repr__(self):
"""return string representation"""
return self._data.__repr__() + ' (' + str(len(self._data))+' items)'
def get_all(self):
"""return a list of all the elements"""
return(self._data)
Pour obtenir la valeur moyenne, par exemple:
q= CircularBuffer(1000000);
for i in range(40000):
q.record(i);
print "capacity=", q.size
print "stored=", len(q.get_all())
print "average=", sum(q.get_all()) / len(q.get_all())
Résultats dans:
capacity= 1000000
stored= 40000
average= 19999
real 0m0.024s
user 0m0.020s
sys 0m0.000s
C'est environ 1/3 du temps de l'équivalent avec dequeue.
Bien qu'il y ait déjà un grand nombre de bonnes réponses ici, je n'ai pas pu trouver de comparaison directe des timings pour les options mentionnées. Donc, s'il vous plaît trouver mon humble tentative de comparaison ci-dessous.
À des fins de test uniquement, la classe peut basculer entre un tampon basé sur list
, un tampon basé sur collections.deque
et un tampon basé sur Numpy.roll
.
Notez que la méthode update
n'ajoute qu'une seule valeur à la fois, pour rester simple.
import numpy
import timeit
import collections
class CircularBuffer(object):
buffer_methods = ('list', 'deque', 'roll')
def __init__(self, buffer_size, buffer_method):
self.content = None
self.size = buffer_size
self.method = buffer_method
def update(self, scalar):
if self.method == self.buffer_methods[0]:
# Use list
try:
self.content.append(scalar)
self.content.pop(0)
except AttributeError:
self.content = [0.] * self.size
elif self.method == self.buffer_methods[1]:
# Use collections.deque
try:
self.content.append(scalar)
except AttributeError:
self.content = collections.deque([0.] * self.size,
maxlen=self.size)
elif self.method == self.buffer_methods[2]:
# Use Numpy.roll
try:
self.content = numpy.roll(self.content, -1)
self.content[-1] = scalar
except IndexError:
self.content = numpy.zeros(self.size, dtype=float)
# Testing and Timing
circular_buffer_size = 100
circular_buffers = [CircularBuffer(buffer_size=circular_buffer_size,
buffer_method=method)
for method in CircularBuffer.buffer_methods]
timeit_iterations = 1e4
timeit_setup = 'from __main__ import circular_buffers'
timeit_results = []
for i, cb in enumerate(circular_buffers):
# We add a convenient number of convenient values (see equality test below)
code = '[circular_buffers[{}].update(float(j)) for j in range({})]'.format(
i, circular_buffer_size)
# Testing
eval(code)
buffer_content = [item for item in cb.content]
assert buffer_content == range(circular_buffer_size)
# Timing
timeit_results.append(
timeit.timeit(code, setup=timeit_setup, number=int(timeit_iterations)))
print '{}: total {:.2f}s ({:.2f}ms per iteration)'.format(
cb.method, timeit_results[-1],
timeit_results[-1] / timeit_iterations * 1e3)
, Sur mon système, cela donne:
list: total 1.06s (0.11ms per iteration)
deque: total 0.87s (0.09ms per iteration)
roll: total 6.27s (0.63ms per iteration)
La question initiale était:" efficace " tampon circulaire. Selon cette efficacité demandée, la réponse d'aaronasterling semble être définitivement correcte. Utilisation d'une classe dédiée programmée en Python et comparaison du traitement du temps avec les collections.deque montre une accélération x5. 2 fois avec deque! Voici un code très simple pour tester ceci:
class cb:
def __init__(self, size):
self.b = [0]*size
self.i = 0
self.sz = size
def append(self, v):
self.b[self.i] = v
self.i = (self.i + 1) % self.sz
b = cb(1000)
for i in range(10000):
b.append(i)
# called 200 times, this lasts 1.097 second on my laptop
from collections import deque
b = deque( [], 1000 )
for i in range(10000):
b.append(i)
# called 200 times, this lasts 0.211 second on my laptop
Pour transformer une deque en liste, utilisez simplement:
my_list = [v for v in my_deque]
Vous obtiendrez alors O (1) un accès aléatoire aux éléments deque. De bien sûr, cela n'a de valeur que si vous devez faire de nombreux accès aléatoires à la deque après l'avoir définie une fois.
Vous répondez n'est pas juste. Circulaire tampon principal ont deux priciples ( https://en.wikipedia.org/wiki/Circular_buffer )
- La longueur du tampon est réglée;
- premier entré premier sorti;
- Lorsque vous ajoutez ou supprimez un élément, les autres éléments ne doivent pas déplacer leur position
Votre code ci-dessous:
def add_to_buffer( self, num ):
self.mylist.pop( 0 )
self.mylist.append( num )
Considérons une situation où la liste est pleine, en utilisant votre code:
self.mylist = [1, 2, 3, 4, 5]
Maintenant, nous ajoutons 6, la liste est modifiée
self.mylist = [2, 3, 4, 5, 6]
Le les éléments attendent 1 dans la liste a changé leur position
Votre code est une file d'attente, pas un tampon de cercle.
La réponse de Basj, je pense est la plus efficace.
Par la façon dont, un cercle tampon peut imporve la performance de l'opération pour ajouter un élément.
Cela applique le même principe à certains tampons destinés à contenir les messages texte les plus récents.
import time
import datetime
import sys, getopt
class textbffr(object):
def __init__(self, size_max):
#initialization
self.posn_max = size_max-1
self._data = [""]*(size_max)
self.posn = self.posn_max
def append(self, value):
#append an element
if self.posn == self.posn_max:
self.posn = 0
self._data[self.posn] = value
else:
self.posn += 1
self._data[self.posn] = value
def __getitem__(self, key):
#return stored element
if (key + self.posn+1) > self.posn_max:
return(self._data[key - (self.posn_max-self.posn)])
else:
return(self._data[key + self.posn+1])
def print_bffr(bffr,bffer_max):
for ind in range(0,bffer_max):
stored = bffr[ind]
if stored != "":
print(stored)
print ( '\n' )
def make_time_text(time_value):
return(str(time_value.month).zfill(2) + str(time_value.day).zfill(2)
+ str(time_value.hour).zfill(2) + str(time_value.minute).zfill(2)
+ str(time_value.second).zfill(2))
def main(argv):
#Set things up
starttime = datetime.datetime.now()
log_max = 5
status_max = 7
log_bffr = textbffr(log_max)
status_bffr = textbffr(status_max)
scan_count = 1
#Main Loop
# every 10 secounds write a line with the time and the scan count.
while True:
time_text = make_time_text(datetime.datetime.now())
#create next messages and store in buffers
status_bffr.append(str(scan_count).zfill(6) + " : Status is just fine at : " + time_text)
log_bffr.append(str(scan_count).zfill(6) + " : " + time_text + " : Logging Text ")
#print whole buffers so far
print_bffr(log_bffr,log_max)
print_bffr(status_bffr,status_max)
time.sleep(2)
scan_count += 1
if __name__ == '__main__':
main(sys.argv[1:])