prattom prattom -4 years ago 140
Python Question

using threading in pygame

I have a raspberry pi and on one of the gpio pins I am sending pulse, thus I have python code to detect interrupt on that pin and their are at most 2 interrupts every seconds. Now I want to pass this value(total no. of interrupts) to pygame application.

Currently python code for detecting interrupts writes total no. interrupts to file as interrupt is detected and then pygame application reads that number from the file. Thus my question how can I integrate interrupt detecting code in pygame using threads since I want both pygame application and interrupt detecting code to run in parallel. I read somewhere that pygame is not thread safe.

My code for interrupt detection:

GPIO.setmode(GPIO.BCM)
count = 0
GPIO.setup(2, GPIO.IN, pull_up_down=GPIO.PUD_UP)
def my_callback(channel):
file = open('hello.txt','w')
global count
count += 1
file.write(str(count))
GPIO.add_event_detect(2,GPIO.BOTH, callback=my_callback)

while True:
print "Waiting for input."
sleep(60)
GPIO.cleanup()


pygame application code:

pygame.init()
size=[640,640]
screen=pygame.display.set_mode(size)
pygame.display.set_caption("Test")
done=False
clock=pygame.time.Clock()
font = pygame.font.SysFont("consolas", 25, True)
frame_rate = 20
frame_count = 0
count = 0
while done == False:
for event in pygame.event.get(): # User did something
if event.type == pygame.QUIT: # If user clicked close
done=True # Flag that we are done so we exit this loop
pygame.quit()
sys.exit()

f = open("hello.txt", "r")
count = int(f.read())
output_string = "ACTUAL %s" %count
text = font.render(output_string,True,red)
screen.blit(text, [250,420])
frame_count += 1
clock.tick(frame_rate)
pygame.display.flip()

pygame.quit ()

Answer Source

You can use e.g. the threadsafe Queue class to let your threads communicate with each other.

quick'n'dirty example:

import pygame
from pygame.color import Color
from Queue import Queue
from threading import Thread

q = Queue()

def worker():
    GPIO.setmode(GPIO.BCM)
    GPIO.setup(2, GPIO.IN, pull_up_down=GPIO.PUD_UP)
    def my_callback(channel):
        q.put(True)
    GPIO.add_event_detect(2,GPIO.BOTH, callback=my_callback)

    while True:
        print "Waiting for input."
        sleep(60)
    GPIO.cleanup()


t = Thread(target=worker)
t.daemon = True
t.start()

pygame.init()

screen = pygame.display.set_mode([640,640])
clock  = pygame.time.Clock()
font   = pygame.font.SysFont("consolas", 25, True)
count  = 0
pygame.display.set_caption("Test")

done = False
while not done:
    screen.fill(Color('black'))
    for event in pygame.event.get(): # User did something
        if event.type == pygame.QUIT: # If user clicked close
            done = True
    try: 
        q.get()
        count += 1
    except: pass
    output_string = "ACTUAL          %s"  % count
    text = font.render(output_string, True, Color('red'))
    screen.blit(text, [250,420])
    clock.tick(20)
    pygame.display.flip()
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download