pyserial - comment lire la dernière ligne envoyée depuis un périphérique série

J'ai un Arduino connecté à mon ordinateur qui exécute une boucle, envoyant une valeur sur le port série à l'ordinateur tous les 100 ms.

je veux faire un script Python qui ne lira à partir du port série que quelques secondes, donc je veux juste voir la dernière chose envoyée par L'Arduino.

Comment faites-vous cela à Pyserial?

Voici le code que j'ai essayé qui ne fonctionne pas. Il lit les lignes séquentiellement.

import serial
import time

ser = serial.Serial('com4',9600,timeout=1)
while 1:
    time.sleep(10)
    print ser.readline() #How do I get the most recent line sent from the device?
17
demandé sur Seanny123 2009-07-07 21:16:43

10 réponses

peut - être que je ne comprends pas votre question, mais comme c'est une ligne de série, vous devrez lire tout envoyé de L'Arduino séquentiellement-il sera tamponné dans L'Arduino jusqu'à ce que vous le lisiez.

si vous voulez avoir un affichage d'état qui montre la dernière chose envoyée - utilisez un thread qui incorpore le code dans votre question (moins le sommeil), et gardez la dernière ligne complète lire comme la dernière ligne de L'Arduino.

mise à Jour:mtasic 's l'exemple de code est assez bonne, mais si l'Arduino a envoyé un partiel de la ligne lors de l' inWaiting() est appelé, vous obtiendrez une ligne tronquée. Au lieu de cela, ce que vous voulez faire est de mettre la dernière complet en ligne last_received, et de garder la partielle de la ligne buffer pour qu'il puisse être ajouté à la prochaine fois autour de la boucle. Quelque chose comme ceci:

def receiving(ser):
    global last_received

    buffer_string = ''
    while True:
        buffer_string = buffer_string + ser.read(ser.inWaiting())
        if '\n' in buffer_string:
            lines = buffer_string.split('\n') # Guaranteed to have at least 2 entries
            last_received = lines[-2]
            #If the Arduino sends lots of empty lines, you'll lose the
            #last filled line, so you could make the above statement conditional
            #like so: if lines[-2]: last_received = lines[-2]
            buffer_string = lines[-1]

Concernant l'utilisation de readline(): voici ce que la documentation de Pyserial a à dire (légèrement édité pour la clarté et avec une mention à readlines ()):

soyez prudent lorsque vous utilisez"readline". Faire spécifiez un délai lors de l'ouverture du port série, sinon il pourrait bloquer pour toujours si aucun personnage de newline n'est reçu. Notez également que "readlines()" ne fonctionne qu'avec un délai d'attente. Il dépend de l'existence d'un délai d'attente et interprète cela comme EOF (fin du fichier).

ce qui me semble tout à fait raisonnable!

28
répondu Vinay Sajip 2015-03-15 15:53:56
from serial import *
from threading import Thread

last_received = ''

def receiving(ser):
    global last_received
    buffer = ''

    while True:
        # last_received = ser.readline()
        buffer += ser.read(ser.inWaiting())
        if '\n' in buffer:
            last_received, buffer = buffer.split('\n')[-2:]

if __name__ ==  '__main__':
    ser = Serial(
        port=None,
        baudrate=9600,
        bytesize=EIGHTBITS,
        parity=PARITY_NONE,
        stopbits=STOPBITS_ONE,
        timeout=0.1,
        xonxoff=0,
        rtscts=0,
        interCharTimeout=None
    )

    Thread(target=receiving, args=(ser,)).start()
11
répondu mtasic85 2009-07-07 18:02:58

ces solutions monopolisent le CPU en attendant les caractères.

Vous devriez faire au moins un appel de blocage pour lire(1)

while True:
    if '\n' in buffer: 
        pass # skip if a line already in buffer
    else:
        buffer += ser.read(1)  # this will block until one more char or timeout
    buffer += ser.read(ser.inWaiting()) # get remaining buffered chars

...et de faire la distinction chose qu'avant.

7
répondu Rufus 2011-01-25 18:59:24

Vous pouvez utiliser ser.flushInput() pour débusquer toutes les données sérielles qui se trouvent actuellement dans le tampon.

après avoir effacé les anciennes données, vous pouvez utiliser ser.readline () pour obtenir les données les plus récentes du périphérique série.

je pense que c'est un peu plus simple que les autres solutions proposées ici. A travaillé pour moi, j'espère que c'est approprié pour vous.

4
répondu user3524946 2014-04-11 19:53:24

cette méthode vous permet de contrôler séparément le temps d'attente pour la collecte de toutes les données pour chaque ligne, et un temps d'attente différent pour les lignes supplémentaires.

# get the last line from serial port
lines = serial_com()
lines[-1]              

def serial_com():
    '''Serial communications: get a response'''

    # open serial port
    try:
        serial_port = serial.Serial(com_port, baudrate=115200, timeout=1)
    except serial.SerialException as e:
        print("could not open serial port '{}': {}".format(com_port, e))

    # read response from serial port
    lines = []
    while True:
        line = serial_port.readline()
        lines.append(line.decode('utf-8').rstrip())

        # wait for new data after each line
        timeout = time.time() + 0.1
        while not serial_port.inWaiting() and timeout > time.time():
            pass
        if not serial_port.inWaiting():
            break 

    #close the serial port
    serial_port.close()   
    return lines
3
répondu fja0568 2015-01-19 21:42:14

Vous aurez besoin d'une boucle pour lire tout ce qui est envoyé, le dernier appel à readline() bloquant jusqu'à l'expiration du délai. Donc:

def readLastLine(ser):
    last_data=''
    while True:
        data=ser.readline()
        if data!='':
            last_data=data
        else:
            return last_data
2
répondu quamrana 2009-07-07 17:44:05

légère modification au code de mtasic & Vinay Sajip:

alors que j'ai trouvé ce code très utile pour une application similaire, j'ai eu besoin de les lignes revenant d'un périphérique série qui enverrait des informations périodiquement.

j'ai choisi d'enlever le premier élément du dessus, de l'enregistrer, puis de rejoindre les autres éléments comme le nouveau tampon et de continuer à partir de là.

je me rends compte que c'est ce Qu'était Greg j'ai demandé, mais j'ai pensé que ça valait la peine de le partager comme une note secondaire.

def receiving(ser):
    global last_received

    buffer = ''
    while True:
        buffer = buffer + ser.read(ser.inWaiting())
        if '\n' in buffer:
            lines = buffer.split('\n')
            last_received = lines.pop(0)

            buffer = '\n'.join(lines)
2
répondu Crazy Joe Malloy 2009-07-17 18:40:17

en utilisant .inWaiting() à l'intérieur d'une boucle infinie peut être problématique. Il peut porc de l'ensemble de la CPU selon la mise en oeuvre. Je recommande plutôt d'utiliser une taille précise de données à lire. Dans ce cas, les éléments suivants doivent être fait par exemple:

ser.read(1024)
2
répondu Srinath 2012-10-06 20:54:46

Trop de complications

Quelle est la raison de diviser l'objet bytes par newline ou par d'autres manipulations de tableaux? J'écris la méthode la plus simple, qui permettra de résoudre votre problème:

import serial
s = serial.Serial(31)
s.write(bytes("ATI\r\n", "utf-8"));
while True:
    last = ''
    for byte in s.read(s.inWaiting()): last += chr(byte)
    if len(last) > 0:
        # Do whatever you want with last
        print (bytes(last, "utf-8"))
        last = ''
2
répondu LXSoft 2014-11-04 18:35:33

voici un exemple utilisant un wrapper qui vous permet de lire la ligne la plus récente sans CPU à 100%

class ReadLine:
    """
    pyserial object wrapper for reading line
    source: https://github.com/pyserial/pyserial/issues/216
    """
    def __init__(self, s):
        self.buf = bytearray()
        self.s = s

    def readline(self):
        i = self.buf.find(b"\n")
        if i >= 0:
            r = self.buf[:i + 1]
            self.buf = self.buf[i + 1:]
            return r
        while True:
            i = max(1, min(2048, self.s.in_waiting))
            data = self.s.read(i)
            i = data.find(b"\n")
            if i >= 0:
                r = self.buf + data[:i + 1]
                self.buf[0:] = data[i + 1:]
                return r
            else:
                self.buf.extend(data)

s = serial.Serial('/dev/ttyS0')
device = ReadLine(s)
while True:
    print(device.readline())
0
répondu bdoubleu 2018-07-12 10:40:03