M.Hiroi's Home Page

Algorithms with Python

ブロックソート (BlockSorting) [1]

[ PrevPage | Python | NextPage ]

はじめに

昔の話ですが、ファイルの圧縮ツールといえば日本では LHA が定番でした。現在、Windows では zip が標準ツールとして使用されています。Unix 系の OS では、複数のファイルを tar (テープアーカイバ) で一つのファイルにまとめ、それを圧縮ツール gzip で圧縮することが一般的でした。最近では、gzip のかわりに bzip2 や xz (XZ Utils) を使うことが多くなってきたようです。一般的なファイルであれば、bzip2 や xz のほうが LHA, zip, gzip よりも高い圧縮率になるようです。

bzip2 の圧縮率が高いのは「ブロックソート (BlockSorting)」という方法を使っているからです。ブロックソート法は 1994 年に M. Burrows 氏と D. J. Wheeler 氏が提案した方法で、Burrows-Wheeler Transform (BWT) と呼ばれることもあります。ブロックソート法はデータを圧縮するのではなく、データを圧縮しやすい形に変換するだけです。そして、ほかの方法でいっきに圧縮するわけです。

ブロックソート法を採用した圧縮ツールは bzip2 以外にもありますが、日本では鶴田真一氏が作成された GCA, DGCA が素晴らしい性能を発揮しているようです。今回はブロックソートを使ってファイルを圧縮してみましょう。

●ブロックソートの符号化

最初に、ブロックソートについて簡単に説明します。ブロックソートというと、なにやら難しいソートを使うのではないか、と思われた方もいるでしょう。M.Hiroi も最初はそのように思いました。ところが、ブロックソートという特別なソートがあるわけではありません。ブロックソートの動作はとても簡単で、特別なソートを使わなくても実現できます。ただし、ソートは時間がとてもかかる処理なので、実用的なツールを作成する場合、時間を短縮するための工夫が必要になります。

それでは、記号列 "aeadacab" をブロックソートしてみましょう。まず、記号列を 1 記号ずつシフトして、新しい記号列を生成します。これを記号列が 1 回転するまで続けます。生成された記号列は、次のようになります。

図 (1) に示すように、生成された記号列は元のデータを含めて 8 個になります。次に、これらの記号列をソートします。その結果は (2) のようになります。最後に、(2) から元のデータの「位置」と、各記号列の「最後の記号」を順番に取り出して出力します。これでブロックソートは終了です。記号列は 1 回転しているので、最後の記号を順番に取り出すことで記号の並びは変わりますが、記号列の中身(a が 4 個、b, c, d, e が 1 個)に変わりはありません。ようするに、ブロックソートは記号列 "aeadacab" を "cdebaaaa" に変換しているだけなのです。

ここで、変換後の記号列 "cdebaaaa" に注目してください。同じ記号 a が並んでいることがわかりますね。これがブロックソートの効果です。この例では記号は 8 個しかありませんが、もっと長い記号列をブロックソートすれば、同じ記号をたくさん並べることができます。たとえば、the を多数含むテキストをブロックソートすると、"e ..... th" や "he ..... t" といった記号列が生成されます。その結果、変換後の記号列には多数の h や t が並ぶことになるわけです。

このように、ブロックソートはデータを並べ替えただけなので、記号の出現頻度はまったく変わっていないことに注意してください。したがって、このデータをそのままハフマン符号やレンジコーダで圧縮しても、効果はまったくありません。それでは、連長圧縮 (ランレングス) を使ってみよう、と思われた方もいるでしょう。確かにランレングスでもある程度は圧縮できるのですが、それよりも記号の出現頻度を偏った方向へ変換できれば、高い圧縮率を達成することができるはずです。

一番簡単な方法は、ブロックソートのあとに「MTF (Move-To-Front) 法」を適用することです。MTF 法でデータを変換すると、ハフマン符号やレンジコーダでも効率よく圧縮することができます。MTF 法については、あとで詳しく説明します。

●ブロックソートの復号

次はブロックソートの復号について説明します。記号列 "aeadacab" をブロックソートで変換すると、"cdebaaaa" と元のデータの位置 3 を出力しました。この情報から元のデータを復号します。ブロックソートの復号は、パズルを解くみたいで実に面白い方法です。次の図を見てください。

最初に、記号列 "cdebaaaa" をソートします。すると、"aaaabcde" になりますね。ブロックソートは、最後の記号を順番に取り出して出力しています。したがって、出力された記号列をソートすれば、先頭の記号を求めることができます。

次に記号の繋がりを求めます。ブロックソートは記号列を 1 つずつシフトすることで、新しい記号列を生成しています。"aeadacab" を 1 つシフトすると "eadacaba" になりますね。このように、先頭の記号は末尾へ移動するので、最後の記号と先頭の記号は繋がっていることがわかります。つまり、記号 b の次は記号 a になります。同じ記号 (a) が複数あるので、位置で表した方がわかりやすいでしょう。記号 b の位置は [3] ですね。[3] の次は位置 [7] の記号 a で、[7] の次は [2] の記号 e というように、記号の繋がりを求めることができます。

記号の繋がりがわかっても、どこから始めたらいいかわかりません。そのために、元のデータの位置を出力しているわけです。この場合、元データの位置は 3 ですから、3 番目の記号列 "a ...... b" を復号します。この場合、記号 a の位置 [7] から記号の繋がりをたどっていけば、元の記号列 "aeadacab" を復号することができます。

ブロックソートの場合、符号化にはソートが必要になるため時間がかかりますが、復号はとても簡単に行うことができるのです。

●ブロックソートのプログラム

それでは実際に簡単なテストプログラムを作ってみましょう。ブロックソートをプログラムする場合、記号列を回転させる処理が必要になります。単純に考えると、M 個の記号列を回転させるには M * M バイトの領域が必要になりますが、下図に示すように同じ記号列を後ろにコピーすることで、簡単に実現することができます。

ブロックソートでいちばん時間がかかる処理がソートです。データ数が多くなると「クイックソート」や「マージソート」でも時間がかかるのです。実用的なツールでは高速にソートするための工夫が必要になります。時間を短縮するため、先頭 2 記号で分布数えソートを行い、そのあとでマージソートやクイックソートで仕上げるなど、いろいろな工夫が考えられますが、もっと高速にソートできる方法があります。

巨大なテキストデータを高速に検索するためのデータ構造に接尾辞は列 (suffix array) があります。接尾辞配列の構築にはブロックソートと同様にデータのソートが必要になるため、今までに高速なアルゴリズムが研究・開発されています。これらのアルゴリズムをブロックソートに適用することで、巨大なデータでも実用的な時間でソートすることができます。興味のある方は拙作のページ 接尾辞配列 (suffix array) をお読みください。

ここでは簡単なテストプログラムということで、とりあえず Python のメソッド sort() を使うことにします。プログラムは次のようになります。

リスト : ブロックソート

import functools

# 比較関数
def compare(x, y, size):
    for n in range(size):
        r = ord(buff[x + n]) - ord(buff[y + n])
        if r != 0: return r
    return 0

# 符号化
def bs_encode(data):
    global buff
    size = len(data)
    buff = data * 2
    idx = list(range(size))
    idx.sort(key = functools.cmp_to_key(lambda x, y: compare(x, y, size)))
    work = []
    for x in range(size):
        p = idx[x]
        if p == 0: top = x
        work.append(buff[p + size - 1])
    return top, work

# 復号
def bs_decode(top, data):
    size = len(data)
    idx = list(range(size))
    idx.sort(key = lambda x: ord(data[x]))
    buff = []
    p = idx[top]
    for _ in range(size):
        buff.append(data[p])
        p = idx[p]
    return ''.join(buff)

# test
if __name__ == '__main__':
    a = 'abacadaeafagahaiajakalaman'
    print(a)
    top, buff = bs_encode(a)
    print(top, buff)
    print(bs_decode(top, buff))

符号化は関数 bs_encode() で行います。引数 data はブロックソートするデータ (文字列) です。data の長さを size にセットします。そして、グローバル変数 buff に data を 2 倍した文字列をセットします。buff に格納されたデータはソートできないので、配列 idx に buff への位置をセットして、idx をソートすることにします。idx の要素は 0 から size - 1 に初期化します。

次に、メソッド sort() でソートします。モジュール functools の関数 cmp_to_key() を使って比較関数 compare() をラップして、sort() のキーワード引数 key に渡します。データの大きさを compare() の引数に渡すため lambda 式を使っています。compare() の引数 x, y には配列 idx の要素が渡されるので、その位置から buff の要素を比較していくだけです。buff には data + data がセットされているので、データの終端をチェックする必要はありません。

最後にデータを出力します。idx から buff への位置を取り出して変数 p にセットします。そして、最後の文字 buff[p + size - 1] をバッファ work に書き込みます。もし、p が 0 の場合は、その位置 x を変数 top にセットしておきます。そして、最後に top と work を返します。

復号を行う関数 bs_decode() も簡単です。引数 top がデータの先頭位置、data が復号するデータです。最初に、文字単位でデータをソートします。今回はテストプログラムということでメソッド sort() を使いましたが、分布数えソートを使った方が高速になると思います。

あとはブロックソートの復号で説明したことをそのままプログラムするだけです。データの先頭位置は引数 top に渡されるので、p = idx[top] で文字の位置を取り出して、文字 data[p] をバッファ buff に書き込みます。そして、次の文字の位置を p = idx[p] で求めます。最後に、buff のデータをメソッド join() で連結して返します。

これでプログラムは完成です。それでは実際に試してみましょう。

abacadaeafagahaiajakalaman

0 ['n', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 
'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a']

abacadaeafagahaiajakalaman

正しく動作していますね。このように、ブロックソートすることで同じ記号を並べることができます。

●MTF (Move-To-Front) 法

次は MTF (Move-To-Front) 法について説明します。MTF は「同じ記号がいくつ前に現れたか」を符号にする方法で、最近現れた記号ほど小さな値に変換することができます。MTF は記号の出現表を作ることで簡単に実現できます。たとえば、記号の種類が a, b, c, d の 4 つしかない場合で、記号列 "abccddddd" を MTF で符号化してみましょう。次の図を見てください。

まず、表を [a, b, c, d] に初期化します。MTF は、この表に現れる記号の位置を符号にします。最初の記号は b ですね。b の位置は 1 番目なので 1 を出力します。ここで、記号 b を表の先頭へ移動します。このように、記号を表の先頭へ移動することから "Move To Front" と呼ばれています。この結果、頻繁に現れる記号は表の先頭付近に集まるので、それらの記号は小さな値に変換されるわけです。

この値を γ符号やδ符号 で符号化すれば、データを圧縮することは可能です。γ符号とδ符号は P. Elias 氏が開発した正整数を表すための符号です。小さな正整数ほど短い符号が割り当てられているため、MTF で変換したデータを圧縮することができるわけです。MTF とγ符号やδ符号の組み合わせは、簡単に符号化と復号を行うことができるのですが、残念ながらこれだけでは高い圧縮率を達成することができません。

MTF にはもうひとつ特徴があります。記号 d を変換するところを見てください。最初の d は表の 3 番目にあるので 3 を出力します。そして、d を表の先頭へ移動するので、残りの d は 3 つすべて 0 に変換されます。このように、MTF では同じ記号が連続していれば、それを 0 に変換することができます。この特徴がブロックソートに適しているのです。

ブロックソートしたあとのデータでは、同じ記号がたくさん並びます。このデータを MTF で変換すれば、同じ記号の並びを 0 に変換することができます。つまり、t が並んでいるところも、h が並んでいるところも 0 に変換されるのです。したがって、データ全体の中で 0 の割合が著しく増加し、記号の出現頻度を偏らせることができる、圧縮しやすいデータに変換される、というわけです。

●MTF 法のプログラム

それでは MTF のプログラムを作りましょう。MTF はブロックソートと同様にとても簡単にプログラムできます。次のリストを見てください。

リスト : Move To Front 法

# 符号化
def mtf_encode(data):
    table = list(range(256))
    buff = []
    for x in data:
        c = ord(x)
        j = table.index(c)
        if j > 0:
            del table[j]
            table.insert(0, c)
        buff.append(j)
    return buff

# 復号
def mtf_decode(data):
    table = list(range(256))
    buff = []
    for x in data:
        c = table[x]
        if x > 0:
            del table[x]
            table.insert(0, c)
        buff.append(chr(c))
    return buff

# test
if __name__ == '__main__':
    a = 'abacadaeafagahaiajakalaman'
    print(a)
    top, buff = bs_encode(a)
    print(top, buff)
    buff1 = mtf_encode(buff)
    print(buff1)
    buff2 = mtf_decode(buff1)
    print(buff2)
    print(bs_decode(top, buff2))

関数 mtf_encode() で符号化、関数 mtf_decode() で復号を行います。引数 data が入力データです。最初に、配列 table を 0 から 255 までの記号で初期化します。符号化の場合、data から記号 c を読み込み、table を線形探索します。見つけた位置 j が MTF の符号になります。復号の場合、data から符号 j を読み込み、table[j] の値が記号 c になります。そして、j が 0 よりも大きければ、記号 c を table の先頭へ移動させます。

実行結果は次のようになりました。

abacadaeafagahaiajakalaman

0 ['n', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 
'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a']

[110, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 110,
 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

['n', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 
'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a', 'a']

abacadaeafagahaiajakalaman

ブロックソートしたあと同じ記号が並びますが、MTF によりそれらの記号が 0 に変換され、データ全体の中で 0 の割合が著しく増加していることがわかります。このように、MTF により記号の出現頻度を偏らせることができるわけです。

●MTF 法の改良

このように、ブロックソートのあと MTF 法を適用すると、多くの記号が 0 に変換されるため、記号の出現頻度を著しく偏らせることができます。このあと、レンジコーダを適用すると、高い圧縮率を実現することができますが、MTF 法で 0 に変換される記号の割合をもっと多くすることができれば、圧縮率もさらに向上すると思われます。MTF 法は単純なアルゴリズムなので、いろいろなバリエーションが考えられると思いますが、ここでは簡単で効果的な改良方法を紹介します。

●2 番目に移動する方法

MTF 法はその名前が示すように、現れた記号を表の先頭へ移動します。ここで、この移動位置を 2 番目に変更します。そして、先頭へ移動できるのは、2 番目にある記号の場合だけに限定します。つまり、いきなり先頭へ移動するのではなく、まず 2 番目に移動しておいて、それから先頭へ移動するのです。

このアルゴリズムは 1999 年に Balkenhol 氏, Kurtz 氏, Shtarkov 氏が提案したもので MTF-1 と呼ばれています。このドキュメントでも MTF-1 と表記することにします。なお、MTF-1 は Stephan T. Lavavej 氏のページ bwtzip - nuwen.net で説明されています。Stephan T. Lavavej 氏に感謝いたします。

●MTF-1 の説明

たとえば、記号が a, b, c の 3 種類で、記号列 "aaaacbaaaa" を MTF で変換してみましょう。単純な MTF は次のように変換されます。

このように、a の途中で b と c が入っている場合、b と c を先頭へ移動すると "0000222000" に変換されます。それでは、b と c を 2 番目に移動させてみましょう。次の図を見てください。

b と c を 2 番目に移動すると、"aaaacbaaaa" は "0000220000" に変換されます。0 がひとつ増えていますね。これが MTF-1 の効果です。長い記号列をブロックソートすると同じ記号がたくさん並びますが、連続した記号の間に他の記号が入ることも多くあるはずです。したがって、MTF-1 で変換すれば、0 の個数を増やすことができると思われます。もちろん、データによっては 0 の個数が減少することもあるでしょう。ですが、大きなデータには効果を期待してもよさそうです。

●MTF-1 のプログラム

プログラムはとても簡単です。次のリストを見てください。

リスト : MTF-1

# 符号化
def mtf1_encode(buff):
    table = list(range(256))
    for x in range(len(buff)):
        c = buff[x]
        j = table.index(c)
        if j == 1:
            table[1] = table[0]
            table[0] = c
        elif j > 1:
            del table[j]
            table.insert(1, c)
        buff[x] = j

# 復号
def mtf1_decode(buff):
    table = list(range(256))
    for x in range(len(buff)):
        j = buff[x]
        c = table[j]
        if j == 1:
            table[1] = table[0]
            table[0] = c
        elif j > 1:
            del table[j]
            table.insert(1, c)
        buff[x] = c

符号化を行う関数が mtf1_encode() で、復号を行う関数が mtf1_decode() です。入力バッファ buff を直接書き換えていることに注意してください。どちらの関数でも、記号の位置 j が 1 よりも大きければ、その記号を table[1] へ移動します。そして、j が 1 の場合にのみ、その記号を先頭へ移動します。とても簡単ですね。

●さらなる改良

ところで、MTF-1 にも弱点があります。次の図を見てください。

記号が a, b, c の 3 種類で、記号列 "aaaabcaaaa" を MTF-1 で変換してみましょう。この場合、a が続いたあとの記号 b が 2 番目にあるため、b を先頭へ移動します。そして、c を 2 番目に移動するので、a の位置は 3 番目になります。このあと a が続くことになると、a は 2 番目に移動してから先頭に戻るので、結果は "0000122100" になります。これでは 0 の個数が減ってしまいますね。

そこで、直前に出力した記号が 0 以外の場合にのみ、1 番目の記号を先頭へ移動することにします。つまり、0 を出力した直後は、1 番目の記号を移動しないのです。この方法で "aaaabcaaaa" を変換すると、次のようになります。

このように、"aaaabcaaaa" は "0000120000" に変換されます。MTF-1 よりも 0 の個数を増やすことができましたね。このアルゴリズムは 1999 年に Balkenhol 氏と Shtarkov 氏が提案したもので MTF-2 と呼ばれています。以降このドキュメントでも MTF-2 と表記することにします。

なお、MTF-2 は Stephan T. Lavavej 氏のページ bwtzip - nuwen.net で説明されています。Stephan T. Lavavej 氏に感謝します。

●MTF-2 のプログラム

プログラムはとても簡単です。次のリストを見てください。

リスト : MTF-2

# 符号化
def mtf2_encode(buff):
    table = list(range(256))
    prev = 1
    for x in range(len(buff)):
        c = buff[x]
        j = table.index(c)
        if j == 1:
            if prev != 0:
                table[1] = table[0]
                table[0] = c
        elif j > 1:
            del table[j]
            table.insert(1, c)
        buff[x] = j
        prev = j

# 復号
def mtf2_decode(buff):
    table = list(range(256))
    prev = 1
    for x in range(len(buff)):
        j = buff[x]
        c = table[j]
        if j == 1:
            if prev != 0:
                table[1] = table[0]
                table[0] = c
        elif j > 1:
            del table[j]
            table.insert(1, c)
        buff[x] = c
        prev = j

符号化を行う関数が mtf2_encode() で、復号を行う関数が mtf2_decode() です。どちらの関数も直前に出力した記号を変数 prev に格納します。このプログラムでは prev を 1 に初期化しています。この値は 0 でもかまいませんが、mtf2_encode() と mtf2_decode() の prev は同じ値で初期化してください。あとは、j が 1 のときに prev をチェックして、0 以外の値ならば記号を先頭へ、0 ならば移動しません。とても簡単ですね。

●評価結果

それでは、実際に Canterbury Corpus で配布されているテストデータ The Canterbury Corpus で試してみましょう。ブロックソートと MTF 法で変換したあと、記号 0 の個数を数えてみます。そして、適応型レンジコーダで圧縮してみましょう。

ブロックソートは拙作のページで作成した接尾辞配列の構築アルゴリズム 三分割法改良版 [1] を使っています。プログラムの説明は割愛いたしますので、詳細は プログラムリスト をお読みください。結果は次にようになりました。

                        表 : ブロックソートの結果
                    (BlockSorting + MTF + RangeCoder)

                                 MTF              MTF-1             MTF-2
  ファイル名      サイズ  0 の個数  サイズ  0 の個数  サイズ  0 の個数  サイズ
  -----------------------------------------------------------------------------
  alice29.txt    152,089    85,187  47,142    84,717  47,067    85,986  46,720
  asyoulik.txt   125,179    62,815  42,930    62,819  42,798    63,601  42,573
  cp.html         24,603    15,405   8,504    14,218   8,864    14,238   8,855
  fields.c        11,150     7,741   3,192     7,102   3,399     6,964   3,436
  grammar.lsp      3,721     2,376   1,203     2,109   1,378     2,103   1,378
  kennedy.xls  1,029,744   794,903 156,091   798,091 154,758   809,174 151,892
  lcet10.txt     426,754   261,044 117,767   259,027 117,746   262,183 116,775
  plrabn12.txt   481,861   238,303 156,182   243,019 154,467   247,337 153,364
  ptt5           513,216   448,526  57,551   454,484  53,576   455,466  52,947
  sum             38,240    24,980  13,814    23,090  14,280    22,902  14,318
  xargs.1          4,227     2,220   1,762     1,907   1,831     1,901   1,838
  -----------------------------------------------------------------------------
  合計         2,810,784 1,943,500 606,237 1,950,583 600,164 1,971,855 594,096

結果を見ればおわかりのように、ブロックソートのあと MTF 法でデータを変換すると、記号 0 の個数が極めて多くなります。MTF-1 と MTF-2 は MTF よりも 0 の個数が増えているので、MTF を改良した効果は十分に出ていると思います。

大きなファイルの場合、MTF-1 よりも MTF-2 の方が効果が高いようです。このあと、適応型レンジコーダで圧縮すると、MTF-2 が一番高い圧縮率になりました。このほかにも、MTF 法はいろいろなバリエーションが考えられると思います。興味のある方は、MTF 法の改良に挑戦してみてください。

ところで、適応型レンジコーダの代わりに バイナリレンジコーダ を使うと、圧縮率はさらに高くなります。結果は次のようになります。

                表 : ブロックソートの結果
          (BlockSroting + MTF-2 + BinaryRangeCoder)

  ファイル名      サイズ   Alpha    Gamma    Binary    LHA     bzip2
  --------------------------------------------------------------------
  alice29.txt    152,089   43,729   43,611   43,745   59,117   43,202
  asyoulik.txt   125,179   40,087   39,966   40,061   52,341   39,569
  cp.html         24,603    8,404    8,329    8,408    8,384    7,624
  fields.c        11,150    3,309    3,272    3,294    3,170    3,039
  grammar.lsp      3,721    1,337    1,309    1,323    1,271    1,283
  kennedy.xls  1,029,744  136,047  129,415  125,891  198,342  130,280
  lcet10.txt     426,754  108,926  108,660  109,141  159,558  107,706
  plrabn12.txt   481,861  144,888  144,516  144,979  210,045  145,577
  ptt5           513,216   49,544   49,291   50,406   52,305   49,759
  sum             38,240   13,707   13,514   13,563   13,993   12,909
  xargs.1          4,227    1,800    1,769    1,787    1,778    1,762
  --------------------------------------------------------------------
  合計         2,810,784  551,778  543,652  542,598  760,304  542,710

ブロックソート + MTF 法により記号 0 の個数が極めて多くなるので、バイナリレンジコーダのαモデル (Alpha) やγモデル (Gamma) で符号化すると、LHA を超えて bzip2 にせまる高い圧縮率を達成することができます。バイナリモデル (Binary) でも同程度の圧縮率になりました。

ただし、バイナリレンジコーダは符号化・復号処理に時間がとてもかかります。次の表を見てください。

            表 : ブロックソートの実行時間
  (BlockSroting + MTF-2 + RnageCoder, BinaryRangeCoder)

                            RangeCoder        Binary
  ファイル名      サイズ   符号化  復号    符号化  復号
  -------------------------------------------------------
  alice29.txt    152,089   0.896   0.559   2.132   1.471
  asyoulik.txt   125,179   0.750   0.461   1.751   1.252
  cp.html         24,603   0.205   0.103   0.405   0.249
  fields.c        11,150   0.125   0.040   0.237   0.163
  grammar.lsp      3,721   0.085   0.015   0.116   0.046
  kennedy.xls  1,029,744   6.066   3.569  14.615  10.118
  lcet10.txt     426,754   2.485   1.518   5.949   4.200
  plrabn12.txt   481,861   2.751   1.780   6.519   4.868
  ptt5           513,216   2.063   1.621   6.435   5.026
  sum             38,240   0.439   0.154   0.727   0.431
  xargs.1          4,227   0.089   0.018   0.123   0.048
  -------------------------------------------------------
  合計         2,810,784  15.954   9.838  39.009  27.872

符号化と復号の単位 : 秒
実行環境 : Python 3.8.10, Ubuntu 20.04 LTS (WSL1), Intel Core i5-6200U 2.30GHz

バイナリレンジコーダの場合、適応型レンジコーダの 2 倍以上の時間がかかります。バイナリレンジコーダは優れた圧縮性能を持っていますが、処理時間がかかるのが欠点です。もちろん、適応型レンジコーダを使う場合でも、工夫次第でブロックソートの圧縮率を向上させることができます。

一番簡単な方法は MTF 法のあとに「ランレングス」を適用することです。また、ブロックソート法に適した「情報源モデル」を作成すると圧縮率を高くすることができます。次回は、ランレングスとブロックソート法に適した情報源モデルについて説明します。

なお、実行時間の結果は M.Hiroi のコーディング、実行したマシン、プログラミング言語 [*2] などの環境に大きく依存しています。また、これらの環境だけではなく、データの種類によっても実行時間はかなり左右されます。興味のある方は、いろいろなデータをご自分の環境で試してみてください。

-- note --------
[*1] 接尾辞配列の構築アルゴリズムはいろいろありますが、その中で決定版ともいえる方法に SA-IS 法があります。2009 年に Nong, Ge 氏らによって発表された SA-IS 法は、データ数を N とすると O(N) で接尾辞配列を構築できる優秀な方法です。
[*2] PyPy を使うとプログラムを修正せずに高速化することができます。
                 RangeCoder        Binary
  ファイル名    符号化  復号    符号化  復号
  --------------------------------------------
  alice29.txt   0.734   0.204   1.047   0.234
  asyoulik.txt  0.486   0.191   0.794   0.209
  cp.html       0.270   0.132   0.522   0.112
  fields.c      0.231   0.101   0.283   0.089
  grammar.lsp   0.165   0.079   0.147   0.072
  kennedy.xls   1.089   0.513   3.320   0.853
  lcet10.txt    0.827   0.322   1.716   0.457
  plrabn12.txt  0.948   0.376   1.868   0.520
  ptt5          0.547   0.308   1.265   0.466
  sum           0.297   0.122   0.482   0.138
  xargs.1       0.153   0.097   0.214   0.074
  --------------------------------------------
  合計          5.747   2.445  11.658   3.224

符号化と復号の単位 : 秒
実行環境 : PyPy 7.3.1, Ubuntu 20.04 LTS (WSL1), Intel Core i5-6200U 2.30GHz

●参考文献

  1. 広井誠, 『高性能圧縮ツール bsrc の理論と実装 (前編, 後編)』, Interface 2003 年 12 月号, 2004 年 1 月号, CQ出版社
  2. 広井誠, 『高性能圧縮ツール bsrc の改良 bsrc2 (前編, 後編), Interface 2004 年 10 月号, 11 月号, CQ出版社

●プログラムリスト

#
# bwt.py : ブロックソート法
#          (接尾辞配列 [5] の三分割法を改造)
#
#          Copyright (C) 2007-2022 Makoto Hiroi
#
from array import *

# 定数
SIZE = 256 * 256
LIMIT = 10

# ソート用比較関数
def compare(x, y, n):
    for i in range(n, data_size - n):
        r = buff[x + i] - buff[y + i]
        if r != 0: return r
    return 0

# 挿入ソート
def insert_sort(low, high, n):
    for i in range(low + 1, high + 1):
        temp = idx[i]
        j = i - 1
        while j >= low and compare(temp, idx[j], n) < 0:
            idx[j + 1] = idx[j]
            j -= 1
        idx[j + 1] = temp

# 枢軸の選択
def select_pivot(low, high, n):
    m = (high - low) // 4
    a = buff[idx[low + m] + n]
    b = buff[idx[low + m * 2] + n]
    c = buff[idx[low + m * 3] + n]
    if a > b:
        tmp = a
        a = b
        b = tmp
    if b > c:
        b = c
        if a > b: b = a
    return b

# マルチキークイックソート
def mqsort(low, high, n = 0):
    while True:
        if high - low <= LIMIT:
            insert_sort(low, high, n)
            return
        # 枢軸
        p = select_pivot(low, high, n)
        # 4 分割
        i = m1 = low
        j = m2 = high
        while True:
            while i <= j:
                k = buff[idx[i] + n] - p
                if k > 0: break
                if k == 0:
                    idx[i], idx[m1] = idx[m1], idx[i]
                    m1 += 1
                i += 1
            while i <= j:
                k = buff[idx[j] + n] - p
                if k < 0: break
                if k == 0:
                    idx[j], idx[m2] = idx[m2], idx[j]
                    m2 -= 1
                j -= 1
            if i > j: break
            idx[i], idx[j] = idx[j], idx[i]
            i += 1
            j -= 1
        # 等しいデータを中央に集める
        for k in range(min(m1 - low, i - m1)):
            idx[low + k], idx[j - k] = idx[j - k], idx[low + k]
        m1 = low + (i - m1)
        for k in range(min(high - m2, m2 - j)):
            idx[i + k], idx[high - k] = idx[high - k], idx[i + k]
        m2 = high - (m2 - j) + 1
        mqsort(low, m1 - 1, n)
        mqsort(m2, high, n)
        if m1 >= m2: break
        low = m1
        high = m2 - 1
        n += 1

# Type C をソート
def sort_type_c():
    # Type C を Type D, E を分離して Type D をソート
    for x in range(0, 256):
        for y in range(x + 1, 256):
            c = (x << 8) + y
            low = count_sum[c]
            high = count_sum[c + 1]
            m = low
            for n in range(low, high):
                i = idx[n]
                if buff[i + 1] >= buff[i + 2]:
                    idx[n] = idx[m]
                    idx[m] = i
                    m += 1
            if m - low > 1:
                mqsort(low, m - 1, 2)
    #
    # Type E のセット
    #
    # 累積度数表をコピー
    end = count_sum[:]
    # 記号 0, 255 の区間はスキャンする必要なし
    for x in range(254, 0, -1):
        for y in range(255, x, -1):
            c = (x << 8) + y
            for n in range(count_sum[c + 1] - 1, count_sum[c] - 1, -1):
                i = idx[n]
                if i == 0: i = data_size
                if buff[i - 1] < buff[i] < buff[i + 1]:
                    # Type E をセット
                    c1 = (buff[i - 1] << 8) + buff[i] + 1
                    end[c1] -= 1
                    idx[end[c1]] = i - 1

# Type A, B をセット
def set_type_ab():
    start = [0] * 256
    end = [0] * 256
    for i in range(0, 256):
        for j in range(i, 256):
            start[j] = count_sum[(j << 8) + i]
            end[j] = count_sum[(j << 8) + i + 1] - 1
        # 前から後ろへ
        j = count_sum[i << 8]
        while j < start[i]:
            x = idx[j]
            if x == 0: x = data_size
            if buff[x - 1] >= buff[x]:
                c = buff[x - 1]
                idx[start[c]] = x - 1
                start[c] += 1
            j += 1
        # 後ろから前へ
        j = count_sum[(i + 1) << 8] - 1
        while j > end[i]:
            x = idx[j]
            if x == 0: x = data_size
            if buff[x - 1] >= buff[x]:
                c = buff[x - 1]
                idx[end[c]] = x - 1
                end[c] -= 1
            j -= 1

# 三分割法改良版
def suffix_sort(data, size):
    global count_sum, buff, data_size, idx
    buff = data
    data_size = size
    idx = array('I')
    for _ in range(data_size): idx.append(0)
    # 分布数えソート
    count = [0] * SIZE
    count_sum = [0] * (SIZE + 1)
    for x in range(data_size):
        count[(buff[x] << 8) + buff[x + 1]] += 1
    for x in range(1, SIZE + 1):
        count_sum[x] = count[x - 1] + count_sum[x - 1]
    for x in range(1, SIZE):
        count[x] += count[x - 1]
    for x in range(data_size - 1, -1, -1):
        c = (buff[x] << 8) + buff[x + 1]
        count[c] -= 1
        idx[count[c]] = x
    # Type C をソート
    sort_type_c()
    # Type A, B をセット
    set_type_ab()
    # 出力
    work = array('B')
    for x in range(size):
        i = idx[x]
        if i == 0: top = x
        work.append(buff[i + size - 1])
    return work, top
#
# freq.py : 適応型レンジコーダ用の出現頻度表
#           (有限文脈モデル: freq.py)
#
#           Copyright (C) 2007-2022 Makoto Hiroi
#
from rangecoder import *

# 定数
GR = 16

# 出現頻度表
class Freq:
    def __init__(self, size, inc = 1, limit = MIN_RANGE):
        self.size = size
        self.inc = inc
        self.limit = limit
        self.count = [1] * size
        if size % GR == 0:
            self.count_group = [GR] * (size // GR)
        else:
            self.count_group = [GR] * (size // GR + 1)
        self.sum_ = size

    # 出現頻度表の更新
    def update(self, c):
        self.count[c] += self.inc
        self.count_group[c // GR] += self.inc
        self.sum_ += self.inc
        if self.sum_ >= self.limit:
            n = 0
            for x in range(len(self.count_group)):
                self.count_group[x] = 0
            for x in range(self.size):
                self.count[x] = (self.count[x] >> 1) | 1
                self.count_group[x // GR] += self.count[x]
                n += self.count[x]
            self.sum_ = n

    # 記号の累積度数を求める
    def cumul(self, c):
        n = 0
        for x in range(c // GR): n += self.count_group[x]
        for x in range((c // GR) * GR, c): n += self.count[x]
        return n

    # 符号化
    def encode(self, rc, c):
        temp = rc.range_ // self.sum_
        rc.low += self.cumul(c) * temp
        rc.range_ = self.count[c] * temp
        rc.encode_normalize()
        self.update(c)

    # 復号
    def decode(self, rc):
        # 記号の探索
        def search_code(value):
            n = 0
            for x in range(len(self.count_group)):
                if value < n + self.count_group[x]: break
                n += self.count_group[x]
            for c in range(x*GR, self.size):
                if value < n + self.count[c]: break
                n += self.count[c]
            return c, n
        #
        temp = rc.range_ // self.sum_
        c, num = search_code(rc.low // temp)
        rc.low -= temp * num
        rc.range_ = temp * self.count[c]
        rc.decode_normalize()
        self.update(c)
        return c
#
# rangecoder.py : レンジコーダ
#                 (レンジコーダ [1]: rangecoder.py)
#
#                 Copyright (C) 2007-2022 Makoto Hiroi
#
from byteio import *    # 連長圧縮 の byteio.py を使用する

# 定数
ENCODE = "encode"
DECODE = "decode"
MAX_RANGE = 0xffffffff
MIN_RANGE = 0x1000000
MASK      = 0xffffffff
SHIFT     = 24

# レンジコーダー
class RangeCoder:
    def __init__(self, s, mode):
        self.stream = s
        self.range_ = MAX_RANGE
        self.buff = 0
        self.cnt = 0
        if mode == ENCODE:
            self.low = 0
        elif mode == DECODE:
            # buff の初期値 (0) を読み捨てる
            self.stream.getc()
            # 4 byte read
            self.low = self.stream.getc()
            self.low = (self.low << 8) + self.stream.getc()
            self.low = (self.low << 8) + self.stream.getc()
            self.low = (self.low << 8) + self.stream.getc()
        else:
            raise "RangeCoder mode error"

    # 符号化の正規化
    def encode_normalize(self):
        if self.low > MAX_RANGE:
            # 桁上がり
            self.buff += 1
            self.low &= MASK
            if self.cnt > 0:
                self.stream.putc(self.buff)
                for _ in range(self.cnt - 1): self.stream.putc(0)
                self.buff = 0
                self.cnt = 0
        while self.range_ < MIN_RANGE:
            if self.low < (0xff << SHIFT):
                self.stream.putc(self.buff)
                for _ in range(self.cnt): self.stream.putc(0xff)
                self.buff = (self.low >> SHIFT) & 0xff
                self.cnt = 0
            else:
                self.cnt += 1
            self.low = (self.low << 8) & MASK
            self.range_ <<= 8

    # 復号の正規化
    def decode_normalize(self):
        while self.range_ < MIN_RANGE:
            self.range_ <<= 8
            self.low = ((self.low << 8) + self.stream.getc()) & MASK

    # 終了
    def finish(self):
        c = 0xff
        if self.low > MAX_RANGE:
            # 桁上がり
            self.buff += 1
            c = 0
        self.stream.putc(self.buff)
        for _ in range(self.cnt): self.stream.putc(c)
        #
        self.stream.putc((self.low >> 24) & 0xff)
        self.stream.putc((self.low >> 16) & 0xff)
        self.stream.putc((self.low >> 8) & 0xff)
        self.stream.putc(self.low & 0xff)
#
# rangecoder2.py : バイナリレンジコーダ
#                  (バイナリレンジコーダ [1]: rangecoder2.py)
#
#                  Copyright (C) 2007-2022 Makoto Hiroi
#
from rangecoder import *

B_INC = 4
B_LIMIT = 0x200

# コンテキスト
class Context0:
    def __init__(self):
        self.c0 = 1
        self.c1 = 1

    # 更新
    def update(self, bit):
        if bit > 0:
            self.c1 += B_INC
        else:
            self.c0 += B_INC
        if self.c0 + self.c1 >= B_LIMIT:
            self.c0 = (self.c0 >> 1) | 1
            self.c1 = (self.c1 >> 1) | 1

    # 符号化
    def encode(self, rc, bit):
        temp = rc.range_ // (self.c0 + self.c1)
        if bit > 0:
            rc.low += temp * self.c0
            rc.range_ = temp * self.c1
        else:
            rc.range_ = temp * self.c0
        rc.encode_normalize()
        self.update(bit)

    # 復号
    def decode(self, rc):
        temp = rc.range_ // (self.c0 + self.c1)
        if rc.low // temp < self.c0:
            bit = 0
            rc.range_ = temp * self.c0
        else:
            bit = 1
            rc.low -= temp * self.c0
            rc.range_ = temp * self.c1
        rc.decode_normalize()
        self.update(bit)
        return bit

# 改良版 (こちらを使用する)
class Context:
    def __init__(self):
        self.sum_ = 2
        self.c0 = 1

    # 更新
    def update(self, bit):
        self.sum_ += B_INC
        if bit == 0: self.c0 += B_INC
        if self.sum_ >= B_LIMIT:
            self.c0 = (self.c0 >> 1) | 1
            self.sum_ = (self.sum_ >> 1) | 1
            if self.sum_ <= self.c0: self.sum_ = self.c0 + 1

    # ビットの符号化
    def encode(self, rc, bit):
        temp = rc.range_ // self.sum_
        if bit > 0:
            rc.low += temp * self.c0
            rc.range_ -= temp * self.c0
        else:
            rc.range_ = temp * self.c0
        rc.encode_normalize()
        self.update(bit)

    # ビットの復号
    def decode(self, rc):
        temp = rc.range_ // self.sum_
        if rc.low // temp < self.c0:
            bit = 0
            rc.range_ = temp * self.c0
        else:
            bit = 1
            rc.low -= temp * self.c0
            rc.range_ -= temp * self.c0
        rc.decode_normalize()
        self.update(bit)
        return bit

##### バイナリモデル #####

class Freq2:
    def __init__(self, size):
        self.size = size
        self.context = [Context() for _ in range(size - 1)]

    # 符号化
    def encode(self, rc, code):
        def encode_sub(node):
            if node > 0:
                p = (node - 1) // 2
                encode_sub(p)
                # 奇数は左の子 (1), 偶数は右の子 (0)
                bit = node & 1
                self.context[p].encode(rc, bit)
        #
        encode_sub(code + self.size - 1)

    # 復号
    def decode(self, rc):
        node = 0
        node_size = self.size - 1
        while node < node_size:
            bit = self.context[node].decode(rc)
            if bit > 0:
                # 1 は左の子
                node = 2 * node + 1
            else:
                # 0 は右の子
                node = 2 * node + 2
        return node - node_size

##### αモデル #####

class AlphaFreq:
    def __init__(self, size):
        self.size = size - 1
        self.context = [Context() for _ in range(size - 1)]

    # 符号化
    def encode(self, rc, c):
        for x in range(self.size):
            if x < c:
                bit = 0
            else:
                bit = 1
            self.context[x].encode(rc, bit)
            if bit: break

    # 復号
    def decode(self, rc):
        c = 0
        while c < self.size:
            bit = self.context[c].decode(rc)
            if bit: break
            c += 1
        return c

##### γモデル #####

# ビット列モデル
class BitsFreq:
    def __init__(self, size):
        self.size = size
        self.context = [Context() for _ in range(size)]

    # 符号化
    def encode(self, rc, c):
        for x in range(self.size):
            bit = (c >> x) & 1
            self.context[x].encode(rc, bit)

    # 復号
    def decode(self, rc):
        c = 0
        for x in range(self.size):
            bit = self.context[x].decode(rc)
            if bit: c |= bit << x
        return c

# γモデル
class GammaFreq:
    def __init__(self, size):
        n2 = size >> 1
        n1 = 0
        while n2 > 0:
            n1 += 1
            n2 >>= 1
        self.size = n1
        self.context1 = AlphaFreq(n1 + 1)
        self.context2 = [None] * (n1 + 1)
        for x in range(1, n1 + 1):
            self.context2[x] = BitsFreq(x)

    # 符号化
    def encode(self, rc, n):
        n1 = 0
        n2 = (n + 1) >> 1
        while n2 > 0:
            n1 += 1
            n2 >>= 1
        self.context1.encode(rc, n1)
        if n1 > 0:
            self.context2[n1].encode(rc, n + 1)

    # 復号
    def decode(self, rc):
        n1 = self.context1.decode(rc)
        if n1 > 0:
            n2 = self.context2[n1].decode(rc)
            n1 = (1 << n1) + n2 - 1
        return n1
#
# mtf.py : Move To Front 法
#
#          Copyright (C) 2007-2022 Makoto Hiroi
#

# 符号化
def mtf_encode(buff):
    table = list(range(256))
    for x in range(len(buff)):
        c = buff[x]
        j = table.index(c)
        if j > 0:
            del table[j]
            table.insert(0, c)
        buff[x] = j

# 復号
def mtf_decode(buff):
    table = list(range(256))
    for x in range(len(buff)):
        j = buff[x]
        c = table[j]
        if j > 0:
            del table[j]
            table.insert(0, c)
        buff[x] = c


##### MTF-1 #####

# 符号化
def mtf1_encode(buff):
    table = list(range(256))
    for x in range(len(buff)):
        c = buff[x]
        j = table.index(c)
        if j == 1:
            table[1] = table[0]
            table[0] = c
        elif j > 1:
            del table[j]
            table.insert(1, c)
        buff[x] = j

# 復号
def mtf1_decode(buff):
    table = list(range(256))
    for x in range(len(buff)):
        j = buff[x]
        c = table[j]
        if j == 1:
            table[1] = table[0]
            table[0] = c
        elif j > 1:
            del table[j]
            table.insert(1, c)
        buff[x] = c

##### MTF-2 #####

# 符号化
def mtf2_encode(buff):
    table = list(range(256))
    prev = 1
    for x in range(len(buff)):
        c = buff[x]
        j = table.index(c)
        if j == 1:
            if prev != 0:
                table[1] = table[0]
                table[0] = c
        elif j > 1:
            del table[j]
            table.insert(1, c)
        buff[x] = j
        prev = j

# 復号
def mtf2_decode(buff):
    table = list(range(256))
    prev = 1
    for x in range(len(buff)):
        j = buff[x]
        c = table[j]
        if j == 1:
            if prev != 0:
                table[1] = table[0]
                table[0] = c
        elif j > 1:
            del table[j]
            table.insert(1, c)
        buff[x] = c
        prev = j
#
# bsrc.py : ブロックソート (BlockSorting) によるファイルの圧縮
#
#           Copyright (C) 2007-2022 Makoto Hiroi
#
import argparse, os.path, time
from array import *
from bwt import *
from mtf import *

# バイナリレンジコーダを使うときは freq のかわりに rangecoder2 を import する
from freq import *

# 定数
LIMIT = 0x4000
INC   = 4

# 4 バイトの正整数値を出力
def write_number(num, fout):
    fout.putc((num >> 24) & 0xff)
    fout.putc((num >> 16) & 0xff)
    fout.putc((num >> 8) & 0xff)
    fout.putc(num & 0xff)

# 4 バイトの正整数値を入力
def read_number(fin):
    num = 0
    for _ in range(4):
        num = (num << 8) + fin.getc()
    return num

# 符号化
def bs_encode(fin, fout, size):
    # 入力
    buff = array('B')
    buff.fromfile(fin, size)
    buff *= 2
    # BlockSorting
    work, top = suffix_sort(buff, size)
    # Move To Front
    mtf2_encode(work)
    print(work.count(0))
    write_number(top, fout)
    # RangeCoder
    rc = RangeCoder(fout, ENCODE)
    freq = Freq(256, INC, LIMIT)
    for x in range(size):
        freq.encode(rc, work[x])
    rc.finish()

# ブロックソートの復号
def bs_decode(fin, fout, size):
    top = read_number(fin)
    buff = array('B')
    # RangeCoder
    rc = RangeCoder(fin, DECODE)
    freq = Freq(256, INC, LIMIT)
    for _ in range(size):
        buff.append(freq.decode(rc))
    # Move To Front
    mtf2_decode(buff)
    # BlockSorting
    idx = array('I')
    for _ in range(size): idx.append(0)
    # 分布数えソート
    count = [0] * 256
    for x in range(size): count[ buff[x] ] += 1
    for x in range(1, 256): count[x] += count[x - 1]
    for x in range(size - 1, -1, -1):
        c = buff[x]
        count[c] -= 1
        idx[ count[c] ] = x
    # 出力
    x = idx[top]
    for _ in range(size):
        fout.putc(buff[x])
        x = idx[x]

# 符号化
def encode_file(name1, name2):
    size = os.path.getsize(name1)
    with open(name1, 'rb') as fin, ByteIO(name2, WOPEN) as fout:
        write_number(size, fout)
        if size > 0: bs_encode(fin, fout, size)

# 復号
def decode_file(name1, name2):
    with ByteIO(name1, ROPEN) as fin, ByteIO(name2, WOPEN) as fout:
        size = read_number(fin)
        if size > 0: bs_decode(fin, fout, size)

# オプション解析
parser = argparse.ArgumentParser(description='ブロックソート + MTF + レンジコーダ')
parser.add_argument('name1', help='入力ファイル')
parser.add_argument('name2', help='出力ファイル')
parser.add_argument('-e', '--encode', action='store_true', help='符号化')
parser.add_argument('-d', '--decode', action='store_true', help='復号')
args = parser.parse_args()

# 実行
s = time.time()
if args.encode and not args.decode:
    encode_file(args.name1, args.name2)
elif args.decode:
    decode_file(args.name1, args.name2)
else:
    print('option error')
e = time.time()
print('{:.3f}'.format(e - s))

初版 2007 年 8 月 5 日
改訂 2022 年 10 月 1 日

Copyright (C) 2007-2022 Makoto Hiroi
All rights reserved.

[ PrevPage | Python | NextPage ]