在File-Engine中为了加速文件搜索的速度,加入了GPU加速的功能。通过显卡来进行并行运算,大幅提高搜索的速度。
由于显卡可以进行大量的并行运算,而每一个文件都是独立的,进行字符串匹配并不会相互干扰,因此GPU非常适合用来加速搜索。
具体的实现方法如下。
定义GPU加速接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
| package file.engine.dllInterface.gpu;
import java.util.function.BiConsumer; import java.util.function.Supplier;
public interface IGPUAccelerator {
void resetAllResultStatus();
void match(String[] searchCase, boolean isIgnoreCase, String searchText, String[] keywords, String[] keywordsLowerCase, boolean[] isKeywordPath, int maxResultNumber, BiConsumer<String, String> resultCollector);
boolean isGPUAvailableOnSystem();
boolean isMatchDone(String key);
int matchedNumber(String key);
void stopCollectResults();
boolean hasCache();
boolean isCacheExist(String key);
void initCache(String key, Supplier<String> recordSupplier);
void addRecordsToCache(String key, Object[] records);
void removeRecordsFromCache(String key, Object[] records);
void clearCache(String key);
void clearAllCache();
boolean isCacheValid(String key);
int getGPUMemUsage();
void initialize();
void release();
String[] getDevices();
boolean setDevice(int deviceNum); }
|
该接口定义了GPU加速的基本方法,以后如果有新的GPU计算框架可以用于加速,只需要重新实现这个接口即可。
目前,该接口有两个实现,分别使用CUDA和OpenCL来进行加速。然后采用一个统一的包装类GPUAccelerator进行管理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
| package file.engine.dllInterface.gpu;
import file.engine.configs.AllConfigs; import file.engine.event.handler.EventManagement; import file.engine.event.handler.impl.stop.RestartEvent; import file.engine.utils.RegexUtil;
import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Supplier;
public enum GPUAccelerator { INSTANCE; private static IGPUAccelerator gpuAccelerator; private static final CudaAccelerator cudaAccelerator = CudaAccelerator.INSTANCE; private static final OpenclAccelerator openclAccelerator = OpenclAccelerator.INSTANCE;
record IsEnabledWrapper(boolean isEnableGpuAccelerate) { private static volatile IsEnabledWrapper instance;
public static IsEnabledWrapper getInstance() { if (instance == null) { synchronized (IsEnabledWrapper.class) { if (instance == null) { instance = new IsEnabledWrapper(AllConfigs.getInstance().getConfigEntity().isEnableGpuAccelerate()); } } } return instance; } }
enum Category { CUDA("cuda"), OPENCL("opencl"); final String category;
Category(String category) { this.category = category; }
@Override public String toString() { return this.category; }
static Category categoryFromString(String c) { return switch (c) { case "cuda" -> CUDA; case "opencl" -> OPENCL; default -> null; }; } }
...... GPU加速方法调用
public Map<String, String> getDevices() { LinkedHashMap<String, String> deviceMap = new LinkedHashMap<>(); getDeviceToMap(cudaAccelerator, deviceMap, Category.CUDA); getDeviceToMap(openclAccelerator, deviceMap, Category.OPENCL); return deviceMap; }
private void getDeviceToMap(IGPUAccelerator igpuAccelerator, HashMap<String, String> deviceMap, Category category) { if (igpuAccelerator.isGPUAvailableOnSystem()) { var devices = igpuAccelerator.getDevices(); if (devices == null || devices.length == 0) { return; } for (int i = 0; i < devices.length; ++i) { var deviceName = devices[i]; if (deviceName.isBlank()) { continue; } try { if (!deviceMap.containsKey(deviceName)) { deviceMap.put(deviceName, category + ";" + i); } } catch (Exception e) { e.printStackTrace(); } } } }
public boolean setDevice(String deviceCategoryAndId) { if (gpuAccelerator != null) { return true; } if (!IsEnabledWrapper.getInstance().isEnableGpuAccelerate()) { return false; } if (deviceCategoryAndId.isEmpty()) { if (cudaAccelerator.isGPUAvailableOnSystem()) { cudaAccelerator.initialize(); if (cudaAccelerator.setDevice(0)) { gpuAccelerator = cudaAccelerator; return true; } } if (openclAccelerator.isGPUAvailableOnSystem()) { openclAccelerator.initialize(); if (openclAccelerator.setDevice(0)) { gpuAccelerator = openclAccelerator; return true; } } return false; } String[] info = RegexUtil.semicolon.split(deviceCategoryAndId); String deviceCategory = info[0]; int id = Integer.parseInt(info[1]); var category = Category.categoryFromString(deviceCategory); if (category != null) { switch (category) { case CUDA: if (cudaAccelerator.isGPUAvailableOnSystem()) { cudaAccelerator.initialize(); if (cudaAccelerator.setDevice(id)) { gpuAccelerator = cudaAccelerator; return true; } } case OPENCL: if (openclAccelerator.isGPUAvailableOnSystem()) { openclAccelerator.initialize(); if (openclAccelerator.setDevice(id)) { gpuAccelerator = openclAccelerator; return true; } } } } return false; }
@SuppressWarnings("unused") public static void sendRestartOnError0() { System.err.println("GPU缓存出错,自动重启"); EventManagement.getInstance().putEvent(new RestartEvent()); } }
|
下面介绍CUDA加速。
CUDA加速首先会加载cudaAccelerator.dll,如果加载成功,将会设置isCudaLoaded为true。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| enum CudaAccelerator implements IGPUAccelerator { INSTANCE;
private static boolean isCudaLoaded;
static { try { System.load(Path.of("user/cudaAccelerator.dll").toAbsolutePath().toString()); isCudaLoaded = true; } catch (UnsatisfiedLinkError | Exception e) { e.printStackTrace(); isCudaLoaded = false; } } }
|
初始化过程
加载完成后,将会调用setDevice(int deviceId)来选择GPU并进行初始化。
首先会调用initialize()方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
|
JNIEXPORT void JNICALL Java_file_engine_dllInterface_gpu_CudaAccelerator_initialize (JNIEnv* env, jobject) { init_stop_signal(); init_cuda_search_memory(); init_str_convert(); set_using_device(current_using_device); if (env->GetJavaVM(&jvm) != JNI_OK) { env->ThrowNew(env->FindClass("java/lang/Exception"), "get JavaVM ptr failed."); return; } set_jvm_ptr_in_kernel(jvm); if (CreateDXGIFactory(__uuidof(IDXGIFactory), reinterpret_cast<void**>(&p_dxgi_factory)) != S_OK) { env->ThrowNew(env->FindClass("java/lang/Exception"), "create dxgi factory failed."); } IDXGIAdapter* p_adapter = nullptr; for (UINT i = 0; p_dxgi_factory->EnumAdapters(i, &p_adapter) != DXGI_ERROR_NOT_FOUND; ++i) { DXGI_ADAPTER_DESC adapter_desc; p_adapter->GetDesc(&adapter_desc); gpu_name_adapter_map.insert(std::make_pair(adapter_desc.Description, adapter_desc)); } }
|
首先初始化停止搜索信号,当结果数量达到maxResults限制,停止信号将会被设置为true,在resetAllResultStatus()方法将会把停止信号重置为false。
随后将会初始化cuda相关内存,如存储搜索关键字,存储搜索过滤条件等。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| void init_cuda_search_memory() { gpuErrchk(cudaMalloc(reinterpret_cast<void**>(&dev_search_case), sizeof(int)), true, nullptr); gpuErrchk(cudaMalloc(reinterpret_cast<void**>(&dev_search_text), MAX_PATH_LENGTH * sizeof(char)), true, nullptr); gpuErrchk(cudaMalloc(reinterpret_cast<void**>(&dev_keywords_length), sizeof(size_t)), true, nullptr); gpuErrchk(cudaMalloc(reinterpret_cast<void**>(&dev_is_keyword_path), sizeof(bool) * MAX_KEYWORDS_NUMBER), true, nullptr); gpuErrchk(cudaMalloc(reinterpret_cast<void**>(&dev_is_ignore_case), sizeof(bool)), true, nullptr); gpuErrchk( cudaMalloc(reinterpret_cast<void**>(&dev_keywords), static_cast<size_t>(MAX_PATH_LENGTH * MAX_KEYWORDS_NUMBER)), true, nullptr); gpuErrchk( cudaMalloc(reinterpret_cast<void**>(&dev_keywords_lower_case), static_cast<size_t>(MAX_PATH_LENGTH * MAX_KEYWORDS_NUMBER)), true, nullptr); }
|
最后初始化字符串转换器,字符串转换器可以在CPU中实现字符串从UTF-8编码转换到GB2312编码,然后实现从中文字符串转换到拼音,实现拼音搜索。
1 2 3 4 5 6 7
| void init_str_convert() { init_gbk2_utf16_2(); init_gbk2_utf16_3(); init_utf162_gbk(); }
|
初始化完成后将会获取jvm指针,用于在match方法开启cuda核函数后,使用多个线程收集处理结果,并将结果返回到Java虚拟机中。通过std::thread开启线程后用jvm指针使线程绑定到Java虚拟机从而调用Java方法。
最后初始化gpu_name_adapter,这是Windows中的directX API,在该项目中主要用于实现显存的监控,因此在此暂时不讨论。
到此初始化完成。
添加缓存步骤
接下来谈一下initCache方法初始化缓存。
首先看一下缓存的结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
|
using cache_data = struct cache_data { char* dev_strs = nullptr; size_t* dev_str_addr = nullptr; size_t* str_length = nullptr; std::atomic_uint64_t remain_blank_num; std::atomic_uint64_t record_num; std::mutex lock; concurrency::concurrent_unordered_set<size_t> record_hash; };
using list_cache = struct cache_struct { cache_data str_data; char* dev_output = nullptr; size_t dev_output_bytes = 0; bool is_cache_valid = false; std::atomic_bool is_match_done; std::atomic_int is_output_done; unsigned matched_number = 0; };
|
缓存的添加首先会调用record_supplier方法,获取每一个记录,将他们全部读入std::vector中,然后将缓存保存进入显存。
CUDA和OpenCL保存的实现方式略有不同,由于OpenCL中不允许使用size_t,因此无法实现在GPU核函数中对显存寻址。所以OpenCL中每一个字符串的都是定长,长度被定义在constant.h中的MAX_PATH_LENGTH宏中。
CUDA版本的保存方式会更加省显存空间。保存方式如下:
在将所有记录读出后,将会记录总共需要的字节数,然后分配三块内存(即cache_data中的dev_strs,dev_str_addr,str_length)
其中两块在显存中,一块为保存字符串的空间,另一块保存每一个字符串在第一块显存中的偏移量。第三块内存为主机内存,保存第一块显存中每个字符串的长度。
这样保存相比OpenCL的定长字符串省下了很多空间,但是由于需要在GPU核函数中进行两次寻址,因此效率相对比较低。
最后初始化标志位,缓存初始化完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
|
JNIEXPORT void JNICALL Java_file_engine_dllInterface_gpu_CudaAccelerator_initCache (JNIEnv* env, jobject, jstring key_jstring, jobject record_supplier) { const jclass supplier_class = env->GetObjectClass(record_supplier); const jmethodID get_function = env->GetMethodID(supplier_class, "get", "()Ljava/lang/Object;"); std::vector<std::string> records_vec; unsigned record_count = 0; size_t total_bytes = 0; while (true) { const jobject record_from_supplier = env->CallObjectMethod(record_supplier, get_function); if (record_from_supplier == nullptr) { break; } const auto jstring_val = reinterpret_cast<jstring>(record_from_supplier); const auto record = env->GetStringUTFChars(jstring_val, nullptr); if (const auto record_len = strlen(record); record_len < MAX_PATH_LENGTH) { records_vec.emplace_back(record); total_bytes += record_len; ++total_bytes; ++record_count; } env->ReleaseStringUTFChars(jstring_val, record); env->DeleteLocalRef(record_from_supplier); } const auto _key = env->GetStringUTFChars(key_jstring, nullptr); std::string key(_key); auto cache = new list_cache; cache->str_data.record_num = record_count; cache->str_data.remain_blank_num = MAX_RECORD_ADD_COUNT;
const size_t total_results_size = static_cast<size_t>(record_count) + MAX_RECORD_ADD_COUNT;
const auto alloc_bytes = total_bytes + MAX_RECORD_ADD_COUNT * MAX_PATH_LENGTH; gpuErrchk(cudaMalloc(reinterpret_cast<void**>(&cache->str_data.dev_strs), alloc_bytes), true, get_cache_info(key, cache).c_str()); gpuErrchk(cudaMemset(cache->str_data.dev_strs, 0, alloc_bytes), true, nullptr);
gpuErrchk(cudaMalloc(reinterpret_cast<void**>(&cache->str_data.dev_str_addr), total_results_size * sizeof(size_t)), true, nullptr); gpuErrchk(cudaMemset(cache->str_data.dev_str_addr, 0, total_results_size * sizeof(size_t)), true, nullptr);
cache->str_data.str_length = new size_t[total_results_size];
gpuErrchk(cudaMalloc(reinterpret_cast<void**>(&cache->dev_output), total_results_size), true, get_cache_info(key, cache).c_str()); cache->dev_output_bytes = total_results_size; cache->is_cache_valid = true; cache->is_match_done = false; cache->is_output_done = 0;
cudaStream_t stream; gpuErrchk(cudaStreamCreate(&stream), true, nullptr); auto target_addr = cache->str_data.dev_strs; auto save_str_addr_ptr = cache->str_data.dev_str_addr; unsigned i = 0; for (const std::string& record : records_vec) { const auto record_length = record.length(); gpuErrchk(cudaMemcpyAsync(target_addr, record.c_str(), record_length, cudaMemcpyHostToDevice, stream), true, nullptr); const auto str_address = reinterpret_cast<size_t>(target_addr); gpuErrchk(cudaMemcpyAsync(save_str_addr_ptr, &str_address, sizeof(size_t), cudaMemcpyHostToDevice, stream), true, nullptr); cache->str_data.str_length[i] = record_length; target_addr += record_length; ++target_addr; ++save_str_addr_ptr; ++i; cache->str_data.record_hash.insert(hasher(record)); } gpuErrchk(cudaStreamSynchronize(stream), true, nullptr); gpuErrchk(cudaStreamDestroy(stream), true, nullptr); cache_map.insert(std::make_pair(key, cache)); env->ReleaseStringUTFChars(key_jstring, _key); env->DeleteLocalRef(supplier_class); }
|
字符串匹配及收集
再来谈一下match方法,该方法实现对缓存中数据的字符串匹配,并将匹配后的结果保存进入Java容器中
首先等待清理缓存完成,防止在搜索时缓存被清除导致程序崩溃。然后生成搜索关键字和搜索过滤条件,随后打开多个线程收集GPU匹配结果,最后开启GPU核函数进行匹配,等待所有收集线程退出,即搜索完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
|
JNIEXPORT void JNICALL Java_file_engine_dllInterface_gpu_CudaAccelerator_match (JNIEnv* env, jobject, jobjectArray search_case, jboolean is_ignore_case, jstring search_text, jobjectArray keywords, jobjectArray keywords_lower, jbooleanArray is_keyword_path, jint max_results, jobject result_collector) { if (cache_map.empty()) { return; } wait_for_clear_cache(); std::lock_guard lock_guard(modify_cache_lock); std::vector<std::string> search_case_vec; if (search_case != nullptr) { generate_search_case(env, search_case_vec, search_case); } std::vector<std::string> keywords_vec; std::vector<std::string> keywords_lower_vec; const auto keywords_length = env->GetArrayLength(keywords); if (keywords_length > MAX_KEYWORDS_NUMBER) { fprintf(stderr, "too many keywords.\n"); return; } bool is_keyword_path_ptr[MAX_KEYWORDS_NUMBER]{ false }; const auto is_keyword_path_ptr_bool_array = env->GetBooleanArrayElements(is_keyword_path, nullptr); for (jsize i = 0; i < keywords_length; ++i) { auto tmp_keywords_str = reinterpret_cast<jstring>(env->GetObjectArrayElement(keywords, i)); auto keywords_chars = env->GetStringUTFChars(tmp_keywords_str, nullptr); #ifdef DEBUG_OUTPUT std::cout << "keywords: " << keywords_chars << std::endl; #endif keywords_vec.emplace_back(keywords_chars); env->ReleaseStringUTFChars(tmp_keywords_str, keywords_chars); env->DeleteLocalRef(tmp_keywords_str);
tmp_keywords_str = reinterpret_cast<jstring>(env->GetObjectArrayElement(keywords_lower, i)); keywords_chars = env->GetStringUTFChars(tmp_keywords_str, nullptr); keywords_lower_vec.emplace_back(keywords_chars); env->ReleaseStringUTFChars(tmp_keywords_str, keywords_chars); env->DeleteLocalRef(tmp_keywords_str);
#ifdef DEBUG_OUTPUT std::cout << "is keyword path: " << static_cast<bool>(is_keyword_path_ptr_bool_array[i]) << std::endl; #endif is_keyword_path_ptr[i] = is_keyword_path_ptr_bool_array[i]; } env->ReleaseBooleanArrayElements(is_keyword_path, is_keyword_path_ptr_bool_array, JNI_ABORT); const auto search_text_chars = env->GetStringUTFChars(search_text, nullptr); std::atomic_uint result_counter = 0; std::vector<std::thread> collect_threads_vec; collect_threads_vec.reserve(COLLECT_RESULTS_THREADS); for (int i = 0; i < COLLECT_RESULTS_THREADS; ++i) { collect_threads_vec.emplace_back([&] { JNIEnv* thread_env = nullptr; JavaVMAttachArgs args{ JNI_VERSION_10, nullptr, nullptr }; if (jvm->AttachCurrentThread(reinterpret_cast<void**>(&thread_env), &args) != JNI_OK) { fprintf(stderr, "get thread JNIEnv ptr failed"); return; } collect_results(thread_env, result_collector, result_counter, max_results, search_case_vec); jvm->DetachCurrentThread(); }); } start_kernel(cache_map, search_case_vec, is_ignore_case, search_text_chars, keywords_vec, keywords_lower_vec, is_keyword_path_ptr); collect_results(env, result_collector, result_counter, max_results, search_case_vec); for (auto&& each_thread : collect_threads_vec) { if (each_thread.joinable()) { each_thread.join(); } } for (auto& [_, cache_val] : cache_map) { if (cache_val->is_output_done.load() != 2) { cache_val->is_output_done = 2; } } env->ReleaseStringUTFChars(search_text, search_text_chars); }
|
GPU核函数
首先通过核函数线程id获取对应的字符串,然后对字符串进行匹配,如果匹配完成将output设置为1(即缓存结构体中的dev_output),如果匹配失败则设置为0。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
|
__global__ void check(const size_t* str_address_records, const size_t* total_num, const int* search_case, const bool* is_ignore_case, char* search_text, char* keywords, char* keywords_lower_case, const size_t* keywords_length, const bool* is_keyword_path, char* output, const bool* is_stop_collect_var) { const size_t thread_id = GET_TID(); if (thread_id >= *total_num) { return; } if (*is_stop_collect_var) { output[thread_id] = 0; return; } const auto path = reinterpret_cast<const char*>(str_address_records[thread_id]); if (path == nullptr || !path[0]) { output[thread_id] = 0; return; } if (not_matched(path, *is_ignore_case, keywords, keywords_lower_case, static_cast<int>(*keywords_length), is_keyword_path)) { output[thread_id] = 0; return; } if (*search_case == 0) { output[thread_id] = 1; return; } if (*search_case & 4) { strlwr_cuda(search_text); char file_name[MAX_PATH_LENGTH]{ 0 }; get_file_name(path, file_name); strlwr_cuda(file_name); if (strcmp_cuda(search_text, file_name) != 0) { output[thread_id] = 0; return; } } output[thread_id] = 1; }
|
在not_matched()方法中,将会对字符串进行匹配,并尝试进行拼音匹配
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| __device__ bool not_matched(const char* path, const bool is_ignore_case, char* keywords, char* keywords_lower_case, const int keywords_length, const bool* is_keyword_path) { for (int i = 0; i < keywords_length; ++i) { const bool is_keyword_path_val = is_keyword_path[i]; char match_str[MAX_PATH_LENGTH]{ 0 }; if (is_keyword_path_val) { get_parent_path(path, match_str); } else { get_file_name(path, match_str); } char* each_keyword; if (is_ignore_case) { each_keyword = keywords_lower_case + i * static_cast<unsigned long long>(MAX_PATH_LENGTH); strlwr_cuda(match_str); } else { each_keyword = keywords + i * static_cast<unsigned long long>(MAX_PATH_LENGTH); } if (!each_keyword[0]) { continue; } if (strstr_cuda(match_str, each_keyword) == nullptr) { if (is_keyword_path_val || !is_str_contains_chinese(match_str)) { return true; } char gbk_buffer[MAX_PATH_LENGTH * 2]{ 0 }; char* gbk_buffer_ptr = gbk_buffer; utf8_to_gbk(match_str, static_cast<unsigned>(strlen_cuda(match_str)), &gbk_buffer_ptr, nullptr); char converted_pinyin[MAX_PATH_LENGTH * 6]{ 0 }; char converted_pinyin_initials[MAX_PATH_LENGTH]{ 0 }; convert_to_pinyin(gbk_buffer, converted_pinyin, converted_pinyin_initials); if (strstr_cuda(converted_pinyin, each_keyword) == nullptr && strstr_cuda(converted_pinyin_initials, each_keyword) == nullptr) { return true; } } } return false; }
|
结果收集
每个缓存中拥有一个is_output_done字段,初始为0,每个线程将会通过cas尝试将缓存的is_output_done设置为1,表示正在收集,当收集完成is_output_done会被设置为2表示收集完成。
当线程抢到某个缓存的收集权后将会开始进行收集。
GPU核函数匹配成功后将会把dev_output中对应的标志设置为1,通过dev_output可以拿到对应的字符串地址,然后取出字符串。
由于GPU核函数无法访问操作系统函数,因此在这里还需要过滤是否为文件夹,或是否为文件,过滤完成后将会调用_collect_func()调用match()方法的回调方法,将结果存入Java容器中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
| void collect_results(JNIEnv* thread_env, jobject result_collector, std::atomic_uint& result_counter, const unsigned max_results, const std::vector<std::string>& search_case_vec) { const jclass biconsumer_class = thread_env->GetObjectClass(result_collector); const jmethodID collector = thread_env->GetMethodID(biconsumer_class, "accept", "(Ljava/lang/Object;Ljava/lang/Object;)V"); bool all_complete; const auto stop_func = [&] { return is_stop() || result_counter.load() >= max_results; }; auto _collect_func = [&](const std::string& _key, char _matched_record_str[MAX_PATH_LENGTH], unsigned* matched_number) { if (++result_counter >= max_results) { is_results_number_exceed = true; } auto record_jstring = thread_env->NewStringUTF(_matched_record_str); auto key_jstring = thread_env->NewStringUTF(_key.c_str()); thread_env->CallVoidMethod(result_collector, collector, key_jstring, record_jstring); thread_env->DeleteLocalRef(record_jstring); thread_env->DeleteLocalRef(key_jstring); ++* matched_number; }; do { all_complete = true; for (const auto& [key, val] : cache_map) { if (stop_func()) { break; } if (!val->is_cache_valid) { continue; } if (!val->is_match_done.load()) { all_complete = false; continue; } if (int expected = 0; !val->is_output_done.compare_exchange_strong(expected, 1)) { continue; } unsigned matched_number = 0; const auto output_ptr = new char[val->dev_output_bytes]; gpuErrchk(cudaMemcpy(output_ptr, val->dev_output, val->str_data.record_num, cudaMemcpyDeviceToHost), false, "collect results failed"); for (size_t i = 0; i < val->str_data.record_num.load(); ++i) { if (stop_func()) { break; } if (static_cast<bool>(output_ptr[i])) { char matched_record_str[MAX_PATH_LENGTH]{ 0 }; char* str_address; gpuErrchk( cudaMemcpy(&str_address, val->str_data.dev_str_addr + i, sizeof(size_t), cudaMemcpyDeviceToHost ), false, nullptr); gpuErrchk(cudaMemcpy(matched_record_str, str_address, val->str_data.str_length[i], cudaMemcpyDeviceToHost), false, "collect results failed"); if (search_case_vec.empty()) { if (is_file_exist(matched_record_str)) { _collect_func(key, matched_record_str, &matched_number); } } else { if (std::find(search_case_vec.begin(), search_case_vec.end(), "f") != search_case_vec.end()) { if (is_dir_or_file(matched_record_str) == 1) { _collect_func(key, matched_record_str, &matched_number); } } else if (std::find(search_case_vec.begin(), search_case_vec.end(), "d") != search_case_vec. end()) { if (is_dir_or_file(matched_record_str) == 0) { _collect_func(key, matched_record_str, &matched_number); } } else { if (is_file_exist(matched_record_str)) { _collect_func(key, matched_record_str, &matched_number); } } } } } val->matched_number = matched_number; val->is_output_done = 2; delete[] output_ptr; } } while (!all_complete && !stop_func()); thread_env->DeleteLocalRef(biconsumer_class); }
|
至此,GPU加速的核心方法基本介绍完成。OpenCL的版本除了字符串存储方式不同,其他方法几乎相同,只是使用了不同的框架进行实现,因此不再赘述。