开发ElasticSearch插件就是这么简单
目前es应用非常广泛,作为搜索、数据分析引擎,功能非常强大。其中插件功能是非常重要的一块,本文以自己开发的插件为切入点,简单阐述es插件原理及如何开发。
背景:
前几年工作中用es来作为站内app的搜索,顺便开发了一个动态更新同义词的插件: elasticsearch-analysis-dynamic-synonym, es本身是支持同义词配置(Synonym token filter),但是不允许动态更新同义词词库,每次更新后需要重启每个es节点,比较麻烦。注意:阅读本文需要对es有一定的了解。
插件类型:
站点插件(site plugins):
es把站点插件当成被内置的HTTP服务器处理的文件集,处于/_plugin/plugin_name/URL下面。站点插件可以图形化的界面查看、监控并管理es集群、索引等信息。 elasticsearch-head 和cerebro 就是非常好用的站点插件。 不过es5.0之后就不支持站点插件,主要是因为es并不是设计成web server,还有就是5.0之后es运行在Java Security Manager之下。现在所有的站点插件一般都是外置web server,然后访问es提供的restful接口来获取相关信息的。
具体参考这篇博客: running-site-plugins-with-elasticsearch-5-0
java插件:
java插件的种类非常丰富,包括分词插件(Analysis Plugins)、集群发现插件(Discovery Plugins)、脚本插件(Script Plugins)等。当我们需要某个功能的时候可以先看看官方,或第三方是否已经有相应的插件提供改功能。如果没有可以尝试自己开发相应的插件。一般接触比较多的就是分词插件,比如:中文分词插件:elasticsearch-analysis-ik、中文拼音插件:elasticsearch-analysis-pinyin 。
java插件主要是包含JAR文件,以及相关的脚本和配置文件的zip包。如果要安装一个java插件,集群中的每个节点(node)都必须安装,安装成功之后需要重启每个节点。
其中zip包中至少要包含插件描述文件plugin-descirptor.properties
和插件相关代码jar包。plugin-descriptor.properties
中最关键的两个配置是elasticsearch.version
和classname
。其中elasticsearch.version声明了插件兼容的es版本,这个版本必须与es版本一致,即当es每发布一个release版本的时候,你所开发的插件也需要发布同一个版本号的release版本,当es加载插件的时候会检查版本号是否跟es版本一致。classname是插件的入口类,要求完整路径。下图是一个插件的结构:
动态同义词插件
源码:elasticsearch-analysis-dynamic-synonym
解决的问题:
es提供了同义词过滤器的默认实现 SynonymFilter,以及在新版本中的SynonymGraphFilter。但是这两者不能动态的去更新同义词词库,每次修改之后需要重启每个Node才能生效,所以开发了这个插件。
elasticsearch-analysis-dynamic-synonym 项目代码结构如下:
其中:
DynamicSynonymPlugin类是插件入口类,继承了Plugin类、并实现了AnalysisPlugin接口。
plugin-descriptor.properties是插件描述文件。
plugin-security.policy是安全策略文件,声明插件会用到的java安全项。因为该插件允许配置远程同义词词库文件,需要配置网络权限。
plugin.xml是maven打包配置文件,es要求插件需要打包成zip文件,项目中用的是maven,所以用了maven assembly插件。
elasticsearch-analysis-dynamic-synonym 实现原理:
主要是在es自带synonym token filter的基础上,实现定时更新同义词词库功能,支持远程和本地同义词词库。
{
"index" : {
"analysis" : {
"analyzer" : {
"synonym" : {
"tokenizer" : "whitespace",
"filter" : ["remote_synonym"]
}
},
"filter" : {
"remote_synonym" : {
"type" : "dynamic_synonym",
"synonyms_path" : "http://host:port/synonym.txt",
"interval": 30
},
"local_synonym" : {
"type" : "dynamic_synonym",
"synonyms_path" : "synonym.txt"
}
}
}
}
}
远程同义词词库文件更新机制:synonyms_path是个url,该 http 请求需要返回两个头部(header),一个是 Last-Modified
,一个是 ETag
,这两者都是字符串类型,只要有一个发生变化,该插件就会去抓取新的同义词进而更新。
本地同义词词库文件更新机制:synonyms_path是一个相对es config目录的相对路径,会检查该文件的修改时间,如果有修改则会读取新的同义词进而更新。
核心更新代码如下:
// DynamicSynonymTokenFilterFactory
/**
* Static id generator
*/
private static final AtomicInteger id = new AtomicInteger(1);
private static final ScheduledExecutorService pool = Executors.newScheduledThreadPool(1, r -> {
Thread thread = new Thread(r);
thread.setName("monitor-synonym-Thread-" + id.getAndAdd(1));
return thread;
});
private volatile ScheduledFuture<?> scheduledFuture;
SynonymFile getSynonymFile(Analyzer analyzer) {
try {
SynonymFile synonymFile;
if (location.startsWith("http://") || location.startsWith("https://")) { // 远程文件
synonymFile = new RemoteSynonymFile(
environment, analyzer, expand, lenient, format, location);
} else { // 本地文件
synonymFile = new LocalSynonymFile(
environment, analyzer, expand, lenient, format, location);
}
if (scheduledFuture == null) {
scheduledFuture = pool.scheduleAtFixedRate(new Monitor(synonymFile),
interval, interval, TimeUnit.SECONDS); // 启动定时器定时更新同义词
}
return synonymFile;
} catch (Exception e) {
logger.error("failed to get synonyms: " + location, e);
throw new IllegalArgumentException("failed to get synonyms : " + location, e);
}
}
public class Monitor implements Runnable {
private SynonymFile synonymFile;
Monitor(SynonymFile synonymFile) {
this.synonymFile = synonymFile;
}
@Override
public void run() {
if (synonymFile.isNeedReloadSynonymMap()) { // 判断同义词词库文件是否有更新
synonymMap = synonymFile.reloadSynonymMap(); // 更新同义词表
for (AbsSynonymFilter dynamicSynonymFilter : dynamicSynonymFilters.keySet()) {
dynamicSynonymFilter.update(synonymMap);
logger.info("success reload synonym");
}
}
}
}
可以从代码中看到,就是启动了一个定时线程池,定时去更新同义词词库。安装插件之后,启动es可以看到插件是否被加载:
可以看到官方自带的插件在内部称为module:The builtin modules, which are plugins, but cannot be installed or removed。加载的这些module对应es的modules目录下面的module。
插件实现机制:
-
自己实现插件的话,入口类要继承插件抽象基类,并实现特定类型插件接口:
-
es插件机制主要通过自定义
ClassLoader
来实现。插件的目标就是利用类加载器实现类隔离:各种插件之间的类隔离、es应用和插件之间的类隔离。插件的加载主要是通过PluginsService类,和自定义类加载器ExtendedPluginsClassLoader来实现的,ExtendedPluginsClassLoader直接继承ClassLoader,并重写了loadClass。 -
plugins目录下的插件是被PluginsService类加载,加载时机是启动节点Node实例化的时候。
// Node.java, Node构造方法里面
this.pluginsService = new PluginsService(tmpSettings, initialEnvironment.configFile(), initialEnvironment.modulesFile(), initialEnvironment.pluginsFile(), classpathPlugins);
final Settings settings = pluginsService.updatedSettings();
PluginsService构造方法
public PluginsService(
Settings settings, // es节点启动的配置
Path configPath, // es的config目录
Path modulesDirectory, // es的modules目录, 官方自带的
Path pluginsDirectory, // es的plugins目录,第三方插件所在的目录
Collection<Class<? extends Plugin>> classpathPlugins // classpath路径中的插件
)
构造方法里面具体执行流程:
- 初始化并加载classpathPlugins中的插件
- 初始化modules目录下面的module,并分别封装成Bundle对象,放到seenBundles集合中。
- 初始化plugins目录下面的plugin,同样分别封装成Bundle对象,放到seenBundles集合中。
- 统一加载seenBundles中的插件,加载成功的插件会放到pluginsLoaded
- 最后检查必须加载的插件是否被加载。