当前位置:必发365电子游戏 > 编程 > 【必发365电子游戏】实际上用fs2来促成数据行调控恐怕会特别简便易行和直接
【必发365电子游戏】实际上用fs2来促成数据行调控恐怕会特别简便易行和直接
2019-12-19

    上节大家探究了经过scalaz-stream-fs2来驱动生龙活虎套数据管理流程,用fs2的Pipe类型来落实对数据流的逐行操作。本篇探讨策画在上节探究的根底上对数据流的流动和因素操作实行优化完备。如数据流动中增添诸如next、skip、eof成效、内容调整中追加对行元素的append、insert、update、remove等操作方法。不过通过生机勃勃番对fs2的重复解读,发掘这么些操作格局并不像自家所想像那么的法子,实际上用fs2来完结数据行调控也许会进一层简约和一向。那是因为与思想数码库行浏览形式不相同的是fs2是意气风发种拖式流(pull-model stream),它的数码行集结是风流倜傥种泛函不可变集结。每风华正茂行少年老成旦读取就等于直接消耗了断(consumed),所以只扶助生机勃勃种向前逐行读取形式。借使形象地陈述的话,大家习于旧贯的所谓数据集浏览或者是上边那样的情景:

读取豆蔻梢头行数据 >>> (使用或更新行字段值)>>> 向中游发送新的一站式数据。唯有结束发送动作才表示终止运算。完毕对上游的保有行数据读取并不表示终止操作,因为大家还是可以够穿梭向上游发送自定义发生的数据行。

大家用fs2模拟生机勃勃套数据流管道FDAPipeLine,管道中间有动乱数量的作业节点FDAWorkNode。作业方式包涵从管道中游截取二个数量成分、对其進展管理、然后采纳是不是向上游的管道接口(FDAPipeJoint)发送。上边是那套模拟的品种:fdapipes/package.scala

 1 package com.bayakala.funda {
 2 
 3   import fs2._
 4 
 5   package object fdapipes {
 6     //数据行类型
 7     trait FDAROW
 8 
 9     //数据处理管道
10     type FDAPipeLine[ROW] = Stream[Task, ROW]
11     //数据作业节点
12     type FDAWorkNode[ROW] = Pipe[Task, ROW, ROW]
13     //数据管道开关阀门,从此处获得管道内数据
14     type FDAValve[ROW] = Handle[Task, ROW]
15     //管道连接器
16     type FDAPipeJoint[ROW] = Pull[Task, ROW, Unit]
17 
18     //作业类型
19     type FDATask[ROW] = ROW => Option[List[ROW]]
20 
21   }
22 
23 }

留心那个FDAROW类型:那是生龙活虎种泛类型,因为在管道中流淌的数目只怕有多体系型,如数据行和QueryAction行。

流淌调整情势:FDAValves.scala

 1 package com.bayakala.funda.fdapipes
 2 import fs2._
 3 object FDAValves {  //流动控制方法
 4 //跳过本行(不向下游发送)
 5   def fda_skip[ROW] = Some(List[ROW]())
 6 //将本行发送至下游连接管道
 7   def fda_next[ROW](r: ROW) = Some(List[ROW](r))
 8 //终止流动
 9   def fda_break = None
10 
11 }

数量发送方法:FDAPipes.scala

1 package com.bayakala.funda.fdapipes
2 import fs2._
3 object FDAJoints {  //数据发送方法
4 //write rows down the pipeline
5   def fda_pushRow[ROW](row: ROW) = Pull.output1(row)
6   def fda_pushRows[ROW](rows: List[ROW]) = Pull.output(Chunk.seq(rows))
7 }

必发365电子游戏,学业节点职业章程:

 1 package com.bayakala.funda.fdapipes
 2 import FDAJoints._
 3 object FDANodes { //作业节点工作方法
 4  def fda_execUserTask[ROW](task: FDATask[ROW]): FDAWorkNode[ROW] = {
 5    def go: FDAValve[ROW] => FDAPipeJoint[ROW] = h => {
 6      h.receive1Option {
 7        case Some((r, h)) => task(r) match {
 8          case Some(xr) => xr match {
 9            case Nil => go(h)
10            case _ => fda_pushRows(xr) >> go(h)
11          }
12          case None => fda_halt
13        }
14        case None => fda_halt
15      }
16    }
17    in => in.pull(go)
18  }
19 
20 }

上边大家就示范那个工具库的切切实实应用办法:examples/Example1.scala
安装示范情状:

 1 package com.bayakala.funda.fdapipes.examples
 2 import fs2._
 3 import com.bayakala.funda.fdapipes._
 4 import FDANodes._
 5 import FDAValves._
 6 import Helpers._
 7 object Example1 extends App {
 8 
 9 
10   case class Employee(id: Int, name: String, age: Int, salary: BigDecimal) extends FDAROW
11 // test data set
12   val r1 = Employee(1, "John", 23, 100.00)
13   val r2 = Employee(2, "Peter", 25,100.00)
14   val r3 = Employee(3, "Kay", 35,100.00)
15   val r4 = Employee(4, "Cain", 45,100.00)
16   val r5 = Employee(5, "Catty", 35,100.00)
17   val r6 = Employee(6, "Little", 19,80.00)

注意Employee是后生可畏种行类型,因为它extends FDAROW。

大家再写二个追踪突显当前流动数据行的函数:examples/Helpers.scala

1 package com.bayakala.funda.fdapipes.examples
2 import com.bayakala.funda.fdapipes._
3 import fs2.Task
4 object Helpers {
5   def log[ROW](prompt: String): FDAWorkNode[ROW] =
6     _.evalMap {row => Task.delay{ println(s"$prompt> $row"); row }}
7 }

下边我们就用多少个有例外须要的例证来演示流动调整和数量管理效果,那一个事例就是给最后顾客的专门的学业编制程序示范版本,然后由客户照版编写:

1、依照每条数据状态逐行举行拍卖:

 1 // 20 - 30岁加10%, 30岁> 加20%,其它加 5%
 2   def raisePay: FDATask[FDAROW] = row => {
 3     row match {
 4       case emp: Employee => {
 5         val cur = emp.age match {
 6           case a if ((a >= 20) && (a < 30)) => emp.copy(salary = emp.salary * 1.10)
 7           case a if ((a >= 30)) => emp.copy(salary = emp.salary * 1.20)
 8           case _ => emp.copy(salary = emp.salary * 1.05)
 9         }
10         fda_next(cur)
11       }
12       case _ => fda_skip
13     }
14   }

客户提供的效率函数类型必得是FDATask[FDAROW]。类型参数FDAROW代表数据行通用途目。如若用户内定了FDATask[Employee]函数类型,那么必需保证管道中流动的数据行独有Employee风流倜傥种档期的顺序。完成对现阶段行数据的拍卖后用fda_next(emp卡塔尔把它发送到下生机勃勃节连接管道。大家用上面包车型客车三结合函数来张开演算:

  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("加薪前>"))
      .through(fda_execUserTask[FDAROW](raisePay))
      .through(log("加薪后>"))
    .run.unsafeRun
-----
运算结果:
加薪前>> Employee(1,John,23,100.0)
加薪后>> Employee(1,John,23,110.00)
加薪前>> Employee(2,Peter,25,100.0)
加薪后>> Employee(2,Peter,25,110.00)
加薪前>> Employee(3,Kay,35,100.0)
加薪后>> Employee(3,Kay,35,120.00)
加薪前>> Employee(4,Cain,45,100.0)
加薪后>> Employee(4,Cain,45,120.00)
加薪前>> Employee(5,Catty,35,100.0)
加薪后>> Employee(5,Catty,35,120.00)
加薪前>> Employee(6,Little,19,80.0)
加薪后>> Employee(6,Little,19,84.000)

2、在风流倜傥组数据行内依照每条数据状态进行筛选:

  // 筛选40岁以上员工
  def filter40: FDATask[FDAROW] = row => {
    row match {
      case emp: Employee => {
        if (emp.age > 40)
          Some(List(emp))
        else fda_skip[Employee]
      }
      case _ => fda_break
    }
  }
  println("---------")
  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("年龄>"))
    .through(fda_execUserTask[FDAROW](filter40))
    .through(log("合格>"))
    .run.unsafeRun
---
运算结果:
年龄>> Employee(1,John,23,100.0)
年龄>> Employee(2,Peter,25,100.0)
年龄>> Employee(3,Kay,35,100.0)
年龄>> Employee(4,Cain,45,100.0)
合格>> Employee(4,Cain,45,100.0)
年龄>> Employee(5,Catty,35,100.0)
年龄>> Employee(6,Little,19,80.0)
-

【必发365电子游戏】实际上用fs2来促成数据行调控恐怕会特别简便易行和直接。3、依据近日数据行状态终止作业:

 1   // 浏览至第一个30岁以上员工,跳出
 2   def stopOn30: FDATask[Employee] = emp => {
 3         if (emp.age > 30)
 4           fda_break
 5         else
 6           Some(List(emp))
 7   }
 8   println("---------")
 9   Stream(r1,r2,r3,r4,r5,r6)
10     .through(log("当前员工>"))
11     .through(fda_execUserTask[Employee](stopOn30))
12     .through(log("选入名单>"))
13     .run.unsafeRun
14 ---
15 运算结果:
16 当前员工>> Employee(1,John,23,100.0)
17 选入名单>> Employee(1,John,23,100.0)
18 当前员工>> Employee(2,Peter,25,100.0)
19 选入名单>> Employee(2,Peter,25,100.0)
20 当前员工>> Employee(3,Kay,35,100.0)

在此个例子里客户钦点了行类型统风度翩翩为Employee。

我们还是能够把八个效率串接起来。像上边那样把1和2八个职能连起来:

  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("加薪前>"))
    .through(fda_execUserTask[FDAROW](raisePay))
    .through(log("加薪后>"))
    .through(log("年龄>"))
    .through(fda_execUserTask[FDAROW](filter40))
    .through(log("合格>"))
    .run.unsafeRun
---
运算结果:
加薪前>> Employee(1,John,23,100.0)
加薪后>> Employee(1,John,23,110.00)
年龄>> Employee(1,John,23,110.00)
加薪前>> Employee(2,Peter,25,100.0)
加薪后>> Employee(2,Peter,25,110.00)
年龄>> Employee(2,Peter,25,110.00)
加薪前>> Employee(3,Kay,35,100.0)
加薪后>> Employee(3,Kay,35,120.00)
年龄>> Employee(3,Kay,35,120.00)
加薪前>> Employee(4,Cain,45,100.0)
加薪后>> Employee(4,Cain,45,120.00)
年龄>> Employee(4,Cain,45,120.00)
合格>> Employee(4,Cain,45,120.00)
加薪前>> Employee(5,Catty,35,100.0)
加薪后>> Employee(5,Catty,35,120.00)
年龄>> Employee(5,Catty,35,120.00)
加薪前>> Employee(6,Little,19,80.0)
加薪后>> Employee(6,Little,19,84.000)
年龄>> Employee(6,Little,19,84.000)

下边小编把全体的亲自过问代码提必要大家:

package com.bayakala.funda.fdapipes.examples
import fs2._
import com.bayakala.funda.fdapipes._
import FDANodes._
import FDAValves._
import Helpers._
object Example1 extends App {


  case class Employee(id: Int, name: String, age: Int, salary: BigDecimal) extends FDAROW
// test data set
  val r1 = Employee(1, "John", 23, 100.00)
  val r2 = Employee(2, "Peter", 25,100.00)
  val r3 = Employee(3, "Kay", 35,100.00)
  val r4 = Employee(4, "Cain", 45,100.00)
  val r5 = Employee(5, "Catty", 35,100.00)
  val r6 = Employee(6, "Little", 19,80.00)



// 20 - 30岁加10%, 30岁> 加20%,其它加 5%
  def raisePay: FDATask[FDAROW] = row => {
    row match {
      case emp: Employee => {
        val cur = emp.age match {
          case a if ((a >= 20) && (a < 30)) => emp.copy(salary = emp.salary * 1.10)
          case a if ((a >= 30)) => emp.copy(salary = emp.salary * 1.20)
          case _ => emp.copy(salary = emp.salary * 1.05)
        }
        fda_next(cur)
      }
      case _ => fda_skip
    }
  }

  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("加薪前>"))
      .through(fda_execUserTask[FDAROW](raisePay))
      .through(log("加薪后>"))
    .run.unsafeRun


  // 筛选40岁以上员工
  def filter40: FDATask[FDAROW] = row => {
    row match {
      case emp: Employee => {
        if (emp.age > 40)
          Some(List(emp))
        else fda_skip[Employee]
      }
      case _ => fda_break
    }
  }
  println("---------")
  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("年龄>"))
    .through(fda_execUserTask[FDAROW](filter40))
    .through(log("合格>"))
    .run.unsafeRun

  // 浏览至第一个30岁以上员工,跳出
  def stopOn30: FDATask[Employee] = emp => {
        if (emp.age > 30)
          fda_break
        else
          Some(List(emp))
  }
  println("---------")
  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("当前员工>"))
    .through(fda_execUserTask[Employee](stopOn30))
    .through(log("选入名单>"))
    .run.unsafeRun


  println("---------")
  Stream(r1,r2,r3,r4,r5,r6)
    .through(log("加薪前>"))
    .through(fda_execUserTask[FDAROW](raisePay))
    .through(log("加薪后>"))
    .through(log("年龄>"))
    .through(fda_execUserTask[FDAROW](filter40))
    .through(log("合格>"))
    .run.unsafeRun

}

 

 

 

 

 

 

 

 

上一篇:没有了
下一篇:没有了