暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

ES学习-02节点启动代码流程

小怂读书 2021-10-10
881

ES6.3.2

1、Elasticsearch源代码目录结构

packagedescription
benchmarks该目录包含Elasticsearch的一些基准,基于JMH
client访问ES的客户端API方法,包括REST和Transport
distribution打包工具,打包各类压缩发布包
serverES功能实现核心
plugins插件
x-packES License
buildSrcEs构建相关的代码
modules核心代码外的扩展模块,包括聚合


2、Elasticsearch基本模块


3、ES节点启动代码流程梳理

1)debug模式启动:Elasticsearch.java

/**
* Main entry point for starting elasticsearch
*/
public static void main(final String[] args) throws Exception {

// 注册状态记录器错误的侦听器
LogConfigurator.registerErrorListener();
final Elasticsearch elasticsearch = new Elasticsearch();
// 进入static main方法,调用elasticsearch.main(args, terminal)
int status = main(args, elasticsearch, Terminal.DEFAULT);
if (status != ExitCodes.OK) {
exit(status);
}
}


2)调用执行Command.java中的main方法

/** Parses options for this command from args and executes it. */
public final int main(String[] args, Terminal terminal) throws Exception {
//… 启动shutdown处理的钩子线程,es进程关闭时执行回调处理
//… 执行命令,抛出执行过程中的异常
try {
mainWithoutErrorHandling(args, terminal);
} catch (OptionException e) {
printHelp(terminal);
terminal.println(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
return ExitCodes.USAGE;
} catch (UserException e) {
if (e.exitCode == ExitCodes.USAGE) {
printHelp(terminal);
}
terminal.println(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
return e.exitCode;
}
return ExitCodes.OK;
}


3)mainWithoutErrorHandling函数中执行execute


进入EnvironmentAwareCommand.java 中的execute方法

这里主要是检查ES启动需要的配置是否存在,如果不存在,从系统属性中获取进行赋值:




4)执行Elasticsearch.java execute方法

@Override
protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {
// ...
final boolean daemonize = options.has(daemonizeOption);
final Path pidFile = pidfileOption.value(options);
final boolean quiet = options.has(quietOption);
    // ...
try {
init(daemonize, pidFile, quiet, env);
} catch (NodeValidationException e) {
throw new UserException(ExitCodes.CONFIG, e.getMessage());
}
}

进入Bootstrap.init方法

void init(final boolean daemonize, final Path pidFile, final boolean quiet, Environment initialEnv)
throws NodeValidationException, UserException {
try {
Bootstrap.init(!daemonize, pidFile, quiet, initialEnv);
} catch (BootstrapException | RuntimeException e) {
// format exceptions to the console in a special way
// to avoid 2MB stacktraces from guice, etc.
throw new StartupException(e);
}
}


5)Bootstrap.init

static void init(
final boolean foreground,
final Path pidFile,
final boolean quiet,
final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
// … 实例化Bootstrap
INSTANCE = new Bootstrap();
// 加载elasticsearch.keystore文件,重新创建env环境变量
final SecureSettings keystore = loadSecureSettings(initialEnv);
final Environment environment = createEnvironment(foreground, pidFile, keystore, initialEnv.settings(), initialEnv.configFile());
// ...
final boolean closeStandardStreams = (foreground == false) || quiet;
try {
if (closeStandardStreams) {
final Logger rootLogger = ESLoggerFactory.getRootLogger();
final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
if (maybeConsoleAppender != null) {
Loggers.removeAppender(rootLogger, maybeConsoleAppender);
}
closeSystOut();
}
// 检查Lucene版本
checkLucene();
// install the default uncaught exception handler; must be done before security is
// initialized as we do not want to grant the runtime permission
// setDefaultUncaughtExceptionHandler
Thread.setDefaultUncaughtExceptionHandler(
new ElasticsearchUncaughtExceptionHandler(() -> Node.NODE_NAME_SETTING.get(environment.settings())));
INSTANCE.setup(true, environment);
try {
// any secure settings must be read during node construction
IOUtils.close(keystore);
} catch (IOException e) {
throw new BootstrapException(e);
}
        // 实例启动
INSTANCE.start();
if (closeStandardStreams) {
closeSysError();
}
} catch (NodeValidationException | RuntimeException e) {
// ...
}
}


6、执行setup

private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {
Settings settings = environment.settings();
try {
spawner.spawnNativeControllers(environment);
} catch (IOException e) {
throw new BootstrapException(e);
}
// 初始化本地资源,此时
initializeNatives(
environment.tmpFile(),
BootstrapSettings.MEMORY_LOCK_SETTING.get(settings),
BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings),
BootstrapSettings.CTRLHANDLER_SETTING.get(settings));
// 在安装security manager之前初始化探针
initializeProbes();
if (addShutdownHook) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
IOUtils.close(node, spawner);
LoggerContext context = (LoggerContext) LogManager.getContext(false);
Configurator.shutdown(context);
} catch (IOException ex) {
throw new ElasticsearchException("failed to stop node", ex);
}
}
});
}
try {
// look for jar hell
final Logger logger = ESLoggerFactory.getLogger(JarHell.class);
JarHell.checkJarHell(logger::debug);
} catch (IOException | URISyntaxException e) {
throw new BootstrapException(e);
}
// Log ifconfig output before SecurityManager is installed
IfConfig.logIfNecessary();
// install SM after natives, shutdown hooks, etc.
try {
Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
} catch (IOException | NoSuchAlgorithmException e) {
throw new BootstrapException(e);
}
    // 初始化Node节点
node = new Node(environment) {
@Override
protected void validateNodeBeforeAcceptingRequests(
final BootstrapContext context,
final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {
BootstrapChecks.check(context, boundTransportAddress, checks);
}
};
}

初始化之前的INSTANCE实例状态:


初始化完成后,集群基本配置信息、环境变量等已初始化完成。



7、INSTANCE.start();

private void start() throws NodeValidationException {
node.start();
keepAliveThread.start();
// 启动keepAlive线程,线程不执行具体的操作,是唯一的用户线程,作用是保持进程的运行(Java程序中至少要有一个用户进程,用户进程为0时退出进程,注:是用户进程,不是守护进程)
}

Node.start()方法中主要进行ES外部环境的检查BootstrapChecks,检查包括堆大小检查、文件描述符检查、内存锁定检查、最大线程数检查、最大虚拟内存检查等等。

nodeConnectionsService.start();
discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
clusterService.start();

初始化内部数据、创建线程池等操作。


8)节点启动相关日志

[2021-10-10T15:57:09,118][INFO ][o.e.n.Node               ] [50mY7pf] initialized
[2021-10-10T15:59:03,774][INFO ][o.e.n.Node ] [50mY7pf] starting ...
[2021-10-10T15:59:59,585][INFO ][o.e.t.TransportService ] [50mY7pf] publish_address {127.0.0.1:9300}, bound_addresses {[::1]:9300}, {127.0.0.1:9300}
[2021-10-10T16:07:44,642][WARN ][o.e.b.BootstrapChecks ] [50mY7pf] initial heap size [268435456] not equal to maximum heap size [4294967296]; this can cause resize pauses and prevents mlockall from locking the entire heap
[2021-10-10T16:07:51,817][INFO ][o.e.c.s.MasterService ] [50mY7pf] zen-disco-elected-as-master ([0] nodes joined)[, ], reason: new_master {50mY7pf}{50mY7pfYSja9aXpQfjZsHQ}{qXQYrlCXSwq75wtkUvr6ng}{127.0.0.1}{127.0.0.1:9300}{ml.machine_memory=17179869184, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true}
[2021-10-10T16:07:51,825][INFO ][o.e.c.s.ClusterApplierService] [50mY7pf] new_master {50mY7pf}{50mY7pfYSja9aXpQfjZsHQ}{qXQYrlCXSwq75wtkUvr6ng}{127.0.0.1}{127.0.0.1:9300}{ml.machine_memory=17179869184, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true}, reason: apply cluster state (from master [master {50mY7pf}{50mY7pfYSja9aXpQfjZsHQ}{qXQYrlCXSwq75wtkUvr6ng}{127.0.0.1}{127.0.0.1:9300}{ml.machine_memory=17179869184, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true} committed version [1] source [zen-disco-elected-as-master ([0] nodes joined)[, ]]])
[2021-10-10T16:09:19,308][WARN ][o.e.c.s.MasterService ] [50mY7pf] cluster state update task [zen-disco-elected-as-master ([0] nodes joined)[, ]] took [1.4m] above the warn threshold of 30s
[2021-10-10T16:09:20,203][INFO ][o.e.x.s.t.n.SecurityNetty4HttpServerTransport] [50mY7pf] publish_address {127.0.0.1:9200}, bound_addresses {[::1]:9200}, {127.0.0.1:9200}
[2021-10-10T16:09:21,959][INFO ][o.e.n.Node ] [50mY7pf] started
[2021-10-10T16:09:45,316][WARN ][o.e.x.s.a.s.m.NativeRoleMappingStore] [50mY7pf] Failed to clear cache for realms [[]]
[2021-10-10T16:10:08,195][INFO ][o.e.l.LicenseService ] [50mY7pf] license [0cbc53f2-c8df-4cea-9209-ba7d8144faef] mode [basic] - valid
[2021-10-10T16:10:17,619][INFO ][o.e.g.GatewayService ] [50mY7pf] recovered [4] indices into cluster_state
[2021-10-10T16:10:18,927][WARN ][o.e.c.s.MasterService ] [50mY7pf] cluster state update task [local-gateway-elected-state] took [58.1s] above the warn threshold of 30s
[2021-10-10T16:10:20,950][WARN ][o.e.c.s.ClusterApplierService] [50mY7pf] cluster state applier task [apply cluster state (from master [master {50mY7pf}{50mY7pfYSja9aXpQfjZsHQ}{qXQYrlCXSwq75wtkUvr6ng}{127.0.0.1}{127.0.0.1:9300}{ml.machine_memory=17179869184, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true} committed version [2] source [local-gateway-elected-state]])] took [55.8s] above the warn threshold of 30s
[2021-10-10T16:10:47,473][INFO ][o.e.c.r.a.AllocationService] [50mY7pf] Cluster health status changed from [RED] to [YELLOW] (reason: [shards started [[test123][4], [test123][1], [test123][3], [test123][0]] ...]).


9)启动完成访问

文章转载自小怂读书,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论