Derrick Petzold

Open source enthusiast and developer


somaxconn - That pesky limit.

Views: 12,042 Comments: 0

Posted October 23, 2012 by derrick


Not too long ago after upgrading from Lucid LTS to Precise LTS our uWSGI servers started erroring with the following message:

Sun Sep 23 03:29:31 2012 - *** uWSGI listen queue of socket 3 full !!! (129/128) ***

After some light research I found that the backlog limit passed to the listen syscall had been exceeded. The listen queue size defaults to 128 for uWSGI and that number has a maximum system limit set by /proc/sys/net/core/somaxconn. This lead me to ask a couple of questions I thought were worth researching some more.

  • Why is the default limit set to 128?
  • What extactly does the limit do?
  • What are the reprecussions of raising the limit?
  • What is the optimal limit for our machines?

This is what I found:

From the listen manpage

The backlog argument defines the maximum length to which the queue of pending connections for sockfd may grow.  If a connection request arrives when the queue is full,  the  client may  receive  an  error  with an indication of ECONNREFUSED or, if the underlying protocol supports retransmission, the request may be ignored so that a later reattempt  at  connection succeeds.
...
If the backlog argument is greater than the value in /proc/sys/net/core/somaxconn, then it is  silently  truncated  to that value; the default value in this file is 128.  In kernels before 2.4.25, this limit was a hard coded value, SOMAXCONN, with the value 128.

Please note the second paragraph. The value is silently truncated if it exceeds the system max. This caused some service degradation as one of the servers had its listen limit increased but not the system limit. More on that later.

From UNP

When a SYN arrives from a client, TCP creates a new entry on the incomplete queue and then responds with the second segment of the three-way handshake: the server's SYN with an ACK of the client's SYN (Section 2.6). This entry will remain on the incomplete queue until the third segment of the three-way handshake arrives (the client's ACK of the server's SYN), or until the entry times out. (Berkeley-derived implementations have a timeout of 75 seconds for these incomplete entries.) If the three-way handshake completes normally, the entry moves from the incomplete queue to the end of the completed queue. When the process calls accept, which we will describe in the next section, the first entry on the completed queue is returned to the process, or if the queue is empty, the process is put to sleep until an entry is placed onto the completed queue. 
There are several points to consider regarding the handling of these two queues. 

The backlog argument to the listen function has historically specified the maximum value for the sum of both queues.
There has never been a formal definition of what the backlog means. The 4.2BSD man page says that it "defines the maximum length the queue of pending connections may grow to." Many man pages and even the POSIX specification copy this definition verbatim, but this definition does not say whether a pending connection is one in the SYN_RCVD state, one in the ESTABLISHED state that has not yet been accepted, or either. The historical definition in this bullet is the Berkeley implementation, dating back to 4.2BSD, and copied by many others.

Berkeley-derived implementations add a fudge factor to the backlog: It is multiplied by 1.5 (p. 257 of TCPv1 and p. 462 of TCPV2). For example, the commonly specified backlog of 5 really allows up to 8 queued entries on these systems, as we show in Figure 4.10.
The reason for adding this fudge factor appears lost to history [Joy 1994]. But if we consider the backlog as specifying the maximum number of completed connections that the kernel will queue for a socket ([Borman 1997b], as discussed shortly), then the reason for the fudge factor is to take into account incomplete connections on the queue.

Do not specify a backlog of 0, as different implementations interpret this differently (Figure 4.10). If you do not want any clients connecting to your listening socket, close the listening socket.

Assuming the three-way handshake completes normally (i.e., no lost segments and no retransmissions), an entry remains on the incomplete connection queue for one RTT, whatever that value happens to be between a particular client and server. Section 14.4 of TCPv3 shows that for one Web server, the median RTT between many clients and the server was 187 ms. (The median is often used for this statistic, since a few large values can noticeably skew the mean.)

Historically, sample code always shows a backlog of 5, as that was the maximum value supported by 4.2BSD. This was adequate in the 1980s when busy servers would handle only a few hundred connections per day. But with the growth of the World Wide Web (WWW), where busy servers handle millions of connections per day, this small number is completely inadequate (pp. 187–192 of TCPv3). Busy HTTP servers must specify a much larger backlog, and newer kernels must support larger values.
Many current systems allow the administrator to modify the maximum value for the backlog.

A problem is: What value should the application specify for the backlog, since 5 is often inadequate? There is no easy answer to this. HTTP servers now specify a larger value, but if the value specified is a constant in the source code, to increase the constant requires recompiling the server. Another method is to assume some default but allow a command-line option or an environment variable to override the default. It is always acceptable to specify a value that is larger than supported by the kernel, as the kernel should silently truncate the value to the maximum value that it supports, without returning an error (p. 456 of TCPv2).

So some years ago 5 was a questionable limit and that also somewhat explains the silent truncate. Further inspection of kernel source lead to this comment

net/core/request_sock.c:

/*
 * Maximum number of SYN_RECV sockets in queue per LISTEN socket.
 * One SYN_RECV socket costs about 80bytes on a 32bit machine.
 * It would be better to replace it with a global counter for all sockets
 * but then some measure against one socket starving all other sockets
 * would be needed.
 *
 * The minimum value of it is 128. Experiments with real servers show that
 * it is absolutely not enough even at 100conn/sec. 256 cures most
 * of problems.
 * This value is adjusted to 128 for low memory machines,
 * and it will increase in proportion to the memory of machine.
 * Note : Dont forget somaxconn that may limit backlog too.
 */

So now things are becoming more clear. The backlog defines the number of clients that are in the handshake phase of TCP connection and from what I can tell from Stevens the value seems to be mostly of legacy significance when servers had memory in the number of megabytes not gigabytes as we do today.

Continuing inspection shows that queue is a list of request_sock structures.

net/request_sock.h

/* struct request_sock - mini sock to represent a connection request
 */
struct request_sock {
        struct request_sock             *dl_next; /* Must be first member! */
        u16                             mss;
        u8                              retrans;
        u8                              cookie_ts; /* syncookie: encode tcpopts in timestamp */
        /* The following two fields can be easily recomputed I think -AK */
        u32                             window_clamp; /* window clamp at creation time */
        u32                             rcv_wnd;          /* rcv_wnd offered first time */
        u32                             ts_recent;
        unsigned long                   expires;
        const struct request_sock_ops   *rsk_ops;
        struct sock                     *sk;
        u32                             secid;
        u32                             peer_secid;
};

This is where the queue is allocated.

net/core/request_sock.c

int reqsk_queue_alloc(struct request_sock_queue *queue,
                      unsigned int nr_table_entries)
{
        size_t lopt_size = sizeof(struct listen_sock);
        struct listen_sock *lopt;

        nr_table_entries = min_t(u32, nr_table_entries, sysctl_max_syn_backlog);
        nr_table_entries = max_t(u32, nr_table_entries, 8);
        nr_table_entries = roundup_pow_of_two(nr_table_entries + 1);
        lopt_size += nr_table_entries * sizeof(struct request_sock *);
        if (lopt_size > PAGE_SIZE)
                lopt = vzalloc(lopt_size);
        else
                lopt = kzalloc(lopt_size, GFP_KERNEL);
        if (lopt == NULL)
                return -ENOMEM;

        for (lopt->max_qlen_log = 3;
             (1 << lopt->max_qlen_log) < nr_table_entries;
             lopt->max_qlen_log++);

        get_random_bytes(&lopt->hash_rnd, sizeof(lopt->hash_rnd));
        rwlock_init(&queue->syn_wait_lock);
        queue->rskq_accept_head = NULL;
        lopt->nr_table_entries = nr_table_entries;

        write_lock_bh(&queue->syn_wait_lock);
        queue->listen_opt = lopt;
        write_unlock_bh(&queue->syn_wait_lock);

        return 0;
}

Conclusions

So now we have some answers. The somaxconn defines the number of request_sock structures allocated per each listen call. The queue is persistent through the life of the listen socket and the default value of 128 is not applicable to today's servers. Increasing that number 5x would have an insignificant affect on system resources while ensuring your server will stay up during load.

From the kernel comment each entry takes 80 bytes on a 32-bit machine. Doubling the number for 64-bits still leads to an inconsequential number for todays machines. Increasing the limit to 8k would lead to 1.25mb of memory usage.

Actions Taken

Since there is no error if the listen backend limit exceeds the system max I submitted a patch to uWSGI to hard fail if the number passed through the --listen parameter exceeds the system max. The patch was accepted and now the case that caused the second failure will not happen.

Going Forward

I would like to know what the maximum of number slots are being used in the queue. All I know for certain is that the number is greater than 128 and less than 8192. So the most optimal size is somewhere in-between. Considering that the cost of increasing the size is so low this lowers the priority on that.

However I do think it would be interesting case study. For instance would the queue be better served with a linked list that would grow and shrink depending on demand. In the highly available environments we work in it seems that this is an area for improvement. I just cannot see any good reason for lost of service due to the listen backlog queue limit especially one that defaults to 128.

For uWSGI I am happy with the first submission but now I am thinking the --listen parameter should take a soft value like 'max' that would default to system max or maybe even override it. Again my motivation is that I cannot see a good reason for the short limit in the first place.

An auto random character field for Django

Views: 1,003 Comments: 0

Posted October 20, 2012 by derrick


Sometime ago I wrote about generating external ids. An external id is an unique id not related to the database id. For example something a link shortening service would use - http://bit.ly/RaFx8j. The original post had the logic in a model Manager and I did get some feedback saying it would be better implemented as a Django Field. Finally after just a couple of years I had a chance to implement it as such. It is based on the AutoSlugField from the excellent django-extensions app.

class RandomCharField(BaseUniqueField):

    def __init__(self, *args, **kwargs):
        kwargs.setdefault('blank', True)
        kwargs.setdefault('editable', False)

        self.lower = kwargs.pop('lower', False)
        self.digits_only = kwargs.pop('digits_only', False)
        self.alpha_only = kwargs.pop('alpha_only', False)
        self.include_punctuation = kwargs.pop('include_punctuation', False)
        self.length = kwargs.pop('length', 8)
        kwargs['max_length'] = self.length

        # legacy
        kwargs.pop('include_digits', False)

        if self.digits_only:
            self.valid_chars = string.digits
        else:
            self.valid_chars = string.lowercase

            if not self.lower:
                self.valid_chars += string.uppercase

            if not self.alpha_only:
                self.valid_chars += string.digits

                if self.include_punctuation:
                   self.valid_chars += string.punctuation

        super(RandomCharField, self).__init__(*args, **kwargs)

    def generate_chars(self, *args, **kwargs):
        return ''.join([random.choice(list(self.valid_chars)) for x in range(self.length)])

    def pre_save(self, model_instance, add):
        if not add:
            return getattr(model_instance, self.attname)

        initial = self.generate_chars()
        value = self.find_unique(model_instance, initial, self.generate_chars)
        setattr(model_instance, self.attname, value)
        return value

    def get_internal_type(self):
        return "CharField"

    def south_field_triple(self):
        "Returns a suitable description of this field for South."
        # We'll just introspect the _actual_ field.
        from south.modelsinspector import introspector
        field_class = '%s.RandomCharField' % (self.__module__)
        args, kwargs = introspector(self)
        kwargs.update({
            'alpha_only': repr(self.alpha_only),
            'digits_only': repr(self.digits_only),
            'include_punctuation': repr(self.include_punctuation),
            'length': repr(self.length),
            'lower': repr(self.lower),
        })
        # That's our definition!
        return (field_class, args, kwargs)

Below is some sample output:

>>> class RandomCharTestModel(models.Model):
>>>     chars = RandomCharField(length=6)
BVm9GE

>>> class RandomCharTestModelAlpha(models.Model):
>>>     chars = RandomCharField(length=12, alpha_only=True)
CxPWKJHDPnNO

>>> class RandomCharTestModelDigits(models.Model):
>>>     chars = RandomCharField(length=4, digits_only=True)
7097

>>> class RandomCharTestModelPunctuation(models.Model):
>>>     chars = RandomCharField(length=12, include_punctuation=True)
k[ZS.TR,0LHO    

>>> class RandomCharTestModelLower(models.Model):
>>>     chars = RandomCharField(length=12, lower=True, alpha_only=True)
pzolbemetmok

>>> class RandomCharTestModelLowerAlphaDigits(models.Model):
>>>     chars = RandomCharField(length=12, lower=True, include_punctuation=False)
wfaytk3msiin
For the full implementation including tests see my django utilitiy library on github.

Django request logging and json

Views: 6,680 Comments: 0

Posted January 15, 2012 by derrick


I wrote a decorator to make the request available to logging by using a LoggerAdpater and RequestInfo make the request object dict-like.

class RequestInfo(object):

    def __init__(self, request):
        self.request = request

    def __getitem__(self, name):

        if name == 'request.host':
            return socket.gethostname()

        if name.startswith('request.meta.'):
            val = name.split('.')[2]
            try:
                return self.request.META[val.upper()]
            except KeyError as e:
                return None
        return eval('self.%s' % (name))

    def _get_attrs(self, obj):
        attrs = []
        for attr in dir(obj):
            try:
                if not attr.startswith('_') and \
                        not callable(getattr(obj, attr)):
                    attrs.append(attr)
            except AttributeError:
                pass
        return attrs

    def __iter__(self):
        keys = ['request.host']
        keys.extend(['request.%s' % (a) for a in
                self._get_attrs(self.request)])
        keys.extend(['request.session.%s' % (a) for a in
            self._get_attrs(self.request.session)])
        keys.extend(['request.user.%s' % (a) for a in
            self._get_attrs(self.request.user)])
        keys.extend(['request.meta.%s' % (a.lower()) for a in
            self.request.META.keys()])
        return keys.__iter__()

def logger(name):
    def wrap(func):
        def caller(*args, **kwargs):
            request = None
            for arg in args:
                if isinstance(arg, HttpRequest):
                    request = arg
            if 'logger' not in kwargs:
                if request is not None:
                    kwargs['logger'] = logging.LoggerAdapter(
                            logging.getLogger(name), RequestInfo(request))
                else:
                    kwargs['logger'] = logging.getLogger(name)
            return func(*args, **kwargs)
        return caller
    return wrap
its meant to be used with the views like this
@log.logger(__name__)
def home(request, logger=None):
    logger.info('user performed some action')
update your logging.yaml to use the new format. For more info on that please see my previous post.
request-json:
    (): log.CustomJsonFormatter
    format: "'created', 'request.host', 'module', 'funcName', 'lineno', 'levelname', 'request.meta.remote_addr',  'request.meta.http_user_agent', 'request.user.username', 'request.path', 'message'"
and the resulting log message should look something like this
{"host": "fastcgi22", "request.meta.http_user_agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/535.15 (KHTML, like Gecko) Ubuntu/11.04 Chromium/18.0.996.0 Chrome/18.0.996.0 Safari/535.15", "request.path": "/", "created": 1326706577.275584, "module": "log", "funcName": "caller", "request.user.username": "derrick", "lineno": 114, "request.meta.remote_addr": "76.93.216.138", "message": "user performed some action", "levelname": "INFO"}
Now django requests are fully integrated with logging. Happy Trails.
Download the source.

Django logging, json, and syslog

Views: 2,393 Comments: 0
Tags:

Posted January 12, 2012 by derrick


In this post I will show you how to configure logging so your application servers can send info in nicely parseable json format to a centralized server for analysis.

So first things first. Lets begin by moving the logging out of settings.py and into it own config. Why because its easier to read and edit in yaml than in python/json and it shows up great in vim/emacs. So let's replace this:

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'filters': {
        'require_debug_false': {
            '()': 'django.utils.log.RequireDebugFalse',
        }
    },
    'handlers': {
        'mail_admins': {
            'level': 'ERROR',
            'filters': ['require_debug_false'],
            'class': 'django.utils.log.AdminEmailHandler'
        }
    },
    'loggers': {
        'django.request': {
            'handlers': ['mail_admins'],
            'level': 'ERROR',
            'propagate': True,
        },
    }
}
with this
import yaml
SITE_ROOT = os.path.dirname(os.path.realpath(__file__))
LOGGING = yaml.load(open(SITE_ROOT + '/logging.yaml', 'r'))
and put this into logging.yaml
version: 1
disable_existing_loggers: True
        
filters:
    require_debug_false:
        (): django.utils.log.RequireDebugFalse

handlers: 
    mail_admins:
        level: ERROR
        filters: [require_debug_false]
        class: django.utils.log.AdminEmailHandler

loggers:
    django.request:
        handlers: [mail_admins]
        propagate: True
        level: ERROR
Yay. Now we have a logging config that's easy to read and edit. To make the logging as painless as possible use Madzak's excellent module python-json-logger. I went the distance and defined my own _fmt parser.
class CustomJsonFormatter(jsonlogger.JsonFormatter):
    def parse(self):
        return eval(self._fmt)
That allows for the format to be defined as python string. Append the json formatter, syslog handler and the app logger to their respective locations in logging.yaml.
formatters:
    (): log.CustomJsonFormatter
    json:
        format: "'created', 'module', 'funcName', 'lineno', 'levelname', 'message'"

handlers:
    syslog:
        level: DEBUG
        class: logging.handlers.SysLogHandler
        facility: local0
        formatter: json
        address: [loghost, 514]

loggers:
    app_name:
        handlers: [syslog]
        propagate: True
        level: DEBUG
Next on to your loghost. I use my db server but any server with space for the logs will do. We need to configure it to accept remote requests and enable the local0 facility. I don't know why the local? are not enabled by default but no matter. Ubuntu's default syslogd is rsyslog and if that is what you got put the following in /etc/rsyslog.d/local0.conf
$ModLoad imudp
$UDPServerRun 514
$template msg,"{'%msg%\n"
local0.*        /var/log/local0;msg
Notice the "{" in the template. That should certainly not be there but for some reason rsyslog strips some of the message with just logging the %msg% like that. Why I have no idea and I wasn't going to go to the source to find out (syslog-ng did something similar so I am really at a loss). Now be sure to create the file and restart rsyslog.
sudo touch /var/log/local0
sudo chown syslog.adm /var/log/local0
sudo restart rsyslog
logger -p local0.info test test

Okay now its time to test. Logging messages from your app server should now be going to the central log host but you would be the luckiest person alive if that all worked without a hitch. If it doesn't be sure to check firewall rules, security groups, stuff like that.

Download the source.

Adding http gzip support in Python

Views: 3,355 Comments: 1
Tags:

Posted May 11, 2011 by derrick


I have been doing a lot of http retrieval lately and the most efficient way to do that is with gzip compression enabled. Fortunately Python makes that really easy. All you have to do is derive from urllib2.HTTPHandler and override http_open().

import httplib, urllib, urllib2
class GzipHandler(urllib2.HTTPHandler):
    def http_open(self, req):
        req.add_header('Accept-encoding', 'gzip')
        r = self.do_open(httplib.HTTPConnection, req)
        if 'Content-Encoding'in r.headers and \
                r.headers['Content-Encoding'] == 'gzip':
            fp = gzip.GzipFile(fileobj=StringIO(r.read()))
        else:
            fp = r
        resp = urllib.addinfourl(fp, r.headers, r.url, r.code)
        resp.msg = r.msg
        return resp

The Accept-encoding header tells the server that this client supports gzip compression and if the Content-Encoding header is set to gzip the server returned an compressed response. Now you just need to build your opener.

def retrieve(url):
    request = urllib2.Request(url)
    opener = urllib2.build_opener(GzipHandler)
    return opener.open(request)

For more information see:

Download the source.

Searching Twitter with Python

Views: 4,282 Comments: 0
Tags:

Posted May 4, 2011 by derrick


I wanted to search Twitter and after reading their API docs I knew this was going to be a fun task given their native json support.

That means I would just have to handle pagination and creating the search query. Below is the resulting code.

twitter-search.py

#!/usr/bin/python2.7

import argparse
import datetime
import json
import urllib
import urlparse

class Twitter(object):

    search_url = 'http://search.twitter.com/search.json'

    def __init__(self, verbose=False):
        self.verbose = verbose
        super(Twitter, self).__init__()

    def search(self, query, until=None, rpp=100, max_results=None):

        results = []
        params = {
            'q': query,
            'rpp': rpp,
        }
        if until:
            params['until'] = until.strftime('%Y-%m-%d')

        if self.verbose:
            print(params)

        url = '%s?%s' % (self.search_url, urllib.urlencode(params))
        response = json.loads(urllib.urlopen(url).read())
        results.extend(response['results'])

        if len(results) >= max_results:
            return results
       
        while 'next_page' in response:
            url = self.search_url + response['next_page']
            response = json.loads(urllib.urlopen(url).read())
       
            if self.verbose:
                print('%s: %s' % (url, len(response['results'])))

            results.extend(response['results'])
            if len(results) >= max_results:
                break
        return results

    def search_last_day(self, *args, **kwargs):
        kwargs['until'] = datetime.datetime.now() - datetime.timedelta(days=1)
        return self.search(*args, **kwargs)

if __name__ == '__main__':

    parser = argparse.ArgumentParser(description='Search twitter')
    parser.add_argument('search', nargs=1)
    parser.add_argument('--rpp', dest='rpp', type=int, default=100, help='Results per page')
    parser.add_argument('-m', '--max-results', dest='max_results', type=int, default=100, help='Max results returned')
    parser.add_argument('-p', '--print-results', dest='print_results', action='store_true', help='Print the results')
    parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='Turn verbose on')
    args = parser.parse_args()

    twitter = Twitter(verbose=args.verbose)
    results = twitter.search_last_day(args.search, rpp=args.rpp, max_results=args.max_results)
    print('Found %s items' % (len(results)))
    if args.verbose:
        json.dumps(results, indent=4)
    if args.print_results:
        for result in results:
            print('%s' % (result['text']))
Download the source.

Generating external ids with django

Views: 2,310 Comments: 0
Tags:

Posted September 21, 2010 by derrick


Sometimes you want an alternative to the traditional consecutive database ids i.e. to keep people from walking your site. Below is the code to generate an external id (a random 5 character field) when a model instance is created. I have used this on a couple of sites now and I am pretty happy with it.

models.py

from django.contrib.auth.models import User
from django.contrib.contenttypes import generic
from django.contrib.contenttypes.models import ContentType
from django.db import models

import random

class YourModelManager(models.Manager):

    @staticmethod
    def _random_id(prefix='', length=5):
        alphabet = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
        for x in range(length):
            prefix += random.choice(alphabet)
        return prefix

    @classmethod
    def _generate_external_id(cls):
        external_id = cls._random_id()
        while (YourModel.objects.filter(external_id=external_id).count() > 0):
            external_id = cls._random_id()
        return external_id

    def create(self, **kwargs):
        kwargs['external_id'] = self._generate_external_id()
        return self.get_query_set().create(**kwargs)

    def get_or_create(self, **kwargs):
        try:
            return self.get_query_set().get(**kwargs), False
        except self.model.DoesNotExist:
            kwargs['external_id'] = self._generate_external_id()
            return self.get_query_set().get_or_create(**kwargs)

class YourModel(models.Model):
    external_id = models.CharField(max_length=5, unique=True)
    objects = YourModelManager()
Download the source.

Capturing the output from ffmpeg

Views: 6,450 Comments: 0
Tags:

Posted August 20, 2010 by derrick


I needed to collect the output from ffmpeg for some profiling. It proved more challenging than I anticipated as ffmpeg writes the data unflushed to stderr making it unreadable using stdio. To get the data the stderr file descriptor has to set to NONBLOCK using fcntl. Here is the resulting Python code.

sample.py

def encode(filename, callback=None):
    cmd = 'ffmpeg -i "%s" -acodec libfaac -ab 128kb ' + \
          '-vcodec mpeg4 -b 1200kb -mbd 2 -flags +4mv ' + \
          '-trellis 2 -cmp 2 -subcmp 2 -s 320x180 "%s.mp4"'
    pipe = subprocess.Popen(
        shlex.split(cmd % (filename, os.path.splitext(filename)[0])),
        stderr=subprocess.PIPE,
        close_fds=True
    )
    fcntl.fcntl(
        pipe.stderr.fileno(),
        fcntl.F_SETFL,
        fcntl.fcntl(pipe.stderr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK,
    )
    # frame=   29 fps=  0 q=2.6 size=     114kB time=0.79 bitrate=1181.0kbits/s
    reo = re.compile("""\S+\s+(?P<frame>d+)  # frame
                         \s\S+\s+(?P<fps>\d+)           # fps
                         \sq=(?P<q>\S+)                    # q
                         \s\S+\s+(?P<size>\S+)          # size
                         \stime=(?P<time>\S+)           # time
                         \sbitrate=(?P<bitrate>[\d\.]+) # bitrate
                         """, re.X)
    while True:
        readx = select.select([pipe.stderr.fileno()], [], [])[0]
        if readx:
            chunk = pipe.stderr.read()
            if chunk == '':
                break
            m = reo.match(chunk)
            if m and callback:
                callback(m.groupdict())
        time.sleep(.1)
The complete script is located here.
Download the source.

Learning Erlang

Views: 1,769 Comments: 0
Tags:

Posted August 2, 2010 by derrick


I have been really loving Python lately but after reading this post. I thought it would be a good idea to check out Erlang. I have heard its concurrency and network support is out of this world and being in a knowledge based industry extra knowledge never hurts. This simple code snippet shows how much Erlang differs from the traditional procedural languages.
average(X) -> sum(X) / len(X).
sum([H|T]) -> H + sum(T);
sum([]) -> 0.
len([_|T]) -> 1 + len(T);
len([]) -> 0.
average takes at list X who calls sum and len. Both of those are recursive functions that split the list into the first element H and the remainder T. Variables must start with a capital letter and the '_' denotes the result is not used. Notice in this example no temporary variables were used. Talk about putting the "f" in functional. I can't wait to get to the concurrent stuff.
Download the source.

Upload files to S3 with progressbar

Views: 4,516 Comments: 0
Tags:

Posted July 28, 2010 by derrick


Here is a script I wrote to upload files to S3. It differs from s3put in that there is a progress indicator showing the ETA and the percentage uploaded and a summary after the transfer is complete.

s3upload.py

#!/usr/bin/python

import os
import sys
import optparse
import progressbar
import time

from boto.s3.connection import S3Connection
from boto.exception import S3ResponseError
from boto.s3.key import Key

AWS_ACCESS_KEY_ID = ''
AWS_SECRET_ACCESS_KEY = ''

pbar = None

def sizeof_fmt(num):
    for x in ['bytes','KB','MB','GB','TB']:
        if num < 1024.0:
            return "%3.1f%s" % (num, x)
        num /= 1024.0

def progress_callback(current, total):
    try:
        pbar.update(current)
    except AssertionError, e:
        print e

def upload_file(filename, bucket, prefix=None, reduced_redundancy=False):

    global pbar

    key = Key(bucket)
    if prefix:
        key.key = '%s/%s' % (prefix, filename)
    else:
        key.key = '%s' % (filename)

    size = os.stat(filename).st_size
    if size == 0:
        print 'Bad filesize for "%s"' % (filename)
        return 0

    widgets = [
        unicode(filename, errors='ignore').encode('utf-8'), ' ',
        progressbar.FileTransferSpeed(),
        ' <<<', progressbar.Bar(), '>>> ',
        progressbar.Percentage(), ' ', progressbar.ETA()
    ]
    pbar = progressbar.ProgressBar(widgets=widgets, maxval=size)
    pbar.start()

    try:
        key.set_contents_from_filename(
            filename,
            cb=progress_callback,
            num_cb=100,
            reduced_redundancy=reduced_redundancy,
        )
        key.set_acl('public-read')
    except IOError, e:
        print e
        return 0

    pbar.finish()
    return size

if __name__ == '__main__':

    parser = optparse.OptionParser(usage='usage: %prog [options] ')
    parser.add_option('-p', '--prefix', dest='prefix')
    parser.add_option('-r', '--reduced_rendundancy', dest='reduced_redundancy', action='store_true', default=False)
    (options, args) = parser.parse_args()

    if len(args) < 2:
        parser.print_help()
        sys.exit(1)

    conn = S3Connection(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
    try:
        bucket = conn.get_bucket(args[0])
    except S3ResponseError, e:
        if e.error_code == 'NoSuchBucket':
            bucket = conn.create_bucket(args[0])
        else:
            raise e

    stime = time.time()
    total_bytes = 0
    count = 0
    for arg in args[1:]:
        size = upload_file(arg, bucket, options.prefix, options.reduced_redundancy)
        total_bytes += size
        count += 1

    if len(args) > 2:
        print
        print '%s files %s at %.2f kb/s' % (count, sizeof_fmt(total_bytes), (total_bytes / 1024)/time.time() - stime))
To install it run these commands
sudo pip install boto progressbar
git clone git://gist.github.com/510222.git s3upload-gist
cd s3upload-gist
vim s3upload # Add your AWS settings
chmod 755 s3upload-gist/s3upload
sudo mv s3upload-gist/s3upload /usr/local/bin
rm -rf s3upload-gist
Download the source.

Django Facebook Authentication Backend

Views: 4,161 Comments: 2

Posted July 15, 2010 by derrick


Here is a Django authentication backend I wrote using Facebook's amazingly simple Graph API. It logs the user in using their Facebook credentials so you site doesn't have to worry about creating user profiles, validating, etc. See

Define the facebook tokens in settings.py and replace with the name of your app. You will probably want to modify the scope on the authorize link in the template, see the authentication permissions link.

backends.py

from django.conf import settings
from django.contrib.auth import models as auth_models

import cgi
import urllib
import simplejson

from <app_name> import models

class FacebookBackend:

    def authenticate(self, token=None):

        facebook_session = models.FacebookSession.objects.get(
            access_token=token,
        )

        profile = facebook_session.query('me')

        try:
            user = auth_models.User.objects.get(username=profile['id'])
        except auth_models.User.DoesNotExist, e:
            user = auth_models.User(username=profile['id'])

        user.set_unusable_password()
        user.email = profile['email']
        user.first_name = profile['first_name']
        user.last_name = profile['last_name']
        user.save()

        try:
            models.FacebookSession.objects.get(uid=profile['id']).delete()
        except models.FacebookSession.DoesNotExist, e:
            pass

        facebook_session.uid = profile['id']
        facebook_session.user = user
        facebook_session.save()

        return user

    def get_user(self, user_id):

        try:
            return auth_models.User.objects.get(pk=user_id)
        except auth_models.User.DoesNotExist:
            return None

models.py

from django.db import models
from django.contrib.auth.models import User

class FacebookSessionError(Exception):
    def __init__(self, error_type, message):
        self.message = message
        self.type = error_type
    def get_message(self):
        return self.message
    def get_type(self):
        return self.type
    def __unicode__(self):
        return u'%s: "%s"' % (self.type, self.message)

class FacebookSession(models.Model):

    access_token = models.CharField(max_length=103, unique=True)
    expires = models.IntegerField(null=True)

    user = models.ForeignKey(User, null=True)
    uid = models.BigIntegerField(unique=True, null=True)

    class Meta:
        unique_together = (('user', 'uid'), ('access_token', 'expires'))

    def query(self, object_id, connection_type=None, metadata=False):
        import urllib
        import simplejson

        url = 'https://graph.facebook.com/%s' % (object_id)
        if connection_type:
            url += '/%s' % (connection_type)

        params = {'access_token': self.access_token}
        if metadata:
            params['metadata'] = 1

        url += '?' + urllib.urlencode(params)
        response = simplejson.load(urllib.urlopen(url))
        if 'error' in response:
            error = response['error']
            raise FacebookSessionError(error['type'], error['message'])
        return response

views.py

from django.contrib import auth
from django.http import HttpResponseRedirect
from django.shortcuts import render_to_response
from django.template import RequestContext

import cgi
import simplejson
import urllib

from <app_name> import settings

def login(request):
    error = None

    if request.user.is_authenticated():
        return HttpResponseRedirect('/yay/')

    if request.GET:
        if 'code' in request.GET:
            args = {
                'client_id': settings.FACEBOOK_APP_ID,
                'redirect_uri': settings.FACEBOOK_REDIRECT_URI,
                'client_secret': settings.FACEBOOK_API_SECRET,
                'code': request.GET['code'],
            }

            url = 'https://graph.facebook.com/oauth/access_token?' + \
                    urllib.urlencode(args)
            response = cgi.parse_qs(urllib.urlopen(url).read())
            access_token = response['access_token'][0]
            expires = response['expires'][0]

            facebook_session = models.FacebookSession.objects.get_or_create(
                access_token=access_token,
            )[0]

            facebook_session.expires = expires
            facebook_session.save()

            user = auth.authenticate(token=access_token)
            if user:
                if user.is_active:
                    auth.login(request, user)
                    return HttpResponseRedirect('/yay/')
                else:
                    error = 'AUTH_DISABLED'
            else:
                error = 'AUTH_FAILED'
        elif 'error_reason' in request.GET:
            error = 'AUTH_DENIED'

    template_context = {'settings': settings, 'error': error}
    return render_to_response('login.html', template_context, context_instance=RequestContext(request))

settings.py

FACEBOOK_APP_ID = ''
FACEBOOK_API_KEY = ''
FACEBOOK_API_SECRET = ''
FACEBOOK_REDIRECT_URI = 'http://example.com/login/'

AUTHENTICATION_BACKENDS = (
    '<app_name>.backends.FacebookBackend',
)

login.html

{% if error %}
      {% if error == 'AUTH_FAILED' %}
          <p>Authentication failed</p>
      {% else %}{% if error == 'AUTH_DISABLED' %}
          <p>Your account is disabled</p>
      {% else %}{% if error == 'AUTH_DENIED' %}
          <p>You did not allow access</p>
       {% endif %}{% endif %}{% endif %}
  {% else %}
    <a href="https://graph.facebook.com/oauth/authorize?client_id={{ settings.FACEBOOK_APP_ID }}&redirect_uri={{ settings.FACEBOOK_REDIRECT_URI }}&scope=publish_stream,email&display=popup">
      <img src="http://developers.facebook.com/images/devsite/login-button.png"/>
    </a>
  {% endif %}
Download the source.

Python function runtime logging decorator

Views: 2,517 Comments: 0
Tags:

Posted July 15, 2010 by derrick


When profiling it can be useful to log the amount of time that is spent in a function. With Python that is super easy to do with decorators.

logtime.py

#!/usr/bin/python

import time
import syslog

def logtime(func):
    def caller(*args, **kwargs):
        stime = time.time()
        ret = func(*args, **kwargs)
        syslog.syslog(
            syslog.LOG_LOCAL2 | syslog.LOG_INFO,
            '%s=%s\n' % (func.__name__, time.time() - stime))
        return ret
    return caller

@logtime
def test_func(arg1, arg2=None):
    print arg1, arg2
    time.sleep(1)

if __name__ == '__main__':
    test_func(1, 2)
logtime will log the time spent in the function to syslog.
Jul 14 15:05:01 olomai python: test_func=1.00114893913
Download the source.

Comparison of IN, GROUP BY and COUNT using Hibernate, Django and SQLAlchemy

Views: 3,221 Comments: 0

Posted July 7, 2010 by derrick


The other day I wrote about how to do a IN and GROUP BY query using Java's de facto ORM, Hibernate. I thought it would be interesting to see how other ORMs handled the same query. This is the query I want to generate:
SELECT COUNT(*),state FROM download_request WHERE id IN (<id list>) GROUP BY state;
Below is the code, output and SQL generated for the three ORMs.

Hibernate

class HibernateDAO implements ApplicationDAO {
public Map getStateCounts(final Collection ids) {
  HibernateSession hibernateSession = new HibernateSession();
  Session session = hibernateSession.getSession();
  Criteria criteria = session.createCriteria(DownloadRequestEntity.class)
	.add(Restrictions.in("id", ids));
  ProjectionList projectionList = Projections.projectionList();
  projectionList.add(Projections.groupProperty("state"));
  projectionList.add(Projections.rowCount());
  criteria.setProjection(projectionList);
  List results = criteria.list();
  Map stateMap = new HashMap();
  for(Object[] obj: results) {
      	DownloadState downloadState = (DownloadState)obj[0];
       	stateMap.put(downloadState.getDescription().toLowerCase(), (Integer)obj[1]);
  }
  hibernateSession.closeSession();
  return stateMap;
}
public static void main(String args[]) {
    HibernateDAO downloadRequestDAO = new HibernateDAO();	
    Collection ids = new ArrayList();
    for (int i = 1000;  i < 1010; i++ )
        ids.add(i);
    Map stateCounts =  downloadRequestDAO.getStateCounts(ids);
    for (String state: stateCounts.keySet()) {
        System.out.println(state + ": " + stateCounts.get(state));
    }
}
}

Output

failed: 5
downloaded: 1
completed: 4

SQL

select this_.state as y0_, count(*) as y1_ from download_request this_ 
where this_.id in (1000, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009) 
group by this_.state

Django

counts = models.DownloadRequest.objects.filter(
    id__in=range(1000, 1010),
).values('state').annotate(Count('state'))
for count in counts:
    print count

Output

{'state': u'FAILED', 'state__count': 5}
{'state': u'COMPLETED', 'state__count': 4}
{'state': u'DOWNLOADED', 'state__count': 1}

SQL

SELECT `download_request`.`state`, COUNT(`download_request`.`state`) 
AS `state__count` FROM `download_request` 
WHERE `download_request`.`id` IN (1000, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009) 
GROUP BY `download_request`.`state` ORDER BY NULL

SQLAlchmey

query = session.query(
    func.count(DownloadRequest.state), DownloadRequest.state,
).filter(
    DownloadRequest.id.in_(range(1000,1010)),
).group_by(DownloadRequest.state)
for count in query.all():
    print count

Output

(4L, 'COMPLETED')
(1L, 'DOWNLOADED')
(5L, 'FAILED')

SQL

SELECT count(download_request.state) AS count_1, download_request.state 
AS download_request_state FROM download_request 
WHERE download_request.id IN (1000, 1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009) 
GROUP BY download_request.state
As you can see SQLAlchemy is the most similar to SQL, django's is the briefest and Hibernate (obviously) is the most Java-like. Of the three I'd say I like SQLAlchemy the best as it is the most similar to SQL and me being from an SQL background it is the most natural. However all three get the job done and it is always great to have options.
Download the source.

IN, GROUP BY and COUNT with Hibernate

Views: 15,809 Comments: 0
Tags:

Posted July 2, 2010 by derrick


I needed to make the following SQL query with Hibernate
SELECT COUNT(*), state FROM download_request WHERE id IN (<id list>) GROUP BY state;
and being new to Hibernate it came out a lot differently than how I thought it would. To perform the IN query a Criteria query needs to be created
Criteria criteria = session.createCriteria(DownloadRequestEntity.class)
    .add(Restrictions.in("id", ids))
For the count and order by a Projection needs to be added to the criteria
ProjectionList projectionList = Projections.projectionList();
projectionList.add(Projections.groupProperty("state"));
projectionList.add(Projections.rowCount());
criteria.setProjection(projectionList);
This is the resulting code
public Map getStateCounts(final Collection ids) {
  HibernateSession hibernateSession = new HibernateSession();
  Session session = hibernateSession.getSession();
  Criteria criteria = session.createCriteria(DownloadRequestEntity.class)
	.add(Restrictions.in("id", ids));
  ProjectionList projectionList = Projections.projectionList();
  projectionList.add(Projections.groupProperty("state"));
  projectionList.add(Projections.rowCount());
  criteria.setProjection(projectionList);
  List results = criteria.list();
  Map stateMap = new HashMap();
  for(Object[] obj: results) {
      	DownloadState downloadState = (DownloadState)obj[0];
       	stateMap.put(downloadState.getDescription().toLowerCase(), (Integer)obj[1]);
  }
  hibernateSession.closeSession();
  return stateMap;
}
Something completely different from what I expected. That's what I love about solving problems sometimes the solution is something you might never expect.
Download the source.

Init script for fastcgi and php on Ubuntu

Views: 3,118 Comments: 0

Posted July 2, 2010 by derrick


This is an init script to run spawn-fcgi and php on Ubuntu. Its adapted from Aaron Schaefer's excellent post on how to run wordpress on nginx - the configuration this site runs on.

fastcgi-php

#!/bin/sh

## BEGIN INIT INFO
# Provides: FastCGI servers for PHP
# Required-Start: networking
# Required-Stop: networking
# Default-Start: 2 3 4 5
# Default-Stop: S 0 1 6
# Short-Description: Start FastCGI servers with PHP.
# Description: Start PHP with spawn-fcgi. For use with nginx and lighttpd.
#
#
### END INIT INFO
#
# Author: Derrick Petzold
#
# http://derrickpetzold.com/index.php/2010/07/02/init-script-fastcgi-php-ubuntu/
#

RUN_USER=nobody
RUN_GROUP=nogroup

LISTEN_ADDRESS=127.0.0.1
LISTEN_PORT=53217

SPAWN_FCGI=/opt/lighttpd/bin/spawn-fcgi
PHP_CGI=/opt/php5/bin/php-cgi
PID_FILE=/var/run/fastcgi-php.pid

d_start() {
    if [ -f $PID_FILE ]; then
      echo -n " already running"
    else
        start-stop-daemon --start -p $PID_FILE \
            --exec /usr/bin/env -- $SPAWN_FCGI -f $PHP_CGI \
            -u $RUN_USER -g $RUN_GROUP -a $LISTEN_ADDRESS -p $LISTEN_PORT \
            -P $PID_FILE
    fi
}

d_stop() {
    start-stop-daemon --stop --quiet --pidfile $PID_FILE \
                      || echo -n " not running"
    if [ -f $PID_FILE ]; then
        rm $PID_FILE
    fi
}

case "$1" in
  start)
    echo -n "Starting FastCGI: $0 "
    d_start
    echo "."
    ;;

  stop)
    echo -n "Stopping FastCGI: $0 "
    d_stop
    echo "."
    ;;
  restart)
    echo -n "Restarting FastCGI: $0 "
    d_stop
    sleep 1
    d_start
    ;;
  *)
    echo "usage: $0 {start|stop|restart}"
    ;;
esac
To install it follow the instructions below
git clone git://gist.github.com/510245.git fastcgi-php.gist
vim fastcgi-php.gist/fastcgi-php # Update with your pathnames
chmod 755 fastcgi-php.gist/fastcgi-php
mv fastcgi-php.gist/fastcgi-php /etc/init.d/
update-rc.d fastcgi-php defaults
/etc/init.d/fastcgi-php start
rm -rf fastcgi-php.gist
Download the source.

derrickpetzold.com

My home on the web. Uses the Django web framework, uwsgi as the WSGI server, nginx as the media server and load balancer, pygments for the syntax highlighting. Author and designer.

crowdtube.tv

crowdtube.tv
CrowdTube.tv streams trending videos to your browser like a never ending tv show. This was Cory Shaw's awesome idea. I was responsible for the backend development.

dmusic.bz

dMusic.bz is Pandora clone written using Django and JavaScript. It uses last.fm to find the music relationships. Author and designer.

ilovephotos.com

ilovephotos.com
ilovephotos.com is a photo sharing and tagging website. Facial detection was run on the photos so the bounding boxes were already drawn around the faces eliminating a step in a tedious process. I authored views and the process to run facial detection on the photos and upload them to S3.

kindfish.com

kindfish.com
Kindfish.com was BlueLava's first photo site. It was event based and slideshows would be created from the albums. I authored the views and process to generate the slideshows from the photos on EC2.

Below is a snippet of my resume. Click here to view the full version in pdf, its proper format.

captcha