您好,欢迎来到花图问答。
搜索
您的当前位置:首页Thrift RPC实战(四) thrift连接池的实现

Thrift RPC实战(四) thrift连接池的实现

来源:花图问答

对象池是一种很实用的技术,经典的例子就是数据库连接池。本篇直接在apache开源项目commons-pool的基础上开发。

步骤:

一、定义对象工厂


package com.yangyang.thrift.connection;

import 
import 
import 
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;

/**
 * 定义对象工厂
 * Created by chenshunyang on 16/8/15.
 */
public class TProtocolFactory extends BasePooledObjectFactory<TProtocol> {

    private String host;
    private  int port;
    private  boolean keepAlive =true;

    public TProtocolFactory(String host, int port, boolean keepAlive) {
        this.host = host;
        this.port = port;
        this.keepAlive = keepAlive;
    }

    @Override
    public TProtocol create() throws Exception {
        TSocket socket = new TSocket(host,port);
        TTransport tTransport = new TFastFramedTransport(socket);
        tTransport.open();
        return new TCompactProtocol(tTransport);
    }

    @Override
    public PooledObject<TProtocol> wrap(TProtocol protocol) {
        return new DefaultPooledObject<TProtocol>(protocol);
    }

    /**
     *  对象钝化(即:从激活状态转入非激活状态,returnObject时触发)
     * @param pooledObject
     * @throws Exception
     */
    @Override
    public void passivateObject(PooledObject<TProtocol> pooledObject) throws TTransportException {
        if (keepAlive){
            pooledObject.getObject().getTransport().flush();
            pooledObject.getObject().getTransport().close();
        }
    }

    /**
     * 对象激活(borrowObject时触发)
     * @param pooledObject
     * @throws TTransportException
     */
    public void activateObject(PooledObject<TProtocol> pooledObject) throws TTransportException {
        if (!pooledObject.getObject().getTransport().isOpen()){
            pooledObject.getObject().getTransport().open();
        }
    }

    /**
     * 对象销毁(clear时会触发)
     * @param pooledObject
     * @throws TTransportException
     */
    public void destroyObject(PooledObject<TProtocol> pooledObject) throws TTransportException{
        passivateObject(pooledObject);
        pooledObject.markAbandoned();
    }

    /**
     * 验证对象有效性
     * @param pooledObject
     * @return
     */
    public boolean validateObject(PooledObject<TProtocol> pooledObject){
        if (pooledObject.getObject() != null){
            if (pooledObject.getObject().getTransport().isOpen()){
                return true;
            }
            try {
                pooledObject.getObject().getTransport().open();
                return  true;
            } catch (TTransportException e) {
                e.printStackTrace();
            }
        }
        return false;
    }
}

有二个关键的方法,需要重写:activateObject(对象激活) 及 passivateObject(对象钝化)

二、定义对象池


package com.yangyang.thrift.connection;

import 
import 
import 

/**
 * 定义对象池,GenericObjectPool为对象池的默认实现
 * Created by chenshunyang on 16/8/15.
 */
public class AutoClearGenericObjectPool<T> extends GenericObjectPool<T>{

    public AutoClearGenericObjectPool(PooledObjectFactory<T> factory) {
        super(factory);
    }

    public AutoClearGenericObjectPool(PooledObjectFactory<T> factory, GenericObjectPoolConfig config) {
        super(factory, config);
    }

    @Override
    public void returnObject(T obj) {
        super.returnObject(obj);
        //空闲数>=激活数时,清理掉空闲连接
        if (getNumIdle() >= getNumActive()) {
            clear();
        }
    }
}

common-pools提供了对象池的默认实现:GenericObjectPool 但是该对象池中,对于处于空闲的对象,需要手动调用clear来释放空闲对象,如果希望改变这一行为,可以自己派生自己的子类,重写returnObject方法,上面的代码中,每次归还对象时,如果空闲的对象比激活的对象还要多(即:一半以上的对象都在打酱油),则调用clear方法。

三、使用示例:



package com.yangyang.thrift.connection;

import 
import 
import org.apache.thrift.protocol.TProtocol;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;

/**
 * Created by chenshunyang on 16/8/15.
 */
public class ProtocolPoolTest {
    public static void main(String[] args) throws Exception{
        GenericObjectPoolConfig config = new GenericObjectPoolConfig();
        config.setMaxIdle(10);
        config.setMinIdle(1);
        config.setTestOnBorrow(true);

        ObjectPool<TProtocol> pool = new AutoClearGenericObjectPool<TProtocol>(
                new TProtocolFactory("127.0.0.1", 2181, true), config);

        List<TProtocol> list = new ArrayList<TProtocol>();
        for (int i = 1; i <= 10; i++) {
            TProtocol protocol = pool.borrowObject();
            System.out.println(protocol.toString());
            if (i % 2 == 0) {
                //10个连接中,将偶数归还
                pool.returnObject(protocol);
            } else {
                list.add(protocol);
            }
        }

        Random rnd = new Random();
        while (true) {
            System.out.println(String.format("active:%d,idea:%d", pool.getNumActive(), pool.getNumIdle()));
            Thread.sleep(5000);
            //每次还一个
            if (list.size() > 0) {
                int i = rnd.nextInt(list.size());
                pool.returnObject(list.get(i));
                list.remove(i);
            }

            //直到全部还完
            if (pool.getNumActive() <= 0) {
                break;
            }
        }

        System.out.println("clear all------------------------");

        list.clear();
        //连接池为空,测试是否能重新创建新连接
        for (int i = 1; i <= 10; i++) {
            TProtocol protocol = pool.borrowObject();
            System.out.println(protocol.toString());
            if (i % 2 == 0) {
                pool.returnObject(protocol);
            } else {
                list.add(protocol);
            }
        }

        while (true) {
            System.out.println(String.format("active:%d,idea:%d", pool.getNumActive(), pool.getNumIdle()));
            Thread.sleep(5000);
            if (list.size() > 0) {
                int i = rnd.nextInt(list.size());
                pool.returnObject(list.get(i));
                list.remove(i);
            }

            if (pool.getNumActive() <= 0) {
                pool.close();
                break;
            }
        }


    }
}

注:需要从对象池取一个对象时,调用borrowObject(背后会调用activeObject激活对象),类似的,对象使用完之后,需要调用returnObject将对象放回对象池(背后会调用passivateObject使对象钝化)

输出:


org.apache.thrift.protocol.TCompactProtocol@6ae40994

org.apache.thrift.protocol.TCompactProtocol@1a93a7ca

org.apache.thrift.protocol.TCompactProtocol@4dcbadb4

org.apache.thrift.protocol.TCompactProtocol@4e515669

org.apache.thrift.protocol.TCompactProtocol@4e515669

org.apache.thrift.protocol.TCompactProtocol@17d10166

org.apache.thrift.protocol.TCompactProtocol@17d10166

org.apache.thrift.protocol.TCompactProtocol@1b9e1916

org.apache.thrift.protocol.TCompactProtocol@1b9e1916

org.apache.thrift.protocol.TCompactProtocol@ba8a1dc

active:5,idea:1

active:4,idea:2

active:3,idea:0

active:2,idea:1

active:1,idea:0

clear all------------------------

org.apache.thrift.protocol.TCompactProtocol@1b701da1

org.apache.thrift.protocol.TCompactProtocol@726f3b58

org.apache.thrift.protocol.TCompactProtocol@442d9b6e

org.apache.thrift.protocol.TCompactProtocol@ee7d9f1

org.apache.thrift.protocol.TCompactProtocol@ee7d9f1

org.apache.thrift.protocol.TCompactProtocol@15615099

org.apache.thrift.protocol.TCompactProtocol@15615099

org.apache.thrift.protocol.TCompactProtocol@1edf1c96

org.apache.thrift.protocol.TCompactProtocol@1edf1c96

org.apache.thrift.protocol.TCompactProtocol@368102c8

active:5,idea:1

active:4,idea:2

active:3,idea:0

active:2,idea:1

active:1,idea:0

从输出上看,归还对象后,再次取出时,并没有创建新对象,而是直接使用了对象池中已经空闲的对象。当对象池中的所有对象都归还变成空闲并被clear后,再次从对象池中借对象时,会重新创建对象。

参考文章:

欢迎大家扫码关注我的微信公众号,与大家一起分享技术与成长中的故事。


我的微信公众号.jpg

Copyright © 2019- huatuowenda.com 版权所有 湘ICP备2023022495号-1

违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务