「ヒープ (heap)」は「半順序木 (partial ordered tree)」と呼ばれる木構造の一種で、普通は二分木を使った二分ヒープのことを指します。ヒープを利用すると、最小値をすぐに見つけることができ、新しくデータを挿入する場合も、高々要素の個数 (n) の対数 (log2 n) に比例する程度の時間で済みます。
ヒープは配列を使って簡単に実装することができます。また、二分木を使ったヒープの実装では Leftist Heap と Skew Heap というアルゴリズムがあります。
Haskell の場合、配列の操作は副作用を伴うので、木構造である Leftist Heap や Skew Heap の方が扱いやすいと思います。今回は配列によるヒープの実装を説明し、Leftist Heap と Skew Heap は次回以降に説明します。
一般的な二分木では、親よりも左側の子のほうが小さく、親よりも右側の子が大きい、という関係を満たすように作ります。「半順序木」の場合、親は子より小さいか等しい、という関係を満たすように作ります。
このとき、葉はすべて同じ高さになるか、そうでなければ、葉は左から右へ順番に埋めていきます。このような二分木は配列で表すことができます。ヒープの場合、木の根を配列の添字 0 とすると、0 番目には必ず最小値のデータが格納されます。
下図にヒープと配列の関係を示します。
0 1 2 3 4 5 6
TABLE [10 20 30 40 50 60 70]
(root)
10 (0)
/ \ 親の添字を k とすると
/ \ その子は 2*k+1, 2*k+2 になる。
20 (1) 30 (2) 子の添字を k とすると
/ \ / \ その親は (k - 1) / 2 になる。
40 50 60 70 親の値 <= 子の値 の関係を満たす。
(3) (4) (5) (6)
図 : ヒープと配列の対応関係
ヒープは、下図の手順で作ることができます。まず、データを最後尾に追加します。そして、このデータがヒープの条件を満たしているかチェックします。もしも、条件を満たしていなければ、親と子を入れ換えて、次の親をチェックします。
これを木のルート方向 (添字 0 の方向) に向かって繰り返します。条件を満たすか、木のルート (添字 0) まで到達すれば、処理を終了します。これをデータの個数だけ繰り返します。
TABLE [* * * * * * * * * *] 最初は空
[80 * * * * * * * * *] 最初のデータをセット
[80 10 * * * * * * * *] 次のデータをセットし親と比較
親 子 親の位置 0 = (1 - 1)/2
[10 80 * * * * * * * *] 順序が違っていたら交換
[10 80 60 * * * * * * *] データをセットし比較
親 子 親の位置 0 = (2 - 1)/2
[10 80 60 20 * * * * * *] データをセットし比較
親 子 親の位置 1 = (3 - 1)/2
[10 20 60 80 * * * * * *] 交換する
・・・・データがなくなるまで繰り返す・・・・
図 : ヒープの構築 (1)
このアルゴリズムを Haskell でプログラムすると、次のようになります。
リスト : ヒープの構築 (1)
-- 要素の比較
compItem :: Ord a => IOArray Int a -> Int -> Int -> IO Ordering
compItem buff i j = liftM2 (compare) (readArray buff i) (readArray buff j)
-- 要素の交換
swapItem :: IOArray Int a -> Int -> Int -> IO ()
swapItem buff i j = do
a <- readArray buff i
b <- readArray buff j
writeArray buff i b
writeArray buff j a
-- ルート方向に向かってヒープを構築
upheap :: Ord a => IOArray Int a -> Int -> IO ()
upheap buff n = do
when (n > 0) $ do
let p = (n - 1) `div` 2
t <- compItem buff p n
when (t == GT) $ do
swapItem buff p n
upheap buff p
関数 compItem は配列の要素を比較します。関数 swapItem は配列の要素を交換します。Ordering は大小関係を表すデータ型です。
data Ordering = LT | EQ | GT
関数 compare x y は x < y であれば LT を、x == y であれば EQ を、x > y であれば GT を返します。Ordering は大小関係 LT < EQ < GT が定義されているので、たとえば compare x y の返り値を t とすると、x <= y は t <= EQ で調べることができます。
関数 upheap はヒープを満たすように n 番目の要素をルート方向に向かって移動させます。0 から n - 1 番目までの要素はヒープの条件を満たしているとします。n が 0 の場合、ルートまでたどったので処理を終了します。
n の親を p とすると、p は (n - 1) / 2 で求めることができます。そして、親 p が子 n よりも大きい場合、ヒープの条件を満たさないので p 番目と n 番目の要素を swapItem で交換し、upheap を再帰呼び出しして次の親子関係をチェックします。そうでなければ、ヒープの条件を満たしているので処理を終了します。
実際にヒープを構築する場合は、配列の最後尾にデータを追加して、upheap を呼び出せばいいわけです。また、データが格納されている配列でも、upheap を適用してヒープを構築することができます。簡単な例を示します。
ghci> a <- newListArray (0,9) [5,6,4,7,3,8,2,9,1,0] :: IO (IOArray Int Int) ghci> mapM_ (upheap a) [1..9] ghci> getElems a [0,1,3,4,2,8,5,9,7,6]
ただし、この方法はデータ数を n とすると upheap を n - 1 回呼び出すため、それほど速い方法ではありません。もう少し高速な方法はあとで説明することにしましょう。
次は、最小値を取り出したあとで新しいデータを追加し、ヒープを再構築する手順を説明します。
TABLE [10 20 30 40 50 60 70 80 90 100] ヒープを満たしている
[* 20 30 40 50 60 70 80 90 100] 最小値を取り出す
[66 20 30 40 50 60 70 80 90 100] 新しい値をセット
[66 20 30 40 50 60 70 80 90 100] 小さい子と比較する
^ ^ (2*0+1) < (2*0+2)
親 子 子
[20 66 30 40 50 60 70 80 90 100] 交換して次の子と比較
^ ^ (2*1+1) < (2*1+2)
親 子 子
[20 40 30 66 50 60 70 80 90 100] 交換して次の子と比較
^ ^ (2*3+1) < (2*3+2)
親 子 子 親が小さいから終了
図 : ヒープの再構築
最初に、ヒープの最小値である添字 0 の位置にあるデータを取り出します。次に、その位置に新しいデータをセットし、ヒープの条件を満たしているかチェックします。ヒープの構築とは逆に、葉の方向 (添字の大きい方向) に向かってチェックしていきます。
まず、2 つの子の中で小さい方の子を選び、それと挿入したデータを比較します。もしも、ヒープの条件を満たしていなければ、親と子を交換し、その次の子供と比較します。これを、条件を満たすか、子供がなくなるまで繰り返します。
このアルゴリズムを Haskell でプログラムすると次のようになります。
リスト : ヒープの再構築
downheap :: Ord a => IOArray Int a -> Int -> Int -> IO ()
downheap buff n h = iter n h
where
selectChild c1 h = do
let c2 = c1 + 1
if c2 > h
then return c1
else do
t <- compItem buff c1 c2
if t == GT
then return c2
else return c1
iter n h = do
let c1 = 2 * n + 1
when (c1 <= h) $ do
c <- selectChild c1 h
t <- compItem buff n c
when (t == GT) $ do
swapItem buff n c
iter c h
関数 downheap はヒープを満たすように n 番目の要素を葉の方向へ移動させます。n + 1 番目から最後までの要素はヒープを満たしているとします。引数 h は最後の要素の位置を表します。実際の処理は局所関数 iter で行います。
最初に、n の子 c1 を求めます。これが h よりも大きければ処理を終了します。そして、もう一つの子 (c + 1) がある場合は、小さい子を選択します。この処理を局所関数 selectChild で行っています。もう一つの子を c2 とすると、c2 が h より大きければ c1 を返します。そうでなければ、compItem で c1 と c2 を比較して小さな子を選びます。
次に、selectChild で選んだ子 c と親 n を比較し、親が大きい場合は swapItem で親と子を交換します。それから、iter を再帰呼び出しして次の親子関係をチェックします。親が子以下の場合はヒープの条件を満たしているので処理を終了します。
最小値を取り出したあと新しいデータを挿入しない場合は、新しいデータの代わりに配列 buff の最後尾のデータを buff の 0 番目にセットしてヒープを再構築します。上図の例でいえば、100 を buff[0] にセットして、ヒープを再構築すればいいわけです。この場合、ヒープに格納されているデータの個数は一つ減ることになります。
ところで、n 個のデータをヒープに構築する場合、n - 1 回 upheap を呼び出さなければいけません。ところが、すべてのデータを配列に格納したあと、ヒープを構築するうまい方法があります。次の図を見てください。
TABLE [100 90 80 70 60|50 40 30 20 10] 後ろ半分が葉に相当
[100 90 80 70|60 50 40 30 20 10] 60 を挿入する
^
[100 90 80 70|60 50 40 30 20 10] 子供と比較する
^ ^ (2*4+1), (2*4+2)
親 子
[100 90 80 70|10 50 40 30 20 60] 交換する
・・・ 70 80 90 を順番に挿入し修正する ・・・
[100|10 40 20 60 50 80 30 70 90] 90 を挿入し修正した
[100 10 40 20 60 50 80 30 70 90] 100 を挿入、比較
^ ^ ^ (2*0+1), (2*0+2)
親 子 子
[10 100 40 20 60 50 80 30 70 90] 小さい子と交換し比較
^ ^ ^ (2*1+1), (2*1+2)
親 子 子
[10 20 40 100 60 50 80 30 70 90] 小さい子と交換し比較
^ ^ ^ (2*3+1), (2*3+2)
親 子 子
[10 20 40 30 60 50 80 100 70 90] 交換して終了
図 : ヒープの構築 (2)
配列を前半と後半の 2 つに分けると、後半部分はこれより下にはデータが繋がっていない葉の部分になります。つまり、後半部分の要素は互いに関係がなく、前半部分の枝にあたる要素と関係しているだけでなのです。したがって、後半部分だけを見れば、それはヒープを満たしていると考えることができます。
あとは、前半部分の要素に対して、葉の方向に向かってヒープの関係を満たすよう修正していけば、配列全体がヒープを満たすことになります。この処理は関数 downheap を使うと次のように簡単にプログラムできます。
ghci> a <- newListArray (0,9) [5,6,4,7,3,8,2,9,1,0] :: IO (IOArray Int Int) ghci> mapM_ (\x -> downheap a x 9) [4,3,2,1,0] ghci> getElems a [0,1,2,5,3,8,4,9,7,6]
後ろからヒープを再構築していくと考えるとわかりやすいでしょう。この方法の場合、要素 n の配列に対して、n / 2 個の要素の修正を行えばよいので、最初に説明したヒープの構築方法よりも速くなります。
それでは、ヒープを使って「優先度つき待ち行列 (priority queue)」を作ってみましょう。一般に、キューは先入れ先出し (FIFO : first-in, first-out) のデータ構造です。キューからデータを取り出すときは、先に挿入されたデータから取り出されます。これに対し、優先度つき待ち行列は、データに優先度をつけておいて、優先度の高いデータから取り出していきます。
優先度つき待ち行列は、優先度を基準にヒープを構築することで実現できます。最初に作成する関数を示します。
プログラムは次のようになります。
リスト : 優先度つき待ち行列
-- データ型の定義
data Heap a = Heap Int (IORef Int) (IOArray Int a)
-- ヒープの生成
makeHeap :: Int -> IO (Heap a)
makeHeap n = do
a <- newArray_ (0, n - 1)
b <- newIORef 0
return (Heap n b a)
-- リストからヒープを生成する
fromList :: Ord a => [a] -> IO (Heap a)
fromList xs = do
let n = length xs
m = n `div` 2 - 1
a <- newListArray (0, n - 1) xs
b <- newIORef n
mapM_ (\x -> downheap a x (n - 1)) [m, m-1 .. 0]
return (Heap n b a)
データ型は Heap a とし、データ構築子を Heap としました。第 1 引数が配列 (ヒープ) の大きさ (Int)、第 2 引数が要素の個数 (IORef Int)、第 3 引数が配列を表します。関数 makeHeap n は大きさが n の空のヒープを生成します。配列と IORef を生成して Heap に格納して返すだけです。
関数 fromList はリストからヒープを生成します。リスト xs の要素数を length で求めて変数 n にセットし、n / 2 - 1 の値を変数 m にセットします。配列本体は newListArray で生成し、mapM_ で m 番目から 0 番目の要素に downheap を適用してヒープを構築します。
次はデータを追加する関数 insert を作ります。
リスト : データの追加
insert :: Ord a => Heap a -> a -> IO ()
insert (Heap size cnt buff) x = do
c <- readIORef cnt
if c >= size
then error "Full Heap"
else do
writeArray buff c x
writeIORef cnt (c + 1)
upheap buff c
最初にヒープに格納されているデータ数を求めて変数 c にセットします。c がヒープの大きさ size 以上の場合、ヒープは満杯なのでエラーを送出します。そうでなければ、配列の c 番目に x を挿入し、upheap でヒープを再構築します。データの個数 cnt を +1 することをお忘れなく。
次は最小値を取り出す関数 deleteMin を作ります。
リスト : 最小値の取り出し
deleteMin :: Ord a => Heap a -> IO a
deleteMin (Heap _ cnt buff) = do
c <- readIORef cnt
if c <= 0
then error "Empty Heap"
else do
x <- readArray buff 0
let c1 = c - 1
writeIORef cnt c1
when (c1 > 0) $ do
swapItem buff 0 c1
downheap buff 0 (c1 - 1)
return x
最初にデータの個数を求めて変数 c にセットします。c が 0 以下の場合、ヒープは空なのでエラーを送出します。そうでなければ、配列 buff の 0 番目の要素を取り出して変数 x にセットします。次に、データの個数を -1 します。データが残ってる場合じはヒープを再構築します。最後尾の要素と 0 番目の要素を swapItem で交換し、downheap でヒープを再構築します。最後に x を return で IO に格納して返します。
あとのプログラムは簡単なので説明は割愛いたします。詳細はプログラムリストをお読みください。
それでは実際に実行してみましょう。
ghci> a <- makeHeap 8 :: IO (Heap Int) ghci> isEmpty a True ghci> isFull a False ghci> mapM_ (insert a) [7, 6 .. 0] ghci> deleteMin a 0 ghci> deleteMin a 1 ghci> deleteMin a 2 ghci> deleteMin a 3 ghci> deleteMin a 4 ghci> deleteMin a 5 ghci> deleteMin a 6 ghci> deleteMin a 7 ghci> isEmpty a True ghci> a <- fromList [5,6,4,7,3,8,2,9,1,0] ghci> isFull a True ghci> isEmpty a False ghci> deleteMin a 0 ghci> deleteMin a 1 ghci> deleteMin a 2 ghci> deleteMin a 3 ghci> deleteMin a 4 ghci> deleteMin a 5 ghci> deleteMin a 6 ghci> deleteMin a 7 ghci> deleteMin a 8 ghci> deleteMin a 9 ghci> isEmpty a True
正常に動作していますね。
--
-- heap.hs : 配列を使ったヒープの実装
--
-- Copyright (C) 2013-2021 Makoto Hiroi
--
import Data.Array.IO
import Data.IORef
import Control.Monad
-- データ型の定義
data Heap a = Heap Int (IORef Int) (IOArray Int a)
-- 要素の比較
compItem :: Ord a => IOArray Int a -> Int -> Int -> IO Ordering
compItem buff i j = liftM2 (compare) (readArray buff i) (readArray buff j)
-- 要素の交換
swapItem :: IOArray Int a -> Int -> Int -> IO ()
swapItem buff i j = do
a <- readArray buff i
b <- readArray buff j
writeArray buff i b
writeArray buff j a
-- ルート方向に向かってヒープを構築
upheap :: Ord a => IOArray Int a -> Int -> IO ()
upheap buff n = do
when (n > 0) $ do
let p = (n - 1) `div` 2
t <- compItem buff p n
when (t == GT) $ do
swapItem buff p n
upheap buff p
-- 葉の方向に向かってヒープを構築
downheap :: Ord a => IOArray Int a -> Int -> Int -> IO ()
downheap buff n h = iter n h
where
selectChild c1 h = do
let c2 = c1 + 1
if c2 > h
then return c1
else do
t <- compItem buff c1 c2
if t == GT
then return c2
else return c1
iter n h = do
let c1 = 2 * n + 1
when (c1 <= h) $ do
c <- selectChild c1 h
t <- compItem buff n c
when (t == GT) $ do
swapItem buff n c
iter c h
-- ヒープの生成
makeHeap :: Int -> IO (Heap a)
makeHeap n = do
a <- newArray_ (0, n - 1)
b <- newIORef 0
return (Heap n b a)
-- リストからヒープを生成する
fromList :: Ord a => [a] -> IO (Heap a)
fromList xs = do
let n = length xs
m = n `div` 2 - 1
a <- newListArray (0, n - 1) xs
b <- newIORef n
mapM_ (\x -> downheap a x (n - 1)) [m, m-1 .. 0]
return (Heap n b a)
-- データの追加
insert :: Ord a => Heap a -> a -> IO ()
insert (Heap size cnt buff) x = do
c <- readIORef cnt
if c >= size
then error "Full Heap"
else do
writeArray buff c x
writeIORef cnt (c + 1)
upheap buff c
-- 最小値を取り出す
deleteMin :: Ord a => Heap a -> IO a
deleteMin (Heap _ cnt buff) = do
c <- readIORef cnt
if c <= 0
then error "Empty Heap"
else do
x <- readArray buff 0
let c1 = c - 1
writeIORef cnt c1
when (c1 > 0) $ do
swapItem buff 0 c1
downheap buff 0 (c1 - 1)
return x
-- 最小値を求める
findMin :: Ord a => Heap a -> IO a
findMin (Heap _ cnt buff) = do
c <- readIORef cnt
if c <= 0
then error "Empty Heap"
else readArray buff 0
-- ヒープは空か
isEmpty :: Heap a -> IO Bool
isEmpty (Heap _ cnt _) = do
c <- readIORef cnt
return (c == 0)
-- ヒープは満杯か
isFull :: Heap a -> IO Bool
isFull (Heap size cnt _) = do
c <- readIORef cnt
return (c == size)