Multi-threading made easier than ever...

Coordinator
Apr 20, 2010 at 9:13 PM
Edited Apr 26, 2010 at 4:28 AM

Introduction

If you google up 'multi-threading' you will find several millions articles on this topic. However, most of them will show you just the fundamentals, such as how to (a) initialize a few threads, and (b) start them.  Then they have a Console.ReadLine() at the end of the loop to wait for all of the thread to finish.  After making this point, they stopped, thinking that they have empowered you with all of the skills necessary to begin using multi-threading on your application.

This article is different.  Multi-threading with crudwork is very different and easy.  I will show you enough to get started.  And then I will show you what really going on behind the scene.

Getting Started with Multi-threading using crudwork

Say for example, we have a task to write an application to give all employees an enormous raise of 20%.  To do this, we will retrieve a list of employee id from the EMPLOYEE table, and use two stored procedures to get and update the salary.  This could be done with a foreach loop.  And this should work fine on a single thread.

private static void GiveRaise()
{
	var df = new DataFactory(DatabaseProvider.SqlClient, "data source=(local); integrated security=true; initial catalog=MyCompany");
	var dt = df.FillTable("select ID from Employee");

	foreach (DataRow dr in dt.Rows)
	{
		int employeeId = Convert.ToInt32(dr["ID"]);

		decimal payrate = (decimal)df.ExecuteScalar("dbo.GetPayRate", 
					df.GetParameterIn("@EmployeeID", DbType.Int32, 0, employeeId));							
				
		payrate *= 0.20M;

		df.ExecuteProcedure("dbo.UpdatePayRate", new DbParameter[] {
					df.GetParameterIn("@EmployeeID", DbType.Int32, 0, employeeId),
					df.GetParameterIn("@Payrate", DbType.Decimal, 0, payrate),
		});
	}
}

Since this article is about multi-threading, we will convert this snippet to run in multi-threads.

Before we begin, let's go over the steps we need to run to increase an employee salary.  First, we retrieve a list of Employee ID. This list is then used by the for loop. Inside the for loop, we call a stored procedure to get the pay rate. Then we increase it by 20%. Finally, we call another stored procedure to update the pay rate.

It is important to keep in mind that the above code is reusing the same 'df' instance of the DataFactory class throughout the procedure. In multi-threading environment, this is probably one of the bad pratices to avoid.  We should not rely on this global instance, because the state of this instance is unknown during runtime.  Or worse, its state may be changed without any notice and may cause deadlocks and other unexpected behaviors. To overcome this, we will create a new instance of this DataFactory from within the scope. 

Step 1 – Setup the Multi-threading Container

First, we create a new class, let's call it MyContainer.  We want this class to inherit the multi-purpose base class, MultiThreadingBase<T,U>, class where generic T becomes a List<T> and generic U becomes a List<U>.  In our example, we are only passing in a list of DataRow; therefore, we will declare a <U> to be a List<int>.  We will go into more detail later.

Next, we override the ProcessInput method.  This method will be called on each thread.  We put our code into this method.  Finally, as we discussed,  we will create a private instance of the DataFactory on the container-level in the ctor. 

private class MyContainer : MultiThreadingBase<DataRow, int>
{
	private DataFactory df;

	public MyContainer(string connectionString)
	{
		df = new DataFactory(DatabaseProvider.SqlClient, "data source=(local); integrated security=true; initial catalog=MyCompany");
	}

	protected override void ProcessInput(MultiThreadEventArgs<DataRow, int> e)
	{
		var dr = e.Input;
		e.Results = 1; /* return a bogus result to the List<int> */

		int employeeId = Convert.ToInt32(dr["ID"]);
		decimal payrate = (decimal)df.ExecuteScalar("dbo.GetPayRate",
			df.GetParameterIn("@EmployeeID", DbType.Int32, 0, employeeId));

		payrate *= 0.20M;

		e.Result = df.ExecuteProcedure("dbo.UpdatePayRate", new DbParameter[] {
			df.GetParameterIn("@EmployeeID", DbType.Int32, 0, employeeId),
			df.GetParameterIn("@Payrate", DbType.Decimal, 0, payrate),
		});
	}
}

Step 2 – Setup the multi-threading code

We will update the originally method to support multi-threading.  First create a List<DataRow> to use as the input, populate it using the DataTable.Select() method. Finally, we start the multi-threading process via the MT.Start() method.  Notice we pass the connectionString to the constructor.

private static void GiveRaiseMT()
{
	var connectionString = "data source=(local); integrated security=true; initial catalog=MyCompany";
	var df = new DataFactory(DatabaseProvider.SqlClient, connectionString);
	var dt = df.FillTable("select ID from Employee");

	List<DataRow> mainInput = new List<DataRow>();
	mainInput.AddRange(dt.Select());

	// start mt using 16 threads
	var mainOutput = MT.Start(mainInput, 16, typeof(MyContainer), connectionString);
}

Multi-threading behind the scene

The MT.Start() method does the following for you on the backend.

  1. Create an instance of the MultiThreadingManager with the same generic <T> and <U> as the container.
  2. create XXX number of instances of the container class
  3. attach the input list.
  4. distribute the input list evenly among each threads.  (For example if there are 100 entries and 10 threads, each threads will be assigned to work with 10 entries.)
  5. start all threads
  6. wait for all threads to complete
  7. merge all lists into one single list
  8. return the list to the caller

 

The MT.Start() and MT.StartDialog() methods

More to come

The MultiThreadingManager<T,U> Class

More to come

The MultiThreadingBase<T,U> Class

More to come

DistributionType

More to come

Pros and Cons

More to come