2012年12月3日星期一

Twitter Finagle-Scala语言的RPC框架

简介

Finagle是Twitter的一个开源scala框架,支持Java语言和Scala语言,支持的协议包括Apache Thrift, Kestrel, Memcached, http, redis, mysql等。由于国内的scala应用程度不高,而Twitter已经逐渐发展成为了Scala的重度使用用户。Finagle是基于NIO和netty的,主体的结构如下图:
本文的目的是实践Finagle框架的thrift协议,并在底层通过JPA,Hibernate与mysql进行交互,同时利用Spring framework管理Bean。

scrooge

scrooge也是twitter开源的一个项目,项目是用scala编写的一个thrift代码生成器。我们直接下载zip包使用scrooge脚本生成代码即可。
假设我们的thrift IDL代码如下:
struct User{
        1:i32 id,
        2:string name,
        3:i32 createdAt,
        4:i32 updated
}

service UserService{
        User getUser(1:i32 id),
        User saveUser(1:User user),
        User updateUser(1:User user)
}
使用下面的命令生成scala代码:
> scrooge -finagle demo.thrift
会生成一个thrift目录,scala代码就在目录下。

maven

通过eclipse新建一个scala项目,选择scala-archetype-simple,创建一个带maven支持的scala项目。
记得enable maven支持的同时,选中scala菜单的添加scala特性,否则不会编译。
需要调整pom中的scala版本和本地scala版本一致。

Finagle

拷贝scrooge生成的代码到工程中,并新建ThriftServer和ThriftClient两个scalaObject。
代码ThriftServer.scala
object ThriftServer {
  def main(args: Array[String]) {
   
    val protocol = new TBinaryProtocol.Factory()
    val ctx = new ClassPathXmlApplicationContext("applicationContext.xml")
    val userService = ctx.getBean("userService").asInstanceOf[UserService.FutureIface]
    val server = new UserService.FinagledService(userService, protocol)
    val address = new InetSocketAddress("localhost", 9000)
    var builder = ServerBuilder()
      .codec(ThriftServerFramedCodec())
      .name("UserService")
      .bindTo(address)
      .build(server)
  }
}
代码ThriftClient.scala:
object ThriftClient {
  def main(args: Array[String]) {
    val service = ClientBuilder()
      .hosts(new InetSocketAddress("localhost", 9000))
      .codec(ThriftClientFramedCodec())
      .hostConnectionLimit(100)
      .build()
    val client = new UserService.FinagledClient(service)
    val user = client.getUser(10)
    println("getUser-->"+user.apply())
    val user1 = client.saveUser(user.apply());
    println("save db-->"+user1.apply())
  }
}
ThriftServer中使用了spring管理了userService。
新建一个实现类,并调用了封装的dao层代码:
class UserServiceImpl extends FutureIface {
  
  @BeanProperty
  var dao:Dao = null
  
  def getUser(`id`: Int): Future[User] = {
    val currentTime = (System.currentTimeMillis() / 1000).asInstanceOf[Int];
    var user = if (id == 0) null else User.apply(id, "wenfeng", currentTime, currentTime)
    Future.value(user)
  }
  def saveUser(`user`: User): Future[User] = {
    var u = new com.scala.core.User(user._1, user._2, user._3, user._4)
    try{
     val id = dao.save(u)
     u.setId(id.asInstanceOf[Long]);
    }catch{
      case ex:DalException => ex.printStackTrace()
    }
    println("save user-->"+user)
    val res = User.apply(u.getId().asInstanceOf[Int], 
        u.getName(), (u.getCreatedAt() / 1000).asInstanceOf[Int], 
        (u.getUpdated() / 1000).asInstanceOf[Int])
    Future.value(res)
  }
  def updateUser(`user`: User): Future[User] = Future.value(user)
}
Spring的配置文件:
<beans xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:sca="http://www.springframework.org/schema/sca" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xsi:schemalocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
            http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd
            http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
            http://www.springframework.org/schema/sca http://www.osoa.org/xmlns/sca/1.0/spring-sca.xsd">
 <context:component-scan base-package="com.scala">
 <bean class="com.scala.dao.impl.DaoImpl" id="dao">
 <bean class="com.scala.service.impl.UserServiceImpl" id="userService">
  <property name="dao" ref="dao">
 </property></bean>
</bean></context:component-scan></beans>
新建一个JPA的User类,使用scala的@BeanProperty 生成getter和setters
@Entity
@Table(name="user")
class User extends java.io.Serializable{
 @Id
 @GeneratedValue(strategy = GenerationType.AUTO)
 @BeanProperty 
 var id:Long = _
 
 @Column(name="name")
 @BeanProperty 
 var name:String = _
 
 @Column(name="created_at")
 @BeanProperty 
 var createdAt:Long = _
 
 @Column(name="updated")
 @BeanProperty 
 var updated:Long = _
 
 def this(id:Long, name:String, createdAt:Long, update:Long) = {
   this()
   this.id = id
   this.name = name
   this.createdAt = createdAt
   this.updated = updated
 }
 override def toString() = "User["+id+","+name+","+createdAt+","+updated+"]"
测试插入一条用户数据,查看log和DB后,发现数据已经成功写入DB。