純粋関数型技術メモ

内容は副作用を含みます

Functional Reactive Programming でテトリスを書いてみる(2) [Haskell]

前回は基本的なデータ型を定義し,テトリスのフィールドを表示させた.

入力

キー入力によって動作が変わるようにしたい.
一旦テトリスを離れる.

newtype Input = Get Char

inputEvent :: IO (Event Input)

echo :: Event Input -> Event Output
echo i = (\(Get c) -> Put c) <$> i

main = do
    hSetEcho stdin False
    input <- inputEvent
    run $ echo input

インターフェースとしては,inputEventから得た入力イベントを出力イベントに変換する形にする.

import Data.Time

run :: Event Output -> IO ()
run (Event o) = do
    utct <- getCurrentTime
    mapM_ (\(t', o) -> do
        t <- getTime utct
        threadDelay . truncate $ (t' - t) * 1000
        case o of
            Put c -> do
                putChar c
                hFlush stdout
            Clr -> clearScreen
        ) o

inputEvent :: IO (Event Input)
inputEvent = do
    utct <- getCurrentTime
    let go = do
            c <- getChar
            t <- getTime utct
            if c == '\n' then return []
                         else ((t, Get c):) <$> go
    Event <$> go

getTime :: UTCTime -> IO Time
getTime t = (* 1000) . realToFrac . flip diffUTCTime t <$> getCurrentTime

inputEventは一文字ずつ入力を受けて,時間とセットでイベントとする.
run関数も少し修正.

abc

入力をそのまま表示しエンターキーで終了させたい.
しかしIOが遅延しないために,入力が全て終わってから出力が始まるようになってしまう.

ここで黒魔術を使う.

import System.IO.Unsafe

inputEvent :: IO (Event Input)
inputEvent = do
    utct <- getCurrentTime
    let go = unsafeInterleaveIO $ do
            c <- getChar
            t <- getTime utct
            if c == '\n' then return []
                         else ((t, Get c):) <$> go
    Event <$> go

unsafeInterleaveIOは,IOの中身を遅延させることができる(もちろんunsafe).
これによりEventの計算が遅延されるので,入力を受けて順次出力されるようになった.

ここで一つ問題がある.

echo :: Event Input -> Event Output
echo i = (\(Get c) -> Put c) <$> i

putA :: Event Output
putA = puts $ const "A" <$> atTimes [0,100..]

main = do
    hSetEcho stdin False
    input <- inputEvent
    run $ echo input `mappend` putA
AAAaAAAAAbAAAAcAAAAA...

入力をそのまま出力するechoイベントと 0.1秒おきにAと出力するputAイベントを合成しているが,echoイベントのタイミングでputAイベントの出力がまとめて表示される.合成を行うmappend関数ではイベント配列を時間で比較して並べ替えているために,片方の最新のイベントまで全体の評価が遅延してしまう.

[(0, 'A'), (100, 'A'), (200, 'A'), (300, 'A'), (400, 'A'), (500, 'A'), ...
`mappend`
[                (150, 'a'),                   (400, 'b'), ...

シンプルな実装方法では限界が見えてきた.

スレッドを用いた実装

Control.Concurrent.Chan は,並行処理のためのFIFOライブラリである.
ここからは各イベントは軽量スレッドとして並行実行し,キューでストリームを表現する.

Event

{-# LANGUAGE DeriveFunctor #-}

type Time = Double

newtype Event a = Event (Queue a -> IO ())

type Queue a = Chan (EventItem a)

data EventItem a = Item a
                 | End
    deriving Functor

newtype Input = Get Char

data Output = Put Char
            | Clr

run :: Event Output -> IO ()
run (Event f) = do
    ch <- newChan
    forkIO $ f ch
    loop ch
    where
        loop :: Queue Output -> IO ()
        loop ch = do
            e <- readChan ch
            case e of
                End    -> mempty
                Item o -> do
                    case o of
                        Put c -> do
                            putChar c
                            hFlush stdout
                        Clr -> clearScreen
                    loop ch

atTimes :: [Time] -> Event ()
atTimes ts = Event $ \ch -> do
    utct <- getCurrentTime
    forM_ ts (\t' -> do
        t <- getTime utct
        threadDelay . truncate $ (t' - t) * 1000
        writeChan ch $ Item ())
    writeChan ch End

getTime :: UTCTime -> IO Time
getTime t = (* 1000) . realToFrac . flip diffUTCTime t <$> getCurrentTime

main = run $ const (Put 'a') <$> atTimes [0, 500..]
aaa...

Eventは,出力先のChanを受け取るとEventItemのストリームを流す.
Chanを通してEventや関数を繋いでいくことでストリームを形成する.

isEnd :: EventItem a -> Bool
isEnd End = True
isEnd _   = False

instance Functor Event where
    f `fmap` (Event g) = Event $ \ch -> do
        ch' <- newChan
        forkIO $ g ch'
        let loop = do
             e <- readChan ch'
             writeChan ch $ f <$> e
             unless (isEnd e) loop
        loop

instance Applicative Event where
    pure = return
    (<*>) = ap

instance Monad Event where
    return a = Event $ \ch -> do
        writeChan ch $ Item a
        writeChan ch End
    (Event f) >>= g = Event $ \ch -> do
        ch' <- newChan
        count <- newMVar (1 :: Int)
        forkIO $ f ch'
        let loop = do
             e <- readChan ch'
             modifyMVar_ count $ return . (+ 1)
             case e of
                 End -> return ()
                 Item a -> do
                     let Event h = g a
                     forkIO $ recieve h
                     loop
            recieve h = do
             c <- newChan
             forkIO $ h c
             let loop' = do
                  e <- readChan c
                  case e of
                      End -> do
                          modifyMVar_ count $ return . subtract 1
                          i <- readMVar count
                          if i == 0 then writeChan ch End
                                    else loop'
                      Item _ -> do
                          writeChan ch e
                          loop'
             loop'
        loop
        modifyMVar_ count $ return . subtract 1

instance Monoid (Event a) where
    mempty = Event $ \ch -> writeChan ch End
    a `mappend` b = join $ listE [(0, a), (0, b)]

inputEvent :: IO (Event Input)
inputEvent = do
    ch <- newChan
    forkIO $ loop ch
    return $ Event $ \ch' -> do
        c <- dupChan ch
        joinQueue c ch'
    where
        loop :: Queue Input -> IO ()
        loop ch = do
            c <- getChar
            if c == '\n'
            then writeChan ch End
            else do
                writeChan ch $ Item (Get c)
                loop ch

joinQueue :: Queue a -> Queue a -> IO ()
joinQueue ca cb = do
    e <- readChan ca
    writeChan cb e
    unless (isEnd e) $ joinQueue ca cb

delay :: Time -> Event a -> Event a
delay t (Event f) = Event $ \ch -> do
    ch' <- newChan
    forkIO $ f ch'
    let loop = do
            e <- readChan ch'
            forkIO $ do
                threadDelay . truncate $ t * 1000
                writeChan ch e
            unless (isEnd e) loop
    loop

echo :: Event Input -> Event Output
echo = fmap $ \(Get c) -> Put c

main = do
    hSetEcho stdin False
    input <- inputEvent
    run $ (const (Put 'i') <$> input) `mappend`
          delay 2500 (echo input)     `mappend`
          const (Put 'A') <$> atTimes [0, 1000..]
AAAAAiaAAiAbAAiicAideAAiAaAAA...

入力を0.5秒遅れで出力・入力があった時にiと出力・1秒ごとにAと出力 の3つのEventを合成できた.

fmapやmappendでは,forkIOで並行にEventを生成しストリームをつなげている.
ストリームは最終的な出力を根とする木構造になるので,末端となるinputEventでは入力が重複しないようChanを複製している.

Behavior

newtype Behavior a = Behavior (IO (IO a))
    deriving Functor

instance Applicative Behavior where
    pure = Behavior . pure . pure
    Behavior f <*> Behavior b = Behavior $ do
        fi <- f
        bi <- b
        return $ fi <*> bi

time :: Behavior Time
time = Behavior $ do
    utct <- getCurrentTime
    return $ getTime utct

Behaviorは単に値を返すIO型として実装した.初期化時と取得時の二段階のIOになっている.

snapshot :: Behavior b -> Event a -> Event (a, b)
snapshot (Behavior bh) (Event f) = Event $ \ch -> do
    ch' <- newChan
    forkIO $ f ch'
    bi <- bh
    let loop = do
         e <- readChan ch'
         b <- bi
         writeChan ch $ (, b) <$> e
         unless (isEnd e) loop
    loop

snapshot :: Behavior b -> Event a -> Event (a, b)
snapshot (Behavior bh) (Event f) = Event $ \ch -> do
    ch' <- newChan
    forkIO $ f ch'
    bi <- bh
    let loop = do
         e <- readChan ch'
         b <- bi
         writeChan ch $ (, b) <$> e
         unless (isEnd e) loop
    loop

stepper :: a -> Event a -> Behavior a
stepper a (Event f) = Behavior $ do
    ch <- newChan
    forkIO $ f ch
    var <- newMVar a
    let loop = do
         e <- readChan ch
         case e of
             End -> return ()
             Item a -> do
                 swapMVar var a
                 loop
    forkIO loop
    return $ readMVar var

main = do
    hSetEcho stdin False
    input <- inputEvent
    run $ puts (show . snd <$> snapshot time input) `mappend`
          delay 200 (echo input)                    `mappend`
          puts (const "A\n" <$> atTimes [0, 500..])
A
A
532.432aA
A
A
2399.504A
bA
A
3652.25cA
4151.362dA
A

...

狙い通り動くようになった.

今回はここまで.

前回(1) <

今回のソース