`
agapple
  • 浏览: 1583631 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

(业务层)异步并行加载ChangeLog

阅读更多

继上一篇:  (业务层)异步并行加载技术分析和设计

 

目前已经在google code上新建了一个project,也在逐步的完善和加强并行加载的功能,这里记录一下ChangeLog。

 

相关代码: https://github.com/agapple/asyncload , 有兴趣的同学可以一起参与,目前正在公司的应用中打算实施,逐步的在完善功能和解决一些兼容性的问题。

 

Change 1: (HandleMode模式修改)

AsyncLoadExecutor(并行加载的执行容器),修改了HandleMode模式,增加了CALLERUN,废弃了BLOCK。

HandleMode(针对并行加载队列满负荷时的一种处理模式):

  • CALLERRUN: 针对新的请求直接由调用者线程执行。即恢复为正常阻塞调用。
  • REJECT:针对新的请求直接抛出reject异常,比较暴力的处理方式。一般适用于需要进行资源控制,避免出现高并发
  • BLOCK: 针对新的请求直接进行阻塞,等待pool池中工作队列处理。 (已删除)

Change 2: (ThreadLocal支持)

为了异步并行加载,部分的代码块会由多个Thread执行,原先的ThreadLocal属性的模式已经不再有效。线程分为:caller thread(主线程,一般web应用为Jetty,Tomcat的Http工作线程) 和 runner thread(并行加载的执行线程)。

目前项目中支持在runner thread的可以正常获取ThreadLocal数据(caller thread运行中产生的ThreadLocal)。


简单设计:


ThreadLocal建议使用: 在异步加载中对ThreadLocal为只读,尽量不对其进行set操作。

伪代码:

try {
   threadLocal.put(object);
   serviceA.method(){ // 并行加载1 
        Object obj = threadLocal.get();
   }
   serviceB.method(){ // 并行加载2
        Object obj = threadLocal.get();
  }
} finally {
   threadLocal.remove(object);
}

说明:

1.  在所有的并行加载执行之前,完成threadLocal的设置,在最后完成threadLocal的清理。

2.  在各个并行加载容器中读取threadLocal信息。

 

针对一下的几种场景,使用ThreadLocal潜在风险:

1. serviceB依赖serviceA的ThreadLocal属性put。 因为A和B都是在并行的加载,所以很难确定执行的前后顺序。

2. caller线程依赖serviceA的ThreadLocal的属性put,因为A是一个并行加载,所以caller可能会优先于serviceA调用threadLocal

3. serviceA和serviceB都各自进行ThreadLocal属性的设置,因为执行顺序的不确定性和多Thread的执行,所以最后需要在caller线程进行合并,可能导致数据会丢失

 

Change 3 : (支持接口代理,解决final Service无法代理的问题)

1. 允许在构造代理工程时,设置targetClass。允许自定义cglib代理的目标class,而不是自动扫描对应service的class对象。

2. 使用template提交代码"闭包"的方式。

Change 4 : 处理超时时间<=0 等价于不进行超时控制

修改代码类:AsyncLoadResult

增加了对timeout<=0的逻辑判断,如果<=0,则调用future.get()进行处理,不进行超时控制。

 

Change 5 : 修改默认超时间,原先为3000毫秒改为0毫秒(意为不进行超时控制,保证对老业务处理兼容)

修改代码类:AsyncLoadConfig

原先默认超时时间为3000毫秒,但考虑在老系统上实施并行加载,程序员coding时意识不够,或者系统上线初期对timeout把握不够,所以这里考虑修改了默认的超时时间。

 

Change 6 : future针对timeout处理的bugfix

修改代码类:AsyncLoadResult

针对future.get(timeout, TimeUnit.MILLISECONDS); 针对出现TimeoutException时,需要进行cancel处理。

修改代码为:

try {
  // 使用cglib lazyLoader,避免每次调用future
  return future.get(timeout, TimeUnit.MILLISECONDS);
  }
 } catch (TimeoutException e) {
 future.cancel(true);
 throw e;
}

 

存在的注意点:

  因为透明的进行了多线程机制,原先的正常业务中并不会去处理Thread.interrupt() 和 InterruptException。

可能出现的一种现象:

  caller thread已经结束,但pool池中存在很多RUNNING的Thread线程。导致出现线程池不够用,该stop的没有stop。

解决方案:

   待定。目前暂未想到比较好的处理方案,因为需要对一个RUNNING的Thread线程执行一个stop操作,原先的Thread.stop(Throwable e)已经被@Deprecated,具体原因:http://download.oracle.com/javase/6/docs/technotes/guides/concurrency/threadPrimitiveDeprecation.html

Change 7 : 新增newInstance方法,支持对不带默认空构造函数(constructor)的支持

为了支持复杂的业务系统,尽可能自适应。这里提供了一个增强Class.newInstance()方法。实现方式比较简单:

 

public static Object newInstance(Class type) {
// 1. 首先查找默认的空构造函数
// 2. 查找其他的构造函数,默认选取第一个
// 2.1 获取构造函数的参数类型,产生默认的参数值,处理原型和数组,对象等。
}

Change 8 : 新增基于spring inteceptor的实现,方便无嵌入的使用

 

        <bean class="org.springframework.aop.framework.autoproxy.BeanNameAutoProxyCreator"> <!-- 自动配置代理 -->
   		<property name="optimize" value="false"/>
   		<property name="proxyTargetClass" value="false" />
        <property name="beanNames">
        	<list>
        		<value>asyncLoadTestServiceForInteceptor</value>
        	</list>
        </property>
        <property name="interceptorNames">
        	<list>
	            <value>asyncLoadInterceptor</value>
        	</list>
        </property>
     </bean>

 

 

Change 9 : 增加AsyncLoad相关接口定义,允许客户端提取内部状态

新增了两个接口类: 

 

  • AsyncLoadObject :  目前提供了_isNull(判断原始的model对象是否为空), _getStatus(获取并行加载的状态,时间点),_getOriginalClass(原始的class对象)
  • AsyncLoadService : 目前提供了_getOriginalClass(原始的class对象)
新增AsyncLoadUtils类,用于操作AsyncLoadObject和AsyncLoadService,一般不建议直接显示的直接使用,这样不方便以后的扩展。希望以后可以直接使用AsyncLoadUtils
 
--------------------------------------------------------------------------------------  
 
持续更新......

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics