November 2007





Stateful compression

Cats: Code

Fri - 30 Nov 2007 - 12:46 PM

As I said earlier, we had gotten to the point with our logging mechanism where we could generate SQL batches and ship them off to a separate server. For data integrity purposes we had built a failover mechanism that would take failed sql batches and save them off to disk. At first that was adequate, but then we realized that this crude mechanism had one nasty drawback - if a series of batches failed, were re-submitted, and part of them failed again, there was no clean way to identify which worked and which didn't. Additionally, the SQL was very verbose, creating very large batch files during extended outtages. Finally, we decided that we wanted the ability to send log messages to a staging area first, before shipping them off to SQL, essentially creating an out-of-process buffer between the source application servers and the SQL database, to smooth out any spikes in load, or DBMS performance.

To support this, we decided to use the .net serialization mechanism, as we already had a discrete object that represented a log message with all the trimmings. Additionally, we decided that since we were going to go through the trouble of implementing a more robust serialization mechanism, we would also take a stab at compressing the data stream, without getting into hard-core gzip and the like.

.NET has several standard approaches to serialization. The most rudimentary is to simply mark your class with a Serializable attribute, and then use a BinaryFormatter to serialize it to a stream. This essentially walks the object graph, and spits it out to stream, using assembly-qualified type names to identify non-primitive datatypes. While in and of itself this is quite acceptable, we realized that we would have a very large amount of redundant data coming accross the stream. While the apps generate millions of messages per day, there are only a handful (less than 20 in our specific example) distinct message types. Since all of our messages are parameterized, we decided that we could save a lot of space by shipping off just a numeric message id, and the full set of defining parameters accross the stream, and letting the receiving end reconstruct the message as necessary.

There are two basic way to do this - to assign a unique (or unique-enough) id to a message. One is to do so compile-time - manually mark each message with an attribute that defines its id. The problem there is that you then have to make sure that both the sending and receiving end have the same messages and ids defined. What's even worse is that you have to manually assure id uniqueness for your entire set of messages. Painful. The other, is to lazy-assign these ids - assign an id as each new message is encountered. This approach is a lot simpler on the end user, but requires a bit of trickery up-front.

Let's assume for a second that we have two classes - a Reader and a Writer (I'll talk about this aspect in another post). The writer establishes a persistent connection with a reader, and begins serializing messages to it one by one. The persistent aspect is key. If we know that our writer is the only writer sending data to our reader, we can define a way for our two halves to assign ids to messages that they both have seen, and not worry about these ids colliding with the ids of other such writer/reader combinations.

With that in mind, we can start looking at specifics. What we need is a way to control how individual fields of our Message class get serialized. The simplest way to do that is to implement the ISerializable interface. This gives us two control points - a GetObjectData(SerializationInfo, StreamingContext) method that's invoked on object serialization, and a ..ctor(SerializationInfo, StreamingContext) constructor that the formatter class will invoke when de-serializing. The way these two work is that the SerializationInfo object is a glorified hashtable that you can throw strongly-typed name/value pairs in. When serializing, we determine which fields we want persisted, and when de-serializing, we read these back out, and interpret them to populate our newly-created object:

 
// serializing fields we want to take as is
// (excerpt from GetObjectData(SerializationInfo info, StreamingContext context))
info.AddValue("a", args);
info.AddValue("w", sessionID);
info.AddValue("u", userID);
info.AddValue("ex", exceptionText);     
 
// de-serializing fields we passed along untouched
// (excerpt from ..ctor(SerializationInfo info, StreamingContext context))
args = (string[])info.GetValue("a", typeof(string[]));
sessionID = info.GetString("w");
userID = info.GetString("u");
exceptionText = info.GetString("ex");

Alright, that's done. But what about the conditional stuff, what about assigning message ids? Well, that's the next step. First and foremost, we have to answer one question - how do we communicate with the outside world, specifically the reader/writer classes? The problem is that the two serialization methods we have aren't called by our code. They're called from within the formatter class, so we have no direct control over them. To answer that question, we need to look at the second parameter being passed into both of these methods - the StreamingContext struct. This struct has an interesting field on it - Context, of type object. Turns out that this field is there just for us tricky end-users who are trying to pass along some additional information. What we can then do is use this field to pass a helper into the serialization methods. The cleanest way to do this, without tying our message objects to the specifics of who's trying to serialize them is through an interface.

 
namespace WorkflowServer.Foundation.Diagnostics.Logging
{
    /// Manages serialization of message text. This interface defines methods for
    /// keeping track of which messages have been previously serialized, and
    /// can be replaced with remote identifiers.
    interface IMessageSerializationManager
    {
        /// Attempts to find the remote handle for a given message in a given
        /// instance. If the message hasn't been serialized yet, an id is
        /// generated, and false is returned
        /// returns true if the message has already been serialized, false otherwise
        bool FindOrAdd(InstanceInfo instance, string message, out int remoteId);     
 
        /// Adds the message to the manager, under the given id.
        void AddMessage(InstanceInfo instance, string message, int remoteId);     
 
        /// Gets the message referenced by the message id.
        string GetMessage(InstanceInfo instance, int messageId);
    }
}

Now, all we have to do is implement parts of this interface on both our reader and our writer, and we're set:

public override void GetObjectData(SerializationInfo info, StreamingContext context)
{
    base.GetObjectData(info, context);     
 
    IMessageSerializationManager msm = context.Context as IMessageSerializationManager;     
 
    int messageId;     
 
    // namespace
    if (!msm.FindOrAdd(Instance, codeNS, out messageId))
        info.AddValue("n", codeNS);
    info.AddValue("ni", messageId);     
 
    // name
    if (!msm.FindOrAdd(Instance, codeName, out messageId))
        info.AddValue("na", codeName);
    info.AddValue("n1", messageId);     
 
    // format
    if (!msm.FindOrAdd(Instance, formatString, out messageId))
        info.AddValue("f", formatString);
    info.AddValue("fi", messageId);     
 
    info.AddValue("a", args);
    info.AddValue("w", sessionID);
    info.AddValue("u", userID);
    info.AddValue("ex", exceptionText);
}     
 
protected RuntimeEvent(SerializationInfo info, StreamingContext context)
    : base(info, context)
{
    // the SerializationInfo class doesn't have a way to
    // cleanly check if it contains a particular member,
    // so we pull them all out into a list we can search
    List<string></string> keys = new List<string></string>();
    foreach (SerializationEntry se in info)
        keys.Add(se.Name);     
 
    args = (string[])info.GetValue("a", typeof(string[]));
    sessionID = info.GetString("w");
    userID = info.GetString("u");
    exceptionText = info.GetString("ex");     
 
    IMessageSerializationManager msm = context.Context as IMessageSerializationManager;     
 
    if (!keys.Contains("n"))
        codeNS = msm.GetMessage(Instance, info.GetInt32("ni"));
    else
    {
        codeNS = info.GetString("n");
        msm.AddMessage(Instance, codeNS, info.GetInt32("ni"));
    }     
 
    if (!keys.Contains("na"))
        codeName = msm.GetMessage(Instance, info.GetInt32("n1"));
    else
    {
        codeName = info.GetString("na");
        msm.AddMessage(Instance, codeName, info.GetInt32("n1"));
    }     
 
    if (!keys.Contains("f"))
        formatString = msm.GetMessage(Instance, info.GetInt32("fi"));
    else
    {
        formatString = info.GetString("f");
        msm.AddMessage(Instance, formatString, info.GetInt32("fi"));
    }     
 
    // compute the full message text
    message = String.Format(formatString, args) + exceptionText;
}

 That's it. What we've done is defined a stateful compression protocol. Every time a new Reader/Writer pair is defined, the message ids are re-calculated, but only once per message type. Given the usage pattern of high repetition, this is a compromise we're quite happy with.

No Comments »


Application logging

Cats: Code
Tags: ,

Thu - 29 Nov 2007 - 01:17 PM

One of the neat features that we built in workflow server is the logging mechanism. The concept is pretty straightforward - for every message that you want logged, you define an enum value, and mark it with an attribute:

 
    enum Messages
    {
        [DefaultMessage("Hello {0}")]
        HelloWorld
    }

Then, when you need to log that message, you call the Report method on an instance of the workflow application object, supplying the enum and substitution parameters:

WSApplication.Application.Report(Messages.HelloWorld, "World");

 There are a few distinct advantages to using this mechanism, over straight text logging.

  • Each message is uniquely identifiable - the logging mechanism gets an enum value, which it uses to extract the declaring type information. This gives us the ability to set up logging rules - direct different log messages by namespace to different targets, or suppress entire namespaces altogether.
  • Since the text of the log message itself isn't directly supplied to the log method, we can localize logging messages by providing an alternate text for a given message.
  • The parameters that are used to form the log message are passed to the logging mechanism as discrete values. This has a very interesting side effect of letting us store those parameters separately, given a non-flat file medium.

With this in place, we implemented several things. First of all, we developed a database logging target. The idea is that each message, instead of being written out to text, is written out to a database. This database has two key tables - a master and a detail. The master table has general information about the message (the origin, the timestamp, the user logged into the system if applicable, the severity, etc), along with the message id (the fully-qualified enum value name) and the formed message text. The detail table has a row for every parameter of a given message. This allowed us to collect logging data in an easily searchable and reportable way. A simple example - whenever workflow server makes a database call, it logs the call, along with the execution time and sql parameters. What we get in the database is a master table entry with a message id of WorkflowServer.Foundation.PersistenceManagement.ADO.ADOCommand.CommandDuration, and a series of detail records - one with the execution time, one with the sql and a set of rows for the actual parameters (additional columns are ommited for clarity):

master:

Command ran for 0 s ('{call s_user_client_keys @user_id = 210211308, @client_id = 85 }')

detail: (columns are parameter index and parameter value)

0 0
1 {call s_user_client_keys @user_id = 210211308, @client_id = 85 }
2 s_user_client_keys
3 210211308
4 85

We can then query against this data - as an example, when we run our load tests, we can compare two subsequent database call reports to see how two versions of the application change with respect to database access. Also, this becomes very useful in a clustered environment. This mechanism automatically aggregates data from separate physical environments into a central point, and lets us query it without touching the actual source machines.

With just this in place, we had a rather powerful logging structure in place. However, we ran into a couple of problems. First off, with a heavy enough volume of data (our SQL Server logging db was growing by about 7GB per day), long-running queries on the database, or a high amount of concurrent writes slowed the DB down, causing logging messages to build up in the source applications, causing all sorts of fun problems. Second, persistence became an issue. When writing to disk, really the only thing that can go wrong mid-write is the disk can fill up. The chances of anything else causing issues are slim-to-none. Once you introduce process and network boundaries into the mix, you suddenly have a lot more that can break. I'll talk about the solutions to these over the next few days.

No Comments »


Crashing the TCP Stack

Cats: Code
Tags: ,

Tue - 27 Nov 2007 - 08:26 AM

On the way to writing the distributed logging mechanism for workflow server, I needed to write my own formatter, as the default BinaryFormatter was too verbose for what I was looking to do (more on that later). Once written, and unit-tested locally, I tried a final series of tests with remote clients - i.e. actually writing onto the wire, rather than just the loop-back. This should've been a routine and quick test. As you can imagine, it didn't turn out to be. First time through, my machine became paralyzed. I lost my Exchange connection, opened remote directories stopped responding, and finally Windows Explorer crashed. Thinking that this was some freak occurence, I rebooted the machine and tried again... to the same effect. Time for full-on debugging.

Symptoms - all network connectivity is lost when attempting to run the app. Works fine over loopback. Works fine with a more-verbose formatter (tested serializing through a SoapFormatter). Doesn't work over my formatter, or the default BinaryFormatter. Works if I step through with the debugger. Stops working the second I stop stepping through and let it run. Fun. This short list actually took days to gather, because in every failed attempt ment a complete reboot of my workstation, a process which takes up to 10 minutes, and one that I don't always have the time for.

Once I saw that I could readily reproduce this issue, and that it seemed to be data-specific (why else would changing the formatter have any effect), I tried to syphon off the data that was being sent. Decorating the network stream that I passed to the formatter, I wrote the data being sent to the stream, and the chunk size off to a set of files, so I could then eliminate the entire workflow stack and try to replicate this problem on its own.

 
class StreamWrapper : Stream
{
    private Stream stream;
    private BinaryWriter data;
    private BinaryWriter indeces;    
 
    public StreamWrapper(Stream stream)
    {
        this.stream = stream;    
 
        data = new BinaryWriter(new FileStream("c:\\data.out", FileMode.Create));
        indeces = new BinaryWriter(new FileStream("c:\\indeces.out", FileMode.Create));
    }    
 
    public override bool CanRead { get { return stream.CanRead; } }
    public override bool CanSeek { get { return stream.CanSeek; } }
    public override bool CanWrite { get { return stream.CanWrite; } }
    public override void Flush() { stream.Flush(); }
    public override long Length { get { return stream.Length; } }    
 
    public override long Position
    {
        get { return stream.Position; }
        set { stream.Position = value; }
    }    
 
    public override int Read(byte[] buffer, int offset, int count) { return stream.Read(buffer, offset, count); }
    public override long Seek(long offset, SeekOrigin origin) { return stream.Seek(offset, origin); }
    public override void SetLength(long value) { stream.SetLength(value); }    
 
    public override void Write(byte[] buffer, int offset, int count)
    {
        data.Write(buffer, offset, count);
        indeces.Write(count - offset);    
 
        stream.Write(buffer, offset, count);
    }
}

Sure enough, playing back the data through a trivial app had the same effect. It then dawned on me that I was seeing lots of small data chunks being flushed directly to the stream, so I tried wrapping the network stream in a buffered stream. Eureka! That did the trick. Both for my test data set, and the app itself.

Now, granted, I should have been using the buffered stream from the get-go, for performance considerations, and I probably would have on the second pass through the code. But an all-out crash is a long way away from poor performance. The question that I have yet to answer is were the symptoms specific to my environment (WinXP SP2 in a corporate enrivonment with a whole host of A/V and monitoring goodies), or is it something that would occur on any box. I'm attaching the sample data that crashed my box. data.out is the data source, and indeces.out is the block lengths. I'd be curious to see if people could reproduce this on their setups...

For those interested, here's the code to quickly play back the data/indeces files. Just create a console app with this code, and start two instances on remote hosts.

 
namespace NetworkTest
{
    class Program
    {
        const int chunkSize = 8000;    
 
        static void Main(string[] args)
        {
            Console.Write("sender/receiver? :");
            switch (Console.ReadLine().Trim().ToLower())
            {
                case "s":
                    StartSender();
                    break;
                default:
                    StartReceiver();
                    break;
            }
        }    
 
        private static void StartReceiver()
        {
            TcpListener listener = new TcpListener(IPAddress.Any, 4444);
            listener.Start();    
 
            using (TcpClient client = listener.AcceptTcpClient())
            {
                NetworkStream stream = client.GetStream();
                byte[] buffer = new byte[chunkSize];    
 
                while (true)
                {
                    int read = stream.Read(buffer, 0, chunkSize);
                    for (int i = 0; i &lt; read; i++)
                        Console.Write(Convert.ToString(buffer[i], 16));
                }
            }
        }    
 
        private static void StartSender()
        {
            Console.Write("host? :");
            string host = Console.ReadLine();    
 
            Console.Write("use indeces? :");
            bool useIndeces = Console.ReadLine().Trim().ToLower() == "y";    
 
            BinaryReader indeces = new BinaryReader(new FileStream("c:\\indeces.out", FileMode.Open));
            BinaryReader data = new BinaryReader(new FileStream("c:\\data.out", FileMode.Open));    
 
            TcpClient client = new TcpClient(host, 4444);
            NetworkStream stream = client.GetStream();    
 
            byte[] buffer = new byte[chunkSize];    
 
            while (true)
            {
                int length = useIndeces ? indeces.ReadInt32() : Math.Min((int)(data.BaseStream.Length -
                                data.BaseStream.Position), chunkSize);
                data.Read(buffer, 0, length);    
 
                stream.Write(buffer, 0, length);    
 
                if ((useIndeces &amp;&amp; (indeces.BaseStream.Position &gt;= indeces.BaseStream.Length))
                    || (!useIndeces &amp;&amp; data.BaseStream.Position &gt;= data.BaseStream.Position))
                    break;
            }
        }
    }
}

Data file: data.out
Index file: indeces.out

No Comments »


Asynchronous session access

Cats: Code

Mon - 12 Nov 2007 - 05:51 PM

After spending two days of my life tracking down a classic multithreading issue, I'd like to see if this is potentially a wider-spread problem than just me. Scenario - application written in asp.net 2.0 crashes randomly. By crashes I mean the worker process recycles. Since we're using inproc sessions, we get the expected result of every user getting logged out of the application at once. There doesn't seem to be any rhyme or reason to crash itself. Our logging mechanisms don't show anything. The event log shows a cryptic

EventType clr20r3, P1 w3wp.exe, P2 6.0.3790.1830, P3 42435be1, P4 mscorlib, P5 2.0.0.0, P6 4333ab80, P7 1546, P8 21, P9 system.argumentoutofrange, P10 NIL.

After digging around on the web, we drop in Peter Bromberg's suggestion (http://www.eggheadcafe.com/articles/20060305.asp). The stack trace we got pointed to AspCompatApplicationStep.AnyStaObjectsInSessionState.

Time for reflector. Disassembling AspCompatApplicationStep.AnyStaObjectsInSessionState shows the following code

 
internal static bool AnyStaObjectsInSessionState(HttpSessionState session)
{
	if (session != null)
	{
		int count = session.Count;
		for (int i = 0; i < count; i++)
		{
			object obj2 = session[i];
			if (((obj2 != null) && (obj2.GetType().FullName == "System.__ComObject"))
			   	&& (UnsafeNativeMethods.AspCompatIsApartmentComponent(obj2) != 0))
			{
				return true;
			}
		}
	}
	return false;
}
 

The only place you can get an ArgumentOutOfRange exception in this code snippet is where we go after the session indexer. What we figured out was that we had the old-school way of catching a session timeout via an expiring web cache entry. In the timeout handler, we were removing something from the session. What it looks like was happening was that when the load on the system was just right, garbage collection was kicking in around the same time as our cache entry was timing out (we had it set to time out 1 minute after the session should have). From there on, standard multithreading issues kick in - deleting an entry from the session while someone is iterating over it.

"Fun" debugging experience. Bottom line - be careful with that handler.

1 Comment »
Meta
Posts | Comments | RDF | Atom | Valid XHTML | CSS | Log in