liangbch 发表于 2010-12-14 22:53:56

求一个数组中元素的最大值和最小值

求一个数组最大值最小值的问题是一个复杂度为O(n)的问题,从算法角度上讲,很难有实质性的改进。本文试图使用几种方法来优化算法,使得查找速度尽可能快,并给出源代码和运行结果。也欢迎大家给出你们自己的算法。难得的一手数据及结论,极具参考价值。
现在,我已经编写了6个函数。
函数getMaxMin1使用最普通的算法查找最大最小值。
函数getMaxMin_SSE4 则使用SSE4指令优化查找算法,主要使用了SSE4 指令PMINUD和PMAXUD,一次可求4个DWORD的最大或者最小值,内存读则使用movdqa指令,一次读取4个DWORD。
函数getMaxMin_MT2_v1是getMaxMin1 的2线程版本。
函数getMaxMin_MT2_v4是getMaxMin1 的4线程版本。
函数getMaxMin_MT2_v2是getMaxMin_SSE4 的2线程版本。
函数getMaxMin_MT4_v2是getMaxMin_SSE4 的4线程版本。


测试结果:
采用32M的测试样本,运行在Intel Core 2 Duo E8500 双核CPU,同时测试这6个函数。每个函数均取5次测试结果的最小值,结果如下:
getMaxMin1:       0.0517秒
getMaxMin_SSE4:   0.0205秒
getMaxMin_MT2_v1: 0.0250秒
getMaxMin_MT4_v10.0293秒
getMaxMin_MT2_v2: 0.0200秒
getMaxMin_MT4_v2: 0.0201秒

结果分析:
[*]getMaxMin_SSE4的速度是普通C版的函数的250%,但此函数采用双线程后,速度并没有提高。
[*]getMaxMin_MT2_v1这个2线程的版本速度为单线程版本的200%,比较合乎逻辑
[*]getMaxMin_MT4_v1这个4线程的版本速度比2线程版本更慢。这也合乎逻辑,在双核CPU上,4线程并不能提高速度。

liangbch 发表于 2010-12-14 22:55:44

findMaxMin.h 文件定义]#if !defined(AFX_FINDMAXMINMT_H__11572878_BB5B_4BB3_A31E_16ED572B10FC__INCLUDED_)
#define AFX_FINDMAXMINMT_H__11572878_BB5B_4BB3_A31E_16ED572B10FC__INCLUDED_

#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000

#include <windows.h>
#include "defs.h"

#define TF_FIND_MAX_MIN_ALU   1
#define TF_FIND_MAX_MIN_SSE4    2

typedef struct _thread_function_ST
{
    DWORD fun_ID;
    DWORD *pdata;
    int   len;
}THREAD_FUNCTION_ST;

typedef struct _search_sort_ST
{
    DWORD fun_ID;
    DWORD *pdata;
    int   len;
    DWORD min;
    DWORD max;
}SEARCH_SORT_ST;

extern DWORD WINAPI ThreadFunc(LPVOID n);

extern void getMaxMin1(DWORD *pData,int len,DWORD *min,DWORD *max);
extern void getMaxMin_asm(DWORD *pData,int len,DWORD *min,DWORD *max);

extern void getMaxMin_SSE4(DWORD *pData,int len,DWORD *min,DWORD *max);
extern void getMaxMin_MT2_v1(DWORD *pData,int len,DWORD *min,DWORD *max);
extern void getMaxMin_MT4_v1(DWORD *pData,int len,DWORD *min,DWORD *max);
extern void getMaxMin_MT2_v2(DWORD *pData,int len,DWORD *min,DWORD *max);
extern void getMaxMin_MT4_v2(DWORD *pData,int len,DWORD *min,DWORD *max);



#endif // !defined(AFX_FINDMAXMINMT_H__11572878_BB5B_4BB3_A31E_16ED572B10FC__INCLUDED_)

liangbch 发表于 2010-12-14 22:58:26

#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <windows.h>

#include "defs.h"
#include "findMaxMin.h"
#include "MTVERIFY.h"


DWORD WINAPI ThreadFunc(LPVOID n)
{
    THREAD_FUNCTION_ST *pTF=(THREAD_FUNCTION_ST *)n;
    SEARCH_SORT_ST *p;

    switch (pTF->fun_ID)
    {
    case TF_FIND_MAX_MIN_ALU:
      p=(SEARCH_SORT_ST *)n;
      getMaxMin1(p->pdata,p->len,&(p->min),&(p->max));
      break;
    case TF_FIND_MAX_MIN_SSE4:
      p=(SEARCH_SORT_ST *)n;
      getMaxMin_SSE4(p->pdata,p->len,&(p->min),&(p->max));
      break;
    default:
      printf("Invalid thread function ID in %d line\n",__LINE__);

    }
    return 0;
}


void getMaxMin1(DWORD *pData,int len,DWORD *min,DWORD *max)
{
    int i;
    *min=~0;
    *max=0;
    for (i=0; i<len; i++)
    {
      if ( pData[ i ] > *max)
            *max=pData[ i ];
      if (pData[ i ]< *min)
            *min=pData[ i ];
    }
}

void getMaxMin_asm(DWORD *pData,int len,DWORD *min,DWORD *max)
{
    DWORD _min, _max;
    __asm
    {
      mov esi, pData
      mov ecx, len
      lea edi,

      mov eax, 0xffffffff      //min
      mov edx, 0                //max

loop_start:
      cmp esi, edi
      jge next10

      cmp , eax
      cmovb eax,

      cmp , edx
      cmova edx,

      add esi, 4
      jmp loop_start
next10:
      mov _min, eax
      mov _max, edx
    }

    *min=_min;
    *max=_max;

}

_declspec(naked)
void getMaxMin_SSE4(DWORD *pData,int len,DWORD *min,DWORD *max)

// 使用SSE4.1 指令 计算数组的最大最小值
// 寄存器的使用
// eax: 存放最小值
// edx: 存放最大值
// xmm0: 当前取得的4个整数
// xmm1: 存放最小值
// xmm2: 存放最大值

/* PMINUD and PMAXUD are SSE4.1 instruciton
The usage for PMINUD
Compares packed unsigned dword integers in the destination operand (first operand)
and the source operand (second operand), and returns the minimum for each packed
value in the destination operand.

Operation
IF (DEST < SRC)
THEN DEST &#1048773; DEST;
ELSE DEST &#1048773; SRC; FI;

IF (DEST < SRC)
THEN DEST &#1048773; DEST;
ELSE DEST &#1048773; SRC; FI;

IF (DEST < SRC)
THEN DEST &#1048773; DEST;
ELSE DEST &#1048773; SRC; FI;

IF (DEST < SRC)
THEN DEST &#1048773; DEST;
ELSE DEST &#1048773; SRC; FI;
*/

#define MIN_128REG      xmm1
#define MAX_128REG      xmm2
#define MIN_32REG      eax
#define MAX_32REG      edx
#define PT_REG          esi
#define END_REG          edi

#define PAR_PDATA   4
#define PAR_LEN       8
#define PAR_MIN       12
#define PAR_MAX       16

{
    __asm
    {
      push      esi
      push      edi

      mov            MIN_32REG,0xffffffff                // min=0xffffffff
      xor            MAX_32REG,    MAX_32REG                // max=0

//phase1:
      mov         ecx, dword ptr         // len
      mov         PT_REG, dword ptr   // pData
      lea            END_REG,                // edi=pData + len
      jmp            cmp10

loop1_start:
      cmp , MIN_32REG
      cmovb MIN_32REG,

      cmp , MAX_32REG
      cmova MAX_32REG,

      add PT_REG, 4
cmp10:
      test PT_REG, 0x0f
      jz phase2

      cmp PT_REG, END_REG
      jb loop1_start

phase2:
      mov         ecx, dword ptr      // len
      mov         END_REG, dword ptr // pData
      lea            END_REG,             // edi=pData + len
      and            END_REG, 0xfffffff0                  // clear bit0-bit3 and make edi % 16==0

      movd      MIN_128REG,    MIN_32REG
      pshufd      MIN_128REG, MIN_128REG, 00000000b       // xmm1 = R0:R0:R0:R0
      movd      MAX_128REG,    MAX_32REG
      pshufd      MAX_128REG, MAX_128REG, 00000000b       // xmm2 = R0:R0:R0:R0
      jmp            cmp20

loop2_start:
      movdqa      xmm0, xmmword ptr
      PMINUD      MIN_128REG, xmm0
      PMAXUD      MAX_128REG, xmm0

      add            PT_REG,16
cmp20:
      cmp            PT_REG, END_REG
      jb            loop2_start


phase3:
      movd      ecx, MIN_128REG
      cmp            ecx, MIN_32REG
      cmovb      MIN_32REG,ecx            //MIN_32REG=min(min_32REG,bit0-31(MIN_128REG))

      psrldq      MIN_128REG, 4            //MIN_128REG >>=4
      movd      ecx, MIN_128REG
      cmp            ecx, MIN_32REG
      cmovb      MIN_32REG,ecx            //MIN_32REG=min(min_32REG,bit32-63(MIN_128REG))

      psrldq      MIN_128REG, 4            //MIN_128REG >>=4
      movd      ecx, MIN_128REG
      cmp            ecx, MIN_32REG
      cmovb      MIN_32REG,ecx            //MIN_32REG=min(min_32REG,bit64-95(MIN_128REG))

      psrldq      MIN_128REG, 4            //MIN_128REG >>=4
      movd      ecx, MIN_128REG
      cmp            ecx, MIN_32REG
      cmovb      MIN_32REG,ecx            //MIN_32REG=min(min_32REG,bit96-127(MIN_128REG)    )

      //-----------------------------------------------
      movd      ecx, MAX_128REG
      cmp            ecx, MAX_32REG
      cmova      MAX_32REG,ecx            //MAX_32REG=max(max_32REG,bit0-31(MAX_128REG))

      psrldq      MAX_128REG, 4            //MAX_128REG >>=4
      movd      ecx, MAX_128REG
      cmp            ecx, MAX_32REG
      cmova      MAX_32REG,ecx            //MAX_32REG=max(max_32REG,bit32-63(MAX_128REG))


      psrldq      MAX_128REG, 4            //MAX_128REG >>=4
      movd      ecx, MAX_128REG
      cmp            ecx, MAX_32REG
      cmova      MAX_32REG,ecx            //MAX_32REG=max(max_32REG,bit64-95(MAX_128REG))

      psrldq      MAX_128REG, 4            //MAX_128REG >>=4
      movd      ecx, MAX_128REG
      cmp            ecx, MAX_32REG
      cmova      MAX_32REG,ecx            //MIN_32REG=max(max_32REG,bit96-127(MIN_128REG))

      mov         END_REG, dword ptr    // pData
      mov         ecx, dword ptr      // len
      lea            END_REG,             // edi=pData + len
      jmp            cmp30

loop3_start:
      cmp , MIN_32REG
      cmovb MIN_32REG,

      cmp , MAX_32REG
      cmova MAX_32REG,

      add PT_REG, 4
cmp30:
      cmp PT_REG, END_REG
      jb loop3_start

RET_VALUE:
      mov      ecx, dword ptr      //*min
      mov      dword ptr ,MIN_32REG

      mov      ecx, dword ptr      //*max
      mov      dword ptr ,MAX_32REG

      emms
      pop      edi
      pop      esi

      ret
    }
}



void getMaxMin_MT_n(int threadCount,int version, DWORD *pData,int len,DWORD *min,DWORD *max)
{
    int slot,width;
    SEARCH_SORT_ST thrdPara;
    HANDLEhThrds;
    DWORD   threadId;

    assert(threadCount<=16);
    width= (len + threadCount-1)/threadCount;

    for (slot=0;slot<threadCount;slot++)
    {
      if (version==1)
            thrdPara.fun_ID=TF_FIND_MAX_MIN_ALU;
      else
            thrdPara.fun_ID=TF_FIND_MAX_MIN_SSE4;

      thrdPara.pdata=pData+slot*width;
      thrdPara.len=width;
    }

    if (len % width !=0)
    {
      thrdPara.len=len % width;
    }

    for (slot=0;slot<threadCount;slot++)
    {
      MTVERIFY( hThrds = CreateThread(NULL,
            0,
            ThreadFunc,
            (LPVOID)(thrdPara+slot),
            0,
            &threadId ) );
    }

    for (slot=0; slot<threadCount; slot++)
    {
      WaitForSingleObject(hThrds, INFINITE);
      MTVERIFY( CloseHandle(hThrds) );
    }

    *min=thrdPara.min;
    *max=thrdPara.max;
    for (slot=1;slot<threadCount;slot++)
    {
      if (thrdPara.max > *max)
            *max=thrdPara.max;
      if (thrdPara.min < *min)
            *min=thrdPara.min;
    }
}

void getMaxMin_MT2_v1(DWORD *pData,int len,DWORD *min,DWORD *max)
{
    getMaxMin_MT_n(2,1,pData,len,min,max);
}

void getMaxMin_MT4_v1(DWORD *pData,int len,DWORD *min,DWORD *max)
{
    getMaxMin_MT_n(4,1,pData,len,min,max);
}


void getMaxMin_MT2_v2(DWORD *pData,int len,DWORD *min,DWORD *max)
{
    getMaxMin_MT_n(2,2,pData,len,min,max);
}

void getMaxMin_MT4_v2(DWORD *pData,int len,DWORD *min,DWORD *max)
{
    getMaxMin_MT_n(4,2,pData,len,min,max);
}

gxqcn 发表于 2010-12-15 07:42:13

注意,在排版代码时,应先在 UEStudio 之类的编辑器中将^t(Tab符)替换成四个空格。
因为论坛里,会将Tab符自动替换成8个空格,使代码缩进过大,不利于阅读。

liangbch 发表于 2010-12-15 12:43:39

好久没贴代码了,下次注意。

无心人 发表于 2010-12-15 14:41:41

多线程对这个问题没帮助

多核在提高到某种程度后,会内存读速度

G-Spider 发表于 2010-12-15 15:09:30

觉得此问题最后仍归结到存取问题.....mark,n天后继续。

liangbch 发表于 2010-12-15 18:56:32

32M个整数共32*4=128M,最快用时0.02秒,故内存读取速度为128M/0.02秒=6400MB/s=6.4GB/s
这台电脑的内存为DDR2-800, 其内存带宽为800M/s * 64bit / (8bit/B)= 6.4GB/s。
哈哈,我的 那个求最大值最小值函数的SSE4版本 用尽了内存带宽,故内存成为瓶颈,多线程不起作用。

liangbch 发表于 2010-12-15 19:38:39

贴上我的电脑的内存性能报告,使用 EVEREST 5.01测试。

gxqcn 发表于 2010-12-15 20:18:57

奇怪,为什么 Memory 的 read 尚不及 write 快?
页: [1] 2 3 4
查看完整版本: 求一个数组中元素的最大值和最小值