Stateful compression
Cats: Code
As I said earlier, we had gotten to the point with our logging mechanism where we could generate SQL batches and ship them off to a separate server. For data integrity purposes we had built a failover mechanism that would take failed sql batches and save them off to disk. At first that was adequate, but then we realized that this crude mechanism had one nasty drawback - if a series of batches failed, were re-submitted, and part of them failed again, there was no clean way to identify which worked and which didn't. Additionally, the SQL was very verbose, creating very large batch files during extended outtages. Finally, we decided that we wanted the ability to send log messages to a staging area first, before shipping them off to SQL, essentially creating an out-of-process buffer between the source application servers and the SQL database, to smooth out any spikes in load, or DBMS performance.
To support this, we decided to use the .net serialization mechanism, as we already had a discrete object that represented a log message with all the trimmings. Additionally, we decided that since we were going to go through the trouble of implementing a more robust serialization mechanism, we would also take a stab at compressing the data stream, without getting into hard-core gzip and the like.
.NET has several standard approaches to serialization. The most rudimentary is to simply mark your class with a Serializable attribute, and then use a BinaryFormatter to serialize it to a stream. This essentially walks the object graph, and spits it out to stream, using assembly-qualified type names to identify non-primitive datatypes. While in and of itself this is quite acceptable, we realized that we would have a very large amount of redundant data coming accross the stream. While the apps generate millions of messages per day, there are only a handful (less than 20 in our specific example) distinct message types. Since all of our messages are parameterized, we decided that we could save a lot of space by shipping off just a numeric message id, and the full set of defining parameters accross the stream, and letting the receiving end reconstruct the message as necessary.
There are two basic way to do this - to assign a unique (or unique-enough) id to a message. One is to do so compile-time - manually mark each message with an attribute that defines its id. The problem there is that you then have to make sure that both the sending and receiving end have the same messages and ids defined. What's even worse is that you have to manually assure id uniqueness for your entire set of messages. Painful. The other, is to lazy-assign these ids - assign an id as each new message is encountered. This approach is a lot simpler on the end user, but requires a bit of trickery up-front.
Let's assume for a second that we have two classes - a Reader and a Writer (I'll talk about this aspect in another post). The writer establishes a persistent connection with a reader, and begins serializing messages to it one by one. The persistent aspect is key. If we know that our writer is the only writer sending data to our reader, we can define a way for our two halves to assign ids to messages that they both have seen, and not worry about these ids colliding with the ids of other such writer/reader combinations.
With that in mind, we can start looking at specifics. What we need is a way to control how individual fields of our Message class get serialized. The simplest way to do that is to implement the ISerializable interface. This gives us two control points - a GetObjectData(SerializationInfo, StreamingContext) method that's invoked on object serialization, and a ..ctor(SerializationInfo, StreamingContext) constructor that the formatter class will invoke when de-serializing. The way these two work is that the SerializationInfo object is a glorified hashtable that you can throw strongly-typed name/value pairs in. When serializing, we determine which fields we want persisted, and when de-serializing, we read these back out, and interpret them to populate our newly-created object:
// serializing fields we want to take as is // (excerpt from GetObjectData(SerializationInfo info, StreamingContext context)) info.AddValue("a", args); info.AddValue("w", sessionID); info.AddValue("u", userID); info.AddValue("ex", exceptionText); // de-serializing fields we passed along untouched // (excerpt from ..ctor(SerializationInfo info, StreamingContext context)) args = (string[])info.GetValue("a", typeof(string[])); sessionID = info.GetString("w"); userID = info.GetString("u"); exceptionText = info.GetString("ex");
Alright, that's done. But what about the conditional stuff, what about assigning message ids? Well, that's the next step. First and foremost, we have to answer one question - how do we communicate with the outside world, specifically the reader/writer classes? The problem is that the two serialization methods we have aren't called by our code. They're called from within the formatter class, so we have no direct control over them. To answer that question, we need to look at the second parameter being passed into both of these methods - the StreamingContext struct. This struct has an interesting field on it - Context, of type object. Turns out that this field is there just for us tricky end-users who are trying to pass along some additional information. What we can then do is use this field to pass a helper into the serialization methods. The cleanest way to do this, without tying our message objects to the specifics of who's trying to serialize them is through an interface.
namespace WorkflowServer.Foundation.Diagnostics.Logging { /// Manages serialization of message text. This interface defines methods for /// keeping track of which messages have been previously serialized, and /// can be replaced with remote identifiers. interface IMessageSerializationManager { /// Attempts to find the remote handle for a given message in a given /// instance. If the message hasn't been serialized yet, an id is /// generated, and false is returned /// returns true if the message has already been serialized, false otherwise bool FindOrAdd(InstanceInfo instance, string message, out int remoteId); /// Adds the message to the manager, under the given id. void AddMessage(InstanceInfo instance, string message, int remoteId); /// Gets the message referenced by the message id. string GetMessage(InstanceInfo instance, int messageId); } }
Now, all we have to do is implement parts of this interface on both our reader and our writer, and we're set:
public override void GetObjectData(SerializationInfo info, StreamingContext context) { base.GetObjectData(info, context); IMessageSerializationManager msm = context.Context as IMessageSerializationManager; int messageId; // namespace if (!msm.FindOrAdd(Instance, codeNS, out messageId)) info.AddValue("n", codeNS); info.AddValue("ni", messageId); // name if (!msm.FindOrAdd(Instance, codeName, out messageId)) info.AddValue("na", codeName); info.AddValue("n1", messageId); // format if (!msm.FindOrAdd(Instance, formatString, out messageId)) info.AddValue("f", formatString); info.AddValue("fi", messageId); info.AddValue("a", args); info.AddValue("w", sessionID); info.AddValue("u", userID); info.AddValue("ex", exceptionText); } protected RuntimeEvent(SerializationInfo info, StreamingContext context) : base(info, context) { // the SerializationInfo class doesn't have a way to // cleanly check if it contains a particular member, // so we pull them all out into a list we can search List<string></string> keys = new List<string></string>(); foreach (SerializationEntry se in info) keys.Add(se.Name); args = (string[])info.GetValue("a", typeof(string[])); sessionID = info.GetString("w"); userID = info.GetString("u"); exceptionText = info.GetString("ex"); IMessageSerializationManager msm = context.Context as IMessageSerializationManager; if (!keys.Contains("n")) codeNS = msm.GetMessage(Instance, info.GetInt32("ni")); else { codeNS = info.GetString("n"); msm.AddMessage(Instance, codeNS, info.GetInt32("ni")); } if (!keys.Contains("na")) codeName = msm.GetMessage(Instance, info.GetInt32("n1")); else { codeName = info.GetString("na"); msm.AddMessage(Instance, codeName, info.GetInt32("n1")); } if (!keys.Contains("f")) formatString = msm.GetMessage(Instance, info.GetInt32("fi")); else { formatString = info.GetString("f"); msm.AddMessage(Instance, formatString, info.GetInt32("fi")); } // compute the full message text message = String.Format(formatString, args) + exceptionText; }
That's it. What we've done is defined a stateful compression protocol. Every time a new Reader/Writer pair is defined, the message ids are re-calculated, but only once per message type. Given the usage pattern of high repetition, this is a compromise we're quite happy with.