Python Pattern Observer: Des Exemples, Des Conseils?
y a-t-il des exemples exemplaires de L'Observateur GoF implémenté en Python? J'ai un code bit qui contient actuellement des bits de code de débogage dans la classe key (qui génère actuellement des messages à stderr si un env magique est activé). En outre, la classe dispose d'une interface pour progressivement retourner des résultats ainsi que leur stockage (mémoire) pour le post-traitement. (La classe elle-même est un gestionnaire de tâches pour exécuter simultanément des commandes sur des machines distantes sur ssh).
Actuellement, l'utilisation de la classe ressemble à quelque chose comme:
job = SSHJobMan(hostlist, cmd)
job.start()
while not job.done():
for each in job.poll():
incrementally_process(job.results[each])
time.sleep(0.2) # or other more useful work
post_process(job.results)
un modèle d'usage alernatif est:
job = SSHJobMan(hostlist, cmd)
job.wait() # implicitly performs a start()
process(job.results)
tout Cela fonctionne très bien pour l'utilité actuelle. Cependant, elle ne manque de souplesse. Par exemple, j'appuie actuellement un format de sortie bref ou une barre de progression comme résultats incrémentiels, je soutiens également
sorties brèves, complètes et "message fusionné" pour la fonction post_process()
.
cependant, j'aimerais prise en charge de multiples flux de résultats/sorties (barre de progression vers le terminal, Débogage et avertissements vers un fichier journal, sorties de tâches réussies vers un fichier/répertoire, messages d'erreur et autres résultats de tâches non réussies vers un autre, etc.).
cela ressemble à une situation qui nécessite un observateur ... ont instances de ma classe d'accepter l'enregistrement à partir d'autres objets et de les rappeler à certains types d'événements comme ils se produisent.
je regarde PyPubSub puisque j'ai vu plusieurs références à cela dans des questions connexes. Je ne suis pas sûr d'être prêt à ajouter la dépendance externe à mon utilitaire mais je pourrais voir de la valeur dans l'utilisation de leur interface comme modèle pour le mien si cela va le rendre plus facile à utiliser pour les autres. (Le projet est conçu à la fois comme un utilitaire de ligne de commande autonome et une classe pour écrire d'Autres scripts/utilitaires).
bref, je sais faire ce que je veux ... mais il existe de nombreuses les moyens de l'accomplir. Je veux des suggestions sur ce qui est le plus susceptible de fonctionner pour les autres utilisateurs du code à long terme.
le code lui-même est à: classh .
8 réponses
cependant il manque de flexibilité.
bien... en fait, cela ressemble à un bon design pour moi si une API asynchrone est ce que vous voulez. Il est habituellement. Peut-être que tout ce dont vous avez besoin est de passer de stderr au module logging
de Python, qui a en quelque sorte son propre modèle de publication/Abonnement, ce qui avec Logger.addHandler()
et ainsi de suite.
si vous voulez soutenir les observateurs, mon conseil est de garder il simple. Vous n'avez vraiment besoin que de quelques lignes de code.
class Event(object):
pass
class Observable(object):
def __init__(self):
self.callbacks = []
def subscribe(self, callback):
self.callbacks.append(callback)
def fire(self, **attrs):
e = Event()
e.source = self
for k, v in attrs.iteritems():
setattr(e, k, v)
for fn in self.callbacks:
fn(e)
votre classe D'emploi peut sous-classe Observable
. Si quelque chose d'intéressant se produit, appelez self.fire(type="progress", percent=50)
ou autre.
je pense que les gens dans les autres réponses exagèrent. Vous pouvez facilement réaliser des événements en Python avec moins de 15 lignes de code.
Vous simple avez deux classes: Event
et Observer
. Toute classe qui veut écouter un événement, doit hériter de l'Observateur et de l'ensemble de l'écouter (observer) pour un événement précis. Lorsqu'un Event
est instancié et lancé, tous les observateurs écoutant cet événement exécuteront les fonctions de rappel spécifiées.
class Observer():
_observers = []
def __init__(self):
self._observers.append(self)
self._observables = {}
def observe(self, event_name, callback):
self._observables[event_name] = callback
class Event():
def __init__(self, name, data, autofire = True):
self.name = name
self.data = data
if autofire:
self.fire()
def fire(self):
for observer in Observer._observers:
if self.name in observer._observables:
observer._observables[self.name](self.data)
exemple :
class Room(Observer):
def __init__(self):
print("Room is ready.")
Observer.__init__(self) # Observer's init needs to be called
def someone_arrived(self, who):
print(who + " has arrived!")
room = Room()
room.observe('someone arrived', room.someone_arrived)
Event('someone arrived', 'Lenard')
sortie:
Room is ready.
Lenard has arrived!
encore quelques approches...
exemple: le module de journalisation
peut-être que tout ce dont vous avez besoin est de passer de stderr au module logging
de Python, qui a un modèle de publication/Abonnement puissant.
il est facile de commencer à produire des journaux.
# producer
import logging
log = logging.getLogger("myjobs") # that's all the setup you need
class MyJob(object):
def run(self):
log.info("starting job")
n = 10
for i in range(n):
log.info("%.1f%% done" % (100.0 * i / n))
log.info("work complete")
du côté des consommateurs, il y a un peu plus de travail. Malheureusement, la configuration de l'enregistreur de sortie prend, comme, 7 lignes de code entières à faire. ;)
# consumer
import myjobs, sys, logging
if user_wants_log_output:
ch = logging.StreamHandler(sys.stderr)
ch.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
myjobs.log.addHandler(ch)
myjobs.log.setLevel(logging.INFO)
myjobs.MyJob().run()
d'un autre côté, il y a une quantité incroyable de choses dans le paquet de journalisation. Si vous avez besoin d'envoyer des données de journal à un ensemble rotatif de dossiers, une adresse e-mail, et le journal D'événement de Windows, vous êtes couverts.
exemple: observateur le plus simple possible
Mais vous n'avez pas besoin d'utiliser une bibliothèque. Un moyen extrêmement simple de soutenir les observateurs est d'appeler une méthode qui ne rien.
# producer
class MyJob(object):
def on_progress(self, pct):
"""Called when progress is made. pct is the percent complete.
By default this does nothing. The user may override this method
or even just assign to it."""
pass
def run(self):
n = 10
for i in range(n):
self.on_progress(100.0 * i / n)
self.on_progress(100.0)
# consumer
import sys, myjobs
job = myjobs.MyJob()
job.on_progress = lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()
parfois, au lieu d'écrire une lambda, on peut dire job.on_progress = progressBar.update
, ce qui est bien.
c'est aussi simple que possible. Un inconvénient est qu'il ne supporte pas naturellement plusieurs auditeurs s'abonnant aux mêmes événements.
Exemple: C#, comme les événements
avec un peu de code de support, vous pouvez obtenir des évènements de type C en Python. Voici le code:
# glue code
class event(object):
def __init__(self, func):
self.__doc__ = func.__doc__
self._key = ' ' + func.__name__
def __get__(self, obj, cls):
try:
return obj.__dict__[self._key]
except KeyError, exc:
be = obj.__dict__[self._key] = boundevent()
return be
class boundevent(object):
def __init__(self):
self._fns = []
def __iadd__(self, fn):
self._fns.append(fn)
return self
def __isub__(self, fn):
self._fns.remove(fn)
return self
def __call__(self, *args, **kwargs):
for f in self._fns[:]:
f(*args, **kwargs)
Le producteur déclare l'événement à l'aide d'un décorateur:
# producer
class MyJob(object):
@event
def progress(pct):
"""Called when progress is made. pct is the percent complete."""
def run(self):
n = 10
for i in range(n+1):
self.progress(100.0 * i / n)
#consumer
import sys, myjobs
job = myjobs.MyJob()
job.progress += lambda pct: sys.stdout.write("%.1f%% done\n" % pct)
job.run()
cela fonctionne exactement comme le code" simple observer "ci-dessus, mais vous pouvez ajouter autant d'auditeurs que vous voulez en utilisant +=
. (Contrairement à C#, il n'y a pas de type de gestionnaire d'événements, vous n'avez pas à new EventHandler(foo.bar)
lorsque vous vous abonnez à un événement, et vous n'avez pas à vérifier null Avant de virer l'événement. Comme C#, les événements ne font pas d'exceptions.)
comment choisir
si logging
fait tout ce dont vous avez besoin, utilisez cela. Sinon faire la chose la plus simple qui fonctionne pour vous. La chose à noter est que vous n'avez pas besoin de prendre une grande dépendance externe.
Que Diriez-vous d'une implémentation où les objets ne sont pas gardés vivants juste parce qu'ils observent quelque chose? Ci-dessous, veuillez trouver une implémentation du pattern observer avec les fonctionnalités suivantes:
- L'Usage est pythonique. Pour ajouter un observateur à une méthode liée
.bar
de l'instancefoo
, il suffit de fairefoo.bar.addObserver(observer)
. - les observateurs ne sont pas maintenus en vie en tant qu'observateurs. En d'autres termes, le code de l'observateur utilise non de solides références.
- pas de sous-classe nécessaire (descripteurs ftw).
- peut être utilisé avec des types inhashables.
- Peut être utilisé autant de fois que vous voulez dans une seule classe.
- (bonus) à partir d'aujourd'hui le code existe dans un bon paquet téléchargeable, installable sur github .
voici le code (le paquet github ou PyPI package ont le plus jusqu'à la date de mise en œuvre):
import weakref
import functools
class ObservableMethod(object):
"""
A proxy for a bound method which can be observed.
I behave like a bound method, but other bound methods can subscribe to be
called whenever I am called.
"""
def __init__(self, obj, func):
self.func = func
functools.update_wrapper(self, func)
self.objectWeakRef = weakref.ref(obj)
self.callbacks = {} #observing object ID -> weak ref, methodNames
def addObserver(self, boundMethod):
"""
Register a bound method to observe this ObservableMethod.
The observing method will be called whenever this ObservableMethod is
called, and with the same arguments and keyword arguments. If a
boundMethod has already been registered to as a callback, trying to add
it again does nothing. In other words, there is no way to sign up an
observer to be called back multiple times.
"""
obj = boundMethod.__self__
ID = id(obj)
if ID in self.callbacks:
s = self.callbacks[ID][1]
else:
wr = weakref.ref(obj, Cleanup(ID, self.callbacks))
s = set()
self.callbacks[ID] = (wr, s)
s.add(boundMethod.__name__)
def discardObserver(self, boundMethod):
"""
Un-register a bound method.
"""
obj = boundMethod.__self__
if id(obj) in self.callbacks:
self.callbacks[id(obj)][1].discard(boundMethod.__name__)
def __call__(self, *arg, **kw):
"""
Invoke the method which I proxy, and all of it's callbacks.
The callbacks are called with the same *args and **kw as the main
method.
"""
result = self.func(self.objectWeakRef(), *arg, **kw)
for ID in self.callbacks:
wr, methodNames = self.callbacks[ID]
obj = wr()
for methodName in methodNames:
getattr(obj, methodName)(*arg, **kw)
return result
@property
def __self__(self):
"""
Get a strong reference to the object owning this ObservableMethod
This is needed so that ObservableMethod instances can observe other
ObservableMethod instances.
"""
return self.objectWeakRef()
class ObservableMethodDescriptor(object):
def __init__(self, func):
"""
To each instance of the class using this descriptor, I associate an
ObservableMethod.
"""
self.instances = {} # Instance id -> (weak ref, Observablemethod)
self._func = func
def __get__(self, inst, cls):
if inst is None:
return self
ID = id(inst)
if ID in self.instances:
wr, om = self.instances[ID]
if not wr():
msg = "Object id %d should have been cleaned up"%(ID,)
raise RuntimeError(msg)
else:
wr = weakref.ref(inst, Cleanup(ID, self.instances))
om = ObservableMethod(inst, self._func)
self.instances[ID] = (wr, om)
return om
def __set__(self, inst, val):
raise RuntimeError("Assigning to ObservableMethod not supported")
def event(func):
return ObservableMethodDescriptor(func)
class Cleanup(object):
"""
I manage remove elements from a dict whenever I'm called.
Use me as a weakref.ref callback to remove an object's id from a dict
when that object is garbage collected.
"""
def __init__(self, key, d):
self.key = key
self.d = d
def __call__(self, wr):
del self.d[self.key]
pour utiliser ceci, nous décorons simplement des méthodes que nous voulons rendre observables avec @event
. Voici un exemple
class Foo(object):
def __init__(self, name):
self.name = name
@event
def bar(self):
print("%s called bar"%(self.name,))
def baz(self):
print("%s called baz"%(self.name,))
a = Foo('a')
b = Foo('b')
a.bar.addObserver(b.bar)
a.bar()
à Partir de wikipedia :
from collections import defaultdict
class Observable (defaultdict):
def __init__ (self):
defaultdict.__init__(self, object)
def emit (self, *args):
'''Pass parameters to all observers and update states.'''
for subscriber in self:
response = subscriber(*args)
self[subscriber] = response
def subscribe (self, subscriber):
'''Add a new subscriber to self.'''
self[subscriber]
def stat (self):
'''Return a tuple containing the state of each observer.'''
return tuple(self.values())
L'Observable est utilisé comme ceci.
myObservable = Observable ()
# subscribe some inlined functions.
# myObservable[lambda x, y: x * y] would also work here.
myObservable.subscribe(lambda x, y: x * y)
myObservable.subscribe(lambda x, y: float(x) / y)
myObservable.subscribe(lambda x, y: x + y)
myObservable.subscribe(lambda x, y: x - y)
# emit parameters to each observer
myObservable.emit(6, 2)
# get updated values
myObservable.stat() # returns: (8, 3.0, 4, 12)
basé sur la réponse de Jason, j'ai implémenté L'exemple D'événements de type C#comme un module python complet incluant de la documentation et des tests. J'aime la fantaisie pythonic trucs :)
donc, si vous voulez une solution prête à l'emploi, vous pouvez simplement utiliser le code sur github .
Exemple: tordu journal des observateurs
pour inscrire un observateur yourCallable()
(appelant qui accepte un dictionnaire) pour recevoir tous les événements log (en plus de tout autre observateur):
twisted.python.log.addObserver(yourCallable)
exemple: exemple producteur/consommateur complet
De Torsadée Python liste de diffusion:
#!/usr/bin/env python
"""Serve as a sample implementation of a twisted producer/consumer
system, with a simple TCP server which asks the user how many random
integers they want, and it sends the result set back to the user, one
result per line."""
import random
from zope.interface import implements
from twisted.internet import interfaces, reactor
from twisted.internet.protocol import Factory
from twisted.protocols.basic import LineReceiver
class Producer:
"""Send back the requested number of random integers to the client."""
implements(interfaces.IPushProducer)
def __init__(self, proto, cnt):
self._proto = proto
self._goal = cnt
self._produced = 0
self._paused = False
def pauseProducing(self):
"""When we've produced data too fast, pauseProducing() will be
called (reentrantly from within resumeProducing's transport.write
method, most likely), so set a flag that causes production to pause
temporarily."""
self._paused = True
print('pausing connection from %s' % (self._proto.transport.getPeer()))
def resumeProducing(self):
self._paused = False
while not self._paused and self._produced < self._goal:
next_int = random.randint(0, 10000)
self._proto.transport.write('%d\r\n' % (next_int))
self._produced += 1
if self._produced == self._goal:
self._proto.transport.unregisterProducer()
self._proto.transport.loseConnection()
def stopProducing(self):
pass
class ServeRandom(LineReceiver):
"""Serve up random data."""
def connectionMade(self):
print('connection made from %s' % (self.transport.getPeer()))
self.transport.write('how many random integers do you want?\r\n')
def lineReceived(self, line):
cnt = int(line.strip())
producer = Producer(self, cnt)
self.transport.registerProducer(producer, True)
producer.resumeProducing()
def connectionLost(self, reason):
print('connection lost from %s' % (self.transport.getPeer()))
factory = Factory()
factory.protocol = ServeRandom
reactor.listenTCP(1234, factory)
print('listening on 1234...')
reactor.run()
Une approche fonctionnelle de l'observateur du design:
def add_listener(obj, method_name, listener):
# Get any existing listeners
listener_attr = method_name + '_listeners'
listeners = getattr(obj, listener_attr, None)
# If this is the first listener, then set up the method wrapper
if not listeners:
listeners = [listener]
setattr(obj, listener_attr, listeners)
# Get the object's method
method = getattr(obj, method_name)
@wraps(method)
def method_wrapper(*args, **kwags):
method(*args, **kwags)
for l in listeners:
l(obj, *args, **kwags) # Listener also has object argument
# Replace the original method with the wrapper
setattr(obj, method_name, method_wrapper)
else:
# Event is already set up, so just add another listener
listeners.append(listener)
def remove_listener(obj, method_name, listener):
# Get any existing listeners
listener_attr = method_name + '_listeners'
listeners = getattr(obj, listener_attr, None)
if listeners:
# Remove the listener
next((listeners.pop(i)
for i, l in enumerate(listeners)
if l == listener),
None)
# If this was the last listener, then remove the method wrapper
if not listeners:
method = getattr(obj, method_name)
delattr(obj, listener_attr)
setattr(obj, method_name, method.__wrapped__)
ces méthodes peuvent ensuite être utilisées pour ajouter un auditeur à n'importe quelle méthode de classe. Par exemple:
class MyClass(object):
def __init__(self, prop):
self.prop = prop
def some_method(self, num, string):
print('method:', num, string)
def listener_method(obj, num, string):
print('listener:', num, string, obj.prop)
my = MyClass('my_prop')
add_listener(my, 'some_method', listener_method)
my.some_method(42, 'with listener')
remove_listener(my, 'some_method', listener_method)
my.some_method(42, 'without listener')
et la sortie est:
method: 42 with listener
listener: 42 with listener my_prop
method: 42 without listener