EDN Admin
Well-known member
<span class=value>Greetings,
This is a very general CS theory
question, so I hope Ive chosen the right forum for this. For the last
year or so Ive been developing a multi-threaded futures trading
application. The way the exchange disseminates the market data is in a
series of "channels" -- each channel has its own IP address, and
transmits a certain group of products. As an example (this isnt how
its done), lets say ChannelA transmits all of the market data for
stocks that begin with the letter A and ChannelB for stocks beginning
with B and so on...
I have a Channel class that connects to
each of these (multicast/UDP) channels. So depending on how many
stocks we want to listen to, I might be making many instances of the
Channel class (one for each "channel" of data we want to listen to).
The
portion of the application that will be *using* this data (well call
it the Listener class) doesnt care or want to know about the
channel-structure of the market data. It just wants to be able to
request the data for "AAPL" or "BP" without having to know they might
be coming from different channels.
So my Listener class has a
shared Queue object and a shared WaitHandle that each of the Channel
objects get a reference to. My Listener class (of which there will
only be 1) is the *consumer* thread. Each of the Channel classes (of
which there will be many) are the *producer* thread(s), each of which
has a reference to the Queue/WaitHandle of the consumer.
Then whenever a Channel (producer) gets a new piece of data, it:
1) first evaluates if the Listener(consumer) cares about this particular stock
2) if so, it attempts to lock on the SyncRoot of the Queue (to gain exclusive write access)
3) adds the new piece(s) of data to the Queue
4) notifies the WaitHandle of the Listener that a new item exists
5) release the SyncRoot lock
The Listener (consumer) thread basically has an infinite loop:
1) wait until any of the Channels (producers) notify my WaitHandle that a new item exists
2) attempt to lock on the SyncRoot of the Queue
3) process the new piece of market data
4) release the SyncRoot lock
So
my first question is very general: is this the right approach for this
type of application? This market data can sometimes come in VERY fast
(10s of thousands of updates per second). What Im beginning to see,
however, is that if ChannelB (only submits updates for stocks whose
symbols start with B) has very heavy data, but most of the stocks my
Listener (consumer) doesnt care about, it seems to be adversely
affecting the performance of the Channels that *do* have many stocks
that we care about. To put it more simply, if my Listener is
interested in 100 stocks that begin with the letter A and only 2 that
begin with the letter B I would expect ChannelB to not affect the
performance of ChannelA (even if ChannelB has VERY heavy message
volume) because only a small fraction of the updates that ChannelB gets
are relevant to my Listener (and so it doesnt affect/lock the Queue).
I would think that if 95% of the updates that come through ChannelB
do not get passed to the Listener, and since that is occurring on a
separate thread, it should have no adverse impact on the performance of
other channels.
One caveat to my approach (and something that
Ive been questioning for some time now) is my locking method in my
Listener (consumer) class. It looks like the following:
<div style="overflow:auto;background-color:white;line-height:100% ! important;font-family:Courier New;font-size:11px <table style="border-width:0px;margin:2px 0px;width:99%;border-collapse:collapse;background-color:rgb(255, 255, 255)" cellpadding=0 cellspacing=0><col style="font-family:Courier New;font-size:11px;padding-left:10px;white-space:nowrap <tbody><tr><td><font style="font-size:11px </font><font style="color:blue while</font><font style="font-size:11px (WaitHandle.WaitAny() != 1) { </font><font style="color:green // wait til a producer adds 1 or more items to queue</font><font style="font-size:11px </font></td></tr><tr><td style="background-color:rgb(247, 247, 247) ArrayList newMessages = <font style="color:blue new</font><font style="font-size:11px ArrayList(); </font></td></tr><tr><td> <font style="color:blue lock</font><font style="font-size:11px (m_Queue.SyncRoot) { </font><font style="color:green // lock the shared Queues SyncRoot member</font><font style="font-size:11px </font></td></tr><tr><td style="background-color:rgb(247, 247, 247) newMessages.AddRange(m_Queue); </td></tr><tr><td> m_Queue.Clear(); </td></tr><tr><td style="background-color:rgb(247, 247, 247) } <font style="color:green // unlock the Queue</font><font style="font-size:11px </font></td></tr><tr><td> </td></tr><tr><td style="background-color:rgb(247, 247, 247) <font style="color:green // so weve only locked the Queue long enough to copy all of the elements, now</font><font style="font-size:11px </font></td></tr><tr><td> <font style="color:green // we can essentially take our time processing the messages because the producer(s)</font><font style="font-size:11px </font></td></tr><tr><td style="background-color:rgb(247, 247, 247) <font style="color:green // are free to add to the Queue (since its unlocked)</font><font style="font-size:11px </font></td></tr><tr><td> </td></tr><tr><td style="background-color:rgb(247, 247, 247) <font style="color:blue for</font><font style="font-size:11px (</font><font style="color:blue int</font><font style="font-size:11px i = 0; i < newMessages.Count; i++) { </font></td></tr><tr><td> ProccessMessage(newMessages); </td></tr><tr><td style="background-color:rgb(247, 247, 247) } </td></tr><tr><td>} </td></tr></tbody></table>
So
what Im wondering is: should I be locking the Queue long enough to
grab ALL of the new objects within it, or should I only lock it long
enough to grab ONE element? In other words, rather than the code
above, should it look like the following:
<div style="overflow:auto;background-color:white;line-height:100% ! important;font-family:Courier New;font-size:11px <table style="border-width:0px;margin:2px 0px;width:99%;border-collapse:collapse;background-color:rgb(255, 255, 255)" cellpadding=0 cellspacing=0><col style="font-family:Courier New;font-size:11px;padding-left:10px;white-space:nowrap <tbody><tr><td><font style="font-size:11px </font><font style="color:blue while</font><font style="font-size:11px (WaitHandle.WaitAny() != 1) { </font><font style="color:green // wait til a producer adds 1 or more items to queue</font><font style="font-size:11px </font></td></tr><tr><td style="background-color:rgb(247, 247, 247) <font style="color:blue while</font><font style="font-size:11px (m_Queue.Count > 0){ </font></td></tr><tr><td> <font style="color:blue object</font><font style="font-size:11px newMessage = </font><font style="color:blue null</font><font style="font-size:11px ; </font></td></tr><tr><td style="background-color:rgb(247, 247, 247) <font style="color:blue lock</font><font style="font-size:11px (m_Queue.SyncRoot) { </font><font style="color:green // lock the shared Queues SyncRoot member</font><font style="font-size:11px </font></td></tr><tr><td> newMessage = m_Queue.Dequeue(); </td></tr><tr><td style="background-color:rgb(247, 247, 247) } </td></tr><tr><td> ProccessMessage(newMessage); </td></tr><tr><td style="background-color:rgb(247, 247, 247) } </td></tr><tr><td>} </td></tr><tr><td style="background-color:rgb(247, 247, 247) </td></tr></tbody></table>
Im
not sure if accessing m_Queue.Count outside of a lock() statement is
safe, but is this second approach a better way to implement a
producer/consumer model?
View the full article
This is a very general CS theory
question, so I hope Ive chosen the right forum for this. For the last
year or so Ive been developing a multi-threaded futures trading
application. The way the exchange disseminates the market data is in a
series of "channels" -- each channel has its own IP address, and
transmits a certain group of products. As an example (this isnt how
its done), lets say ChannelA transmits all of the market data for
stocks that begin with the letter A and ChannelB for stocks beginning
with B and so on...
I have a Channel class that connects to
each of these (multicast/UDP) channels. So depending on how many
stocks we want to listen to, I might be making many instances of the
Channel class (one for each "channel" of data we want to listen to).
The
portion of the application that will be *using* this data (well call
it the Listener class) doesnt care or want to know about the
channel-structure of the market data. It just wants to be able to
request the data for "AAPL" or "BP" without having to know they might
be coming from different channels.
So my Listener class has a
shared Queue object and a shared WaitHandle that each of the Channel
objects get a reference to. My Listener class (of which there will
only be 1) is the *consumer* thread. Each of the Channel classes (of
which there will be many) are the *producer* thread(s), each of which
has a reference to the Queue/WaitHandle of the consumer.
Then whenever a Channel (producer) gets a new piece of data, it:
1) first evaluates if the Listener(consumer) cares about this particular stock
2) if so, it attempts to lock on the SyncRoot of the Queue (to gain exclusive write access)
3) adds the new piece(s) of data to the Queue
4) notifies the WaitHandle of the Listener that a new item exists
5) release the SyncRoot lock
The Listener (consumer) thread basically has an infinite loop:
1) wait until any of the Channels (producers) notify my WaitHandle that a new item exists
2) attempt to lock on the SyncRoot of the Queue
3) process the new piece of market data
4) release the SyncRoot lock
So
my first question is very general: is this the right approach for this
type of application? This market data can sometimes come in VERY fast
(10s of thousands of updates per second). What Im beginning to see,
however, is that if ChannelB (only submits updates for stocks whose
symbols start with B) has very heavy data, but most of the stocks my
Listener (consumer) doesnt care about, it seems to be adversely
affecting the performance of the Channels that *do* have many stocks
that we care about. To put it more simply, if my Listener is
interested in 100 stocks that begin with the letter A and only 2 that
begin with the letter B I would expect ChannelB to not affect the
performance of ChannelA (even if ChannelB has VERY heavy message
volume) because only a small fraction of the updates that ChannelB gets
are relevant to my Listener (and so it doesnt affect/lock the Queue).
I would think that if 95% of the updates that come through ChannelB
do not get passed to the Listener, and since that is occurring on a
separate thread, it should have no adverse impact on the performance of
other channels.
One caveat to my approach (and something that
Ive been questioning for some time now) is my locking method in my
Listener (consumer) class. It looks like the following:
<div style="overflow:auto;background-color:white;line-height:100% ! important;font-family:Courier New;font-size:11px <table style="border-width:0px;margin:2px 0px;width:99%;border-collapse:collapse;background-color:rgb(255, 255, 255)" cellpadding=0 cellspacing=0><col style="font-family:Courier New;font-size:11px;padding-left:10px;white-space:nowrap <tbody><tr><td><font style="font-size:11px </font><font style="color:blue while</font><font style="font-size:11px (WaitHandle.WaitAny() != 1) { </font><font style="color:green // wait til a producer adds 1 or more items to queue</font><font style="font-size:11px </font></td></tr><tr><td style="background-color:rgb(247, 247, 247) ArrayList newMessages = <font style="color:blue new</font><font style="font-size:11px ArrayList(); </font></td></tr><tr><td> <font style="color:blue lock</font><font style="font-size:11px (m_Queue.SyncRoot) { </font><font style="color:green // lock the shared Queues SyncRoot member</font><font style="font-size:11px </font></td></tr><tr><td style="background-color:rgb(247, 247, 247) newMessages.AddRange(m_Queue); </td></tr><tr><td> m_Queue.Clear(); </td></tr><tr><td style="background-color:rgb(247, 247, 247) } <font style="color:green // unlock the Queue</font><font style="font-size:11px </font></td></tr><tr><td> </td></tr><tr><td style="background-color:rgb(247, 247, 247) <font style="color:green // so weve only locked the Queue long enough to copy all of the elements, now</font><font style="font-size:11px </font></td></tr><tr><td> <font style="color:green // we can essentially take our time processing the messages because the producer(s)</font><font style="font-size:11px </font></td></tr><tr><td style="background-color:rgb(247, 247, 247) <font style="color:green // are free to add to the Queue (since its unlocked)</font><font style="font-size:11px </font></td></tr><tr><td> </td></tr><tr><td style="background-color:rgb(247, 247, 247) <font style="color:blue for</font><font style="font-size:11px (</font><font style="color:blue int</font><font style="font-size:11px i = 0; i < newMessages.Count; i++) { </font></td></tr><tr><td> ProccessMessage(newMessages); </td></tr><tr><td style="background-color:rgb(247, 247, 247) } </td></tr><tr><td>} </td></tr></tbody></table>
So
what Im wondering is: should I be locking the Queue long enough to
grab ALL of the new objects within it, or should I only lock it long
enough to grab ONE element? In other words, rather than the code
above, should it look like the following:
<div style="overflow:auto;background-color:white;line-height:100% ! important;font-family:Courier New;font-size:11px <table style="border-width:0px;margin:2px 0px;width:99%;border-collapse:collapse;background-color:rgb(255, 255, 255)" cellpadding=0 cellspacing=0><col style="font-family:Courier New;font-size:11px;padding-left:10px;white-space:nowrap <tbody><tr><td><font style="font-size:11px </font><font style="color:blue while</font><font style="font-size:11px (WaitHandle.WaitAny() != 1) { </font><font style="color:green // wait til a producer adds 1 or more items to queue</font><font style="font-size:11px </font></td></tr><tr><td style="background-color:rgb(247, 247, 247) <font style="color:blue while</font><font style="font-size:11px (m_Queue.Count > 0){ </font></td></tr><tr><td> <font style="color:blue object</font><font style="font-size:11px newMessage = </font><font style="color:blue null</font><font style="font-size:11px ; </font></td></tr><tr><td style="background-color:rgb(247, 247, 247) <font style="color:blue lock</font><font style="font-size:11px (m_Queue.SyncRoot) { </font><font style="color:green // lock the shared Queues SyncRoot member</font><font style="font-size:11px </font></td></tr><tr><td> newMessage = m_Queue.Dequeue(); </td></tr><tr><td style="background-color:rgb(247, 247, 247) } </td></tr><tr><td> ProccessMessage(newMessage); </td></tr><tr><td style="background-color:rgb(247, 247, 247) } </td></tr><tr><td>} </td></tr><tr><td style="background-color:rgb(247, 247, 247) </td></tr></tbody></table>
Im
not sure if accessing m_Queue.Count outside of a lock() statement is
safe, but is this second approach a better way to implement a
producer/consumer model?
View the full article