0%

GPU搜索加速原理

在File-Engine中为了加速文件搜索的速度,加入了GPU加速的功能。通过显卡来进行并行运算,大幅提高搜索的速度。
由于显卡可以进行大量的并行运算,而每一个文件都是独立的,进行字符串匹配并不会相互干扰,因此GPU非常适合用来加速搜索。

具体的实现方法如下。

定义GPU加速接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package file.engine.dllInterface.gpu;

import java.util.function.BiConsumer;
import java.util.function.Supplier;

public interface IGPUAccelerator {

/**
* 重置搜索的标志
*/
void resetAllResultStatus();

/**
* 进行搜索,每当有一个结果匹配后调用resultCollector
* @param searchCase:有D F Full Case四种,分别代表只搜索文件夹(Directory),只搜索文件(File),全字匹配(Full),大小写敏感(Case Sensitive)
* @param isIgnoreCase 是否忽略大小写,当searchCase中存在case时该字段为true
* @param searchText 原始搜索关键字
* @param keywords 搜索关键字,将searchText用";"切割得到
* @param keywordsLowerCase 搜索关键字,全小写字母,内容与keywords相同,但字母为全小写
* @param isKeywordPath 保存keywords中的关键字是路径判断还是文件名判断
* @param maxResultNumber 最大匹配结果数量限制
* @param resultCollector 有一个结果匹配后的回调方法
*/
void match(String[] searchCase,
boolean isIgnoreCase,
String searchText,
String[] keywords,
String[] keywordsLowerCase,
boolean[] isKeywordPath,
int maxResultNumber,
BiConsumer<String, String> resultCollector);

/**
* 判断GPU加速是否可用
* @return true如果可以进行GPU加速
*/
boolean isGPUAvailableOnSystem();

/**
* 判断某一个缓存是否搜索完成
* @param key 缓存key
* @return true如果全部完成
*/
boolean isMatchDone(String key);

/**
* 获取某一个缓存搜索完成后匹配的结果数量
* @param key 缓存key
* @return 匹配的结果数量
*/
int matchedNumber(String key);

/**
* 停止搜索
*/
void stopCollectResults();

/**
* 判断缓存是否存在
* @return true如果至少保存了一个缓存
*/
boolean hasCache();

/**
* 判断某一个缓存是否存在
* @param key 缓存key
* @return true如果缓存名为key的缓存存在
*/
boolean isCacheExist(String key);

/**
* 添加缓存到GPU显存
* @param key 缓存key
* @param recordSupplier 字符串supplier,由GPU加速dll通过jni进行调用,防止字符串过多导致OOM
*/
void initCache(String key, Supplier<String> recordSupplier);

/**
* 向某个缓存添加数据
* @param key 缓存key
* @param records 待添加的数据
*/
void addRecordsToCache(String key, Object[] records);

/**
* 删除某一个缓存中的数据
* @param key 缓存key
* @param records 待删除的数据
*/
void removeRecordsFromCache(String key, Object[] records);

/**
* 删除某一个缓存
* @param key 缓存key
*/
void clearCache(String key);

/**
* 删除所有缓存
*/
void clearAllCache();

/**
* 缓存是否有效
* @param key 缓存key
* @return true如果缓存仍然有效,当addRecordsToCache()失败,表示当前缓存的空位已经不足,出现了数据丢失,缓存被标记为无效。
* @see #addRecordsToCache(String, Object[])
*/
boolean isCacheValid(String key);

/**
* 获取系统显存占用,用于在用户使用过多显存时释放缓存。
* @return 内存占用百分比,数值从0-100
*/
int getGPUMemUsage();

/**
* 初始化
*/
void initialize();

/**
* 释放所有资源
*/
void release();

/**
* 获取GPU设备名
* @return GPU设备名
*/
String[] getDevices();

/**
* 设置使用的GPU设备,deviceNum为getDevices()返回的数组的下标
* @param deviceNum GPU设备id
* @return true如果成功使用设备并初始化
*/
boolean setDevice(int deviceNum);
}

该接口定义了GPU加速的基本方法,以后如果有新的GPU计算框架可以用于加速,只需要重新实现这个接口即可。
目前,该接口有两个实现,分别使用CUDA和OpenCL来进行加速。然后采用一个统一的包装类GPUAccelerator进行管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package file.engine.dllInterface.gpu;

import file.engine.configs.AllConfigs;
import file.engine.event.handler.EventManagement;
import file.engine.event.handler.impl.stop.RestartEvent;
import file.engine.utils.RegexUtil;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

public enum GPUAccelerator {
INSTANCE;
private static IGPUAccelerator gpuAccelerator;
private static final CudaAccelerator cudaAccelerator = CudaAccelerator.INSTANCE;
private static final OpenclAccelerator openclAccelerator = OpenclAccelerator.INSTANCE;

/**
* 之所以采用双重检验锁机制,是由于要实现懒加载,并且不能在加载类的时候进行加载
* 由于在事务管理器扫描@EventRegister@EventListener的阶段将会尝试加载所有类,此时配置中心还不可用
* 因此采用getInstance()方法来实现懒加载,在BootSystem事件发出后再进行初始化。
* @param isEnableGpuAccelerate
*/
record IsEnabledWrapper(boolean isEnableGpuAccelerate) {
private static volatile IsEnabledWrapper instance;

public static IsEnabledWrapper getInstance() {
if (instance == null) {
synchronized (IsEnabledWrapper.class) {
if (instance == null) {
instance = new IsEnabledWrapper(AllConfigs.getInstance().getConfigEntity().isEnableGpuAccelerate());
}
}
}
return instance;
}
}

enum Category {
CUDA("cuda"), OPENCL("opencl");
final String category;

Category(String category) {
this.category = category;
}

@Override
public String toString() {
return this.category;
}

static Category categoryFromString(String c) {
return switch (c) {
case "cuda" -> CUDA;
case "opencl" -> OPENCL;
default -> null;
};
}
}

...... GPU加速方法调用

/**
* key: 设备名
* value: [设备种类(cuda, opencl)];[设备id]
*
* @return map
*/
public Map<String, String> getDevices() {
LinkedHashMap<String, String> deviceMap = new LinkedHashMap<>();
getDeviceToMap(cudaAccelerator, deviceMap, Category.CUDA);
getDeviceToMap(openclAccelerator, deviceMap, Category.OPENCL);
return deviceMap;
}

private void getDeviceToMap(IGPUAccelerator igpuAccelerator, HashMap<String, String> deviceMap, Category category) {
if (igpuAccelerator.isGPUAvailableOnSystem()) {
var devices = igpuAccelerator.getDevices();
if (devices == null || devices.length == 0) {
return;
}
for (int i = 0; i < devices.length; ++i) {
var deviceName = devices[i];
if (deviceName.isBlank()) {
continue;
}
try {
if (!deviceMap.containsKey(deviceName)) {
deviceMap.put(deviceName, category + ";" + i);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

public boolean setDevice(String deviceCategoryAndId) {
if (gpuAccelerator != null) {
// 切换GPU设备重启生效,运行中不允许切换
return true;
}
if (!IsEnabledWrapper.getInstance().isEnableGpuAccelerate()) {
return false;
}
if (deviceCategoryAndId.isEmpty()) {
if (cudaAccelerator.isGPUAvailableOnSystem()) {
cudaAccelerator.initialize();
if (cudaAccelerator.setDevice(0)) {
gpuAccelerator = cudaAccelerator;
return true;
}
}
if (openclAccelerator.isGPUAvailableOnSystem()) {
openclAccelerator.initialize();
if (openclAccelerator.setDevice(0)) {
gpuAccelerator = openclAccelerator;
return true;
}
}
return false;
}
String[] info = RegexUtil.semicolon.split(deviceCategoryAndId);
String deviceCategory = info[0];
int id = Integer.parseInt(info[1]);
var category = Category.categoryFromString(deviceCategory);
if (category != null) {
switch (category) {
case CUDA:
if (cudaAccelerator.isGPUAvailableOnSystem()) {
cudaAccelerator.initialize();
if (cudaAccelerator.setDevice(id)) {
gpuAccelerator = cudaAccelerator;
return true;
}
}
case OPENCL:
if (openclAccelerator.isGPUAvailableOnSystem()) {
openclAccelerator.initialize();
if (openclAccelerator.setDevice(id)) {
gpuAccelerator = openclAccelerator;
return true;
}
}
}
}
return false;
}

@SuppressWarnings("unused")
public static void sendRestartOnError0() {
System.err.println("GPU缓存出错,自动重启");
EventManagement.getInstance().putEvent(new RestartEvent());
}
}

下面介绍CUDA加速。

CUDA加速首先会加载cudaAccelerator.dll,如果加载成功,将会设置isCudaLoaded为true。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
enum CudaAccelerator implements IGPUAccelerator {
INSTANCE;

private static boolean isCudaLoaded;

static {
try {
System.load(Path.of("user/cudaAccelerator.dll").toAbsolutePath().toString());
isCudaLoaded = true;
} catch (UnsatisfiedLinkError | Exception e) {
e.printStackTrace();
isCudaLoaded = false;
}
}
}

初始化过程

加载完成后,将会调用setDevice(int deviceId)来选择GPU并进行初始化。
首先会调用initialize()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/*
* Class: file_engine_dllInterface_gpu_CudaAccelerator
* Method: initialize
* Signature: ()V
*/
JNIEXPORT void JNICALL Java_file_engine_dllInterface_gpu_CudaAccelerator_initialize
(JNIEnv* env, jobject)
{
init_stop_signal();
init_cuda_search_memory();
init_str_convert();
//默认使用第一个设备,current_using_device=0
set_using_device(current_using_device);
if (env->GetJavaVM(&jvm) != JNI_OK)
{
env->ThrowNew(env->FindClass("java/lang/Exception"), "get JavaVM ptr failed.");
return;
}
set_jvm_ptr_in_kernel(jvm);
if (CreateDXGIFactory(__uuidof(IDXGIFactory), reinterpret_cast<void**>(&p_dxgi_factory)) != S_OK)
{
env->ThrowNew(env->FindClass("java/lang/Exception"), "create dxgi factory failed.");
}
IDXGIAdapter* p_adapter = nullptr;
for (UINT i = 0;
p_dxgi_factory->EnumAdapters(i, &p_adapter) != DXGI_ERROR_NOT_FOUND;
++i)
{
DXGI_ADAPTER_DESC adapter_desc;
p_adapter->GetDesc(&adapter_desc);
gpu_name_adapter_map.insert(std::make_pair(adapter_desc.Description, adapter_desc));
}
}

首先初始化停止搜索信号,当结果数量达到maxResults限制,停止信号将会被设置为true,在resetAllResultStatus()方法将会把停止信号重置为false。

随后将会初始化cuda相关内存,如存储搜索关键字,存储搜索过滤条件等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void init_cuda_search_memory()
{
gpuErrchk(cudaMalloc(reinterpret_cast<void**>(&dev_search_case), sizeof(int)), true, nullptr);
gpuErrchk(cudaMalloc(reinterpret_cast<void**>(&dev_search_text), MAX_PATH_LENGTH * sizeof(char)), true, nullptr);
gpuErrchk(cudaMalloc(reinterpret_cast<void**>(&dev_keywords_length), sizeof(size_t)), true, nullptr);
gpuErrchk(cudaMalloc(reinterpret_cast<void**>(&dev_is_keyword_path), sizeof(bool) * MAX_KEYWORDS_NUMBER), true,
nullptr);
gpuErrchk(cudaMalloc(reinterpret_cast<void**>(&dev_is_ignore_case), sizeof(bool)), true, nullptr);
gpuErrchk(
cudaMalloc(reinterpret_cast<void**>(&dev_keywords), static_cast<size_t>(MAX_PATH_LENGTH * MAX_KEYWORDS_NUMBER)),
true, nullptr);
gpuErrchk(
cudaMalloc(reinterpret_cast<void**>(&dev_keywords_lower_case), static_cast<size_t>(MAX_PATH_LENGTH *
MAX_KEYWORDS_NUMBER)), true, nullptr);
}

最后初始化字符串转换器,字符串转换器可以在CPU中实现字符串从UTF-8编码转换到GB2312编码,然后实现从中文字符串转换到拼音,实现拼音搜索。

1
2
3
4
5
6
7
void init_str_convert()
{
init_gbk2_utf16_2();
init_gbk2_utf16_3();
// init_gbk2_utf16();
init_utf162_gbk();
}

初始化完成后将会获取jvm指针,用于在match方法开启cuda核函数后,使用多个线程收集处理结果,并将结果返回到Java虚拟机中。通过std::thread开启线程后用jvm指针使线程绑定到Java虚拟机从而调用Java方法。

最后初始化gpu_name_adapter,这是Windows中的directX API,在该项目中主要用于实现显存的监控,因此在此暂时不讨论。

到此初始化完成。

添加缓存步骤

接下来谈一下initCache方法初始化缓存。

首先看一下缓存的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* \brief 存储数据结构
* remain_blank_num:当前dev_cache_str有多少空闲空间
* record_num:当前有多少个record
* record_hash:每个record的hash,用于判断重复
*/
using cache_data = struct cache_data
{
char* dev_strs = nullptr;
size_t* dev_str_addr = nullptr;
size_t* str_length = nullptr;
std::atomic_uint64_t remain_blank_num;
std::atomic_uint64_t record_num;
std::mutex lock;
concurrency::concurrent_unordered_set<size_t> record_hash;
};

/**
* \brief 缓存struct
* str_data:数据struct
* dev_output:字符串匹配后输出位置,下标与cache_data中一一对应,dev_output中数据为1代表匹配成功
* is_cache_valid:数据是否有效
* is_match_done:是否匹配全部完成
* is_output_done:是否已经存入容器 0 代表没有开始 1 代表正在收集 2代表完成
*/
using list_cache = struct cache_struct
{
cache_data str_data;
char* dev_output = nullptr;
size_t dev_output_bytes = 0;
bool is_cache_valid = false;
std::atomic_bool is_match_done;
std::atomic_int is_output_done;
unsigned matched_number = 0;
};

缓存的添加首先会调用record_supplier方法,获取每一个记录,将他们全部读入std::vector中,然后将缓存保存进入显存。

CUDA和OpenCL保存的实现方式略有不同,由于OpenCL中不允许使用size_t,因此无法实现在GPU核函数中对显存寻址。所以OpenCL中每一个字符串的都是定长,长度被定义在constant.h中的MAX_PATH_LENGTH宏中。

CUDA版本的保存方式会更加省显存空间。保存方式如下:
在将所有记录读出后,将会记录总共需要的字节数,然后分配三块内存(即cache_data中的dev_strs,dev_str_addr,str_length)
其中两块在显存中,一块为保存字符串的空间,另一块保存每一个字符串在第一块显存中的偏移量。第三块内存为主机内存,保存第一块显存中每个字符串的长度。

这样保存相比OpenCL的定长字符串省下了很多空间,但是由于需要在GPU核函数中进行两次寻址,因此效率相对比较低。
最后初始化标志位,缓存初始化完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/*
* Class: file_engine_dllInterface_gpu_CudaAccelerator
* Method: initCache
* Signature: (Ljava/lang/String;Ljava/util/function/Supplier;)V
*/
JNIEXPORT void JNICALL Java_file_engine_dllInterface_gpu_CudaAccelerator_initCache
(JNIEnv* env, jobject, jstring key_jstring, jobject record_supplier)
{
const jclass supplier_class = env->GetObjectClass(record_supplier);
const jmethodID get_function = env->GetMethodID(supplier_class, "get", "()Ljava/lang/Object;");
std::vector<std::string> records_vec;
unsigned record_count = 0;
size_t total_bytes = 0;
while (true)
{
const jobject record_from_supplier = env->CallObjectMethod(record_supplier, get_function);
if (record_from_supplier == nullptr)
{
break;
}
const auto jstring_val = reinterpret_cast<jstring>(record_from_supplier);
const auto record = env->GetStringUTFChars(jstring_val, nullptr);
if (const auto record_len = strlen(record); record_len < MAX_PATH_LENGTH)
{
records_vec.emplace_back(record);
total_bytes += record_len;
++total_bytes; // 每个字符串结尾 '\0'
++record_count;
}
env->ReleaseStringUTFChars(jstring_val, record);
env->DeleteLocalRef(record_from_supplier);
}
const auto _key = env->GetStringUTFChars(key_jstring, nullptr);
std::string key(_key);
auto cache = new list_cache;
cache->str_data.record_num = record_count;
cache->str_data.remain_blank_num = MAX_RECORD_ADD_COUNT;

const size_t total_results_size = static_cast<size_t>(record_count) + MAX_RECORD_ADD_COUNT;

const auto alloc_bytes = total_bytes + MAX_RECORD_ADD_COUNT * MAX_PATH_LENGTH;
gpuErrchk(cudaMalloc(reinterpret_cast<void**>(&cache->str_data.dev_strs), alloc_bytes), true,
get_cache_info(key, cache).c_str());
gpuErrchk(cudaMemset(cache->str_data.dev_strs, 0, alloc_bytes), true, nullptr);

gpuErrchk(cudaMalloc(reinterpret_cast<void**>(&cache->str_data.dev_str_addr), total_results_size * sizeof(size_t)),
true, nullptr);
gpuErrchk(cudaMemset(cache->str_data.dev_str_addr, 0, total_results_size * sizeof(size_t)), true, nullptr);

cache->str_data.str_length = new size_t[total_results_size];

gpuErrchk(cudaMalloc(reinterpret_cast<void**>(&cache->dev_output), total_results_size), true,
get_cache_info(key, cache).c_str());
cache->dev_output_bytes = total_results_size;
cache->is_cache_valid = true;
cache->is_match_done = false;
cache->is_output_done = 0;

cudaStream_t stream;
gpuErrchk(cudaStreamCreate(&stream), true, nullptr);
auto target_addr = cache->str_data.dev_strs;
auto save_str_addr_ptr = cache->str_data.dev_str_addr;
unsigned i = 0;
for (const std::string& record : records_vec)
{
const auto record_length = record.length();
gpuErrchk(cudaMemcpyAsync(target_addr, record.c_str(), record_length, cudaMemcpyHostToDevice, stream), true,
nullptr);
const auto str_address = reinterpret_cast<size_t>(target_addr);
//保存字符串在显存上的地址
gpuErrchk(cudaMemcpyAsync(save_str_addr_ptr, &str_address, sizeof(size_t), cudaMemcpyHostToDevice, stream),
true, nullptr);
cache->str_data.str_length[i] = record_length;
target_addr += record_length;
++target_addr;
++save_str_addr_ptr;
++i;
cache->str_data.record_hash.insert(hasher(record));
}
gpuErrchk(cudaStreamSynchronize(stream), true, nullptr);
gpuErrchk(cudaStreamDestroy(stream), true, nullptr);
cache_map.insert(std::make_pair(key, cache));
env->ReleaseStringUTFChars(key_jstring, _key);
env->DeleteLocalRef(supplier_class);
}

字符串匹配及收集

再来谈一下match方法,该方法实现对缓存中数据的字符串匹配,并将匹配后的结果保存进入Java容器中

首先等待清理缓存完成,防止在搜索时缓存被清除导致程序崩溃。然后生成搜索关键字和搜索过滤条件,随后打开多个线程收集GPU匹配结果,最后开启GPU核函数进行匹配,等待所有收集线程退出,即搜索完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
/*
* Class: file_engine_dllInterface_gpu_CudaAccelerator
* Method: match
* Signature: ([Ljava/lang/String;ZLjava/lang/String;[Ljava/lang/String;[Ljava/lang/String;[ZILjava/util/function/BiConsumer;)V
*/
JNIEXPORT void JNICALL Java_file_engine_dllInterface_gpu_CudaAccelerator_match
(JNIEnv* env, jobject, jobjectArray search_case, jboolean is_ignore_case, jstring search_text,
jobjectArray keywords, jobjectArray keywords_lower, jbooleanArray is_keyword_path, jint max_results,
jobject result_collector)
{
if (cache_map.empty())
{
return;
}
wait_for_clear_cache();
std::lock_guard lock_guard(modify_cache_lock);
//生成搜索条件 search_case_vec
std::vector<std::string> search_case_vec;
if (search_case != nullptr)
{
generate_search_case(env, search_case_vec, search_case);
}
//生成搜索关键字 keywords_vec keywords_lower_vec is_keyword_path_ptr
std::vector<std::string> keywords_vec;
std::vector<std::string> keywords_lower_vec;
const auto keywords_length = env->GetArrayLength(keywords);
if (keywords_length > MAX_KEYWORDS_NUMBER)
{
fprintf(stderr, "too many keywords.\n");
return;
}
bool is_keyword_path_ptr[MAX_KEYWORDS_NUMBER]{ false };
const auto is_keyword_path_ptr_bool_array = env->GetBooleanArrayElements(is_keyword_path, nullptr);
for (jsize i = 0; i < keywords_length; ++i)
{
auto tmp_keywords_str = reinterpret_cast<jstring>(env->GetObjectArrayElement(keywords, i));
auto keywords_chars = env->GetStringUTFChars(tmp_keywords_str, nullptr);
#ifdef DEBUG_OUTPUT
std::cout << "keywords: " << keywords_chars << std::endl;
#endif
keywords_vec.emplace_back(keywords_chars);
env->ReleaseStringUTFChars(tmp_keywords_str, keywords_chars);
env->DeleteLocalRef(tmp_keywords_str);

tmp_keywords_str = reinterpret_cast<jstring>(env->GetObjectArrayElement(keywords_lower, i));
keywords_chars = env->GetStringUTFChars(tmp_keywords_str, nullptr);
keywords_lower_vec.emplace_back(keywords_chars);
env->ReleaseStringUTFChars(tmp_keywords_str, keywords_chars);
env->DeleteLocalRef(tmp_keywords_str);

#ifdef DEBUG_OUTPUT
std::cout << "is keyword path: " << static_cast<bool>(is_keyword_path_ptr_bool_array[i]) << std::endl;
#endif
is_keyword_path_ptr[i] = is_keyword_path_ptr_bool_array[i];
}
env->ReleaseBooleanArrayElements(is_keyword_path, is_keyword_path_ptr_bool_array, JNI_ABORT);
//复制全字匹配字符串 search_text
const auto search_text_chars = env->GetStringUTFChars(search_text, nullptr);
std::atomic_uint result_counter = 0;
std::vector<std::thread> collect_threads_vec;
collect_threads_vec.reserve(COLLECT_RESULTS_THREADS);
for (int i = 0; i < COLLECT_RESULTS_THREADS; ++i)
{
collect_threads_vec.emplace_back([&]
{
JNIEnv* thread_env = nullptr;
JavaVMAttachArgs args{ JNI_VERSION_10, nullptr, nullptr };
if (jvm->AttachCurrentThread(reinterpret_cast<void**>(&thread_env), &args) != JNI_OK)
{
fprintf(stderr, "get thread JNIEnv ptr failed");
return;
}
collect_results(thread_env, result_collector, result_counter, max_results, search_case_vec);
jvm->DetachCurrentThread();
});
}
//GPU并行计算
start_kernel(cache_map, search_case_vec, is_ignore_case, search_text_chars,
keywords_vec, keywords_lower_vec, is_keyword_path_ptr);
collect_results(env, result_collector, result_counter, max_results, search_case_vec);
for (auto&& each_thread : collect_threads_vec)
{
if (each_thread.joinable())
{
each_thread.join();
}
}
for (auto& [_, cache_val] : cache_map)
{
if (cache_val->is_output_done.load() != 2)
{
cache_val->is_output_done = 2;
}
}
env->ReleaseStringUTFChars(search_text, search_text_chars);
}

GPU核函数

首先通过核函数线程id获取对应的字符串,然后对字符串进行匹配,如果匹配完成将output设置为1(即缓存结构体中的dev_output),如果匹配失败则设置为0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* TODO 核函数并未对参数做检查,所以如果数据库中包含不是文件路径的记录将会导致崩溃。
* 如 D: C: 这样的记录将会导致核函数失败
*/
__global__ void check(const size_t* str_address_records,
const size_t* total_num,
const int* search_case,
const bool* is_ignore_case,
char* search_text,
char* keywords,
char* keywords_lower_case,
const size_t* keywords_length,
const bool* is_keyword_path,
char* output,
const bool* is_stop_collect_var)
{
const size_t thread_id = GET_TID();
if (thread_id >= *total_num)
{
return;
}
if (*is_stop_collect_var)
{
output[thread_id] = 0;
return;
}
const auto path = reinterpret_cast<const char*>(str_address_records[thread_id]);
if (path == nullptr || !path[0])
{
output[thread_id] = 0;
return;
}
if (not_matched(path, *is_ignore_case, keywords, keywords_lower_case, static_cast<int>(*keywords_length),
is_keyword_path))
{
output[thread_id] = 0;
return;
}
if (*search_case == 0)
{
output[thread_id] = 1;
return;
}
if (*search_case & 4)
{
// 全字匹配
strlwr_cuda(search_text);
char file_name[MAX_PATH_LENGTH]{ 0 };
get_file_name(path, file_name);
strlwr_cuda(file_name);
if (strcmp_cuda(search_text, file_name) != 0)
{
output[thread_id] = 0;
return;
}
}
output[thread_id] = 1;
}

在not_matched()方法中,将会对字符串进行匹配,并尝试进行拼音匹配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
__device__ bool not_matched(const char* path,
const bool is_ignore_case,
char* keywords,
char* keywords_lower_case,
const int keywords_length,
const bool* is_keyword_path)
{
for (int i = 0; i < keywords_length; ++i)
{
const bool is_keyword_path_val = is_keyword_path[i];
char match_str[MAX_PATH_LENGTH]{ 0 };
if (is_keyword_path_val)
{
get_parent_path(path, match_str);
}
else
{
get_file_name(path, match_str);
}
char* each_keyword;
if (is_ignore_case)
{
each_keyword = keywords_lower_case + i * static_cast<unsigned long long>(MAX_PATH_LENGTH);
strlwr_cuda(match_str);
}
else
{
each_keyword = keywords + i * static_cast<unsigned long long>(MAX_PATH_LENGTH);
}
if (!each_keyword[0])
{
continue;
}
if (strstr_cuda(match_str, each_keyword) == nullptr)
{
if (is_keyword_path_val || !is_str_contains_chinese(match_str))
{
return true;
}
char gbk_buffer[MAX_PATH_LENGTH * 2]{ 0 };
char* gbk_buffer_ptr = gbk_buffer;
// utf-8编码转换gbk
utf8_to_gbk(match_str, static_cast<unsigned>(strlen_cuda(match_str)), &gbk_buffer_ptr, nullptr);
char converted_pinyin[MAX_PATH_LENGTH * 6]{ 0 };
char converted_pinyin_initials[MAX_PATH_LENGTH]{ 0 };
convert_to_pinyin(gbk_buffer, converted_pinyin, converted_pinyin_initials);
if (strstr_cuda(converted_pinyin, each_keyword) == nullptr &&
strstr_cuda(converted_pinyin_initials, each_keyword) == nullptr)
{
return true;
}
}
}
return false;
}

结果收集

每个缓存中拥有一个is_output_done字段,初始为0,每个线程将会通过cas尝试将缓存的is_output_done设置为1,表示正在收集,当收集完成is_output_done会被设置为2表示收集完成。

当线程抢到某个缓存的收集权后将会开始进行收集。
GPU核函数匹配成功后将会把dev_output中对应的标志设置为1,通过dev_output可以拿到对应的字符串地址,然后取出字符串。

由于GPU核函数无法访问操作系统函数,因此在这里还需要过滤是否为文件夹,或是否为文件,过滤完成后将会调用_collect_func()调用match()方法的回调方法,将结果存入Java容器中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
void collect_results(JNIEnv* thread_env, jobject result_collector, std::atomic_uint& result_counter,
const unsigned max_results, const std::vector<std::string>& search_case_vec)
{
const jclass biconsumer_class = thread_env->GetObjectClass(result_collector);
const jmethodID collector = thread_env->GetMethodID(biconsumer_class, "accept",
"(Ljava/lang/Object;Ljava/lang/Object;)V");
bool all_complete;
const auto stop_func = [&]
{
return is_stop() || result_counter.load() >= max_results;
};
auto _collect_func = [&](const std::string& _key, char _matched_record_str[MAX_PATH_LENGTH],
unsigned* matched_number)
{
if (++result_counter >= max_results)
{
is_results_number_exceed = true;
}
auto record_jstring = thread_env->NewStringUTF(_matched_record_str);
auto key_jstring = thread_env->NewStringUTF(_key.c_str());
thread_env->CallVoidMethod(result_collector, collector, key_jstring, record_jstring);
thread_env->DeleteLocalRef(record_jstring);
thread_env->DeleteLocalRef(key_jstring);
++* matched_number;
};
do
{
//尝试退出
all_complete = true;
for (const auto& [key, val] : cache_map)
{
if (stop_func())
{
break;
}
if (!val->is_cache_valid)
{
continue;
}
if (!val->is_match_done.load())
{
//发现仍然有结果未计算完,设置退出标志为false,跳到下一个计算结果
all_complete = false;
continue;
}
if (int expected = 0; !val->is_output_done.compare_exchange_strong(expected, 1))
{
continue;
}
unsigned matched_number = 0;
//复制结果数组到host,dev_output下标对应dev_cache中的下标,若dev_output[i]中的值为1,则对应dev_cache[i]字符串匹配成功
const auto output_ptr = new char[val->dev_output_bytes];
//将dev_output拷贝到output_ptr
gpuErrchk(cudaMemcpy(output_ptr, val->dev_output, val->str_data.record_num, cudaMemcpyDeviceToHost), false,
"collect results failed");
for (size_t i = 0; i < val->str_data.record_num.load(); ++i)
{
if (stop_func())
{
break;
}
//dev_cache[i]字符串匹配成功
if (static_cast<bool>(output_ptr[i]))
{
char matched_record_str[MAX_PATH_LENGTH]{ 0 };
char* str_address;
gpuErrchk(
cudaMemcpy(&str_address, val->str_data.dev_str_addr + i, sizeof(size_t), cudaMemcpyDeviceToHost
), false, nullptr);
//拷贝GPU中的字符串到host
gpuErrchk(cudaMemcpy(matched_record_str, str_address, val->str_data.str_length[i],
cudaMemcpyDeviceToHost), false, "collect results failed");
// 判断文件和文件夹
if (search_case_vec.empty())
{
if (is_file_exist(matched_record_str))
{
_collect_func(key, matched_record_str, &matched_number);
}
}
else
{
if (std::find(search_case_vec.begin(), search_case_vec.end(), "f") != search_case_vec.end())
{
if (is_dir_or_file(matched_record_str) == 1)
{
_collect_func(key, matched_record_str, &matched_number);
}
}
else if (std::find(search_case_vec.begin(), search_case_vec.end(), "d") != search_case_vec.
end())
{
if (is_dir_or_file(matched_record_str) == 0)
{
_collect_func(key, matched_record_str, &matched_number);
}
}
else
{
if (is_file_exist(matched_record_str))
{
_collect_func(key, matched_record_str, &matched_number);
}
}
}
}
}
val->matched_number = matched_number;
val->is_output_done = 2;
delete[] output_ptr;
}
} while (!all_complete && !stop_func());
thread_env->DeleteLocalRef(biconsumer_class);
}

至此,GPU加速的核心方法基本介绍完成。OpenCL的版本除了字符串存储方式不同,其他方法几乎相同,只是使用了不同的框架进行实现,因此不再赘述。