OpenCV源码阅读一: threshold源码
最初实现for (auto i 0; i img.rows; i) { auto *ptr img.ptrcv::Vec3b(i); for (auto j 0; j img.cols; j) { auto pix ptr[j]; for (auto k 0; k img.channels(); k) { if (pix[k] n) pix[k] 255; else pix[k] 0; } } }Benchmark Time CPU IterationsBM_threshold/128 1180128 ns 1179980 ns 675BM_cvthreshold/128 110379 ns 106367 ns 6181我自己的函数较之opencv落后十倍更多,所以我打算开始看opencv源码threshold源码阅读threshold函数入口:int threshold(const uchar* src_data, size_t src_step, uchar* dst_data, size_t dst_step, int width, int height, int depth, int cn, double thresh, double maxValue, int thresholdType) { return threshold_range(0, height, src_data, src_step, dst_data, dst_step, width, depth, cn, thresh, maxValue, thresholdType); }对外入口本身不做具体计算它只是把整张图像[0, height)这一段交给threshold_range()。那接下来我们来看**threshold_range**它先做两件事把宽度乘上通道数width * cn;说明这里不是按“像素对象”处理而是按“连续元素流”处理,减少行列访问时的时间损耗,更像是在处理“一维数组”根据depth thresholdType选择threshold实现接下来是真正做阈值计算的是模板版thresholdhelper, type()templatetypename helper, int type, typename T typename helper::ElemType static inline int threshold(...)它的执行逻辑:准备一个全 0 向量auto zero helper::vmv(0, helper::setvlmax());很多阈值模式最终都要在0、maxValue、src、thresh之间选所以先准备一份全零向量后面直接拿来 merge。按行处理并拿到这一行的首地址,并强转为数组for (int i start; i end; i) { const T* src reinterpret_castconst T*(src_data i * src_step); //i为行号,通过行号*步长计算行数 T* dst reinterpret_castT*(dst_data i * dst_step);按段计算vl0 helper::setvl(width - j); vl1 helper::setvl(width - j - vl0); auto src0 helper::vload(src j, vl0); auto src1 helper::vload(src j vl0, vl1);使用rvv进行批量加载和计算使用rvv::vmerge进行向量计算,并且匹配各种格式优化代码static inline std::arrayuchar, 256 makeThresholdLUT(int thresh, uchar maxVal 255) { std::arrayuchar, 256 lut{}; thresh std::clamp(thresh, 0, 255); for (int i 0; i 256; i) { lut[i] (i thresh) ? maxVal : 0; } return lut; } void threshold_fast_parallel(const cv::Mat src, cv::Mat dst, int thresh, uchar maxVal 255) { const int channels src.channels(); dst.create(src.size(), src.type()); const auto lut makeThresholdLUT(thresh, maxVal); cv::parallel_for_(cv::Range(0, src.rows), [](const cv::Range range) { for (int r range.start; r range.end; r) { const uchar* s src.ptruchar(r); uchar* d dst.ptruchar(r); const int rowBytes src.cols * channels; int c 0; constexpr int kUnroll 16; for (; c rowBytes - kUnroll; c kUnroll) { d[c 0] lut[s[c 0]]; d[c 1] lut[s[c 1]]; d[c 2] lut[s[c 2]]; d[c 3] lut[s[c 3]]; d[c 4] lut[s[c 4]]; d[c 5] lut[s[c 5]]; d[c 6] lut[s[c 6]]; d[c 7] lut[s[c 7]]; d[c 8] lut[s[c 8]]; d[c 9] lut[s[c 9]]; d[c 10] lut[s[c 10]]; d[c 11] lut[s[c 11]]; d[c 12] lut[s[c 12]]; d[c 13] lut[s[c 13]]; d[c 14] lut[s[c 14]]; d[c 15] lut[s[c 15]]; } for (; c rowBytes; c) { d[c] lut[s[c]]; } } }); }Benchmark Time CPU IterationsBM_threshold_fast/128 39377 ns 36453 ns 21471BM_cvthreshold/128 11920 ns 11159 ns 72564设计思路这份实现的核心思路其实就是把一个最朴素的 thresholddst (src thresh) ? maxVal : 0;从“逐像素判断”的形式改造成“先建规则表再按表批量映射”的形式。如果直接写朴素版本内层循环里每处理一个字节都要做一次比较和一次条件分支。图像一大这种操作会重复很多很多次。这个实现不这么做而是先利用 8 位图像像素值只可能在0~255之间这一特点提前把 256 种输入的结果全算好做成一张 LUT。这样后面真正扫图的时候循环体就不再做阈值判断而是直接查表。原来是“边遍历边决策”现在变成了“先决策再遍历时只执行映射”。在这个基础上它又做了第二层优化并行。因为普通 threshold 每一行之间互不依赖所以天然适合按行切分给多个线程处理。于是整个设计就变成了两层第一层是 LUT把每个像素的判断逻辑压缩成常量查表第二层是parallel_for_把整张图按行分块让多个线程同时做查表映射。再加上一点循环展开减少内层循环控制开销这就构成了这份实现的整体设计。所以从本质上讲这份代码不是靠复杂指令取胜而是靠三个朴素但有效的原则消掉分支、保持线性内存访问、利用多核并行。实现流程整个实现流程其实很短只有一条主线。调用threshold_fast_parallel之后函数先读取输入图像的一些基本信息比如通道数然后为输出图像dst创建和输入相同大小、相同类型的内存空间。紧接着它会调用makeThresholdLUT根据当前的thresh和maxVal生成一张 256 项的查找表。到这里阈值规则其实已经完全确定了后面不再需要做任何“像素值和阈值比较”的判断。生成 LUT 之后函数进入并行阶段。cv::parallel_for_会把图像的行区间[0, rows)拆成若干段每个线程拿到其中一段行。每个线程在自己的行区间里逐行取出源图像和目标图像的首地址然后把这一行当成一段连续字节流处理。对于灰度图一行字节数就是cols对于三通道图一行字节数就是cols * 3。接着代码对这一行做查表映射把src里的每个字节读出来作为下标访问 LUT再把结果写到dst对应位置。为了让内层循环更紧凑它不是一次只处理 1 个字节而是一次展开处理 16 个字节。这样做的目的不是改变算法本身而是减少循环变量更新、边界判断这些额外指令的比例。等 16 个一组的大块处理完之后再用一个收尾循环把这一行剩下不足 16 个的元素补完。所有线程都做完自己负责的行区间后整张图的 threshold 就完成了。所以这条实现流程可以概括成一句话先生成 LUT再按行并行遍历整张图对每个字节执行dst lut[src]。makeThresholdLUT的函数结构这个函数虽然很短但内部其实也可以分成两个大段。第一大段是参数规范化。它先创建一个长度固定为 256 的数组然后用std::clamp把传进来的thresh限制在[0, 255]范围内。这样做是为了保证这个阈值一定和 8 位图像的像素范围匹配。因为 LUT 的下标一定是 0 到 255如果阈值本身跑到了这个范围外就会让映射规则失真所以要先修正。第二大段是构造映射表。它用一个从 0 到 255 的循环把每一个可能输入值的输出结果都提前算出来。对于每个i如果i thresh那对应输出就是maxVal否则就是 0。这一步完成后LUT 本身就已经等价于一个完整的二值化规则了。后面的图像处理不再关心 threshold 的数学含义它只关心“源值是多少就去表里取什么结果”。所以这个函数的作用非常纯粹它不是处理图像而是把 threshold 规则编译成一张静态映射表。threshold_fast_parallel的函数结构这个函数内部可以分成三大段来看。第一大段是准备阶段。它先从输入图像里取出channels因为后面每一行实际要处理多少个字节取决于通道数。然后调用dst.create(src.size(), src.type())确保目标图像有和源图一样的大小和数据类型。最后调用makeThresholdLUT生成查表。这一段的作用是把后续处理所需的上下文全部准备好也就是“输出缓冲区准备好、规则准备好”。第二大段是并行分工阶段。这一段由cv::parallel_for_驱动。它不会一次把整张图交给一个线程而是把总行数切成若干个Range。每个线程拿到一个range之后只需要处理自己负责的[range.start, range.end)这一段行即可。这里的关键思想是threshold 属于逐点独立运算没有跨行依赖所以按行切分不会产生同步问题。LUT 又是只读数据所以多个线程共享同一张表也不会有竞争。这就让并行非常自然。第三大段是单行处理阶段。这是 lambda 里面真正干活的地方。对于每一行它先用src.ptruchar(r)和dst.ptruchar(r)拿到这一行源数据和目标数据的首地址再根据src.cols * channels算出这一行要处理的总字节数。然后进入核心循环。核心循环分成两层前一层是按 16 个元素展开的大块处理连续执行 16 次d[c k] lut[s[c k]]后一层是处理余数也就是剩下不足 16 个元素的部分。这样一行结束后再进入下一行直到当前线程的所有行都处理完。所以这个函数的结构其实很工整前面做准备中间做并行切分里面对每一行做查表映射。