Wide finder, parallelism and languages

Tim Bray, who knows about web search and was part of the XML specification process, is experimenting with exploiting parallelism in his wide finder project.

I'm interested in parallelism (running similar processes on multiple hardware) and concurrency (having multiple collaborating threads of control in a system), as many interesting problems are intrinsically concurrent, and as hardware isn't getting much faster but is getting wider, so will provide more parallel processing bandwidth.

The original problem is one of extracting the N most popular pages (not images) fetched from Tim's ongoing blog from the server's log files. These files are a few gigabytes in size.

Tim's ruby code runs in 13.5 seconds on a 1.67Ghz PowerBook, which is twin core G4. My use of my twin core 1.42 GHz G4 desktop petered out last year when I got a single core AMD64 laptop, as most of the time the laptop was more responsive. The laptop runs Ubuntu 7.04 64 bit.

Measuring the problem:

As it's not a lot of ruby code, it's easy to write a little C++ code to find out where the performance issues are.

First off I was playing with simple string matching (tbray1.cpp) vs KMP (tbray2.cpp), but as it's a requirement that the code should be as simple as possible, and KMP doesn't actually help modern CPUs as it's a jumpy algorithm, the third, simpler approach just calling strncmp works as well as far as using time to measure can determine (tbray3.cpp).
fortinbras:$ time bin/tbray3 datasets/original/thousand-o10k.ap
matches: 1100000

real 2m28.201s
user 0m10.193s
sys 0m4.752s
At that size of dataset, the laptop's CPU didn't get above 800MHz (it has 800MHz, 1600MHz, and 2200MHz speeds and stays at 800MHz). Smaller datasets which are already in the disk read-ahead buffer do cause its CPU to shift up; hence some of Steve Vinoski's figures for erlang times are not indicative of the problem - you don't run the same set of data through the system again and again, so you have to either load something else into the file cache to clear it (cat something-big > /dev/null), or use full-size datasets.

Counting non-comment lines which aren't a single { or }:
fortinbras:$ grep -v "^ *\(\($\|[}{] *$\)\|\(//.*\)\)" src/benchmarks/tbray3.cpp | wc -l
So a simple, sequential C++ implementation using a memory mapped file is 48 lines of code (my bracing formatting habits shouldn't count against the language), and isn't too slow.

Running the ruby for reference, which does a bit more in terms of counting occurances:
fortinbras:$ time ruby src/benchmarks/finder.rb datasets/original/thousand-o10k.ap
real 1m20.744s
user 0m24.710s
sys 0m4.420s
Presumably Tim's disk speed to CPU speed ratio is higher; on my laptop ruby's IO bound, and the CPU not maxxed, though it does shift to a faster speed. Fortinbras processes the same size dataset that Tim was using in 8.6 seconds.

But the ruby is doing IO much faster than my simple memory mapped code, so changing to use block IO rather than memory mapped (at the cost of a 25% longer program):
fortinbras:$ time bin/tbray4 datasets/original/thousand-o10k.ap
matches: 1100000

real 1m6.780s
user 0m9.593s
sys 0m2.464s

fortinbras:$ grep -v "^ *\(\($\|[}{] *$\)\|\(//.*\)\)" src/benchmarks/tbray4.cpp | wc -l
Again, the C++ implementation doesn't stress the CPU enough to get out of first gear, and it's about 14 seconds faster than the ruby, due entirely to less user CPU. I'd guess that the underlying C code the ruby interpreter calls for its line reading is similar; I don't know whether ruby strings are internally UTF-16 rather than UTF-8; if so that alone would account for CPU cost. I'm actually quite impressed that ruby isn't glacially slow, but I guess most of the work is between the lines of the program.

The C++ code also fewer lines of code than Steve Vinoski's 84 line erlang example, though the C++ doesn't show use of multi-core parallel processing. Parallel code in C++ can take more work. Concurrent code in C++ definately takes more work than in erlang.

Given an infinitely capable CPU and the same disk, ruby's 81 seconds or C++'s 67 will reduce to 55. If the CPU is using 12 seconds, to get anything from parallel CPUs, you'd need to get the data from disk in less than 12 seconds, which is probably not far off what current flash is capable of. I don't believe there's a software solution to make the IO much faster on the laptop's hardware.

Running on a Sun Netra T1 105, with a 440MHz Sparc CPU:
tercel-2:~/projects/tbray$ time bin/tbray4 datasets/thousand-o10k.ap
matches: 1100000

real 1m23.930s
user 0m57.070s
sys 0m24.636s
Much more CPU time, but the total real time is in the same ball park - the ten year old server has a fast wide SCSI disk but a slower CPU, so is close to being CPU bound.

If you have multiple cores and multiple disks to read from, you can launch multiple batch scripts to process different days' logs in parallel, like make -n or using MPI Scatter/Reduce.

MPI version

There's a simple extension of tbrayN.cpp with MPI at tbray5.cpp. I'm an MPI novice - most of the threading problems I've had to solve require concurrency rather than parallel processing. MPI defaults to use as many processes as there are cores, but you can force it to use a specific number of processes. On a single core machine, it slows from 1.2 seconds on a million line file with a single process to 14 seconds with 16 processes, to 54 seconds with 64 processes. Trying to launch 1000 processes causes the machine to spend forever context switching. Running lots of processes without context switching is what erlang is good at, but for optimal CPU throughput you only want as many processes as you have cores.

The MPI version loses in terms of conciseness, and has two separate code paths - the process to read the file into chunks, and those to scan the chunks. It's not as nice to read as the erlang. It comes in at 92 lines of code, 6 of which are error reporting that the erlang example lacks (presumably the error is passed to read-eval-print-loop to handle), so is within a couple of lines. Using pthreads would probably require more lines, as it lacks suitable message passing primitives.

Parallel languages, concurrent languages

IO aside, the actual processing seems to be a problem in parallelism - the task can be split up into work packets which can be processed independently, with a reduction at the end to a single result set. Only about one in nine lines contribute to the counts, so the dataset for the reduction is significantly smaller than the input dataset, so parallelising the matching and maybe one pass of sorting and counting should allow a speed-up on a multi-core machine.

Languages such as Sun's Fortress have intrinsically parallel constructs and atomic blocks to hide concurrency issues from the developer, and take advantage of multi-core hardware. Fortress has built in parallel for and reduction, equivalent to MPI code but without the programmer having to explicitly manage the processes.

In Fortress, the code for wide finder should be no more compicated than the ruby code; it's up to the implementation to parallelise the loop. Unfortunately, the current Fortress implementation is an interpreter on top of the JVM, and isn't speed optimised - it takes a couple of seconds to parse the scripts, and then interprets them rather than generating byte-codes.

Actor model languages such as Erlang, Alef, and Scala, are used where the problem is best expressed in terms of concurrent processes. Their implementation is designed to allow many, many concurrent processes on a finite number of hardware cores - they reduce requirements for locking, have strategies to mitigate blocking operations stalling other actor's execution, and solve many issues that OS using level threads for concurrent, communicating, mobile, robust processes exposes. I've written actor based code professionally where the problem fits that model.

The wide finder problem has no inter-process communication requirement until the workers terminate, it doesn't require more processes than there are CPUs, it doesn't write anything so doesn't suffer from multiple writer threads and locking issues, and it doesn't require much execution to continue if one worker blocks - the blocking will be due to IO, and it can't do anything anyway.

Erlang and Scala are optimised for the opposite problem to the wide finder problem - concurrency within a core rather than parallelism across cores. A better language choice would be one such as Fortress, which gives parallelism across cores without having to express the problem in anything other than terms of the operations on the data.


This problem shows up one of the reasons I'm working on compression for the quad-store - any data mining using spinning disks is IO bound; using compression is one way to shift work load off the IO pipe and onto the CPU. It's also why I spend a few tenners on a second fast disk for each of the Netras in the cluster, rather than a couple of thousand on a new multi-core box. I can't afford 30 dollars/giga-byte for a solid state wide-drive.

It's also an interesting comparison of compiled and dynamic languages - running ruby is CPU heavy, as heavy as running on the 10 year old hardware in my attic, but is allegedly easier to program in. I think I'd prefer it to Perl if I was to write text extraction and reporting scripts, but hate nearly all programming languages anyway once I have to use them. Some are more useful for certain problem classes.

Some of the discussion has been about approaches towards splitting the log file into chunks, and splitting it into lines.

None of the hits in the sample file require you to split it into lines - the pattern doesn't match anything it shouldn't if you ignore line breaks in the code. To split the file into lines you need to inspect every character; to match the pattern you may only need to inspect one in 18 (if it's not in "GET /ongoing/When/" you can then skip 18 characters and look if that's in the pattern, which is the basis of KMP); if the pattern matching can be done better than the line scanning, then that should give a boost on the CPU limited machine. It doesn't give a boost on the Netra, and I'm not sure why, possibly because KMP makes branch prediction worse.

All the code posted has been exact solutions. If you split a billion line file into 100 equal chunks for 100 cores to process, as you might using the built-in MPI Scatter function, you'll lose maybe 11 hits that get cut in half (as only one in nine requests matches the pattern), and you probably won't care - the most popular pages will have thousands of hits, so a dozen here or there won't be missed. Similarly, if you only want a 90% confidence for the most popular hits, you may only need to sample some of the log file. Whether a approximate solution is good enough is the sort of question you need to ask a customer before optimising for the exact case; it may well simplify the solution.


Labels: , , , , , ,


Blogger Erik Engbrecht said...

I think you somewhat mischaracterized Scala. Scala Actors are a part of the standard library, but not the language. Scala itself is a hybrid functional/OO language with decent DSL capabilities, thus enabling an Actor programming model to be built as a library. I also believe it does quite well at spreading work across many threads.

If you look at my solution in Scala, you'll see that at a high level it takes an approach very similar to Fortress - it automatically parallelizes a for loop. Under the hood it uses Actors to manage the parallelism.

You can find it on my blog.

Wednesday, 5 December 2007 at 22:39:00 GMT  

Post a Comment

<< Home