scala - How do you write to and read from an external process using scalaz streams -
i able send data scalaz stream external program , result of item in 100ms in future. although able code below zipping output stream sink
input stream process
, throwing away sink
side effect, feel solution may brittle.
if external program has error 1 of input items out of sync. feel best bet send sort of incremental id external program can echo out in future if error occurs can resync.
the main trouble having joining result of sending data external program process[task, unit]
output of program process[task, string]
. feel should using wyn
not sure.
import java.io.printstream import scalaz._ import scalaz.concurrent.task import scalaz.stream.process._ import scalaz.stream._ object main extends app { /* # echo.sh prints stdout gets on stdin while read line; sleep 0.1 echo $line done */ val p: java.lang.process = runtime.getruntime.exec("/path/to/echo.sh") val source: process[task, string] = process.repeateval(task{ thread.sleep(1000) system.currenttimemillis().tostring }) val linesr: stream.process[task, string] = stream.io.linesr(p.getinputstream) val printlines: sink[task, string] = stream.io.printlines(new printstream(p.getoutputstream)) val in: process[task, unit] = source printlines val zip: process[task, (unit, string)] = in.zip(linesr) val out: process[task, string] = zip.map(_._2) observe stream.io.stdoutlines out.run.run }
after delving little deeper more advanced types. looks exchange
want.
import java.io.printstream import scalaz._ import scalaz.concurrent.task import scalaz.stream._ import scalaz.stream.io._ object main extends app { /* # echo.sh prints stdout gets on stdin while read line; sleep 0.1 echo $line done */ val program: java.lang.process = runtime.getruntime.exec("./echo.sh") val source: process[task, string] = process.repeateval(task{ thread.sleep(100) system.currenttimemillis().tostring }) val read: stream.process[task, string] = linesr(program.getinputstream) val write: sink[task, string] = printlines(new printstream(program.getoutputstream)) val exchange: exchange[string, string] = exchange(read, write) println(exchange.run(source).take(10).runlog.run) }
Comments
Post a Comment