Python générateur d'échantillons aléatoires (confortable avec des tailles de population énormes)
comme vous le savez peut-être random.sample(population,sample_size)
retourne rapidement un échantillon aléatoire, mais que faire si vous ne connaissez pas à l'avance la taille de l'échantillon? On finit par échantillonner toute la population, ou la mélanger, ce qui est la même chose. Mais cela peut être un gaspillage (si la majorité des tailles d'échantillons sont petites par rapport à la taille de la population) ou même irréalisable (si la taille de la population est énorme, s'épuisant de mémoire). En outre, que faire si votre code doit sauter d'ici à là avant de choisir l'élément suivant de la l'échantillon?
P. M. Je me suis heurté à la nécessité d'optimiser l'échantillon aléatoire tout en travaillant sur simulated annealing pour TSP . Dans mon code d'échantillonnage est redémarré des centaines de milliers de fois, et chaque fois, je ne sais pas si je vais avoir besoin de choisir un élément ou le 100% des éléments de la population.
5 réponses
au début, je divisais la population en blocs. La fonction pour faire l'échantillonnage de bloc peut facilement être un générateur, étant capable de traiter l'échantillon de taille arbitraire. Cela permet également de rendre la fonction d'un générateur.
Imagine infinite population, un bloc de population de 512 et taille d'échantillon de 8. Cela signifie que vous pouvez recueillir autant d'échantillons que vous avez besoin, et pour la réduction future encore échantillonner l'espace déjà échantillonné (pour 1024 blocs cela signifie 8196 échantillons à partir desquels vous pouvez à nouveau prélever des échantillons).
dans le même temps, cela permet un traitement parallèle qui peut être réalisable dans le cas d'échantillons très grands.
Exemple en considérant en mémoire de la population
import random
population = [random.randint(0, 1000) for i in range(0, 150000)]
def sample_block(population, block_size, sample_size):
block_number = 0
while 1:
try:
yield random.sample(population[block_number * block_size:(block_number + 1) * block_size], sample_size)
block_number += 1
except ValueError:
break
sampler = sample_block(population, 512, 8)
samples = []
try:
while 1:
samples.extend(sampler.next())
except StopIteration:
pass
print random.sample(samples, 200)
si la population était externe au script (fichier, bloc) la seule modification est que vous auriez à charger le morceau approprié dans une mémoire. Validation de principe à quoi pourrait ressembler l'échantillonnage d'une population infinie:
import random
import time
def population():
while 1:
yield random.randint(0, 10000)
def reduced_population(samples):
for sample in samples:
yield sample
def sample_block(generator, block_size, sample_size):
block_number = 0
block = []
while 1:
block.append(generator.next())
if len(block) == block_size:
s = random.sample(block, sample_size)
block_number += 1
block = []
print 'Sampled block {} with result {}.'.format(block_number, s)
yield s
samples = []
result = []
reducer = sample_block(population(), 512, 12)
try:
while 1:
samples.append(reducer.next())
if len(samples) == 1000:
sampler = sample_block(reduced_population(samples), 1000, 15)
result.append(list(sampler))
time.sleep(5)
except StopIteration:
pass
Idéalement, vous devriez également recueillir les échantillons et de nouveau les échantillonner.
c'est à ça que servent les générateurs, je crois. Voici un exemple D'échantillonnage Fisher-Yates-Knuth via generator / yield, vous obtenez des événements un par un et vous arrêtez quand vous le voulez.
code mis à jour
import random
import numpy
import array
class populationFYK(object):
"""
Implementation of the Fisher-Yates-Knuth shuffle
"""
def __init__(self, population):
self._population = population # reference to the population
self._length = len(population) # lengths of the sequence
self._index = len(population)-1 # last unsampled index
self._popidx = array.array('i', range(0,self._length))
# array module vs numpy
#self._popidx = numpy.empty(self._length, dtype=numpy.int32)
#for k in range(0,self._length):
# self._popidx[k] = k
def swap(self, idx_a, idx_b):
"""
Swap two elements in population
"""
temp = self._popidx[idx_a]
self._popidx[idx_a] = self._popidx[idx_b]
self._popidx[idx_b] = temp
def sample(self):
"""
Yield one sampled case from population
"""
while self._index >= 0:
idx = random.randint(0, self._index) # index of the sampled event
if idx != self._index:
self.swap(idx, self._index)
sampled = self._population[self._popidx[self._index]] # yielding it
self._index -= 1 # one less to be sampled
yield sampled
def index(self):
return self._index
def restart(self):
self._index = self._length - 1
for k in range(0,self._length):
self._popidx[k] = k
if __name__=="__main__":
population = [1,3,6,8,9,3,2]
gen = populationFYK(population)
for k in gen.sample():
print(k)
vous pouvez obtenir un échantillon de taille K d'une population de taille N en choisissant K des nombres aléatoires non répétitifs dans l'intervalle [0...N[ et les traiter comme des indices.
l'Option
vous pourriez générer un tel indice-échantillon en utilisant la méthode d'échantillon bien connue.
random.sample(xrange(N), K)
De la Python docs sur aléatoire.échantillon :
à choisir un exemple d'une plage d'entiers, utilise un objet xrange () comme argument. Cela est particulièrement rapide et efficace pour l'échantillonnage d'une grande population
Option b)
Si vous n'aimez pas le fait qu'aléatoire.exemple retourne déjà une liste au lieu d'un générateur paresseux de nombres aléatoires non répétitifs, vous pouvez aller fantaisie avec Format-préserver le cryptage pour chiffrer un compteur.
de cette façon, vous obtenez un vrai générateur d'index aléatoires, et vous pouvez choisir autant que vous voulez et arrêter à tout moment, sans obtenir de doubles, ce qui vous donne des ensembles d'échantillons de taille dynamique.
l'idée est de construire un schéma de cryptage pour crypter les nombres de 0 à N. maintenant, pour chaque fois que vous voulez obtenir un échantillon de votre population, vous choisissez une clé aléatoire pour votre cryptage et commencer à crypter les nombres de 0, 1, 2, ... à partir de (c'est le compteur). Puisque chaque bon cryptage crée une correspondance aléatoire 1:1, vous finissez avec des entiers aléatoires non répétitifs que vous pouvez utiliser comme index. Les exigences de stockage pendant cette génération paresseuse est juste la clé initiale plus la valeur actuelle du compteur.
L'idée a déjà été discutée dans générant des nombres aléatoires non répétitifs en Python . Il y a même un extrait de python lié: formatpreservingencryption.py
un exemple de code utilisant cet extrait pourrait être implémenté comme ceci:
def itersample(population):
# Get the size of the population
N = len(population)
# Get the number of bits needed to represent this number
bits = (N-1).bit_length()
# Generate some random key
key = ''.join(random.choice(string.ascii_letters + string.digits) for _ in range(32))
# Create a new crypto instance that encrypts binary blocks of width <bits>
# Thus, being able to encrypt all numbers up to the nearest power of two
crypter = FPEInteger(key=key, radix=2, width=bits)
# Count up
for i in xrange(1<<bits):
# Encrypt the current counter value
x = crypter.encrypt(i)
# If it is bigger than our population size, just skip it
# Since we generate numbers up to the nearest power of 2,
# we have to skip up to half of them, and on average up to one at a time
if x < N:
# Return the randomly chosen element
yield population[x]
j'ai écrit (en Python 2.7.9) un générateur d'échantillonneur aléatoire (d'index) dont la vitesse dépend uniquement de la taille de l'échantillon (il devrait être O(ns log(ns))
où ns
est la taille de l'échantillon). Ainsi, il est rapide lorsque la taille de l'échantillon est petite par rapport à la taille de la population, parce que il ne dépend pas du tout de la taille de la population . Il ne construit pas de collecte de population, il ne fait que choisir des indices aléatoires et utilise une sorte de méthode de bisect sur les indices échantillonnés pour éviter les doublons et les garder puis triés. Étant donné un population
itérable, voici comment utiliser itersample
générateur:
import random
sampler=itersample(len(population))
next_pick=sampler.next() # pick the next random (index of) element
ou
import random
sampler=itersample(len(population))
sample=[]
for index in sampler:
# do something with (index of) picked element
sample.append(index) # build a sample
if some_condition: # stop sampling when needed
break
si vous avez besoin des éléments réels et pas seulement des index, il suffit d'appliquer population
itérable à l'index lorsque nécessaire ( population[sampler.next()]
et population[index]
respectivement pour le premier et le deuxième exemple).
les résultats de certains tests montrent que la vitesse ne dépend pas de la population taille, donc si vous avez besoin de choisir au hasard seulement 10 éléments d'une population de 100 milliards, vous payez seulement pour 10 (rappelez-vous, nous ne savons pas à l'avance combien d'éléments nous allons choisir, Sinon vous feriez mieux d'utiliser random.sample
).
Sampling 1000 from 1000000
Using itersample 0.0324 s
Sampling 1000 from 10000000
Using itersample 0.0304 s
Sampling 1000 from 100000000
Using itersample 0.0311 s
Sampling 1000 from 1000000000
Using itersample 0.0329 s
D'autres essais confirment que le temps de fonctionnement est légèrement plus que linéaire avec la taille de l'échantillon:
Sampling 100 from 1000000000
Using itersample 0.0018 s
Sampling 1000 from 1000000000
Using itersample 0.0294 s
Sampling 10000 from 1000000000
Using itersample 0.4438 s
Sampling 100000 from 1000000000
Using itersample 8.8739 s
enfin, voici la fonction de générateur itersample
:
import random
def itersample(c): # c: population size
sampled=[]
def fsb(a,b): # free spaces before middle of interval a,b
fsb.idx=a+(b+1-a)/2
fsb.last=sampled[fsb.idx]-fsb.idx if len(sampled)>0 else 0
return fsb.last
while len(sampled)<c:
sample_index=random.randrange(c-len(sampled))
a,b=0,len(sampled)-1
if fsb(a,a)>sample_index:
yielding=sample_index
sampled.insert(0,yielding)
yield yielding
elif fsb(b,b)<sample_index+1:
yielding=len(sampled)+sample_index
sampled.insert(len(sampled),yielding)
yield yielding
else: # sample_index falls inside sampled list
while a+1<b:
if fsb(a,b)<sample_index+1:
a=fsb.idx
else:
b=fsb.idx
yielding=a+1+sample_index
sampled.insert(a+1,yielding)
yield yielding
Voici une autre idée. Donc pour la population énorme nous aimerions garder quelques informations sur les enregistrements sélectionnés. Dans votre cas, vous gardez un index entier par enregistrement sélectionné - 32bit ou 64bbit entier, plus un certain code pour faire la recherche raisonnable wrt sélectionné / pas sélectionné. Dans le cas d'un grand nombre d'enregistrements sélectionnés cette tenue de documents peut être prohibitif. Ce que je proposerais est d'utiliser le filtre de fleur pour des indeces sélectionnés. Les faux positifs sont possibles, mais pas les faux négatifs. risque d'obtenir des enregistrements dupliqués. Elle introduit un léger biais de faux positifs dossiers seront exclus de l'échantillonnage. Mais l'efficacité de la mémoire est bonne, moins de 10 bits par élément sont nécessaires pour une probabilité de 1% de faux positif. Donc, si vous choisissez 5% de la population et avez 1% faux positif, vous avez manqué 0.0005 de votre population, selon les exigences pourrait être ok. Si vous voulez un faux positif plus bas, utilisez plus de bits. Mais l'efficacité de la mémoire serait beaucoup mieux, bien que je m'attends à ce qu'Il ya plus code à exécuter par échantillon d'enregistrement.
Désolé, pas de code