Kinda like how chaotic and pretty this graph is. It’s memory usage on my dev server for the past week, as I kept doing big OpenAddresses runs stressing the system. No huge insights, just looks neat.
I’m re-running OpenAddress many times trying to be sure the output stays roughly consistent. It’s never the same twice; servers are unreliable and the data is changing. Also some code changes make small cosmetic differences like rounding error.
What works best for me is comparing the line counts in the output files:
wc -l out/*/out.csv > wc.txt
diff –suppress-common-lines -y oa-full-790/wc.txt oa-new-esri/wc.txt
This only highlights sources that output different number of lines; if the contents in the columns is garbled you won’t see that. But it’s a good way to get an overview of what changed between two runs.
this post is a work in progress
When I’m back from India late February I’d like to re-design OpenAddress machine to a more decoupled architecture. Right now the system runs one big Python process which runs all sources from download all the way to output CSV. (There is some state carried over from run to run, but not much.) It works pretty well and is simple, and a full run just takes 2–3 hours. But the multiprocessing stuff is brittle and the whole system is both overkill (most sources never change, why reprocess?) and also non-responsive (no fast way to test a new source).
Move to a decoupled architecture where tasks are run as fully independent Unix processes. They share state via some centralized database / cloud storage thingy, and also post results in S3 for web serving.
As with the current architecture, the unit of work in the job system is a single source file such as us-ca-san_francisco. It runs as an independent Unix process with no awareness of other sources. It downloads, conforms, summarizes, reports, and uploads its results to the database and to S3.
In addition we also need a task queueing system, something that decides when to run source jobs and posts them on the task queue. That process works mostly by reading the database and deciding what work needs doing. Some rules: “immediately process a new source”. “Check a source weekly for new data and reprocess”. “Clear the whole queue and re-process it”.
Finally we need a task executor that looks at the queue and launches tasks as needed. This could be part of the task queuer, but it helps to think of it as a separate entity. When a task is finished it puts the result on the task record log.
I’m totally agnostic about the data store we use to track runs. It’s a very low demand system just a few hundred megs of data and a handful of transactions a minute. It will require something with consistency and responsiveness. The simple choice is a PostGIS database running persistently on some 24/7 server. I’m not sure what the cloud services equivalent would be, so running with the PostGIS idea, here’s some schema sketches:
Completed task records
- Task ID
- Time started
- Time finished
- Source name
- Success / failure
- S3 output paths: out.csv result, debug logs, etc
- Whatever else write_state does.
- There’s a zillion ways to do a task queue in a database. Rows are placed in the table by the task queuer, then removed by the task launcher when they are dispatched.
A task should be a single Python program. It takes a source.json specification file and any job metadata that’s needed (hopefully none?) and executes it. For debugging purposes this program should be able to be run standalone on a developer’s machine with no dependencies on the queuing system, the task manager database, or S3.
These jobs will be a natural to run on cheap EC2 spot instances. Right now tasks take 0.5 – 90 minutes to run. A bit wasteful to spin up a whole EC2 instance for 30 seconds of work, but maybe that’s OK. We could also run tasks on a single workhorse machine, that’s effectively what we’re doing now. A single mid-level Linux box can run 16 tasks in parallel. (In fact parallelism is good; the jobs are a mix of load types.)
This proposal assumes that a whole source from start to finish is run as a single Python process. But in fact processing a source consists of several sub tasks, and it might be good to run them as separate sub-tasks. Here’s what the subtasks are:
- Download from source URL / ESRI store
- Sample the source data for user inspection
- Convert the source JSON/CSV/SHP data to an extracted.csv file
- Conform the extracted.csv data to OpenAddresses out.csv format
- Upload out.csv and job data to S3
There’s times i’ve wanted to be able to execute these subtasks separately. Particularly the download part, that is slow and unreliable. To some extent the current data caching strategy in Mark 1 and 2 is dealing with that, and it may be sufficient. But you could imagine breaking every source into 5 tasks and running them separately on a job queue. Worth considering.
Why “mark 3”?
OpenAddress Machine mark 1 was Mike’s original work, wrapping the Node code in a bunch of Python scripts to do regular runs and post results in a web dashboard. It is awesome.
We’ve been working on mark 2, the “ditch-node” branch, where we rewrote all the Node code in Python. The overall job architecture is about the same as mark 1, the management of downloads and files and stuff. It’s one big Python process with jobs run in parallel using multiprocessing. We did make some changes to how tasks are conceived and run.
Some notes after this post was written
- Ian notes that GitHub triggers and Jenkins could trigger a build on a new source
Python’s multiprocessing.Pool has a design wrinkle that’s a bit awkward. If you have a pool with N tasks and one of those task subprocesses dies unexpectedly (say, to a SIGTERM or something) then the pool hangs. It looks like N-1 tasks have finished and there’s still one waiting. But it will never complete and your parent process will effectively be stuck. Note that normal termination doesn’t do that, including random exceptions, SIGINT from Ctrl-C, etc. A normal “kill” triggers this though, as certainly does a “kill -9”. Probably a segfault in the Python interpreter will too.
Personally I think this is a bad design choice, but it’s not by accident. There was a huge discussion about this behavior three years ago. I haven’t read it all, but most of the comments seem to be about the wisdom and difficulty of recovering from a bad state. The ticket got closed after someone committed some changes to concurrent.futures (Python 3’s preferred new library). Nothing changed in multiprocessing.Pool.
Recently this issue was revisited for multiprocessing.Pool with a new bug filed that includes a patch. The approach there (and concurrent.futures) is if a child dies unexpectedly, you want to kill the whole Pool immediately with a BrokenProcessPool exception. I’m not wild about this choice, but it’s definitely better than hanging.
None of this applies to the Python distribution we’re running today. The pool will hang. For OpenAddresses I suggest we work around the issue by simply not killing workers. If you want to abort a worker early, try SIGALRM. We could also install a SIGTERM handler to catch the simple “kill” case from an operator, but I’m not sure that’s wise.
In addition, OpenAddresses also has a SIGUSR1 handler that allows someone externally to shut down the whole pool. It’s good for recovering from this state.
Update: we hit this bug again in a new way. Some of the worker processes were getting killed by the Linux OOM Killer. The Python code doesn’t see any exception, it’s just a SIGTERM or something. Only way you know is a record in the syslog. (And the multiprocessing debug logs show a new worker started.)
I managed to get multiprocessing working for OpenAddress job processing. Looks like it’s working pretty well. We use multiprocessing.Pool to manage worker processes, then set up a SIGALRM to abort jobs that run too long. (Using a nifty decorator to add the timeout). I also added a SIGUSR1 handler in the master job process that lets us abort the run. At least once I’ve had the pool fail where there was work being done but none of the worker processes was trying to do it.
Here’s a happy screenshot of htop showing a run going. Note the nifty custom process names courtesy of setproctitle.
Our OpenAddresses system wants to run N jobs at once while downloading and processing our ~1000 sources. Mostly to do a bunch of network requests to various servers in parallel, but also for some CPU parallelism in multi-core systems. Normal jobs use very little RAM (100M?) but occasionally a big GeoJSON file blows up to 2G or 4G. Here’s 4 options for job management.
Unix processes. I’ve been running stuff in GNU parallel, forking off a new Python process for each job. 8 simultaneous processes on a 8 CPU system. That’s worked pretty well, see previous blog posts for notes. CPU usage around 250%, memory usage all over the place but generally under 2G. Full job finishes in 3, 4 hours.
Python threads. I just tried openaddr-process for the first time to run a full go (on the same Linux box). That code works by spawning 16 threads. So one big Python process. That’s not working quite so well. Running at a steady 120% CPU; because of Python’s GIL we can’t get more parallelism. The process also has 11G resident. I suspect much of that could be reclaimed, probably left over from when 2 or 3 big GeoJSON jobs ran at once. But that stresses the garbage collector; there’s something to be said about killing a process. (Update: the resident set size got as big as 17G at one point, enough to force swapping. It didn’t seem to thrash though. Also Python is releasing the memory again later, the RSS has gotten as small as 9G again.)
Python multiprocessing. I want to try this. It should be a relatively easy change from threading, but I am hoping gives some CPU parallelism and memory housekeeping. Also we need a way to kill jobs asynchronously and Python’s threading module is designed to make that possible. With multiprocessing you just send a SIGTERM and are done.
Decoupled task architecture. openaddr-process is structured to do the job as one big long-lived Python process. Even with multiprocessing, when an individual job finishes it’s passing Python objects back to the task manager, which waits 3+ hours to collect them all before formatting the crucial report. This is brittle. Long term I’d like to redesign the system to be more decoupled, so that individual jobs run as completely separate processes and autonomously store state in some persistent storage; files and maybe a row or two in a database. Then the overall Machine monitor can just fork off jobs as it needs and collect data off the database. That’s much more like a task queue architecture, a robust thing. I know how to build this with my own servers, less clear how to match it to the EC2 model of cloud computing.
Following on my OGR/Python vs Unicode work, I took a look at what Fiona is doing. Fiona is a nice Pythonic interface to OGR with a fair amount of Cython code to bridge the gap from the OGR C library to Python idioms. Fiona explicitly returns all strings as u’Unicode Strings’ in both Python 2 and 3 and has code to handle encodings.
Fiona tries to guess encodings. In theory you can override the guess with an encoding=’foo’ parameter when you open a source, but in practice that seems to do nothing. I tried passing in obviously wrong encodings like ‘ascii’ or ‘shift-jis’ and didn’t see any of the expected errors.
Below is the repr of the string Cassiopée as it comes through from the Shapefile in Fiona, with its own guessing. These all appear correct to me. Note that Python 2 encodes Unicode string reprs as ASCII using Python \x escaping, hence the \xe9 in the output. That’s a single character, the expected Unicode codepoint U+00e9 or é.
Good file (ca-qc-gatineau)
Python2. Fiona: u’Cassiop\xe9e’ OGR: b’76 Rue de Cassiop\xc3\xa9e’
Python3: ‘Cassiopée’ OGR: u’Cassiopée’.
Bad file (be-flanders)
Python2. Fiona: u’Now\xe9lei’ OGR: b’Now\xe9lei’
Python3. Fiona: u’Nowélei’ OGR: exception.
Fiona seems to be doing the right thing with both my inputs, returning proper Unicode strings in both Python 2 and Python 3. In Python 2 OGR seems to basically not be decoding at all, just returning byte strings that I’m supposed to decode myself. In Python 3 OGR is trying to return Unicode strings but throws an exception on my bad file.
I think from here, for our OGR code I should add something to explicitly decode strings to Unicode in Python 2 and file a bug in Python 3.
'Extract fields from shapefiles using Fiona, a Unicode test' import sys, fiona, logging from pprint import pprint logging.getLogger().setLevel(logging.DEBUG) logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) enc = sys.argv if len(sys.argv) > 2 else None with fiona.open(sys.argv, 'r', encoding=enc) as source: for f in source: for field in f['properties'].values(): sys.stdout.write('%s %r ' % (type(field), field)) sys.stdout.write('\n')
I’ve been trying to figure out how OGR deals with Unicode, particularly in shapefiles, to solve this issue with OpenAddress conversion. I haven’t found clear docs. There’s something about a SHAPE_ENCODING environment variable and something else about OGR trying to guess. More details in that linked GitHub issue. Anyway, here’s what I discovered.
I have a working non-ASCII shapefile that seems to be in UTF-8. It’s ca-qc-gatineau for us, stored at http://gatineau.ca/donneesouvertes/telechargement/ADRESSE.zip, and has street names like Rue de Cassiopée that our code is working for us.
When OGR/Python opens the file, the capability OLCStringsAsUTF8 is set to True.
In Python 2, GetField() is returning Python strings of type str, which means “bytes”. Those strings appear to be sequences of UTF-8 code points, the repr() of that non-ascii street name is ’76 Rue de Cassiop\xc3\xa9e’. Basically OGR isn’t trying to handle Unicode at all for us, or rather if it is it’s returning UTF-8 encoded byte strings and all is well.
In Python 3, GetField() is also returning Python strings of type ‘str’, which now means ‘unicode’. Their repr in Py3 is a Unicode string, ‘Rue de Cassiopée’, which makes sense.
In both cases OGR/Python is doing the right thing, or at least something consistent and sensible.
I have a non-ASCII shapefile that seems to be in Latin 1. It’s be-flanders for us, stored at https://downloadagiv.blob.core.windows.net/crab-adressenlijst/Shapefile/CRAB_Adressenlijst.zip. It has street names like ‘Nowélei (Jean Baptiste)’ although that é isn’t coming through right.
When OGR/Python opens the file, the capability OLCStringsAsUTF8 is set to False.
In Python 2, GetField still returns ‘str’. And the repr for that street is ‘Now\xe9lei (Jean Baptiste)’. That’s actually not awful; if I know to expect that behavior, I can decode that myself with ISO-8859-1 and get the Unicode string.
In Python 3, GetField() throws an exception.
Traceback (most recent call last): File /home/nelson/src/oa/shpenc.py, line 17, in &amp;lt;module&amp;gt; field = in_feature.GetField(i) File /usr/lib/python3/dist-packages/osgeo/ogr.py, line 3033, in GetField return self.GetFieldAsString(fld_index) File /usr/lib/python3/dist-packages/osgeo/ogr.py, line 2362, in GetFieldAsString return _ogr.Feature_GetFieldAsString(self, *args) UnicodeDecodeError: 'utf-8' codec can't decode byte 0xe9 in position 3: invalid continuation byte
I don’t quite understand what is happening here, but I guess something inside the OGR code is trying to decode that sequence assuming it’s UTF-8 and failing. Note my code is not really in the stack trace here, I”m just asking for the field contents. I think this is an OGR bug. Or else I think the Shapefile is corrupt. Setting SHAPE_ENCODING before invoking my code doesn’t help.
Worth noting QGIS displays the string as Nowlei, skipping the non-ascii character.
It appears that what OGR is doing is trying to guess and cope with the source encoding, presenting UTF-8 byte strings to Python 2 code and Unicode strings to Python 3 code. Which is great! Only things go wrong with the be-flanders file. OGR presents an ISO-8859-1 byte string to Python 2, which you can cope with. OGR crashes in Python 3.
Here’s the little OGR/Python program I’m using to examine shapefiles. It runs in both Python2 and Python3.
import sys from osgeo import ogr, osr ogr.UseExceptions() in_datasource = ogr.Open(sys.argv, 0) in_layer = in_datasource.GetLayer() inSpatialRef = in_layer.GetSpatialRef() in_layer_defn = in_layer.GetLayerDefn() in_feature = in_layer.GetNextFeature() print('OLCStringsAsUTF8? %r' % in_layer.TestCapability(ogr.OLCStringsAsUTF8)) while in_feature: row = dict() for i in range(0, in_layer_defn.GetFieldCount()): field = in_feature.GetField(i) sys.stdout.write('%s %r ' % (type(field), field)) in_feature = in_layer.GetNextFeature() sys.stdout.write('\n')
After the first run against 600+ sources we patched up a bunch of bugs, improved the running infrastructure. Then I ran another set last yesterday, amended a little bit today to re-run jp-* and kr-* after one more bug fix. I wasn’t quite as careful in measuring everything, but here’s some notes on output.
- 579 input sources: 484 out.csv files, 483 sample.json files
84% success rate; compare 66% from previous run.
- 102M output rows.
Compare 80M from previous run, 100M from Node
- Roughly 3.5 hours running time, maybe less.
3 threads stalled on bad servers so it’s not a good measure.
Throwing out the 3 stalled runs, the average source took 137s to execute. Standard deviation of 585s, so it’s a very broad distribution. That first run averaged 104s (SD 265). But a lot more of those runs failed in 1 second!
I’m curious if the new ESRI code is faster or slower than the old code. It’s definitely better in that it works with many more sources. Don’t really have data for it. The ESRI time is probably dominated by slow servers anyway.
Here’s the slowest non-ESRI sources. Given that these times include download times from some slow servers I’m pretty OK with them.
dk CSV 2000s 3.4M rows au-victoria SHP 1620s 3.4M rows es-25830 CSV 1173s 8.8M rows nl CSV 1156 14.8M rows
95 sources failed to produce an out.csv. Of those, 7 did succeed last time we ran, a potential regression. Here’s the cause of each failure:
- us-il-tazewell: ESRI source, JSON parsing error after many calls
- us-tx-colleyville us-tx-dallas us-tx-hurst: ESRI sources, found no records
- us-nc: missing “file” attribute for multiple shapefiles
- us-dc: bad download
- us-ca-san_diego: bad zip download
And finally here’s the list of all 95 sources that didn’t produce an out.csv. I should go through and hand-classify the failures again, but I bet most of 88 of them are for the same reason as last time.
au-queensland be-flanders ca-ab-calgary ca-ab-strathcona-county ca-bc-kelowna ca-bc-langley ca-bc-nanaimo ca-bc-okanagan_similkameen ca-bc-surrey ca-bc-west_kelowna ca-ns-halifax ca-on-niagra ca-sk-regina us-al-shelby us-ar us-ca-san_diego us-co-sanmiguel us-ct-avon us-dc us-fl-alachua us-fl-collier us-ga-glynn us-ia-linn us-ia-polk us-id-canyon us-il-tazewell us-in-hamilton us-in-madison us-in-st_joseph us-la-st_james us-ma us-mi-muskegon us-mn-dakota us-mn-metrogis us-mn-polk us-mn-pope us-mn-yellow_medicine us-mo-barry us-mo-st_louis_county us-ms-madison us-nc us-nc-1 us-nc-10 us-nc-2 us-nc-3 us-nc-4 us-nc-5 us-nc-6 us-nc-7 us-nc-8 us-nc-9 us-nc-charlotte us-nc-columbus us-nc-davie us-nc-wake_county us-ne-omaha us-nm-san_juan us-nv-henderson us-nv-lander us-nv-nye us-nv-washoe_county us-oh-clinton us-oh-hamilton us-pa-beaver us-ri us-tn-memphis us-tx-colleyville us-tx-dallas us-tx-denton us-tx-el_paso us-tx-hurst us-tx-keller us-tx-north_richland_hills us-tx-round_rock us-va-alexandria us-va-augusta us-va-city_of_falls_church us-va-city_of_petersburg us-va-richmond_city us-va-roanoke us-va-stafford us-wa-san_juan us-wa-snohmish us-wi-adams us-wi-calumet us-wi-crawford us-wi-dodge us-wi-jefferson us-wi-juneau us-wi-lincoln us-wi-oneida us-wi-richland us-wi-sauk us-wi-superior us-wy-laramie
The OpenAddresses code includes a Python task manager thats spins off a lot of Python threads, one per input file describing a job. We’d like to capture the debug log records for each thread to a separate file, for presentation in a dashboard. How to do that?
This demo program I wrote shows how. The key concept here is a Python logging.Handler() object that uses some per-Thread state to decide where to write the message. In this case I use the Thread name itself, but you could also use some state stored in Thread.local().
Also this feels related.. log4j had a way to store information in ThreadLocal variables that you could then write out to log files. Ie: you could add a special per-thread variable like “job name” or “HTTP session ID” and have the formatter print it out where appropriate. It was quite handy in some use cases. Python doesn’t exactly have that, but the cookbook has notes on doing something similar with Filters.
Update: see also Mike’s alternate approach. He creates a new Handler for every thread, then uses a Filter to only show messages from the thread he cares about. I like how the approach makes more use of logging’s machinery, it is a bit weird how I had to make a whole Handler that did something odd with the output. I think Mike’s approach means for N threads we’ll end up with N Handlers seeing every log message. That’d be inefficient but if there’s not 100+ threads I doubt it matters.