scala - How do you write to and read from an external process using scalaz streams -


i able send data scalaz stream external program , result of item in 100ms in future. although able code below zipping output stream sink input stream process , throwing away sink side effect, feel solution may brittle.

if external program has error 1 of input items out of sync. feel best bet send sort of incremental id external program can echo out in future if error occurs can resync.

the main trouble having joining result of sending data external program process[task, unit] output of program process[task, string]. feel should using wyn not sure.

import java.io.printstream import scalaz._ import scalaz.concurrent.task import scalaz.stream.process._ import scalaz.stream._  object main extends app { /*   # echo.sh prints stdout gets on stdin   while read line;     sleep 0.1     echo $line   done */   val p: java.lang.process = runtime.getruntime.exec("/path/to/echo.sh")    val source: process[task, string] = process.repeateval(task{      thread.sleep(1000)      system.currenttimemillis().tostring   })    val linesr: stream.process[task, string] = stream.io.linesr(p.getinputstream)   val printlines: sink[task, string] = stream.io.printlines(new printstream(p.getoutputstream))    val in: process[task, unit] = source printlines    val zip: process[task, (unit, string)] = in.zip(linesr)   val out: process[task, string] = zip.map(_._2) observe stream.io.stdoutlines   out.run.run } 

after delving little deeper more advanced types. looks exchange want.

import java.io.printstream  import scalaz._ import scalaz.concurrent.task import scalaz.stream._ import scalaz.stream.io._  object main extends app { /*   # echo.sh prints stdout gets on stdin   while read line;     sleep 0.1     echo $line   done */   val program: java.lang.process = runtime.getruntime.exec("./echo.sh")    val source: process[task, string] = process.repeateval(task{      thread.sleep(100)      system.currenttimemillis().tostring   })    val read: stream.process[task, string] = linesr(program.getinputstream)   val write: sink[task, string] = printlines(new printstream(program.getoutputstream))   val exchange: exchange[string, string] = exchange(read, write)   println(exchange.run(source).take(10).runlog.run) } 

Comments

Popular posts from this blog

angularjs - ADAL JS Angular- WebAPI add a new role claim to the token -

php - CakePHP HttpSockets send array of paramms -

node.js - Using Node without global install -