首页 > delphi > Using Semaphores in Delphi: The Connection Pool

Using Semaphores in Delphi: The Connection Pool

2008年3月26日 发表评论 阅读评论

Semaphores are like mutexes on steroids. Not only do they provide for blocking thread synchronization, but they permit two or more threads, up to some specified maximum number, to work concurrently with a shared resource. As a result, they are unlike most other synchronization objects, which typically provide access to one thread at a time. For an introduction to the creation and use of semaphores, read the preceding article in this series on the Borland Developer Network site by clicking here.

This unique feature of a semaphore, the ability to provide two or more threads with simultaneous access to a resource, lends itself very well to the implementation of objects that contain a fixed number of resources that need to be made available in a multithreaded environment. Two common examples of objects of this type are thread pools and database connection pools. This article discusses the use of a semaphore to implement a connection pool.

Connection Pool Overview

A connection pool is an object designed to provide thread-safe access to a limited number of database connections in a multithreaded environment. In most scenarios, a connection pool contains one or more active database connections that can be used by any thread needing to work with the database.

A connection pool provides two primary benefits. The first is related to performance. Making and dropping a connection to a database is a relatively time consuming process. Depending on the environment, if each thread must make and then drop a connection each time data access is needed, a significant amount of processing power will be spent on the connection process. By comparison, a connection pool includes a fixed number of connections that can be shared among many threads. When a thread needs to work with a database, it obtains a handle to an existing connection that it uses temporarily. This reuse can dramatically reduce the number of times a connection is made and dropped, thereby producing an overall increase in performance. 

The second benefit is related to connection counts. Many database licensing schemes permit only a limited number of simultaneous connections to the database. A connection pool, therefore, becomes a convenient mechanism for ensuring that no more than some fixed maximum number of connections can be active at a given moment. While a semaphore can be used outside a connection pool to manage the maximum number of simultaneous connections, a connection pool automatically provides this feature with the addition benefit of increased performance.

The FixedConnectionPool

The fixed connection pool unit, available for download from Code Central (by clicking here), provides an implementation of a connection pool. The purpose of this code is to demonstrate the use of a semaphore to provide multithreaded access to a limited resource (the connections in the pool). If you want to use this code in a production application, it is your responsibility to certify that the connection pool meets your application’s requirements. Note also, as of this writing, this connection pool has not been tested on a multiprocessor system.

There are four type declarations used in this connection pool. One is an interface and three are classes. The interface, named IConnection, defines the formal declaration of the connection pool interface, and also provides for the lifecycle management of individual connections.

The three classes consist of the connection pool object itself, named TFixedConnectionPool, as well as two helper classes. These helper classes are TConnectionModule, a data module descendant that is created once for each open connection, and TCleanupThread, a TThread descendant that drops connections that have not been used after some specified period of time.

IConnection

Let’s begin by considering the IConnection interface. This interface declaration is shown here:

IConnection = Interface(IInterface)
//CHANGE
//To use a connection of another type, change the
//return type of the Connection function
function Connection: TSQLConnection;
function GetRefCount: Integer;
function GetLastAccess: TDateTime;
property LastAccess: TDateTime read GetLastAccess;
property RefCount: Integer read GetRefCount;
end;

The Connection function returns a SQLConnection reference, which when implemented, refers to a SQLConnection object that is actively connected to a database. If you want to modify this connection pool to use a data access technology other than dbExpress, you need to change the return type of this Connection function to some other connection type, such as an TADOConnection (ADO), TIBDatabase (Interbase Express), or TSession (BDE).

The functions GetRefCount and GetLastAccess are the accessor methods for the RefCount and LastAccess properties, respectively. RefCount returns the number of current references to the IConnection implementing object. LastAccess returns the TDateTime of the last release of the IConnection implementing object. This value is used by the cleanup thread object to test whether the IConnection implementing object has gone unused for an extended period of time, and if so, to drop the unused connection.

TConnectionModule

The actual database connection is provided for by the TConnectionModule class. This class is a TDataModule descendant, and the connection component appears on this data module, as shown in the following figure.

 

TConnectionModule implements the IConnection interface. The SQLConnection that appears on this data module is the reference that is returned when the Connection method is invoked. As mentioned in the preceding discussion of the IConnection interface, if you want to use a connection other than dbExpress, you will replace this SQLConnection component with one that provides a connection using an alternative data access mechanism.

The following is the declaration of the TConnectionModule class. This class is commented extensively, and I am leaving these comments in this code segment for clarity.

//This data module provides the implementation
//of the IConnection interface. To use a data access
//mechanism other than dbExpress, modify the components
//that appear on this data module, and change the class
//of the Connection function in the IConnection interface
//as well as in this class.
TConnectionModule = class(TDataModule, IConnection)
SQLConnection1: TSQLConnection;
private
{ Private declarations }
protected
FRefCount: Integer;
FLastAccess: TDateTime;
//When the data module is created the
//connection pool that creates the data module
//will assign its critical section to this field.
//The data module will use this critical section
//to synchronize access to its reference count.
CriticalSection: TCriticalSection;
//This semaphore points to the FixedConnectionPool's
//semaphore. It will be used to call ReleaseSemaphore
//from the _Release method of the TDataModule.
Semaphore: THandle;
//These two static methods are reintroduced
//in order to implement lifecycle management
//for the interface of this object.
//Normally, unlike normal COM objects, Delphi
//TComponent descendants are not lifecycle managed
//when used in interface references.
function _AddRef: Integer; stdcall;
function _Release: Integer; stdcall;
{IConnection methods}
function GetLastAccess: TDateTime;
function GetRefCount: Integer;
public
{ Public declarations }
{IConnection method}
//CHANGE
//To use a connection of another type, change the
//return type of the Connection function
function Connection: TSQLConnection;
end;

The Connection, GetLastAccess, and GetRefCount methods are implemented in this class to satisfy the implementation of the IConnection interface. The _AddRef and _Release static methods are reintroduced in this class to implement lifecycle management on the interface. This is necessary since the _AddRef and _Release methods introduced in TComponent (a TDataModule ancestor), do not implement lifecycle management through reference counting for VCL (visual component library) objects. This lifecycle management plays a critical role in the management of the connection pool.

The TConnectionModule class also contains two member fields, CriticalSection and Semaphore. Both of these members are used to point to a TCriticalSection and a semaphore (a THandle), both of which are maintained by the TFixedConnectionPool class (and are assigned to these members by the TFixedConnectionPool instance when it creates an instance of the connection module). 

The critical section class is used to synchronize access to the internal reference count of the TConnectionPool instances, and the semaphore controls access to instances of the TConnectionModule class. This is shown in the _AddRef and _Release method implementations, shown here.

function TConnectionModule._AddRef: Integer;
begin
//increment the reference count
CriticalSection.Enter;
try
Inc(FRefCount);
Result := FRefCount;
finally
CriticalSection.Leave;
end;
end;

function TConnectionModule._Release: Integer;
begin
//decrement the reference count
CriticalSection.Enter;
try
Dec(FRefCount);
Result := FRefCount;
//if not more references, call Destroy
if Result = 0 then
Destroy
else
Self.FLastAccess := Now;
finally
CriticalSection.Leave;
if FRefCount = 1 then
ReleaseSemaphore(Semaphore, 1, nil);
end;
end;

Notice that the critical section is locked before the internal reference count can be incremented or decremented. Also, notice that if the reference count of a TConnectionModule instance drops to one that ReleaseSemaphore is called. When the internal reference count on a TConnectionModule instance drops to one, the thread that was using the connection has released it, and the only remaining reference to the connection module is held by the connection pool itself.

TFixedConnectionPool

The connection pool is implemented by the TFixedConnectionPool class, shown in the following code segment. Once again, the extensive comments have been left in the code for clarity. 

//This is the class that manages the connection pool
TFixedConnectionPool = class(TObject)
private
FPool: array of IConnection;
FPoolSize: Integer;
FTimeout: LargeInt;
CleanupThread: TCleanupThread;
//This semaphore is used to limit the number of
//simultaneous connections. When the nth+1 connection
//is requested, it will be blocked until a connection
//becomes available.
Semaphore: THandle;
//This is the critical section that synchronizes
//access to the connection module reference counts
CriticalSection: TCriticalSection;
public
//This overloaded constructor takes two optional
//parameters. These parameters specify the size
//of the connection pool, as well as how long idle
//connections in the connection pool will be kept.
constructor Create(const PoolSize: Integer = 10;
const CleanupDelayMinutes: Integer = 5;
const Timeoutms: LargeInt = 10000); overload;
destructor Destroy; override;
//This function returns an object
//that implements the IConnection interface.
//This object can be a data module, as was
//done in this example.
function GetConnection: IConnection;
end;

One of the more notable elements of this declaration is the FPool member, which is a dynamic array of IConnection references. This array provides the list of pointers to the connections in the connection pool.

The connection pool is configured in the overloaded Create constructor (Create is static in TObject, the TConnectionPool ancestor, which is why it is not overridden). The constructor is shown here.

constructor TFixedConnectionPool.Create(const PoolSize: Integer = 10;
const CleanupDelayMinutes: Integer = 5;
const Timeoutms: LargeInt = 10000);
begin
FPoolSize := PoolSize;
FTimeout := Timeoutms;
Semaphore := CreateSemaphore(nil, PoolSize, PoolSize, '');
CriticalSection := TCriticalSection.Create;
//Set the length of the connection pool
SetLength(FPool, PoolSize);
//Create and start the cleanup thread
CleanupThread := TCleanupThread.Create(True,
CleanupDelayMinutes);
with CleanupThread do
begin
FreeOnTerminate := True;
Priority := tpLower;
FixedConnectionPool := Self;
Resume;
end;
end;

As you can see from this code, the constructor stores the cleanup delay and timeout values in member fields of the connection pool object. It then creates the semaphore, setting the number of available locks and the maximum number of locks to the size of the pool. As a result, initially the connection pool is created with all connections available. Also, the critical section, which as described earlier is used to synchronize access to TConnectionModule instance reference counts, is created.

Next, the length of the dynamic array of IConnection references, FPool, is set to the number of available connections. This dynamic array is used to hold internal references to active connections (TConnectionModule instances), thereby preventing them from being released (until the TConnectionPool class is destroyed, or the cleanup thread determines that one or more of the connections have gone unused for an extended period of time).

Finally, an instance of the cleanup thread is created. This class is discussed in detail later in this article.

The most important method of the TConnectionPool class is the GetConnection method. This method returns an IConnection (implemented by TConnectionModule), so long as one is available (as controlled by the semaphore). The implementation of this method is shown here:

function TFixedConnectionPool.GetConnection: IConnection;
var
i: Integer;
DM: TConnectionModule;
WaitResult: Integer;
begin
Result := nil;
WaitResult := WaitForSingleObject(Semaphore, FTimeout);
if WaitResult <> WAIT_OBJECT_0 then
raise EConnPoolException.Create('Connection pool timeout. '+
'Cannot obtain a connection');
CriticalSection.Enter;
try
for i := Low(FPool) to High(FPool) do
begin
//If FPool[i] = nil, the IConnection has
//not yet been created. Create it, initialize
//it, and return it. If FPool[i] <> nil, then
//check to see if its RefCount = 1 (only the pool
//is referencing the object).
if FPool[i] = nil then
begin
DM := TConnectionModule.Create(nil);
DM.CriticalSection := Self.CriticalSection;
DM.Semaphore := Self.Semaphore;
FPool[i] := DM;
FPool[i].Connection.Connected := True;
Result := FPool[i];
Exit;
end;
//if FPool[i].FRefCount = 1 then
//the connection is available. Return it.
if FPool[i].RefCount = 1 then
begin
Result := FPool[i];
Exit;
end;
end; //for
finally
CriticalSection.Leave;
end;
end;

As you can see from this code, when GetConnection is invoked, WaitForSingleObject is called using the connection pool’s semaphore. If a connection is available, WaitForSingleObject returns immediately, and the available count on the semaphore is reduced by one. If a connection is not currently available, the thread on which GetConnection is invoked is blocked until either a connection becomes available, or the connection timeout expires. If the connection timeout expires, an exception is raised.

When WaitForSingleObject returns Wait_Object_0, the thread invoking GetConnection is being granted access to the connection pool. This means that either there is one or more connections that have not yet been connected, or a previously connected connection is now available (its reference count is one).

This code then enters the critical section (since it might read the RefCount property of the connection module). It then iterates through the FPool dynamic array. It tests each element of this dynamic array for a value of nil, which signals that this connection has not yet been connected. If the element of FPool is nil, an instance of TConnectionModule is created and assigned to this array element (which increments its reference count to one). It is during this process that the critical section and semaphore reference of the connection pool object is passed to the connection module.  Finally, the IConnection implementing object is returned (which increments its reference count to two). 

Each time a given element of the dynamic array is found to be not nil, the loop next tests to see if the reference count of the current element is one. If it is one, that means the connection module was previously created and connected, but is not currently in use. In that case, that connection module is returned (which increments its reference count to two).

Once an available connection has returned, the critical section is exited.

Let’s consider the semaphore for a moment. When GetConnection is called, one of the available locks on the semaphore is taken when a connection can be obtained. This lock on the semaphore will be released back to the semaphore when the connection is no longer used by an external object (one outside the connection pool). This is signaled when the reference count decrements to one. This code was shown earlier in the TConnectionModule’s _Release method.

TCleanupThread

Just about all relevant code concerning this connection pool has been discussed, with the exception of the TCleanupThread class. This class implements a low priority thread that periodically wakes up and inspects the LastAccess property of all instances of the TConnectionModule class. While this operation is not essential for a connection pool, it permits a connection pool to limit the resources it uses on a database server by releasing connections when they are no longer being used.

The following is the declaration of the TClearnupThread class:

//This thread class is used by the connection pool
//object to cleanup idle connections after a
//configurable period of time.
TCleanupThread = class(TThread)
private
FCleanupDelay: Integer;
protected
//When the thread is created, this critical section
//field will be assigned the connection pool's
//critical section. This critical section is
//used to synchronize access to data module
//reference counts.
CriticalSection: TCriticalSection;
FixedConnectionPool: TFixedConnectionPool;
procedure Execute; override;
constructor Create(CreateSuspended: Boolean;
const CleanupDelayMinutes: Integer);
end;

As you can see, the TCleanupThread class contains TCriticalSection and TFixedConnectionPool member fields. As you saw in the TFixedConnetionPool constructor shown earlier in this article, these members are assigned instances of the fixed connection pool’s critical section and a reference to the connection pool itself, respectively. 

The following code shows the initialization of the TCleanupThread class in its constructor:

constructor TCleanupThread.Create(CreateSuspended: Boolean;
const CleanupDelayMinutes: Integer);
begin
// always create suspended
inherited Create(True); // always create suspended
FCleanupDelay := CleanupDelayMinutes;
//Resume if not created suspended
if not CreateSuspended then
Resume;
end;

As you can see from this overloaded constructor, it begins by creating an instance of the thread as a suspended thread. It then stores the value of the ClearnupDelayMinutes formal parameter in a member field. Then, if the thread was created with a CreateSuspended formal parameter value of False, it resumes the thread.

The code in a thread is executed in that thread’s Execute method. The following is the Execute method of this thread:

procedure TCleanupThread.Execute;
var
i: Integer;
begin
while True do
begin
if Terminated then Exit;
//sleep for delay
sleep(FCleanupDelay * 1000 * 60);
if Terminated then Exit;
FixedConnectionPool.CriticalSection.Enter;
try
for i := low(FixedConnectionPool.FPool) to
High(FixedConnectionPool.FPool) do
//if the connection exists, has no external reference,
//and has not been used lately, release it
if (FixedConnectionPool.FPool[i] <> nil) and
(FixedConnectionPool.FPool[i].RefCount = 1) and
(MinutesBetween(FixedConnectionPool.FPool[i].LastAccess, Now) >
FCleanupDelay) then
FixedConnectionPool.FPool[i] := nil;
finally
FixedConnectionPool.CriticalSection.Leave;
end;//try
end;//while
end;

When the thread executes, and after it verifies that it has not been terminate, it calls sleep for some number of minutes defined by the FCleanupDelay value. After the cleanup delay expires, the thread once again checks to see if it has been terminated, after which is enters the critical section and iterates through the array of IConnection references. For each connection that is not nil and whose reference count is not one, it checks to see if the connection has gone unused for greater than the cleanup delay interval. When an unused connection is found, the corresponding value of the IConnection dynamic array is set to nil, which causes the associated TConnectionModule object to be destroyed (through the interface reference count mechanism). This has the effect of closing the connection.

The TFixedConnectionPool Destructor

A number of resources are used by the TFixedConnectionPool class, including the semaphore and critical section. These need to be cleaned up when the fixed connection pool is destroyed. These tasks are performed by the overridden Destroy destructor of the TFixedConnectionPool class, as shown in the following implementation:

destructor TFixedConnectionPool.Destroy;
var
i: Integer;
begin
//Free any remaining connections
CleanupThread.Terminate;
CriticalSection.Enter;
try
for i := Low(FPool) to High(FPool) do
FPool[i] := nil;
SetLength(FPool,0);
finally
CriticalSection.Leave;
end;
CriticalSection.Free;
//Release the semaphore
CloseHandle(Semaphore);
inherited;
end;

Here you see that the destructor begins by terminating the cleanup thread. Next, it enters the critical section and iterates through the connections in the connection pool, setting each to nil (thereby destroying the corresponding TConnectionModule instance). The IConnection dynamic array is then set to a size of zero. Finally, the critical section is exited, the critical section is freed, after which the handle of the semaphore closed.

Using the Fixed Connection Pool

Before you can use the TFixedConnectionPool class, you must invoke its constructor. This is shown in the following pseudo code:

var
ConnPool: TFixedConnectionPool;

begin
ConnPool := TFixedConnectionPool.Create(10, 5, 20000);

Here the connection pool is created with a maximum of 10 connections. Unused connections may be cleaned up after five minutes, and an exception will be raised if a connection cannot be obtained within 20 seconds.

The GetConnection method of the connection pool is used to return a connection module, which you must assign to an IConnection interface reference. You then use the returned interfaced object to obtain the actual connection by reading its connection property. This is shown in the following pseudo code:


var
SQLDataSet: TSQLDataSet;
Conn: IConnection;
begin
SQLDataSet := TSQLDataSet.Create(nil);
try
SQLDataSet.CommandText := 'select * from employee';
Conn := ConnPool.GetConnection;
SQLDataSet.SQLConnection := Conn.Connection;
SQLDataSet.Open;
//do something with the result
finally
SQLDataSet.Free;
end;
//The connection is released when Conn goes out of scope
end;

分类: delphi 标签: 7,972 次阅读
原文链接:http://www.wenhq.com/article/view_136.html
欢迎转载,请注明出处:亲亲宝宝
  1. 本文目前尚无任何评论.
  1. 本文目前尚无任何 trackbacks 和 pingbacks.