摘要
前面32节讲了ServerCnxn完成server的一些数据统计,以及33节讲了NIOServerCnxn用nio的方式完成了server和client的交互,这一节讲ServerCnxnFactory,作为ServerCnxn的工厂方法
属性
直接代码
Logger LOG = LoggerFactory.getLogger(ServerCnxnFactory.class);
/**
* The buffer will cause the connection to be close when we do a send.
*/
static final ByteBuffer closeConn = ByteBuffer.allocate(0);//代表关闭的请求
protected SaslServerCallbackHandler saslServerCallbackHandler;
public Login login;
protected ZooKeeperServer zkServer;//zk服务器
private final Map<ServerCnxn, ConnectionBean> connectionBeans
= new ConcurrentHashMap<ServerCnxn, ConnectionBean>();//每个ServerCnxn对应的jmx数据
protected final HashSet<ServerCnxn> cnxns = new HashSet<ServerCnxn>();
login 和 sasl相关的不展开
方法
工厂方法
有三个,只贴用到的一个,通过配置能定义nio方式还是netty方式,默认nio
static public ServerCnxnFactory createFactory() throws IOException {//创建工厂类
String serverCnxnFactoryName =
System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);
if (serverCnxnFactoryName == null) {
serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();//默认是NIOServerCnxnFactory
}
try {
return (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)
.newInstance();
} catch (Exception e) {
IOException ioe = new IOException("Couldn't instantiate "
+ serverCnxnFactoryName);
ioe.initCause(e);
throw ioe;
}
}
抽象函数
列表如下
函数 | 备注 |
---|---|
public abstract int getLocalPort(); | //本地端口 |
public abstract Iterable<ServerCnxn> getConnections(); | //所有ServerCnxn的迭代器 |
public abstract void closeSession(long sessionId); | //关闭某个session |
public abstract void configure(InetSocketAddress addr, int maxClientCnxns) throws IOException; | //配置地址,以及最大client连接数 |
public abstract int getMaxClientCnxnsPerHost(); | //获取每个host的最大连接数 |
public abstract void setMaxClientCnxnsPerHost(int max); | //设置每个host的最大连接数 |
public abstract void startup(ZooKeeperServer zkServer) | //单机版启动 |
public abstract void join() throws InterruptedException; | //等待线程结束 |
public abstract void shutdown(); | //关闭socket,线程 |
public abstract void start(); | //集群版启动 |
public abstract void closeAll(); | //关闭所有ServerCnxn |
public abstract InetSocketAddress getLocalAddress(); | 获取本地地址 |
其他实现函数
忽略掉sasl,get,set相关
public int getNumAliveConnections() {//ServerCnxn个数
synchronized(cnxns) {
return cnxns.size();
}
}
public void unregisterConnection(ServerCnxn serverCnxn) {//连接解绑
ConnectionBean jmxConnectionBean = connectionBeans.remove(serverCnxn);
if (jmxConnectionBean != null){
MBeanRegistry.getInstance().unregister(jmxConnectionBean);
}
}
public void registerConnection(ServerCnxn serverCnxn) {//连接绑定
if (zkServer != null) {
ConnectionBean jmxConnectionBean = new ConnectionBean(serverCnxn, zkServer);
try {
MBeanRegistry.getInstance().register(jmxConnectionBean, zkServer.jmxServerBean);
connectionBeans.put(serverCnxn, jmxConnectionBean);
} catch (JMException e) {
LOG.warn("Could not register connection", e);
}
}
}
思考
在zk集群中所处的步骤
image.png对应代码在QuorumPeer#start
loadDataBase();//从事务日志目录dataLogDir和数据快照目录dataDir中恢复出DataTree数据
cnxnFactory.start();//开启对客户端的连接端口,启动ServerCnxnFactory主线程
startLeaderElection();//创建出选举算法
super.start();//启动QuorumPeer线程,在该线程中进行服务器状态的检查
如何用netty的方式启动
ServerCnxnFactory#createFactory()
单机版启动和集群版启动两者在ServerCnxnFactory调用的区别
单机版用的NIOServerCnxnFactory#startup
集群版用的NIOServerCnxnFactory#start