I'm playing around with a radio streaming project. Currently I'm creating a python backend. There are over 150,000 online radio station streams in the database. One feature I'm trying to add is to search the radio stations by their currently playing song. I'm using Dirble's streamscrobbler to grab the currently playing song from each radio station using a request and looking through the metadata.
Obviously this script will need to be multi-threaded in order to grab the currently playing songs in a feasible amount of time. It can take no more than 2 minutes. Preferably 1 minute to 1 minute 30 seconds if this is possible.
I've never messed around with a project of this scale before. Creating too many threads takes up resources, so it seems it's best to create a ThreadPoolExecutor. I'm also using SQLAlchemy to work with inserting these songs into a database. Apparently SQLAlchemy uses a connection pool which is implemented on default?
I'm scheduling this task using the lightweight scheduler python module by Daniel Bader. It seems to be working well.
Now, the problem I seem to be having is I get this error:
error: can't start new thread
I'm guessing this is because I'm using up too much resources. What can I do? I could reduce the amount of threads but the task doesn't seem to complete in the amount of time I need, so that would increase the time it makes to go through every stream url.
from streamscrobbler import streamscrobbler
from concurrent.futures import ThreadPoolExecutor
from sqlalchemy import *
#get song name from station
def manageStation(station_id, station_link):
current_song = getCurrentSong(station_link)
current_song = current_song.replace("'", "")
current_song = current_song.replace("\"", "")
current_song = current_song.replace("/", "")
current_song = current_song.replace("\\", "")
current_song = current_song.replace("%", "")
with db.connect() as con:
rs = con.execute("INSERT INTO station_songs VALUES('" + str(station_id) + "', '" + current_song + "', '') ON DUPLICATE KEY UPDATE song_name = '" + current_song + "';")
streamscrobblerobj = streamscrobbler()
stationinfo = streamscrobblerobj.getServerInfo(stream_url)
metadata = stationinfo.get("metadata")
regex = re.search('\'song\': \'(.*?)\'' , str(metadata))
def update() :
print 'update starting'
db = create_engine('mysql://root:pass@localhost:3306/radio')
threadExecutor = ThreadPoolExecutor(max_workers=20000)
with db.connect() as con:
rs = con.execute("SELECT id, link FROM station_table")
for row in rs.fetchall():
threadExecutor.submit(manageStation, row, row)
You don't need a real thread for each task since most of the time, the thread will be waiting on IO from a socket (the web-request).
from gevent import monkey; monkey.patch_socket() NUM_GLETS = 20 STATION_URLS = ( 'http://station1.com', ... ) pool = gevent.Pool(NUM_GLETS) tasks = [pool.spawn(analyze_station, url) for url in STATION_URLS] pool.join(tasks)
analyze_station is your code for fetching and analyzing the particular station.
The result should be a single threaded program, but instead of blocking on every single web-request, another green thread is run while the socket is waiting on data. This is much more efficient than spawning real threads for mostly idle work.