Tuesday, October 20, 2020

Using asyncio with libgpiod on an OrangePI Zero

 Python is a great tool to create simple/fast test code, combine that with libgpiod, now you can toggle and monitor gpio pins.  

In recent Linux kernels libgpiod is a library to write C/C++ and Python to toggle and monitor general purpose pins.   There are plenty examples for python.  I use buildroot to create a Linux distro, one option under hardware is to install libgpiod and if you have python3 selected now you can write python scripts. 

Here is two basic classes gpiowrapper.py:

import gpiod
import threading
import asyncio
import logging

class OutputPin:

def __init__(self, chip ="gpiochip0", pin = 0):

logging.debug("args chip={0} pin={1}".format(chip, pin))
self.chip = gpiod.Chip(chip)

if self.chip is None:
logging.error("chip not found")
else:
logging.debug("type {0} {1}".format(type(self.chip), type(pin)))

self.line = self.chip.get_line(pin)
self.line.request(consumer=chip, type=gpiod.LINE_REQ_DIR_OUT)

def set_level(self, level=0):
if level:
self.line.set_value(1)
else:
self.line.set_value(0)

class EventPin:

def __init__(self, chip = "gpiochip0", pin=0 ):
try:
self._pin = pin
self.chip = gpiod.Chip(chip)
if self.chip is None:
logging.error("chip not found")
else:
logging.debug("type {0} {1}".format(type(self.chip), type(pin)))

self.line = self.chip.get_line(pin)

self.line.request(consumer=chip, type=gpiod.LINE_REQ_EV_RISING_EDGE)
self.on_wait = asyncio.Queue()

asyncio.get_running_loop().add_reader( self.line.event_get_fd(), self.event_cb)

except Exception as ex:
logging.error("Error: {0}".format(str(ex)))

def event_cb(self):
event = self.line.event_read()
if event.type == gpiod.LineEvent.RISING_EDGE:
evstr = ' RISING EDGE'
elif event.type == gpiod.LineEvent.FALLING_EDGE:
evstr = 'FALLING EDGE'
else:
raise TypeError('Invalid event type')

logging.debug(" event {0}".format(evstr))
self.on_wait.put_nowait(event.type)
async def wait(self):
await self.on_wait.get()




 

The EventPin class can be used with asyncio to wait for Edge Triggered GPIO.  The key line is adding file descriptor of the event pin to asyncio and perform a callback when the FD is changes and push the event into a queue so await can be used. 

Here is a simple blink.py:

import gpiod
import sys
import time
import argparse
import gpiowrapper
import asyncio
import logging

async def blink(io_chip, o_pin):

logging.debug("blink")
pin = gpiowrapper.OutputPin(chip=io_chip, pin=int(o_pin))

while True:
pin.set_level(0)
await asyncio.sleep(0.1)
pin.set_level(1)
await asyncio.sleep(0.1)


async def toggleTask( o_pin, timeout):
try:
while True:
o_pin.set_level(0)
await asyncio.sleep(timeout)
o_pin.set_level(1)
await asyncio.sleep(timeout)
except Exception as ex:
logging.error("error {0}".format(str(ex)))

async def monitor_event(io_chip, output_pin, event_pin):

o_pin = gpiowrapper.OutputPin(io_chip, output_pin)
evt_pin = gpiowrapper.EventPin(io_chip, event_pin)

toggle_task = asyncio.create_task(toggleTask(o_pin, .5))

while True:
val = await evt_pin.wait()
if val == gpiod.LineEvent.RISING_EDGE:
logging.debug("Rising edge event")
elif val == gpiod.LineEvent.FALLING_EDGE:
logging.debug("Falling edge event")

if __name__ == "__main__":
logging.basicConfig(format='%(asctime)s %(levelname)s %(filename)s:%(lineno)s - %(funcName)20s() %(message)s', level=logging.DEBUG)
# execute only if run as a script
# Create the parser
my_parser = argparse.ArgumentParser(description='List the content of a folder')

# Add the arguments
my_parser.add_argument('-chip', help='chip ')
my_parser.add_argument('-opin', type=int, help='output pin')
my_parser.add_argument('--epin', type=int, help='event pin')
my_parser.add_argument('--blink', help='event pin')
my_parser.add_argument('--event', help='event pin')

# Execute the parse_args() method
args = my_parser.parse_args()
logging.debug("main")

if args.blink:
logging.debug("run blink")
asyncio.run(blink( args.chip, args.opin))
if args.event:
logging.debug("monitor")
asyncio.run(monitor_event(args.chip, args.opin, args.epin))

 

Load both files onto the Orange PI Zero. 

python blink.py -chip gpiochip0 -opin 1 --epin 0 --event True

This will use  PIN11 (GPIO1) for output and toggle at 1 second.  Pin13 will wait for Rising Edge trigger.  If you connect both pins together then you will get:

# python blink.py -chip gpiochip0 -opin 1 --epin 0 --event True
1970-01-01 00:47:21,419 DEBUG blink.py:60 -             <module>() main
1970-01-01 00:47:21,420 DEBUG blink.py:66 -             <module>() monitor
1970-01-01 00:47:21,421 DEBUG selector_events.py:59 -             __init__() Using selector: EpollSelector
1970-01-01 00:47:21,424 DEBUG gpiowrapper.py:10 -             __init__() args chip=gpiochip0 pin=1
1970-01-01 00:47:21,558 DEBUG gpiowrapper.py:16 -             __init__() type <class 'gpiod.Chip'> <class 'int'>
1970-01-01 00:47:21,648 DEBUG gpiowrapper.py:36 -             __init__() type <class 'gpiod.Chip'> <class 'int'>
1970-01-01 00:47:22,153 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:23,156 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:24,159 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:25,161 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:26,164 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:27,167 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:28,170 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:29,172 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:30,175 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:31,178 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:32,180 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:33,183 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:34,186 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:35,188 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:36,191 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:37,194 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:38,197 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:39,199 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:40,202 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:41,205 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1970-01-01 00:47:42,207 DEBUG gpiowrapper.py:57 -             event_cb()  event  RISING EDGE
1