Frog Wizard

Streaming Log Metrics to Datadog StatsD with Python

I wanted log observability without paying for log retention. Datadog's free tier includes StatsD ingestion, so I wrote a script that uses a generator to tail the log file, parses out what I care about, and ships metrics without retaining anything. The example here counts response status codes from nginx access logs.

Finding the Log File

Docker writes container logs to a JSON file on the host. The path changes on container recreation and log rotation, so the script looks it up dynamically via the Docker API:

def get_log_path():
    client = docker.from_env()
    containers = client.containers.list(filters={"name": "nginx"})
    return containers[0].attrs["LogPath"]

Tailing with Rotation Handling

The tail function opens the log file, seeks to the end, and yields new lines. Docker can rotate the log file out from under you, so it watches the inode — if you watched the path instead, the script would hold an open handle to a deleted file. When the inode changes or the file disappears, it reopens:

def tail():
    def open_current():
        path = get_log_path()
        f = open(path)
        f.seek(0, 2)
        return path, f, os.stat(path).st_ino

    path, f, inode = open_current()
    while True:
        line = f.readline()
        if line:
            yield line.strip()
        else:
            time.sleep(0.25)
            try:
                if os.stat(path).st_ino != inode:
                    f.close()
                    f = open(path)
                    inode = os.stat(path).st_ino
            except FileNotFoundError:
                f.close()
                while True:
                    try:
                        path, f, inode = open_current()
                        break
                    except (NotFound, IndexError, FileNotFoundError):
                        time.sleep(1)

If the container itself is gone (NotFound, IndexError), it just keeps retrying until it comes back.

Parsing

Docker wraps each log line in JSON. A regex pulls out the fields:

NGINX_RE = re.compile(
    r'(?P<remote_addr>\S+) - \S+ \[(?P<time_local>[^\]]+)\] '
    r'"(?P<method>\S+) (?P<path>\S+) \S+" '
    r'(?P<status>\d+) (?P<bytes>\d+) '
    r'"[^"]*" "[^"]*" "[^"]*" '
    r'rt=(?P<rt>\S+) uct="(?P<uct>[^"]*)" uht="(?P<uht>[^"]*)" urt="(?P<urt>[^"]*)" '
    r'upstream=(?P<upstream>\S+) rid=(?P<rid>\S+) cache=(?P<cache>\S+)'
)

def parse(line):
    try:
        log_line = json.loads(line)["log"].strip()
    except (json.JSONDecodeError, KeyError):
        return None
    m = NGINX_RE.match(log_line)
    return m.groupdict() if m else None

Counting and Flushing

The main loop counts status codes over 10-second windows, then flushes them to StatsD as tagged gauges:

WINDOW = 10

def flush(counts):
    for status, count in counts.items():
        statsd.gauge("nginx.status_count", count, tags=[f"status:{status}"])

def main():
    window_end = time.monotonic() + WINDOW
    for line in tail():
        result = parse(line)
        if result:
            counts[result["status"]] = counts.get(result["status"], 0) + 1
        if time.monotonic() >= window_end:
            flush(counts)
            counts.clear()
            window_end = time.monotonic() + WINDOW

Every 10 seconds, Datadog gets a gauge like nginx.status_count tagged by status code, where the value is the number of log lines with that status.