I have been doing a lot of http retrieval lately and the most efficient way to do that is with gzip compression enabled. Fortunately Python makes that really easy. All you have to do is derive from urllib2.HTTPHandler and override http_open().
import httplib, urllib, urllib2
class GzipHandler(urllib2.HTTPHandler):
def http_open(self, req):
req.add_header('Accept-encoding', 'gzip')
r = self.do_open(httplib.HTTPConnection, req)
if 'Content-Encoding'in r.headers and \
r.headers['Content-Encoding'] == 'gzip':
fp = gzip.GzipFile(fileobj=StringIO(r.read()))
else:
fp = r
resp = urllib.addinfourl(fp, r.headers, r.url, r.code)
resp.msg = r.msg
return resp
The Accept-encoding header tells the server that this client supports gzip compression and if the Content-Encoding header is set to gzip the server returned an compressed response. Now you just need to build your opener.
def retrieve(url):
request = urllib2.Request(url)
opener = urllib2.build_opener(GzipHandler)
return opener.open(request)
For more information see:
Download the
source.
I wanted to search Twitter and after reading their API docs I knew this was going to be a fun task given their native json support.
That means I would just have to handle pagination and creating the search query. Below is the resulting code.
twitter-search.py
#!/usr/bin/python2.7
import argparse
import datetime
import json
import urllib
import urlparse
class Twitter(object):
search_url = 'http://search.twitter.com/search.json'
def __init__(self, verbose=False):
self.verbose = verbose
super(Twitter, self).__init__()
def search(self, query, until=None, rpp=100, max_results=None):
results = []
params = {
'q': query,
'rpp': rpp,
}
if until:
params['until'] = until.strftime('%Y-%m-%d')
if self.verbose:
print(params)
url = '%s?%s' % (self.search_url, urllib.urlencode(params))
response = json.loads(urllib.urlopen(url).read())
results.extend(response['results'])
if len(results) >= max_results:
return results
while 'next_page' in response:
url = self.search_url + response['next_page']
response = json.loads(urllib.urlopen(url).read())
if self.verbose:
print('%s: %s' % (url, len(response['results'])))
results.extend(response['results'])
if len(results) >= max_results:
break
return results
def search_last_day(self, *args, **kwargs):
kwargs['until'] = datetime.datetime.now() - datetime.timedelta(days=1)
return self.search(*args, **kwargs)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Search twitter')
parser.add_argument('search', nargs=1)
parser.add_argument('--rpp', dest='rpp', type=int, default=100, help='Results per page')
parser.add_argument('-m', '--max-results', dest='max_results', type=int, default=100, help='Max results returned')
parser.add_argument('-p', '--print-results', dest='print_results', action='store_true', help='Print the results')
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='Turn verbose on')
args = parser.parse_args()
twitter = Twitter(verbose=args.verbose)
results = twitter.search_last_day(args.search, rpp=args.rpp, max_results=args.max_results)
print('Found %s items' % (len(results)))
if args.verbose:
json.dumps(results, indent=4)
if args.print_results:
for result in results:
print('%s' % (result['text']))
Download the
source.
I needed to collect the output from ffmpeg for some profiling. It proved more challenging than I anticipated as ffmpeg writes the data unflushed to stderr making it unreadable using stdio. To get the data the stderr file descriptor has to set to NONBLOCK using fcntl. Here is the resulting Python code.
sample.py
def encode(filename, callback=None):
cmd = 'ffmpeg -i "%s" -acodec libfaac -ab 128kb ' + \
'-vcodec mpeg4 -b 1200kb -mbd 2 -flags +4mv ' + \
'-trellis 2 -cmp 2 -subcmp 2 -s 320x180 "%s.mp4"'
pipe = subprocess.Popen(
shlex.split(cmd % (filename, os.path.splitext(filename)[0])),
stderr=subprocess.PIPE,
close_fds=True
)
fcntl.fcntl(
pipe.stderr.fileno(),
fcntl.F_SETFL,
fcntl.fcntl(pipe.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK,
)
# frame= 29 fps= 0 q=2.6 size= 114kB time=0.79 bitrate=1181.0kbits/s
reo = re.compile("""\S+\s+(?P<frame>d+) # frame
\s\S+\s+(?P<fps>\d+) # fps
\sq=(?P<q>\S+) # q
\s\S+\s+(?P<size>\S+) # size
\stime=(?P<time>\S+) # time
\sbitrate=(?P<bitrate>[\d\.]+) # bitrate
""", re.X)
while True:
readx = select.select([pipe.stderr.fileno()], [], [])[0]
if readx:
chunk = pipe.stderr.read()
if chunk == '':
break
m = reo.match(chunk)
if m and callback:
callback(m.groupdict())
time.sleep(.1)
The complete script is located
here.
Download the
source.
I just wanted a simple wrapper around syslog. The Python logging module is good but it was too heavyweight for what I needed. Here is simple logging class for syslog. It has an optional decorator to provide the function name to syslog which I find useful for debugging.
syslogging.py
#!/usr/bin/python
import syslog
class SysLogging:
def __init__(self, facility, prefix=None):
self.facility = facility
if prefix:
syslog.openlog(prefix)
def _logit(self, priority, message):
syslog.syslog(self.facility | priority, '%s' % (message))
def debug(self, message):
self._logit(syslog.LOG_DEBUG, message)
def info(self, message):
self._logit(syslog.LOG_INFO, message)
def notice(self, message):
self._logit(syslog.LOG_NOTICE, message)
def warning(self, message):
self._logit(syslog.LOG_WARNING, message)
def error(self, message):
self._logit(syslog.LOG_ERR, message)
def crit(self, message):
self._logit(syslog.LOG_CRIT, message)
def alert(self, message):
self._logit(syslog.LOG_ALERT, message)
def emerg(self, message):
self._logit(syslog.LOG_EMERG, message)
def syslogging(func):
def caller(*args, **kwargs):
if 'logger' not in kwargs:
kwargs['logger'] = SysLogging(syslog.LOG_LOCAL2, func.__name__)
return func(*args, **kwargs)
return caller
@syslogging
def test_func(arg1, arg2=None, logger=None):
logger.info('%s %s' % (arg1, arg2))
if __name__ == '__main__':
test_func(1, 'two')
Download the
source.
When profiling it can be useful to log the amount of time that is spent in a function. With Python that is super easy to do with decorators.
logtime.py
#!/usr/bin/python
import time
import syslog
def logtime(func):
def caller(*args, **kwargs):
stime = time.time()
ret = func(*args, **kwargs)
syslog.syslog(
syslog.LOG_LOCAL2 | syslog.LOG_INFO,
'%s=%s\n' % (func.__name__, time.time() - stime))
return ret
return caller
@logtime
def test_func(arg1, arg2=None):
print arg1, arg2
time.sleep(1)
if __name__ == '__main__':
test_func(1, 2)
logtime will log the time spent in the function to syslog.
Jul 14 15:05:01 olomai python: test_func=1.00114893913
Download the
source.
My home on the web. Uses the
Django
web framework,
uwsgi as the WSGI server,
nginx as the media server and load
balancer,
pygments for the syntax highlighting.
Author and designer.
CrowdTube.tv streams trending videos to
your browser like a never ending tv show. This was
Cory Shaw's awesome idea. I was responsible for the backend development.
dmusic.bz
dMusic.bz is
Pandora
clone written using Django and JavaScript. It uses
last.fm
to find the music relationships. Author and designer.
ilovephotos.com is a photo sharing and
tagging website. Facial detection was run on the photos so the bounding boxes were
already drawn around the faces eliminating a step in a tedious process. I authored
views and the process to run facial detection on the photos and upload them to S3.
Kindfish.com was
BlueLava's first photo site. It was
event based and slideshows would be created from the albums. I authored the views
and process to generate the slideshows from the photos on EC2.
Below is a snippet of my resume. Click here to view
the full version in pdf, its proper format.