Archive for the ‘Scalability’ category

Remote Datasources

February 18, 2010

Almost all applications today deal with persisted data. The only rare exceptions are usually not very complicated such as a calculator, unit conversion tool, or minor utility. Let us assume that any application we’re working on is going to have some sort of persistant state that must be kept between executions of the application. Typically the state is stored in either a file or a database. File access is useful for many desktop applications used by a single user as the format can be created very easily from the data model, the data is easy to transfer to another user via email or some other file transfer process, and backups involve just storing the file somewhere safe.

Many applications that involve multiple users, including web applications, will instead employ a database. Even single user desktop applications sometimes use a database. Database access is often performed by a section of the code converting objects or structures into database tupils and storing them in tables. When the data is required again, select statements are made that call the data and turn it back into a structure. Usually this storage and retrieval of data is done in the same code base as the rest of the application.

There are a few problems with this approach however. For one, the database code can sometimes become tightly coupled with the business logic of the application itself. The application knows about the data being stored in a database and will often be changed accordingly to facilitate database access. Switching to a different database system can involve very large rewrites of not only the database code, but also application code as it can rely on database specific functionality. Think of functions that pass along snippets of SQL in order to construct the correct objects. Often that SQL will be database specific.

Besides being tied to a single database, the same coupling also ties applications to databases in general. If your application is expecting the data to be stored in a relational database, assumptions in the business logic will be made regarding that. This inhibits your ability to transition to a file store or other NoSQL solution if required. Because of these concerns, my topic for today is about abstraction and remote data sources.

If you code your database logic in with your application code, coupling can form. However if you separate them completely your application doesn’t need to know where the data comes from or where it goes when its saved. All the application has to do is follow a simple generic interface for storing and retrieving data. This not only elements coupling, but has some other scalability benefits that I’ll discuss a bit later.

To really appreciate interfaced based design and loose coupling, try removing your data access code from your application code base completely. Instead, use a language neutral format such as JSON or XML to communicate between your application and your data source, itself a small application. The application will send a request along with perhaps some data if it needs something stored. The data source will then transform that data into a format that can be stored and place it either in a database, file, or any other storage medium that you desire. Later if you want to modify how your data is stored, maybe a different DB system is more suited or you’re moving from a file-based system to a database, you can just modify the implementation of the data source application while not changing a line of code in your actual application. The loose coupling means that as long as the data source maintains a solid interface that the application can interact with, the application can be developed independently of the data source.

If each of your interactions between your application and datasource are atomic and independent of each other, stateless, you can horizontally scale your datasource using replication. Each data source can access a different replicated read-only data store for request information. Meanwhile write access will all go to a master data store that can be replicated to the read-only slaves. Read requests from an application can be distributed to multiple data sources, especially useful if the data source does any CPU intensive filtering or processing.

The main benefit however remains that by separating your data source from your application as a remote service, you can ensure that your code is loosely coupled and that your business logic will not rely upon the underlying method which your data is stored. This will lead to greater flexibility in your code, and a more agile platform on which to build.

Advertisements

Client Side Map-Reduce with Javascript

January 29, 2010

I’ve been doing a lot of scalability research lately. The High Scalability website has been fairly valuable to this end. I’ve been thinking of alternate approaches to my application designs, mostly based on services. There was an interesting article about Amazon.com’s architecture that describes a little bit on how they put services together.

I started thinking about an application that I work on and how it would work if every section of the application was talking to each other through a web service or sockets passing JSON or Protocol Buffers rather than the current monolithic design that uses object method calls. I then had the thought that why limit your services to being deployed on a set of static machines. There’s only so much expandability in that, what if we harnessed all of the unused power of the client machines that visit the site.

Anyone who’s done any serious work with ECMAScript (aka Javascript) knows that you can do some pretty powerful things in that language. One of the more interesting features about it is the ability to evaluate plain text in JSON format into Javascript objects using eval(). Now eval() is dangerous because it will run any text given to it as if it were Javascript. However that also makes it powerful.

Imagine needing to do a fairly intensive series of computation on a large data set and you don’t have a cluster to help you. You could have clients connect to a web site that you’ve set up and have their browser download data in JSON format along with executable Javascript code. The basic Javascript library can be such that a work unit will be very generic and contain any set of data and functions to perform on that data along with a finishing AJAX call to upload the results. On the server side when you have a Map-Reduce operation that you need to perform, you can distribute work units that contain both a section of the data along with the code needed to execute on it to any connected clients that have this library and are sending AJAX polling requests asking for work.

A work unit gets placed into a JSON string and sent back as the AJAX reply. The client then evaluates the reply which calls a javascript function that processes the data (which is probably a map job). Once the data is process the javascript automatically makes another AJAX call to send the result data back, most likely with some sort of work unit ID to keep track of anything, and requests the next work unit. The server then just coordinates between the various clients, setting time outs for each work unit so that if someone closes their browser the job can be handed out to an active client or a back-end server.

This will work a lot better on CPU intensive processes than it will on memory intensive ones. For example, sorting a large data set requires some CPU, but a lot more memory because you need to keep each element in memory that you’re sorting. Sending entire large lists to clients to sort and receiving the results would be slow due to bandwidth and latency restraints. However, performing large computations on a smaller series of data such as what’s done with SETI or brute force cryptography circumvention where you can send heartbeats of partial results back, there could be a benefit.

The limits of course will be on how much memory you can allocate in your browser to JavaScript. Also, since this technique would focus on heavy computational functions, the user will probably notice a fairly large hit on browsing speed in non-multithreaded or multiprocessing browsers. Naturally from a non-scientific point of view, most people would be outraged if their computer was being taken advantage of for its computing resources without their knowledge. However for applications working on an internal network or applications that state their intentions up front, users might be interested in leaving a browser open to a grid service to add their power to the system. This would probably be easier if you make each work unit worth a set of points and give them a running score like a game.

People like games.

Breaking a monolithic applicaiton into Services

January 22, 2010

Today I shall be tackling Service Oriented Architecture. I think that that particular buzz phrase has annoyed me a lot in the past. CTOs and middle management talk about how the read in a magazine something about SoA or SaaS (Software As A Service) as being the next big thing and that we should switch all of our stuff to that! Meanwhile there isn’t a very good explanation of how or why to do such a thing. I had written it off mostly as I never saw how independent services could be pulled together into an application. Sure it sounded neat for some other project that wasn’t mine. But no way did that model fit in with the project I’m doing.

Recently however I’ve been tasked with rewriting the section of code that generates reports in our application. The entire application is just a series of Java classes nested together and packaged up as one WAR file that is deployed to a Tomcat server. To scale, we deploy the same war file to another server (ideally everything is stateless and share-nothing so there’s no sessions to worry about). The whole thing is behind a load balancer that uses a round-robin strategy to send requests to various machines. Seems like a pretty good way to scale horizontally I thought.

However the horizontal scaling of this system is fairly coarse grain. If we find that getting hit with a lot of requests to generate reports is slowing down a server, our option is to duplicate the entire application on another server to distribute the load. However that is distributing not only report generation but also every other aspect of the application. So now we have an additional front end, service, and database layer running just to speed up one area. It seems like a bit of a waste of resources.

So instead of scaling a monolithic application, how about we break the whole thing up into ‘services’. Instead of the Report system just being a set of APIs and implementations in the applications source code, we instead make an entire new application that just generates reports as its whole goal. It has a basic interface and API that takes in a request with certain criteria of what type of report you want and in what format and returns that report. It can be a completely stand-alone application that just sits and waits, listening on a port for a request to come in. When one does it processes the request, generates a report, then sends the information back over that port to the client and waits for the next request. Naturally we make it multi-threaded so that we can handle multiple requests at a time, putting a limit on that number and queuing any overflow.

The benefit of this is manyfold. For starters you gain fine-grain horizontal scalability. If you find that one service is too slow or receives a lot of requests you can just deploy that service on additional machines. You only deploy that service however rather than the whole application. The service can be done via RPC, direct sockets, web services, or whatever else you like to listen for your requests. A controller near the front end would just call each individual service to gather the end data up to pass back to the user. Put it behind some sort of load balancer with failover and you have fine-grain horizontal scalability.

Second, you gain faster deployment. Since each service is self-contained they can be deployed independent of the other services. Launching a new version does not have to replace the entire system, just the services that you’re upgrading. In addition, since each service can be running (and should be running) on multiple machines, you can upgrade them piecemeal and perform bucket tests. For instance, if you have a service that is running on 10 machines you can upgrade only 2 of them and monitor user interaction with those services. This way you can have 20% of your users actually utilize your new code, the rest will hit the old code. When you see that everything is going fine you can upgrade the other 8 servers to have the new version of the service.

Because each part of the program is just a small separate program itself, problems become easier to manage and bugs become easier to fix. You can write tests for just the service you’re working on and make sure that it meets its contract obligation. It helps manage the problem with cyclical dependencies and tightly coupled code. It also reinforces the object-oriented strategy of message passing between components.

Further more, try to avoid language specific messages such as Java Serialization for communicating between your services. Utilize a language agnostic format such as JSON or Google’s Protocol Buffers. I would avoid XML because it’s fairly verbose and slower to transmit over a network and parse than those others. The advantage of using a language agnostic format is that each service can be written in a different language depending on the needs of the service. If you have a service that really needs to churn through a lot of data and you need every ounce of speed out of it you can write it in C or even assembly. For those areas where it would be good to do something concurrently you might use Scala, Erlang, or Clojure. The main idea is each part of your program can be written in a language that is best for solving that services problem.

Profiling

January 4, 2010

I’m not talking about the type that is done at the airport, but more the type that lets you see how long or how many times each section of code is called. There is most likely a wide variety of tools for profiling your code in a variety of languages. I’m going to focus on Java in this post using a tool called YourKit, but I assume other tools will give the same basic info.

From time to time, especially when you’re having performance problems, it’s a good idea to run your code through a profiler and gain a readout of how long methods take to run and how many times they are called. A good profiler will break it down into a nice tree view that shows which methods took the longest and which methods inside of those methods took a long time based on the number of invocations. Looking at these numbers you can sometimes find places where your code has a nested loop that isn’t required or is calling an expensive method or function that returns the exact same data multiple times.

A developer is trained to trace code in his/her head while writing it, however sometimes you write a method that uses the functions of an already written piece of code. If you don’t know how efficient that code is, you may place it in a loop without realizing the performance penalty. When testing it locally it might seem to run fine because you’re a single user with very low load. A method that called the DB 10k times might return in seconds and seem to display fine for you, but under load it can slow down in production.

While a load test suite can find that a problem exists in the code, profiling can narrow it down to what’s really causing it. You can profile your code without a commercial or open source profiler by manually adding timestamp information before and after method calls. This isn’t very usable to do for every single call, but if you have an idea of what areas are causing a slow down and you want to narrow it down it can be very helpful.

Recently I ran a profiler on a section of a project I’m working on. I was able to find that to load a fairly simple page that there was a database query being called 22k times. That DB query was pulling in information related to an item I actually used, but was never used itself. While those queries return fairly quickly normally, having multiple users hit the page at the same time will multiply the number of queries. Also, if the latency between the server and the DB increases, we will definitely see a decrease in the responsiveness of the application. If we can eliminate these unneeded DB calls, we can speed up the application for the end users and also increase scalability.

Just because you don’t see an issue when you’re running your quick visual test doesn’t mean that the issue doesn’t exist. Performance is not something that should be tacked on at the last minute, and code needs to be analyzed regularly to see where inefficiencies are introduced.

Apache Pig Tips #1

December 2, 2009

Pig is a new and growing platform on top of Hadoop that makes writing jobs easier because you can avoid writing Map and Reduce functions in Java directly while still allowing you to do so if you choose. Instead it creates a bunch of basic functions such as COUNT, FILTER, FOREACH, and such that you would normally have to independently write for each data manipulation you want to perform. Unfortunately, the Pig documentation is fairly sparse and performing what you would think is a basic manipulation can become very difficult if there are no examples.

In this post, I’m going to provide some examples based on what I have learned about Pig in the last week. I labeled this as Apache Pig Tips #1 because I expect I may write more in the future as I uncover additional usages.

My problem domain includes a data set that has multiple IDs and a result field:

{tcid, tpid, tsid, date, result}

There are a few more fields but I’ll leave those out for brevity. A quick description of what each of those IDs are: the tcid is a Test Case id that a result was inserted for. The tpid is the Test Plan that the Test Case was a part of for this result. The tsid is the Test Suite that the Test Plan belongs to. The date is the date the result was added, and the result is the actual result (Pass, Fail, Postponed… etc).

Now a Test Plan can have multiple Test Cases in it, however it can only have each test case in it once. A Test Case can also be in multiple Test Plans (though again only once for each Plan). A Test Suite can have multiple Test Plans, but each Test Plan belongs to exactly one Test Suite. Results for a test case in a test plan can be inserted multiple times. Maybe the first time it was tested it failed so a Fail entry is added. At a later date it passes so a new entry is  made with a Pass result. We need to generate a report that shows how many Pass and Fail per test suite using only the latest result (ignoring previous ones).

The tab separated data is located on HDFS in a file named ‘allresults’. First we need to load the data into a variable:

A = LOAD 'allresults' USING PigStorage() AS (tcid:int, tpid:int, tsid:int, date:chararray, result:chararray);

Next we need to find all Test Case/Test Plan combinations and group by them. This will give us a list of items that has multiple results of different dates, but all for the same test case in a test plan.

B = GROUP A BY (tcid, tpid) PARALLEL 100;

The Pig documentation mentions that the GROUP keyword can be applied to a single alias and the BY can apply to a single item. What isn’t easily discovered in the documentation is that the item can be a tuple, which you can define in line by surrounding multiple fields with (). Normally your group by looks like: B = GROUP A BY tcid; However, to group on multiple fields so that each entry is a unique combination of those fields you can surrounded it with () to make it a tuple.

Currently the definition of B looks something like this:

{group {tpid:int, tcid:int}, A {tcid:int, tpid:int, tsid:int, date:chararray, result:chararray}};

Basically we have a Bag where each item in the bag has a Bag containing the unique tpid and tcid, along with a second bag that contains 1 or more result rows. We need to look at that second bag and remove all but the most recent result rows so that we have just the most recent result.

X = FOREACH B {
    X1 = ORDER A BY date;
    X2 = LIMIT X1 1;
   GENERATE FLATTEN(X2);
}

This will loop through all items that were grouped by tcid and tpid. For each one it will order the inner result bag by date (descending by default). We then take only the first item from each of those ordered bags (the most recent result). We export to X the flattened version of the limited bag. This produces just a Bag of tuples that have all the non-recent results removed.

After that we can split up all of the results into separate aliases by filtering on X multiple times:

pass = FILTER X BY result == 'Pass';
fail = FILTER X BY result == 'Fail';
postpone = FILTER X BY result == 'Postpone';
-- group by suite and count it
passbysuite = GROUP pass BY tsid PARALLEL 100;
failbysuite = GROUP fail BY tsid PARALLEL 100;
postponebysuite = GROUP postpone BY tsid PARALLEL 100;
-- generate counts
passcount = FOREACH passbysuite GENERATE group, COUNT(pass);
failcount = FOREACH failbysuite GENERATE group, COUNT(fail);
postponecount = FOREACH postponebysuite GENERATE group, COUNT(postpone);

We now have 3 Bags, each containing a group which represents the tsid, along with a number correlating to how many results of each type existed.

Pig Frustrations

November 30, 2009

My desires to implement better scalability through pre-processing reports via the Grid have lead me to Pig. Unfortunately, while Pig does remove some of the difficulties of writing for Hadoop (you no longer have to write all of the map-reduce jobs yourself in java), it has many limitations.

The biggest limitation I’ve found so far is just lack of documentation. There are a few tutorials, and a language reference, but not really all that much more. Even internally at Yahoo there is only a few more bits of documentation than found at the official apache site. I know that Pig is very new and hasn’t even hit a 1.0 release yet, however the documentation really has to improve in order for people to start using it more.

Even when looking at the language reference itself I find there are limitations. For instance, you cannot use a FOREACH inside of a FOREACH. There’s no nested looping like that. This makes working with complex data structures quite difficult. The system seems to be based off of doing simple sorting, filtering, and grouping. However if you want to group multiple times to create more advanced data structures: bags within bags within bags, and then sort the inner most bag, there isn’t a construct for that that I have found.

Also I’ve run into error messages that do not really explain what went wrong. Since there is little documentation and even users so far, it’s not easy to resolve some of these errors. A quick search will usually result in a snippet of code where the error was defined or a list of error codes on a web page with no real descriptions to speak of. Definitely no possible solutions.

Despite those flaws, it still seems beneficial to write in Pig rather than writing manual MapReduce jobs.

It’s also possible that some of these flaws can be fixed by writing extensions in Java to call from my Pig scripts. I haven’t gathered up all of the requirements to build and compile custom functions into a jar yet, and perhaps that will be the next step. It’s just unfortunate that some of these things which I feel should be fairly basic are not included in the base package.

I’m still researching however and its possible I just haven’t discovered the way to do what I need. If that changes I’ll be sure to update.

Dynamic Offline Reports

November 23, 2009

Many applications have the primary concern of storing and retrieving data. The raw data by itself is often not very useful, so an additional process is put into place to turn that data into useful information. Many applications generate these reports from the data quickly at the user’s request through either a narrow SQL select statement, in application data processing, or both. However, in larger applications where the data is too large to handle in memory at a time, processing is too heavy, and report customization is too varied, information generation needs to be pushed to its own scalable system.

A lot of data has various pieces of metadata associated with it naturally. Data such as what user added the record, the date it was added or modified (or both), maybe the size of the record, categories, tags, keywords, or other pieces of associate data that is used to break it up into more manageable chunks. This meta data is useful, but only if we can use it as an aggregate to generate specific information relating to the grouping of items based on this data.

Sometimes generating these more specific reports is as easy as adding additional WHERE or GROUP BY clauses in SQL. However, when more advanced business rules are taking place where there isn’t an easy or succinct way of extracting this information via a query, or if the query returns such a large amount of data as to cause memory issues, a different approach can be taken.

For instance; in an application I am currently working on we need to generate reports based on a table with about 5 million rows. The SQL queries we use can limit the amount of rows returned to perhaps a few hundred thousand for some of our larger reports. However, a lot of the data needs to be processed in application code rather than by the database itself due to some special business rules. Because of this, we end up creating a large number of objects in Java to hold these result rows. If multiple users are generating different reports we might end up holding too many of these objects in memory at a time and receive an OOM error. Also, the processing on this data can be intense enough that if the server is slammed with report requests that the entire system slows down, causing difficulties for people wanting to insert or modify data. This is the case I am in while I contemplate offline report generation.

The basic idea is that the main application should be concerned purely with manipulating the data of the system. That is basic CRUD stuff such as creating new records, updating them, the rare deletions, and showing single records to the user (so they can edit or delete it). We want that part of the application to remain fast, and not be effected by the purely read-only needs imposed by report generation. In order to nullify the impact, we move reporting to its own system that reads from a separate read-only replication of our production database.

When a report request comes in to our application, we send a request to the separate reporting system. This can be done either as a web service or maybe an RPC call. The reporting system uses its read-only copy of the data to generate the report numbers and send it back, causing no speed delay for insertion or regular operation of the main application.

This doesn’t solve our OOM issues however, as many drivers for our database (MySQL) return ResultSet objects with the entire contents of the results which might be too large to fit into memory. However, since we’re using a read-only list anyway we can convert the table or tables we use to process our results into flat files that can be read in on a line by line basis, perform some intermediate result processing, deallocate those lines and work on additional lines. Since our reports are mostly generating statistical data over a large data set, we can process results on that data set in parallel using multiple threads or possibly multiple computers using a Hadoop cluster.

By making report generation asynchronous to our applications general work flow we will free up the processing power and the connection pool that’s used to handle requests by asking users to either poll for when the result is finished or to notify the system when a report is finished and thereby avoid the instances where we use all of our available connections or resources processing reports. There is also the added possibility of continuously generating all possible report data on a separate machine or cluster to decrease access time by increasing storage requirements.

I’m currently researching the use of MapReduce in Hadoop for processing this flat file of all data for reports. In addition, I’m researching a few languages that are reported to be good at concurrent processing so that I can benefit from multiple cores when generating reports from our raw data. My current focus is on Scala, Erlang, and Clojure, but I do not have much to report on those areas yet. If anyone has any information those languages as far as report generation based on a largish data set (currently 5 million, but the rate of growth is fairly alarming), let me know.