请问下大家有没有线程池的示例(操作socket的相关程序创建的线程池类)

xiehui888 2008-05-27 12:19:44
求一个操作的相关线程池类的源码,希望大家给个示例,小弟我感谢你们!!!!!!!!!!!!!
...全文
552 25 打赏 收藏 转发到动态 举报
写回复
用AI写文章
25 条回复
切换为时间正序
请发表友善的回复…
发表回复
dbgchen 2011-06-30
  • 打赏
  • 举报
回复
mark
xiehui888 2008-12-15
  • 打赏
  • 举报
回复
僵哥我给你全部的分啦
xiehui888 2008-09-01
  • 打赏
  • 举报
回复
僵哥完成端口就是采用线程池的模型吗?
xiehui888 2008-06-02
  • 打赏
  • 举报
回复
谢谢僵哥
chenyq2008 2008-06-02
  • 打赏
  • 举报
回复
mark
dd_zhouqian 2008-06-02
  • 打赏
  • 举报
回复
不错!
laowang2 2008-06-02
  • 打赏
  • 举报
回复
僵哥 2008-05-27
  • 打赏
  • 举报
回复
//续上 unit IOCPUnit;

const
SHUTDOWN_FLAG = $FFFFFFFF; // Posted to the completion port when shutting down
(******************************************************************************)
(* *)
(* <<以下为实现部分>> *)
(* *)
(******************************************************************************)
(* 模 块 名: 完成端口封装 *)
(* 作 者: Unsigned(僵哥) *)
(* 说 明: 提供线程队列计数,以方便性能检验机制的执行,当线程队列计数较小时, *)
(* 通常有两种可能性,各线程的作业处理时间过长,或者中间产生死锁情况, *)
(* 由此可以再根据线程处理过程的其它计数,判断是否线程数量饱和或不足, *)
(* 线程数量饱和通常的表现为资源占用较高,否则可以适当增加处理线程 *)
(* *)
(* 采用引用计数机制以保证队列的正常释放并防止内存访问违例事件的发生 *)
(* 备 注: *)
(******************************************************************************)
implementation

class function TIOCP.Attach(lpCompletion: TIOCP): TIOCP;
var
Management : Integer;
begin
Result := nil;
if Not Assigned(lpCompletion) then
Exit;
Management := lpCompletion.Attach;
if Management = 1 then
begin
lpCompletion.Detach;
Exit;
end;
Result := lpCompletion;
end;

function TIOCP.CreateIoCompletionPort( IOHandle : Cardinal
; IOContextKey : Cardinal
; Concurrent : Cardinal
): THandle;
begin
Result := Windows.CreateIoCompletionPort( IOHandle
, FHandle
, IOContextKey
, Concurrent
);
end;

function TIOCP.AssociateWith(IOHandle: Cardinal; IOContextKey: Cardinal):BOOL;
begin
result := (CreateIoCompletionPort(IOHandle,IOContextKey)=self.FHandle);
end;

function TIOCP.GetQueuedCompletionStatus( var BytesTransferred : Cardinal
; var IOContextKey : Cardinal
; var Overlapped : POverlapped
; dwMilliseconds : Cardinal
): BOOL;
begin
InterlockedIncrement(FQueueThreadSize);
Result := Windows.GetQueuedCompletionStatus( FHandle
, BytesTransferred
, IOContextKey
, Overlapped
, dwMilliseconds
);
InterlockedDecrement(FQueueThreadSize);
end;

function TIOCP.PostQueuedCompletionStatus( lpBytesTransferred : Cardinal
; dwCompletionKey : Cardinal
; lpOverlapped : POverlapped
): BOOL;
begin
if FIsShutingDown then
begin
Result := False;
Exit;
end;
Result := Windows.PostQueuedCompletionStatus( FHandle
, lpBytesTransferred
, dwCompletionKey
, lpOverlapped
);
end;

function TIOCP.PostThreadQuitFlag: BOOL;
begin
Result := Windows.PostQueuedCompletionStatus( FHandle, 0
, 0
, POverLapped(SHUTDOWN_FLAG)
);
end;

function TIOCP.Attach : Integer;
begin
Result := InterlockedIncrement(FManagementCount);
end;

function TIOCP.Detach;
begin
Result := InterlockedDecrement(FManagementCount);
end;

procedure TIOCP.Free;
begin
if not (self <> nil) then
Exit;
if Detach > 0 then
Exit;
Inherited Free;
end;

constructor TIOCP.Create;
begin
Inherited;
FIsShutingDown := False;
FHandle := INVALID_HANDLE_VALUE;
FQueueThreadSize := 0;
FManagementCount := 0;
FHandle := Windows.CreateIoCompletionPort( INVALID_HANDLE_VALUE
, 0
, 0
, 0
);
InterlockedIncrement(FManagementCount);

end;

procedure TIOCP.ShutDownAll;
var
I : Integer;
begin

I := FQueueThreadSize;
while I > 0 do
begin
PostThreadQuitFlag;
Dec(I);
end;
//while FQueueThreadSize > 0 do Sleep(1);
end;

procedure TIOCP.ShutDownAllEx;
begin
FIsShutingDown := True;
ShutDownAll;
end;

Destructor TIOCP.Destroy;
begin
ShutDownAll;
if FHandle <> INVALID_HANDLE_VALUE then
CloseHandle(FHandle);
FHandle := INVALID_HANDLE_VALUE;
end;
end.
僵哥 2008-05-27
  • 打赏
  • 举报
回复
(******************************************************************************)
(* 模 块 名: IOCPUnit.Pas *)
(* 别 名: 完成端口封装 *)
(* 作 者: Unsigned(僵哥) *)
(* 说 明: 提供线程队列计数,以方便性能检验机制的执行,当线程队列计数较小时, *)
(* 通常有两种可能性,各线程的作业处理时间过长,或者中间产生死锁情况, *)
(* 由此可以再根据线程处理过程的其它计数,判断是否线程数量饱和或不足, *)
(* 线程数量饱和通常的表现为资源占用较高,否则可以适当增加处理线程 *)
(* *)
(* 采用引用计数机制以保证队列的正常释放并防止内存访问违例事件的发生 *)
(* *)
(******************************************************************************)
unit IOCPUnit;

interface
uses
Windows;
type
TIOCP=class(TObject)
strict private
FHandle : THandle; //完成端口句柄
FQueueThreadSize : Integer; //线程队列
FManagementCount : Integer; //引用计数
FIsShutingDown : Boolean;

public
property QueueThreads : Integer read FQueueThreadSize; //线程队列
property ManagementCount : Integer read FManagementCount; //引用计数

public
(*====================================================================*)
(*** IO句柄与完成端口关联 ***)
(*--------------------------------------------------------------------*)
(* 传入参数: *)
(* IOHandle : IO句柄 (文件句柄,管道句柄,Socket等句柄类IO资源) *)
(* IOContextKey : IO上下文关键字 (描述 IOHandle 相关的上下文数据, *)
(* 结构体或者类指针等标识性信息 ) *)
(* Concurrent : 并发线程数,通常为0 *)
(* 返 回 值: 完成端口句柄 *)
(*--------------------------------------------------------------------*)
function CreateIoCompletionPort( IOHandle : THandle
; IOContextKey : DWORD
; Concurrent : DWORD = 0
): THandle;
function AssociateWith ( IOHandle : THandle
; IOContextKey : DWORD
) : BOOL;

(*====================================================================*)
(*** 完成队列检索 ***)
(*--------------------------------------------------------------------*)
(* 传入参数: *)
(* dwMilliseconds : 等待时间(单位:毫秒),同WaitForSingleObject *)
(* 输出参数: *)
(* BytesTransferred : 传输完成字节数,0表示IO关闭,或者出现异常 *)
(* IOContextKey : IO上下文关键字 *)
(* Overlapped : 重叠IO结构指针 *)
(* 返 回 值: *)
(* 成功:True,失败/出错:False(请使用GetLastError取得具体错误) *)
(*--------------------------------------------------------------------*)
function GetQueuedCompletionStatus( var BytesTransferred : DWORD
; var IOContextKey : DWORD
; var Overlapped : POverlapped
; dwMilliseconds : DWORD
): BOOL;

(*====================================================================*)
(*** 发送完成事件 ***)
(*--------------------------------------------------------------------*)
(* 传入参数: *)
(* lpBytesTransferred : 与 GetQueuedCompletionStatus 相对应 *)
(* dwCompletionKey : 与 GetQueuedCompletionStatus 相对应 *)
(* lpOverlapped : 与 GetQueuedCompletionStatus 相对应 *)
(* *)
(* 返回值: 成功:True,失败/出错:False(使用GetLastError取得具体错误) *)
(*--------------------------------------------------------------------*)
function PostQueuedCompletionStatus( lpBytesTransferred : DWORD
; dwCompletionKey : DWORD
; lpOverlapped : POverlapped
): BOOL;

(*====================================================================*)
(*** 发送一个特定的完成事件 ***)
(*--------------------------------------------------------------------*)
(* 返 回 值: 成功:True,失败/出错:False(使用GetLastError取得具体错误) *)
(* 备 注: PostQueuedCompletionStatus(0,0,SHUTDOWN_FLAG) *)
(*--------------------------------------------------------------------*)
function PostThreadQuitFlag : BOOL;

(*====================================================================*)
(*** 增加引用计数 ***)
(*--------------------------------------------------------------------*)
(* 返 回 值: 当前引用计数 *)
(*--------------------------------------------------------------------*)
function Attach : Integer; overload;

(*====================================================================*)
(*** 减少引用计数 ***)
(*--------------------------------------------------------------------*)
(* 返 回 值: 剩余引用计数 *)
(* 备 注: 减少引用计数,但不理会引用计数是否为0,请使用 Free 代替 *)
(*--------------------------------------------------------------------*)
function Detach : Integer;

(*====================================================================*)
(*** 为每对列当中的每一个线程发送一个特定的完成事件 ***)
(*--------------------------------------------------------------------*)
(* 返 回 值: 成功:True,失败/出错:False(使用GetLastError取得具体错误) *)
(* 备 注: PostQueuedCompletionStatus(0,0,SHUTDOWN_FLAG) *)
(*--------------------------------------------------------------------*)
procedure ShutDownAll;
procedure ShutDownAllEx;

(*====================================================================*)
(*** 减少引用计数 ***)
(*--------------------------------------------------------------------*)
(* 备 注: 当引用计数减少为0时,对象被自动释放 *)
(*--------------------------------------------------------------------*)
procedure Free;

public
constructor Create;
destructor Destroy; override;
public
class function Attach(lpCompletion: TIOCP): TIOCP; Overload;
end;
僵哥 2008-05-27
  • 打赏
  • 举报
回复
//续上 unit UnitSyncProc;
procedure TSyncProcPool.Reduce;
var
I : Integer;
lpSyncProc : TSyncProc;
begin
with FMemoryList.LockList do
try
for I := Count downto 1 do
begin
lpSyncProc := Items[I-1];
if Not lpSyncProc^.Using then
begin
Dec(FLastMember);
delete(I-1);
InternalFreeSyncProc(lpSyncProc);

end;
end;
finally
FMemoryList.UnlockList;
end;
end;

function TSyncProcPool.CreateSyncProc:TSyncProc;
var
lpSyncProc : TSyncProc;
begin
Result := nil;
if Not Assigned(FMemoryList) then
Exit;

with FMemoryList.LockList do
try
if (FLastMember<>FFirstMember) and (FLastUsing<>FLastMember) then
begin
Inc(FLastUsing);
lpSyncProc := Items[FLastUsing];
Result := lpSyncProc;
end;
if Result = nil then
begin
lpSyncProc := CreateNewSyncProc;
if lpSyncProc = nil then
Exit;

lpSyncProc^.WhereXY := Add(lpSyncProc);
Result := lpSyncProc;
Inc(FLastMember);
FLastUsing := FLastMember;
end;

Result^.Using := true;
self.Attach;
finally
FMemoryList.UnlockList;
end;

end;

procedure TSyncProcPool.FreeSyncProc(lpSyncProc: TSyncProc);
var
lpTempSyncProc : TSyncProc;
begin
if Not (lpSyncProc <> nil) then
Exit;
if lpSyncProc^.Owner=nil then
Exit;
if lpSyncProc^.Owner<>self then
begin
lpSyncProc^.Owner.FreeSyncProc(lpSyncProc);
Exit;
end;
with FMemoryList.LockList do
try

lpSyncProc^.Using := false;
if FLastUsing >= 0 then
begin
lpTempSyncProc := Items[FLastUsing];
if lpTempSyncProc <> lpSyncProc then
begin
lpTempSyncProc^.WhereXY := lpSyncProc^.WhereXY;
lpSyncProc^.WhereXY := FLastUsing;
Items[FLastUsing] := lpSyncProc;
Items[lpTempSyncProc^.WhereXY] := lpTempSyncProc;
end;
Dec(FLastUsing);

end;
finally
FMemoryList.UnlockList;
self.Free;
end;
end;



procedure TSyncProcPool.RemoveSyncProcNode(lpSyncProc: TSyncProc);
var
I : Integer;
begin
if Not (lpSyncProc <> nil) then
Exit;
if lpSyncProc^.Owner=nil then
Exit;
if lpSyncProc^.Owner<>self then
begin
lpSyncProc^.Owner.RemoveSyncProcNode(lpSyncProc);
Exit;
end;
with FMemoryList.LockList do
try
for I := Count - 1 downto 0 do
begin
if lpSyncProc = Items[I] then
begin
Delete(I);
InternalFreeSyncProc(lpSyncProc);
Dec(FLastMember);
break;
end;
end;
finally
FMemoryList.UnlockList;
end;
end;

function TSyncProcPool.Attach:Integer;
begin
Result := InterlockedIncrement(FManagementCount);
end;

function TSyncProcPool.Detach:Integer;
begin
Result := InterlockedDecrement(FManagementCount);
end;

constructor TSyncProcPool.Create;
begin
Inherited;
FManagementCount := 1;
FLastMember := FFirstMember;
FLastUsing := FFirstMember;
FMemoryList := TThreadList.Create;
end;

destructor TSyncProcPool.Destroy;
var
I : Integer;
begin
with FMemoryList.LockList do
try
for I := 0 to Count - 1 do
begin
InternalFreeSyncProc(Items[I]);
end;
finally
FMemoryList.UnlockList;
FMemoryList.Free;
end;
end;

procedure TSyncProcPool.Free;
begin
if Detach>0 then
Exit;
Inherited Free;
end;

initialization
SyncProcPool := TSyncProcPool.Create;
finalization
SyncProcPool.Free;
end.
僵哥 2008-05-27
  • 打赏
  • 举报
回复
unit UnitSyncProc;

interface

uses
Windows
, Classes
, DateUtils
, SysUtils;
type
TObjectProc = procedure of Object;
TObjectProc1 = procedure (Param0:Pointer) of Object;
TObjectProc2 = procedure (Param0:Pointer;Param1:Pointer) of Object;
TObjectProc3 = procedure (Param0:Pointer;Param1:Pointer;Param2:Pointer) of Object;
TObjectProc4 = procedure (Param0:Pointer;Param1:Pointer;Param2:Pointer;Param3:Pointer) of Object;
TObjectProc5 = procedure (Param0:Pointer;Param1:Pointer;Param2:Pointer;Param3:Pointer;Param4:Pointer) of Object;

TTimeredType = ( tmNone
, tmOnce
, tmPerSecond
, tmPerMinute
, tmPerHour
, tmPerDay
, tmPerWeek
, tmPerMonth
, tmOften
);

TSyncProcPool = class;
TSyncProc = ^_SyncProc;
_SyncProc = packed Record
//--Used by Pool Manager
Owner : TSyncProcPool;
WhereXY : Integer;
Using : Boolean;
//----------------------
TimeredType : TTimeredType;
ParamCount : Byte;
IsSyncCall : Boolean;
//定时器任务

InitDay : Integer;//yyyymmdd
InitTime : Integer;//hhnnss

ExecTimes : Integer;
ExecTime : TDateTime;
TimeOutTime : TDateTime;

Param0 : Pointer;
Param1 : Pointer;
Param2 : Pointer;
Param3 : Pointer;
Param4 : Pointer;
ObjectedProc : TObjectProc;
function CheckRun:Boolean;
procedure Run;
end;

TSyncProcPool = class(TObject)
const FFirstMember : Integer = -1;
strict private
FMemoryList : TThreadList; //线程安全的内存切点列表

FLastUsing : Integer; //最后使用指针
FLastMember : Integer; //最后指针
FManagementCount : Integer; //内存池对象引用计数


strict private


//分配一个dwBytes大小的内存节点
function CreateNewSyncProc : TSyncProc;
//
procedure InternalFreeSyncProc( lpSyncProc : TSyncProc );
protected
procedure RemoveSyncProcNode ( lpSyncProc : TSyncProc );

public
function CreateSyncProc : TSyncProc; overload;
procedure FreeSyncProc ( lpSyncProc : TSyncProc );
procedure Reduce;
procedure FreeAllSyncProcs;
public
Property ManagementCount : Integer read FManagementCount;
public
procedure Free;
function Attach : Integer; overload;
function Detach : Integer;
procedure Init( const newPoolSize : Integer = 0);
public
constructor Create;
destructor Destroy;override;
public
class function Attach(lpSyncProcPool:TSyncProcPool): TSyncProcPool; overload;
end;
var
SyncProcPool : TSyncProcPool=nil;
implementation
//日志
procedure WriteLog( const s: string);
begin
end;

function HeapAllocEx(dwBytes: DWORD): Pointer;
begin
Result := HeapAlloc(GetProcessHeap, HEAP_ZERO_MEMORY, dwBytes);
end;

function HeapReallocEx(lpMem:Pointer;dwBytes: DWORD): Pointer;
begin
Result := HeapRealloc(GetProcessHeap, HEAP_ZERO_MEMORY, lpMem, dwBytes);
end;

function HeapFreeEx(lpMem:Pointer):BOOL;
begin
Result := HeapFree( GetProcessHeap, 0, lpMem);
end;



function _SyncProc.CheckRun:Boolean;
var
TimeStr:String;
begin
Result := false;
case self.TimeredType of
tmNone: Result := True;
tmOnce:
begin
TimeStr := FormatDatetime('yyyymmddhhnnss',Now);
if (TimeStr >= FormatDatetime('yyyymmddhhnnss',ExecTime)) then
begin
if (TimeOutTime > 0.000000001) and (TimeStr > FormatDatetime('yyyymmddhhnnss',TimeOutTime)) then
Exit;
Result := True;
end;

end;
tmPerSecond:
begin
if SecondOf(Now)<> (InitTime mod 100) then
Result := True;
end;
tmPerMinute:
begin
if MinuteOf(Now)<> ((InitTime div 100) mod 100) then
Result := True;
end;
tmPerHour:
begin
if HourOf(Now)<> ((InitTime div 10000) mod 100) then
Result := True;
end;
tmPerDay:
begin
//if SecondOf(Now)<> (InitTime mod 100) then
// Result := True;
end;
tmPerWeek:
begin
//if SecondOf(Now)<> (InitTime mod 100) then
// Result := True;
end;
tmPerMonth:
begin
//if SecondOf(Now)<> (InitTime mod 100) then
// Result := True;
end;
tmOften: Result := True;
end;
end;
procedure _SyncProc.Run;
begin
if Not Assigned(ObjectedProc) then
Exit;
case ParamCount of
1:TObjectProc1(ObjectedProc)(Param0);
2:TObjectProc2(ObjectedProc)(Param0,Param1);
3:TObjectProc3(ObjectedProc)(Param0,Param1,Param2);
4:TObjectProc4(ObjectedProc)(Param0,Param1,Param2,Param3);
5:TObjectProc5(ObjectedProc)(Param0,Param1,Param2,Param3,Param4);
else
ObjectedProc;
end;
end;

class function TSyncProcPool.Attach(lpSyncProcPool: TSyncProcPool): TSyncProcPool;
var
ManagementCount : Integer;
begin
Result := nil;

if Not Assigned(lpSyncProcPool) then
Exit;

ManagementCount := lpSyncProcPool.Attach ;
if ManagementCount = 1 then
begin
lpSyncProcPool.Detach;
Exit;
end;

Result := lpSyncProcPool;
end;


procedure TSyncProcPool.InternalFreeSyncProc( lpSyncProc : TSyncProc);
begin
if Not (lpSyncProc <> nil) then
Exit;
HeapFreeEx(lpSyncProc);
end;

function TSyncProcPool.CreateNewSyncProc : TSyncProc;
begin
Result := TSyncProc(HeapAllocEx(sizeof(_SyncProc)));
if Result<>nil then
Result^.Owner:=self;
end;

procedure TSyncProcPool.Init(const newPoolSize: Integer );
var
I : Integer;
lpSyncProc : TSyncProc;
begin
with FMemoryList.LockList do
try
I := Count;
for I := I to newPoolSize do
begin
lpSyncProc := CreateNewSyncProc;
if lpSyncProc = nil then
break;
lpSyncProc^.Using := false;
lpSyncProc^.WhereXY := Add(lpSyncProc);
Inc(FLastMember);
end;
finally
FMemoryList.UnlockList;
end;
end;

procedure TSyncProcPool.FreeAllSyncProcs;
var
I:Integer;
begin
try
with FMemoryList.LockList do
try
for I := 0 to Count - 1 do
begin
if TSyncProc(Items[I])^.Using then
begin
FreeSyncProc(Items[I]);
end;
end;
finally
FMemoryList.UnlockList;
end;
Except
on E: Exception do
begin
WriteLog('Exception: TActionPool.FreeAllActions; '+ E.Message);
end;
end;
end;

僵哥 2008-05-27
  • 打赏
  • 举报
回复
//续上 unit HSSyncThread;
function UserSyncProc( ObjectProc : TObjectProc4
; IsSyncCall : Boolean
; Param0 : Pointer
; Param1 : Pointer
; Param2 : Pointer
; Param3 : Pointer
): BOOL;
var
CurrentSyncProc:TSyncProc;
begin
Result := False;
if Not Assigned(ObjectProc) then
Exit;
if Not Assigned(SyncThread) then
Exit;
if Not Assigned(SyncProcPool) then
Exit;
CurrentSyncProc := SyncProcPool.CreateSyncProc;
if Not Assigned(CurrentSyncProc) then
Exit;

CurrentSyncProc^.TimeredType := tmNone;

CurrentSyncProc^.IsSyncCall := IsSyncCall;
CurrentSyncProc^.InitDay := 0;
CurrentSyncProc^.InitTime := 0;
CurrentSyncProc^.ExecTimes := 1;
CurrentSyncProc^.ExecTime := 0;
CurrentSyncProc^.TimeOutTime := 0;

//Modify It!
CurrentSyncProc^.ParamCount := 4;
CurrentSyncProc^.Param0 := Param0;
CurrentSyncProc^.Param1 := Param1;
CurrentSyncProc^.Param2 := Param2;
CurrentSyncProc^.Param3 := Param3;
CurrentSyncProc^.Param4 := Nil;
//Modify It!

CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc);

result := SyncThread.InQueue(CurrentSyncProc);
if Not Result then
CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
end;

function UserSyncProc( ObjectProc : TObjectProc5
; IsSyncCall : Boolean
; Param0 : Pointer
; Param1 : Pointer
; Param2 : Pointer
; Param3 : Pointer
; Param4 : Pointer
): BOOL;
var
CurrentSyncProc:TSyncProc;
begin
Result := False;
if Not Assigned(ObjectProc) then
Exit;
if Not Assigned(SyncThread) then
Exit;
if Not Assigned(SyncProcPool) then
Exit;
CurrentSyncProc := SyncProcPool.CreateSyncProc;
if Not Assigned(CurrentSyncProc) then
Exit;

CurrentSyncProc^.TimeredType := tmNone;

CurrentSyncProc^.IsSyncCall := IsSyncCall;
CurrentSyncProc^.InitDay := 0;
CurrentSyncProc^.InitTime := 0;
CurrentSyncProc^.ExecTimes := 1;
CurrentSyncProc^.ExecTime := 0;
CurrentSyncProc^.TimeOutTime := 0;

//Modify It!
CurrentSyncProc^.ParamCount := 5;
CurrentSyncProc^.Param0 := Param0;
CurrentSyncProc^.Param1 := Param1;
CurrentSyncProc^.Param2 := Param2;
CurrentSyncProc^.Param3 := Param3;
CurrentSyncProc^.Param4 := Param4;
//Modify It!

CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc);

result := SyncThread.InQueue(CurrentSyncProc);
if Not Result then
CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
end;

function TSyncThread.InQueue(SyncProc:TSyncProc):BOOL;
begin
Result := False;
if Not Assigned(SyncProc) then
Exit;
Result := FIOCP.PostQueuedCompletionStatus(DWORD(SyncProc),0,Nil);
end;

procedure TSyncThread.RunCurrentProc;
begin
FCurrentProcObject.Run;
end;

procedure TSyncThread.Execute;
var
dwResult:Boolean;
CurrentProc:TSyncProc;
Param0,Param1:DWORD;
Overlapped:POVERLAPPED;
begin
//Toggle Comment if Needed
//CoInitializeEx(nil, COINIT_MULTITHREADED);
//try
while Not Terminated do
begin
dwResult := FIOCP.GetQueuedCompletionStatus(Param0,Param1,Overlapped,INFINITE );
if Not dwResult then
break;
if DWORD(Overlapped) = SHUTDOWN_FLAG then
break;
if Param0 = 0 then
continue;
CurrentProc := TSyncProc(Param0);
try
if CurrentProc.IsSyncCall then
begin
try
FCurrentProcObject := CurrentProc;
Synchronize(RunCurrentProc);
FCurrentProcObject := Nil;
except

end;
end
else
begin
try
CurrentProc.Run;
except

end;
end;
finally
CurrentProc.Owner.FreeSyncProc(CurrentProc);
end;
end;
//Toggle Comment if Needed
//finally
// CoUninitialize;
//end;
end;

procedure TSyncThread.Shutdown;
begin

FIOCP.ShutDownAll;
Terminate;
end;
procedure TSyncThread.ShutdownEx;
begin

FIOCP.ShutDownAllEx;
Terminate;
end;

constructor TSyncThread.Create(CreateSuspended: Boolean);
begin
Inherited Create(true);
FIOCP := TIOCP.Create;
if Not CreateSuspended then
begin
Resume;
end;
end;

destructor TSyncThread.Destroy;
begin
FIOCP.ShutDownAll;
FIOCP.Free;
Inherited;
end;

Initialization
SyncThread := TSyncThread.Create(false);
SyncThread.FreeOnTerminate := True;
finalization
LSyncThread := SyncThread;
SyncThread := Nil;
if LSyncThread<>Nil then

LSyncThread.ShutdownEx;

end.
僵哥 2008-05-27
  • 打赏
  • 举报
回复
最简单的得用GetQueuedCompletionStatus等待来做任务线程池。
所有线程元素调用GetQueuedCompletionStatus等待任务,有任务时使用PostQueuedCompletionStatus加入任务,其中一个线程便会得到任务,然后加以处理。

unit HSSyncThread;

interface
uses
Windows,Classes,SysUtils,iocpunit,UnitSyncProc,ExtCtrls;
type
TSyncThread = class(TThread)
private
FIOCP:TIOCP;
FCurrentProcObject:TSyncProc;
protected
procedure Execute;override;
procedure RunCurrentProc;
public
constructor Create(CreateSuspended: Boolean);
destructor Destroy;override;
procedure Shutdown;
procedure ShutdownEx;
function InQueue(SyncProc:TSyncProc):BOOL;
end;


function UserSyncProc( ObjectProc : TObjectProc
; IsSyncCall : Boolean
): BOOL; overload;
function UserSyncProc( ObjectProc : TObjectProc1
; IsSyncCall : Boolean
; Param0 : Pointer
): BOOL; overload;
function UserSyncProc( ObjectProc : TObjectProc2
; IsSyncCall : Boolean
; Param0 : Pointer
; Param1 : Pointer
): BOOL; overload;
function UserSyncProc( ObjectProc : TObjectProc3
; IsSyncCall : Boolean
; Param0 : Pointer
; Param1 : Pointer
; Param2 : Pointer
): BOOL; overload;
function UserSyncProc( ObjectProc : TObjectProc4
; IsSyncCall : Boolean
; Param0 : Pointer
; Param1 : Pointer
; Param2 : Pointer
; Param3 : Pointer
): BOOL; overload;
function UserSyncProc( ObjectProc : TObjectProc5
; IsSyncCall : Boolean
; Param0 : Pointer
; Param1 : Pointer
; Param2 : Pointer
; Param3 : Pointer
; Param4 : Pointer
): BOOL; overload;

var
SyncThread:TSyncThread = Nil;

implementation

//uses
// ActiveX;

var
LSyncThread:TSyncThread = Nil;
function UserSyncProc( ObjectProc : TObjectProc
; IsSyncCall : Boolean
): BOOL;
var
CurrentSyncProc:TSyncProc;
begin
Result := False;
if Not Assigned(ObjectProc) then
Exit;
if Not Assigned(SyncThread) then
Exit;
if Not Assigned(SyncProcPool) then
Exit;
CurrentSyncProc := SyncProcPool.CreateSyncProc;
if Not Assigned(CurrentSyncProc) then
Exit;

CurrentSyncProc^.TimeredType := tmNone;

CurrentSyncProc^.IsSyncCall := IsSyncCall;
CurrentSyncProc^.InitDay := 0;
CurrentSyncProc^.InitTime := 0;
CurrentSyncProc^.ExecTimes := 1;
CurrentSyncProc^.ExecTime := 0;
CurrentSyncProc^.TimeOutTime := 0;

//Modify It!
CurrentSyncProc^.ParamCount := 0;
CurrentSyncProc^.Param0 := Nil;
CurrentSyncProc^.Param1 := Nil;
CurrentSyncProc^.Param2 := Nil;
CurrentSyncProc^.Param3 := Nil;
CurrentSyncProc^.Param4 := Nil;
//Modify It!

CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc);

result := SyncThread.InQueue(CurrentSyncProc);
if Not Result then
CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
end;

function UserSyncProc( ObjectProc : TObjectProc1
; IsSyncCall : Boolean
; Param0 : Pointer
): BOOL;
var
CurrentSyncProc:TSyncProc;
begin
Result := False;
if Not Assigned(ObjectProc) then
Exit;
if Not Assigned(SyncThread) then
Exit;
if Not Assigned(SyncProcPool) then
Exit;
CurrentSyncProc := SyncProcPool.CreateSyncProc;
if Not Assigned(CurrentSyncProc) then
Exit;

CurrentSyncProc^.TimeredType := tmNone;

CurrentSyncProc^.IsSyncCall := IsSyncCall;
CurrentSyncProc^.InitDay := 0;
CurrentSyncProc^.InitTime := 0;
CurrentSyncProc^.ExecTimes := 1;
CurrentSyncProc^.ExecTime := 0;
CurrentSyncProc^.TimeOutTime := 0;

//Modify It!
CurrentSyncProc^.ParamCount := 1;
CurrentSyncProc^.Param0 := Param0;
CurrentSyncProc^.Param1 := Nil;
CurrentSyncProc^.Param2 := Nil;
CurrentSyncProc^.Param3 := Nil;
CurrentSyncProc^.Param4 := Nil;

//Modify It!

CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc);

result := SyncThread.InQueue(CurrentSyncProc);
if Not Result then
CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
end;

function UserSyncProc( ObjectProc : TObjectProc2
; IsSyncCall : Boolean
; Param0 : Pointer
; Param1 : Pointer
): BOOL;
var
CurrentSyncProc:TSyncProc;
begin
Result := False;
if Not Assigned(ObjectProc) then
Exit;
if Not Assigned(SyncThread) then
Exit;
if Not Assigned(SyncProcPool) then
Exit;
CurrentSyncProc := SyncProcPool.CreateSyncProc;
if Not Assigned(CurrentSyncProc) then
Exit;

CurrentSyncProc^.TimeredType := tmNone;

CurrentSyncProc^.IsSyncCall := IsSyncCall;
CurrentSyncProc^.InitDay := 0;
CurrentSyncProc^.InitTime := 0;
CurrentSyncProc^.ExecTimes := 1;
CurrentSyncProc^.ExecTime := 0;
CurrentSyncProc^.TimeOutTime := 0;

//Modify It!
CurrentSyncProc^.ParamCount := 2;
CurrentSyncProc^.Param0 := Param0;
CurrentSyncProc^.Param1 := Param1;
CurrentSyncProc^.Param2 := Nil;
CurrentSyncProc^.Param3 := Nil;
CurrentSyncProc^.Param4 := Nil;
//Modify It!

CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc);

result := SyncThread.InQueue(CurrentSyncProc);
if Not Result then
CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
end;

function UserSyncProc( ObjectProc : TObjectProc3
; IsSyncCall : Boolean
; Param0 : Pointer
; Param1 : Pointer
; Param2 : Pointer
): BOOL;
var
CurrentSyncProc:TSyncProc;
begin
Result := False;
if Not Assigned(ObjectProc) then
Exit;
if Not Assigned(SyncThread) then
Exit;
if Not Assigned(SyncProcPool) then
Exit;
CurrentSyncProc := SyncProcPool.CreateSyncProc;
if Not Assigned(CurrentSyncProc) then
Exit;

CurrentSyncProc^.TimeredType := tmNone;

CurrentSyncProc^.IsSyncCall := IsSyncCall;
CurrentSyncProc^.InitDay := 0;
CurrentSyncProc^.InitTime := 0;
CurrentSyncProc^.ExecTimes := 1;
CurrentSyncProc^.ExecTime := 0;
CurrentSyncProc^.TimeOutTime := 0;

//Modify It!
CurrentSyncProc^.ParamCount := 3;
CurrentSyncProc^.Param0 := Param0;
CurrentSyncProc^.Param1 := Param1;
CurrentSyncProc^.Param2 := Param2;
CurrentSyncProc^.Param3 := Nil;
CurrentSyncProc^.Param4 := Nil;
//Modify It!

CurrentSyncProc^.ObjectedProc := TObjectProc(ObjectProc);

result := SyncThread.InQueue(CurrentSyncProc);
if Not Result then
CurrentSyncProc.Owner.FreeSyncProc(CurrentSyncProc);
end;

liangpei2008 2008-05-27
  • 打赏
  • 举报
回复
谢谢僵哥
badgirlxiaoxiao 2008-05-27
  • 打赏
  • 举报
回复
僵哥的强项
僵哥 2008-05-27
  • 打赏
  • 举报
回复
对象池跟线程池是两个概念。关键看如何理解,其实都是用完以后收回到池当中进行闲置处理,需用时再次使用。\

关于Broker,Agent,Adapter的比较,下面是个人的理解:
Adapter:主要是将后面复杂多样的接口采用统一的形式进行传递,相当于接口翻译,不影响任何业务。
Agent:代理,可提供特定的接口协议。通常在一些Client无法直接到达或者无法直接访问或者不允许直接访问Server的一些服务时,通过一个Agent转发甚至是过滤,而进行访问,作为Agent,可以对相应的请求及服务进行数据上的篡改,以适应业务需求。对服务提供者通常是特定的或者由调用者指定。
Broker:一个全权的代理,可以针对服务提供者进行决策性选择,并且可以不告知调用者(与调用者之间的可信度相当高)。比如均衡网关。单纯的Broker不做非必要性的数据篡改(比如UDP转发之后,源地址等不得不篡改)。
liangpei2008 2008-05-27
  • 打赏
  • 举报
回复
另外想请教僵哥对Service Broker的理解
liangpei2008 2008-05-27
  • 打赏
  • 举报
回复
在一个3层的结构中,做对象池是不是和其原理一样?
liangpei2008 2008-05-27
  • 打赏
  • 举报
回复
僵哥
能不能讲详细一点~
僵哥 2008-05-27
  • 打赏
  • 举报
回复
线程池的原理就是使用有限的线程迭代执行际定的或者随机调入的任务(干活)。
加载更多回复(4)

829

社区成员

发帖
与我相关
我的任务
社区描述
Delphi 非技术区
社区管理员
  • 非技术区社区
加入社区
  • 近7日
  • 近30日
  • 至今
社区公告
暂无公告

试试用AI创作助手写篇文章吧