An explanation for the need of a WorkerChain class

At my job I contribute to the development of a real-time trading system. The system is pretty monolithic, and pretty messy, but it is also rather effective.

One of the problems we have faced in our development is how to efficiently deal with event-driven pricing and quoting. The system might be roughly broken down into three modules:

  1. Market data – receiving, processing, and analyzing real-time data from a market data vendor
  2. Pricing – running internal pricing algorithms
  3. Quoting – packaging and sending quotes to the exchange

Version 1

Back in the early days of our software, we did things pretty lazily. Essentially, of the three modules above, only the market data module was what might be called “event-driven,” because that’s how the vendor provided it to us. (In other words, when a new quote arrived or a trade was executed, the vendor sent us a message.) Pricing and quoting occurred on timers, as follows:

  1. Every so often, the pricing module would enumerate over each product we were trading and run the applicable pricing algorithm on it.
  2. Every so often, the quoting module would enumerate over each product we were trading and send a quote to the exchange under certain conditions, including the following:
    • The product was currently flagged as quoting (by the user)
    • Our price had changed since our last quote

This is a pretty bad way of doing things, for obvious and not-so-obvious reasons. Among the most obvious reasons is that it introduced significant and unnecessary latency. Consider the following scenario:

  1. At time T, we receive a new market tick for product XYZ from the vendor.
  2. M milliseconds later, our pricing timer goes off, and we recalculate our theoretical price for XYZ.
  3. N milliseconds later, our quoting timer goes off, and we send our new quote for XYZ.

The end result of this setup was that we were often sending our quotes an unnecessarily long time (M + N milliseconds in the scenario above) after receiving new market data.

Version 2

The obvious way to resolve this problem was to set our pricing module to do its work immediately upon receiving new market data, and to set our quoting module to do its work immediately upon completion of the pricing module’s work. This gave rise to scenarios like the following:

  1. At time T, we receive a new market tick for XYZ.
  2. At time ~T (T plus a negligible amount of time to pass work along to the pricing module), we recalculate our theoretical price for XYZ.
  3. At time ~T, we send our new quote for XYZ.

Looks great, right? Wrong. There was a big problem with this approach, though it’s probably not obvious. To illustrate what that problem was, here’s some pseudocode to illustrate how this “Version 2″ was set up:

// some code elsewhere
marketDataProvider.MarketDataReceived += OnMarketDataReceived;

void OnMarketDataReceived(MarketData marketData) {
    ProcessMarketData(marketData);
    RunPricingModel();
    RunQuotingEngine();
}

Basically, trying to price and quote a product in the same moment that it’s received created a performance bottleneck which caused our incoming connection from the vendor to become clogged. This in turn resulted in our data becoming stale, and as you can surely imagine, working with stale data can be pretty devastating for real-time traders. So “Version 2″ needed a change.

Version 2.1

The most straightforward way to patch up Version 2 seemed to be to launch new threads for the pricing and quoting modules so that our market data connection would remain open and not get clogged. Something like this:

void OnMarketDataReceived(MarketData marketData) {
    ProcessMarketData(marketData);
    ThreadPool.QueueUserWorkItem(arg => RunPricingModel());
}

void RunPricingModel() {
    // lots of code ...
    ThreadPool.QueueUserWorkItem(arg => RunQuotingEngine());
}

This did work great, with one little problem. To understand the problem, consider this scenario:

  1. At time T, we receive a new market tick for XYZ.
  2. At time ~T (T plus a negligible amount of time to pass work along to the pricing module), we recalculate our theoretical price for XYZ.
  3. At time T + ~0 (i.e., an incredibly short time after T), we receive another market tick for XYZ.
  4. At time ~T + ~0, while the pricing model is already running, we start running it again.
  5. At time T + ~0 + ~0, we receive another market tick for XYZ.
  6. … and so on.

In other words, we now faced the potential issue of running the same pricing model for the same product on top of itself. Needless to say, our pricing models had not exactly been coded with this potentiality in mind. So, long story short: this caused bugs. Besides, it was just intuitively a bad thing; why would we want to be running 2 or 3 (or, sometimes, 10) instances of the pricing model for the same product at the same time?

“Version 3″

I put “Version 3″ in quotes above because it’s only a hypothetical beast at the moment. In reality we’re really at what might be called Version 2.1.1, which takes Version 2.1 and adds some boolean flags to make sure these methods aren’t walking on top of themselves.

What I’ve found myself itching for is a more general solution to this problem. One reason, which I don’t particularly think is a bad one, is that I’d prefer not to require that my fellow developer working on a pricing model needs to concern himself with issues of concurrency. It should be invisible to the pricing model where it stands in the context of the overall system design.

In other words, let’s say this is an excerpt from the “fixed” pricing method:

void RunPricingModel() {
    if (_pricingModelIsRunning) { // point A
        _needToRunPricingModelAgain = true;
        return;
    }
    
    do {
        _needToRunPricingModelAgain = false; // point B
        _pricingModelIsRunning = true; // point C
        
        // body of method
        
    } while (_needToRunPricingModelAgain);
    
    _pricingModelIsRunning = false;
}

Look at that! None of that code has anything to do with pricing a product. Moreover, there’s a tough-to-spot threading bug in that code above. Note the comments indicating points “A”, “B”, and “C”. Let’s say there are two threads, X and Y. Now imagine the following scenario:

  1. Thread X starts at point A and thread Y starts at point C.
  2. Thread X checks _pricingModelIsRunning. It’s false! So it continues on to point B.
  3. Thread Y sets _pricingModelIsRunning to true and continues on.

Uh-oh. We tried to allow only one thread at a time into this method, but it didn’t work! Threads X and Y both got in.

OK, so I’m pretty sold on this idea of a general solution. Something like a ThreadedWorkerChain class where I could construct an object like this:

var chain = new ThreadedWorkerChain(RunPricingModel, RunQuotingEngine);
chain.StartWork();

This hypothetical class would provide the following behavior:

  1. When StartWork is called:
    1. if RunPricingModel isn’t running, it will start running now
    2. is it IS running, it will run again once it’s finished
  2. When RunPricingModel is finished:
    1. if RunQuotingEngine isn’t running, it will start running now
    2. if it IS running, it will run again once it’s finished
  3. And so on.

This is very much still something I’m working on. However, what I’ve put together so far seems to exhibit the behavior I want, so I am optimistic about it. For a look at the code, go to the C# code page.

About these ads

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: