Implementing the ZeroMQ spec has taught me a lot about parsing in Haskell and I thought I should share.
In general, a middleware is used to abstract away some complex behavior, simplifying the user-developer’s application. The middleware will performs its task and when satisfied it will invoke the application layer — this is usually called “delivering” the message. If a message is corrupted or does not meet some pre-described contract it may be discarded without notice.
Middleware is commonly used for an abstraction bordering on complicated. I’m talking about everything and the kitchen sink, a runtime which begins to dictate the design of your application. I might go so far as to say that TCP is a commonly used middleware — you may not worry about Nagle’s algorithm or the sliding window protocol, you just act on the bytes when they are “delivered”.
The ZMQ wire protocol defines one base network unit: the “frame”. The frame is essentially a “chunk” of the message. Either one frame is enough for “delivery” or it is part of an incoming sequence. ZeroMQ sets a continuation bit to signal buffering is necessary. These frames can be variable length — which is stated in the first byte of every frame. Either the first byte is a magic number, whose length must now be interpreted as a 64-bit value, or you have a frame whose length is between 0 and 254 bytes, inclusive.
These frames belong to one message, which will be delivered to the application when the FIN-bit containing frame is received. The interpretation of the reconstituted message will be described in a future post. For now, all I want to do is put the frames together and make a message.
A length-defined stream tells you right off the bat how much it’s sending you. Using the length you can determine how much memory to allocate. ZeroMQ uses length-prefixed streams on the wire protocol. The first byte in a frame has two possible interpretations: a magic number saying the stream is jumbo, or the actual length of the stream.
I can’t seem to find enough examples of length-encoded binary parsing in Haskell. With help from a couple friends (aka, everyone on freenode’s #haskell), I’ve figured out how to use fields from the parse to control parser behavior.
A few individuals on #haskell said “I don’t know why you’re using attoparsec” and I did not have much of an answer for them. The lazy parsers are a little too much magic for me as a beginner. I wanted something which would let me know if the match was complete, failed, or just not done yet, immediately after sending in some data, and attoparsec was sufficient. The examples provided for attoparsec are text oriented but with the help of the attoparsec-binary library I was able to move forward.
A parser is constructed inside of a do-block. The parser is an unapplied sequence of actions, built to parse a strict ByteString. You first build the parser and you can keep it in memory. You feed the ByteString in to the parser using the attoparsec function “parse” whose type signature is as follows:
parse :: Parser a -> ByteString -> Result a
Before we begin the parser let’s look at the ZeroMQ frame format’s ABNF
more-frame = length more body final-frame = length final body length = OCTET / (%xFF 8OCTET) more = %x01 final = %x00 body = *OCTET
Here’s the relevent parser:
data FrameCont = FINAL | MORE | BADCONT
deriving (Show, Eq)
frame_cont 0x00 = FINAL
frame_cont 0x01 = MORE
frame_cont otherwise = BADCONT
get_fc = do
raw_cont <- AP.anyWord8
guard((frame_cont raw_cont) /= BADCONT) AP. "State must be either MORE or FINAL"
return raw_cont
parser = do
first_byte <- fromIntegral <$> AP.anyWord8
fc <- get_fc
frame_size <- case first_byte of
0xFF -> fromIntegral <$> APB.anyWord64be
_ -> return first_byte
body <- AP.take frame_size
return (frame_size, fc, body)The parser in words: read in a byte as length, conditionally read in the 8OCTET (Word64) length, read in another byte to determine whether final frame or a continuation (with some sanity checking), and finally read the body of the frame using the length.
Edit: Peaker on Reddit was kind enough to point out that a Jumbo/Small distiction was not necessary for the FrameSize. My earlier code would return the monadic value inside of an algebraic datatype but it’s much simplier to just coerce the varying-size lengths to integrals.
Another note: the first field of the triple, frame_size, is most likely unnecessary. The body is a ByteString and as such will carry around its own length. Leaving here for posterity. Upstream code will be modified to remove.
Yet another note: I really need to understand the applicative functions better, I just kind of figured out when to do <$> and when to do by reading the combinatorrent source code. It’s interesting to see how I perform the conditional read operation by either returning the first byte (looks like I’m binding it yet again?) or calling fromIntegral on the successful read of anyWord64be.
The field interdependency is readily apparent — just look at the length field. You must perform some work to determine whether to read 0-254 bytes or 0-18446744073709551616 bytes. One of my favorite parts about this parser: it makes the endianness explicit when reading the length field. It’s easy to mess up the representation of a Jumbo frame — the ZMQ RFC doesn’t even mention endianness (although since this is network programming it is ASSUMED to be big endian).
I’ve messed up endianness countless times when writing hand-rolled parsers in C — it’s great to have that safety and succinctness for free. And my design, the length representation is kept typesafe by moving data in to the FrameSize, either a Jumbo or a Small.
Once they parser actions are defined, we move them in to the final return statement. When the parser is satisfied it will provide a triple: (frame_size, fram_continuation, body). Now we’re ready for test data!
I’m still a TDD guy at heart — and TDD for binary data is notoriously tricky if you don’t fetch data for playback. I use the ByteString.pack function to turn a list of numbers in to a bytestring value. No QuickCheck just yet — but I’m excited to throw it in the mix.
test_one_complete = B.pack [1,1,65]
Then we we can apply the parser, testing for values with the debugging “parseTest”
parseTest :: Show a => Parser a -> ByteString -> IO ()
As long as the triple can be strintified for presentation, we can feed it a parser and data and look at the terminal to verify correct parsing.
AP.parseTest ZF.parser test_one_complete -- Done "" (Small 1,1,"A")
Great!
Next up: reconstrucing full messages and acting upon them. I hope this was instructive, please feel free to look at the ZMQHS code online: https://github.com/xrl/zmqhs