1、NodeManager概述
NodeManager(NM)是YARN中每个节点上的代理,它管理Hadoop集群中单个计算节点,包括与ResourceManger保持通信,监督Container的生命周期管理,监控每个Container的资源使用(内存、CPU等)情况,追踪节点健康状况,管理日志和不同应用程序用到的附属服务。
NodeManager整体架构:
2、NodeManager分析
接下来将按照启动NodeManager时代码执行的顺序为主线进行代码分析。
2.1 main函数
- 调用initAndStartNodeManager函数。
主要代码:
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
//打印Nodemangager启动和关闭时的日志信息
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
//创建NodeManager对象
NodeManager nodeManager = new NodeManager();
//加载配置文件初始化
Configuration conf = new YarnConfiguration();
//初始化并启动NodeManager
nodeManager.initAndStartNodeManager(conf, false);
}
2.2 initAndStartNodeManager函数
- 调用init()函数,进行初始化(init方法调用被重写的serviceInit方法进行初始化)
- 启动各项服务(start方法内部调用被重写的servicestart方法进行启动各项服务)
private void initAndStartNodeManager(Configuration conf, boolean hasToReboot) {
try {
// Remove the old hook if we are rebooting.
if (hasToReboot && null != nodeManagerShutdownHook) {
ShutdownHookManager.get().removeShutdownHook(nodeManagerShutdownHook);
}
//增加nodeManagerShutdownHook为了在NodeManager关闭或重启时关闭compositeService
nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook,
SHUTDOWN_HOOK_PRIORITY);
//调用init()函数,进行初始化(init方法调用被重写的serviceInit方法进行初始化)
this.init(conf);
//启动各项服务(start方法内部调用被重写的servicestart方法进行启动各项服务)
this.start();
} catch (Throwable t) {
LOG.fatal("Error starting NodeManager", t);
System.exit(-1);
}
}
2.3init函数
(1)init方法是从Service接口,在AbstractService抽象类中得到实现。在AbstractService类中的init方法调用protected 类型的serviceInit。在其子类NodeMananger中重写了serviceInit方法。
AbstractService抽象类中init方法实现:
@Override
public void init(Configuration conf) {
if (conf == null) {
throw new ServiceStateException("Cannot initialize service "
+ getName() + ": null configuration");
}
if (isInState(STATE.INITED)) {
return;
}
synchronized (stateChangeLock) {
if (enterState(STATE.INITED) != STATE.INITED) {
setConfig(conf);
try {
serviceInit(config);
if (isInState(STATE.INITED)) {
//if the service ended up here during init,
//notify the listeners
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
(2)NodeManager类中serviceInit方法中是添加一些服务。具体如下:
- 创建并添加DeletionService、NodeHealthCheckerService到父类的一个ArrayList<Service>对象serviceList中
- 调用createNodeStatusUpdater()函数创建NodeStatusUpdater对象,再调用NodeManager的父类方法register将这个对象注册添加父类的ArrayList<Service>对象listeners中.
- 调用createContainerManager()、createWebServer()函数和createNodeResourceMonitor函数创建ContainerManagerImpl对象、Service对象和NodeResourceMonitor对象,并将其添加到serviceList
- 将ContainerManagerImpl注册到之前创建好的异步事件调度器AsyncDispatcher中,然后将将AsyncDispatcher调度器添加到服务队列serviceList中
- 最后添加NodeStatusUpdater对象到serviceList中,最后添加这个对象的原因是要在心跳操作启动必须在所有其他服务之后。
- 调用父类的init()函数,进行其他的配置初始化。
主要代码:
@Override
protected void serviceInit(Configuration conf) throws Exception {
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
NMContainerTokenSecretManager containerTokenSecretManager =
new NMContainerTokenSecretManager(conf);
NMTokenSecretManagerInNM nmTokenSecretManager =
new NMTokenSecretManagerInNM();
this.aclsManager = new ApplicationACLsManager(conf);
//始化ContainerExecutor,ContainerExecutor封装了nodeManager对Container操作的各种方法,
//包括启动container, 查询指定id的container是否活着,等操作. 根据配置yarn.nodemanager.container-executor.class
//决定ContainerExecutor的实例, 默认为DefaultContainerExecutor.
ContainerExecutor exec = ReflectionUtils.newInstance(
conf.getClass(YarnConfiguration.NM_CONTAINER_EXECUTOR,
DefaultContainerExecutor.class, ContainerExecutor.class), conf);
try {
exec.init();
} catch (IOException e) {
throw new YarnRuntimeException("Failed to initialize container executor", e);
}
DeletionService del = createDeletionService(exec);
addService(del);
// NodeManager level dispatcher 异步分发器
this.dispatcher = new AsyncDispatcher();
//可以通过此服务查询node是否健康, 当前node的健康状态包括nodeHealthScriptRunner.isHealthy和dirsHandler.areDisksHealthy
nodeHealthChecker = new NodeHealthCheckerService();
addService(nodeHealthChecker);
dirsHandler = nodeHealthChecker.getDiskHandler();
this.context = createNMContext(containerTokenSecretManager,
nmTokenSecretManager);
//创建NodeStatusUpdater线程, 负责向RM注册和发送心跳(更新状态).
//这里使用ResourceTracker协议向RM通信, 底层为YarnRPC. ResourceTracker接口提供了两个方法; 提供注册和心跳功能
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
// 监控node的资源(即资源是否可用, 四种状态, stopped, inited, notinited, started)
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
addService(nodeResourceMonitor);
//创建ContainerManagerImpl服务, 管理container,使用ContainerManager协议, ContainerManager协议为APP向NodeManager通信的协议
containerManager =
createContainerManager(context, exec, del, nodeStatusUpdater,
this.aclsManager, dirsHandler);
addService(containerManager);
((NMContext) context).setContainerManager(containerManager);
// 创建webServer, 启动NodeManager的web服务. 通过yarn.nodemanagerwebapp.address设置地址, 默认端口为8042
WebServer webServer = createWebServer(context, containerManager
.getContainersMonitor(), this.aclsManager, dirsHandler);
addService(webServer);
((NMContext) context).setWebServer(webServer);
dispatcher.register(ContainerManagerEventType.class, containerManager);
dispatcher.register(NodeManagerEventType.class, this);
addService(dispatcher);
//初始化监控
DefaultMetricsSystem.initialize("NodeManager");
// StatusUpdater should be added last so that it get started last
// so that we make sure everything is up before registering with RM.
addService(nodeStatusUpdater);
super.serviceInit(conf);
// TODO add local dirs to del
}
2.4 start函数
- 调用父类的start()函数依次启动在NodeManager类serviceInit中所有添加好的服务。其中AsyncDispatcher负责事件的传送,NodeStatusUpdater负责产生心跳事件,ContainerManagerImpl负责提供Hadoop RPC需要的函数等等
AbstractService中start方法的具体实现:
@Override
public void start() {
if (isInState(STATE.STARTED)) {
return;
}
//enter the started state
synchronized (stateChangeLock) {
if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
try {
startTime = System.currentTimeMillis();
//会调用子类NN中重写的同名方法
serviceStart();
if (isInState(STATE.STARTED)) {
//if the service started (and isn't now in a later state), notify
if (LOG.isDebugEnabled()) {
LOG.debug("Service " + getName() + " is started");
}
notifyListeners();
}
} catch (Exception e) {
noteFailure(e);
ServiceOperations.stopQuietly(LOG, this);
throw ServiceStateException.convert(e);
}
}
}
}
NodeManager中重写的serviceStart方法的主要代码:
@Override
protected void serviceStart() throws Exception {
try {
doSecureLogin();
} catch (IOException e) {
throw new YarnRuntimeException("Failed NodeManager login", e);
}
super.serviceStart();
}
3、参考资料:
http://www.technology-mania.com/2014/05/an-insight-into-hadoop-yarn-nodemanager.html
http://www.cnblogs.com/biyeymyhjob/archive/2012/08/18/2645576.html
分享到:
相关推荐
文章详细介绍了如何安装weblogic集群,并使用nodeManager来启停weblogic节点服务
详细说明了weblogic 中的具体的结构和 nodemanager的具体的配置方法,其中,包括了nodemanager的内部机理,运行原理,几时有多种方法了启动和配置:wlst,图形界面等等
2021-05-11 16:51:52,925 FATAL org.apache.hadoop.yarn.server.nodemanager.NodeManager: Error starting NodeManager org.apache.hadoop.yarn.exceptions.YarnRuntimeException: Failed to initialize container ...
weblogic 11g nodemanager 实现方式 详细介绍。
主要介绍了linux开机启动nodemanager的步骤,大家参考使用吧
01.nodemanager资源总量配置问题--maven工程插件缺失问题.mp4
java运行依赖jar包
Hadoop技术-NodeManager架构.pptx
Arduino-NodeManager.zip,电池供电传感器快速开发插件传感器节点管理器,Arduino是一家开源软硬件公司和制造商社区。Arduino始于21世纪初,深受电子制造商的欢迎,Arduino通过开源系统提供了很多灵活性。
Hadoop技术NodeManager架构共9页.pdf.zip
Header describing the interface between userspace and the kernel for the ocfs2_nodemanager module.
java运行依赖jar包
MySensors...远程配置:无需进行物理配置即可远程配置节点对其进行访问-内置传感器:提供嵌入式代码,以便允许使用单行通用传感器对其进行配置源代码位于http://github.com/mysensors/NodeManager
项目:nodeManager 该项目出于开发目的,为“九个编年史”节点提供了一种快速的自动更新解决方案。 特征: 根据以下URL自动更新: : 自动下载快照 生成新的Docker容器作为节点 模拟账户 我已经创建并提供了一个...
java运行依赖jar包
<name>yarn.nodemanager.aux-services <value>mapreduce_shuffle </property> <name>yarn.resourcemanager.hostname <value>slave1 <name>yarn.log-aggregation-enable <value>true </property
SecondaryNameNode、ResourceManager、NodeManager 和 JobHistoryServer。 2. 在 Hadoop 集群主节点上搭建 MapReduce 开发环境 Eclipse。 3. 查看 Hadoop 自带的 MR-App 单词计数源代码 WordCount.java,在 Eclipse ...
通过测试,hdfs与yarn组件启动后有以下进程: node1 jps:4个进程 NameNode、JournalNode、DFSZKFailoverController、Jps node2 jps:7个进程 DFSZKFailoverController、NodeManager、JournalNode、QuorumPeerMain、...