M.Hiroi's Home Page

Algorithms with Python

LZB 符号と LZH 符号

[ PrevPage | Python | NextPage ]

はじめに

LZSS 符号の続きです。今回は LZSS 符号のバリエーションの一つである LZB 符号と、LZSS 符号とハフマン符号を組み合わせた LZH 符号を取り上げます。

●LZSS 符号の弱点

LZSS 符号の場合、長さと位置 (距離) は固定長符号で符号化しました。このため、最長一致系列の最大長やスライド窓を大きくしても、圧縮率が向上するとは限りません。前回作成した lzss.py を改造して、スライド窓の大きさと最大長を変えて試してみましょう。

テストデータは Canterbury Corpus で配布されている The Canterbury Corpus です。まずは最大長 (MAX_LEN) を 18 に固定して、スライド窓の大きさを変えてみました。結果は次のようになります。

            表 : LZSS 符号の結果1 (MAX_LEN = 18, MIN_LEN = 3)

  ファイル名      サイズ     8 k      16 k     32 k     64 k     128k
  ---------------------------------------------------------------------
  alice29.txt    152,089    68,332   64,480   61,949   60,664   61,489
  asyoulik.txt   125,179    61,789   58,832   57,023   56,464   57,780
  cp.html         24,603    10,278   10,055   10.275   10,635   10,994
  fields.c        11,150     3,859    4,002    4,156    4,310    4,464
  grammar.lsp      3,721     1,594    1,647    1,699    1,752    1,805
  kennedy.xls  1,029,744   291,968  295,661  300,684  307,614  315,675
  lcet10.txt     426,754   184.684  173,643  165,361  159,078  155,538
  plrabn12.txt   481,861   247,780  235.405  224,554  216,287  211,197
  ptt5           513,216   107,289  109,904  112,685  115,943  119,209
  sum             38,240    17,500   16,678   17,131   17,742   18,355
  xargs.1          4,227     2,198    2.268    2.338    2,408    2,478
  ---------------------------------------------------------------------
  合計         2,810,784   997,271  972,766  957,855  952,897  958,984

スライド窓を大きくすると、大きなテキストファイルの圧縮率は大幅に向上します。逆に、kennedy.xls や ptt5 などのバイナリファイルは圧縮率が低下してしまいます。今回のテストデータでは 64 k バイトでよさそうです。

ただし、スライド窓を大きくすると、最長一致系列の探索に時間がかかるようになります。そこで、今回は LHA のプログラム 参考文献, URL 2 を参考に、最長一致系列の探索処理を改良しています。Python の場合、それでも実行時間はかなり遅くなります。詳しくは プログラムリスト1 をお読みください。

次は、スライド窓の大きさを 64 k バイトに固定して、最大一致系列の最大長 (MAX_LEN) を変更してみましょう。結果は次のようになりました。

      表 : LZSS 符号の結果2 (スライド窓 = 64 k, MIN_LEN = 3)

  ファイル名      サイズ      18        64        128       256
  ----------------------------------------------------------------
  alice29.txt    152,089    60,664    65,300     67,979    70,661
  asyoulik.txt   125,179    56,464    61,145     63,664    66,185
  cp.html         24,603    10,635    10,585     10,877    11,201
  fields.c        11,150     4,310     4,321      4,450     4,585
  grammar.lsp      3,721     1,752     1,792      1,842     1,892
  kennedy.xls  1,029,744   307,614   328,559    339,031   349,503
  lcet10.txt     426,754   159,078   169,748    176,444   183,423
  plrabn12.txt   481,861   216,287   235,945    245,929   255,922
  ptt5           513,216   115,943    79,405     74,621    74,033
  sum             38,240    17,742    17,858     18,258    18,741
  xargs.1          4,227     2,408     2,521      2,590     2,659
  ----------------------------------------------------------------
  合計         2,810,784   952,897   977,179  1,005,685 1,038,805

ptt5 を除いて、MAX_LEN を長くすると圧縮率は低下してしまいました。特に、MAX_LEN を 256 にすると、テキストファイルの圧縮率はとても悪くなります。ptt5 は記号 0 が連続しているデータなので、MAX_LEN が長い方が高い圧縮率になります。

●LZB 符号

一般的なテキストファイルを LZSS 符号で符号化する場合、最長一致系列の長さは短い場合が多く、長い場合は少ないと考えられます。したがって、長さを固定長で符号化すると圧縮率はどうしても悪くなってしまいます。このような場合、小さな正整数ほど短い符号語で表せる可変長符号、具体的には Elias のγ符号を使うと圧縮率を改善することができます。γ符号については拙作のページ 整数の符号化 をお読みください。

1987 年に T. C. Bell 氏が開発した LZB 符号は、長さと位置の符号化に可変長符号を用いることで圧縮率を改善しています。今回は LZB 符号を参考にして、LZSS 符号の圧縮率を改善してみましょう。

●長さの符号化

最初に、長さをγ符号で符号化します。これは長さを符号化するときにクラス BitIO のメソッド gamma_encode() を、復号ではメソッド gamma_decode() を呼び出すだけです。プログラムは簡単なので説明は割愛いたします。実行結果は次のようになりました。

    表 : 長さをγ符号で符号化 (スライド窓 = 64 k, MIN_LEN = 3)

  ファイル名      サイズ      18        64       128       256
  ---------------------------------------------------------------
  alice29.txt    152,089    61,383    60,588    60,578    60,572
  asyoulik.txt   125,179    56,082    55,685    55,679    55,676
  cp.html         24,603    10,680     9,891     9,859     9,859
  fields.c        11,150     4,426     4,125     4,113     4,106
  grammar.lsp      3,721     1,766     1,701     1,701     1,701
  kennedy.xls  1,029,744   336,181   336,181   336,181   336,181
  lcet10.txt     426,754   162,692   158,897   158,542   158,495
  plrabn12.txt   481,861   215,529   215,159   215,147   215,144
  ptt5           513,216   128,660    78,485    70,504    67,331
  sum             38,240    17,736    16,556    16,381    16,300
  xargs.1          4,227     2,376     2,352     2,352     2,352
  ---------------------------------------------------------------
  合計         2,810,784   997,511   939,620   931,037   927,697

MAX_LEN が 18 と短い場合、γ符号は逆効果になるようです。MAX_LEN が長い場合、γ符号の効果により圧縮率は高くなりました。kennedy.xls などバイナリファイルの圧縮率は悪いですが、テキストファイルは高い圧縮率になりました。もちろん、スライド窓を大きくした効果が一番大きいのですが、MAX_LEN を長くしてγ符号を適用した効果も十分に出ていると思います。

●距離の符号化

今度は距離の符号化に可変長符号を用いてみましょう。基本的な考え方はとても簡単です。LZSS 符号の場合、スライド窓の先頭位置 (0) からデータを格納していきます。データ数が少ない場合、たとえば 256 個とすると、距離は 0 - 255 の範囲になりますね。この場合、距離を 8 ビットで表すことができます。1024 個ならば、距離を 10 ビットで表すことができます。このように、スライド窓に格納されているデータ数によって、距離を表す符号語の長さを変化させるのです。

M.Hiroi は 参考文献, URL 1 で LZB 符号を勉強したとき、長さをγ符号で符号化する効果は大きくても、位置を可変長符号で符号化する効果は少ないだろうと思っていました。ところが、今回のようにスライド窓を大きくすると、この効果が大きくなるのです。また、小さなファイルを符号化する場合、スライド窓が大きいと圧縮率はどうしても低下してしまいます。距離を可変長符号で符号化することにより、小さなファイルでも効率よく圧縮できると思われます。

ここで CBT 符号を用いると、さらに圧縮率を向上させることができます。CBT 符号については拙作のページ 整数の符号化 をお読みください。プログラムは次のようになります。

リスト : 距離の符号化

# 距離の符号語長
pos_bits = 0

# 更新
def update_pos_bits(rp):
    global pos_bits
    if pos_bits < POS_BITS:
        while rp > (1 << pos_bits) - 1: pos_bits += 1

# 符号化
def pos_encode(fout, rp, pos):
    global pos_bits
    if 1 < pos_bits < POS_BITS:
        fout.cbt_encode(pos, rp, pos_bits)
    else:
        fout.putbits(pos_bits, pos)

# 復号
def pos_decode(fin, rp):
    global pos_bits
    if 1 < pos_bits < POS_BITS:
        pos = fin.cbt_decode(rp, pos_bits)
    else:
        pos = fin.getbits(pos_bits)
    return pos

関数 update_pos_bits() は距離の符号語長 pos_bits の値を更新します。pos_bits はグローバル変数として定義します。引数 rp はスライド窓内のデータ数を表します。POS_BITS はスライド窓の大きさをビットで表したものです。符号語長は POS_BITS が最大値になります。

スライド窓の更新が終了したら、update_pos_bits() を呼び出して、符号語長 pos_bits を更新します。あとは、符号化するときに距離を求めて、CBT 符号の符号化メソッド cbt_encode() で符号化します。

復号のときも update_pos_bits() を呼び出して、CBT 符号の復号メソッド cbt_decode() で距離を復号します。pos_bits が POS_BITS と等しい場合は、CBT 符号を使わずに putbits(), getbits() で符号化・復号を行います。

●評価結果

それでは、実際に Canterbury Corpus で配布されているテストデータ The Canterbury Corpus を圧縮してみましょう。実行結果は次のようになりました。

          表 : LZB 符号の結果 (MAX_LEN = 256, MIN_LEN = 3)

  ファイル名      サイズ     32k       64k       128k   LHA(lh5)
  ---------------------------------------------------------------
  alice29.txt    152,089    60,434    58,824    58,293    59,117
  asyoulik.txt   125,179    54,733    53,841    53,531    52,341
  cp.html         24,603     8,973     8,890     8,890     8,384
  fields.c        11,150     3,507     3,507     3,507     3,170
  grammar.lsp      3,721     1,424     1,424     1,424     1,271
  kennedy.xls  1,029,744   328,111   334,869   342,095   198,342
  lcet10.txt     426,754   162,202   156,919   153,102   159,558
  plrabn12.txt   481,861   218,899   213,135   208,696   210,045
  ptt5           513,216    66,627    67,262    67,711    52,305
  sum             38,240    15,001    14,845    14,794    13,993
  xargs.1          4,227     1,985     1,985     1,985     1,778
  ---------------------------------------------------------------
  合計         2,810,784   921,896   915,501   914,028   760,304

距離を可変長符号にしたことにより、小さなファイルの圧縮率は高くなっています。スライド窓が大きくても、小さなファイルの圧縮率が高いのは LZB 符号の優れた特徴だと思います。ほかのファイルでも圧縮率は改善されているので、距離の符号化に CBT 符号を用いた効果は十分に出ていると思います。kennedy.xls などバイナリファイルの圧縮率は悪いですが、テキストファイルは LHA (lh5 形式 : スライド窓 8 k) に迫る圧縮率になりました。LZB 符号は優れた符号化方式だと思います。

●LZH 符号

LZSS 符号はスライド窓から最長一致系列を探して、その位置と長さで符号化しました。また、LZB 符号は位置と長さの符号化に可変長符号を用いることで圧縮率を改善しました。最長一致系列が見つからなかった場合、LZSS 符号と LZB 符号は記号をそのまま出力しています。この場合も含めて、可変長の符号語を使って平均符号長を短くすることができれば、圧縮率はさらに良くなるはずです。この二段階目の圧縮にハフマン符号を用いたものを一般に「LZH 符号」といいます。

LZSS 符号の場合、一致位置とその長さ、不一致のときに出力される記号の 3 つの情報があります。これらを別々にハフマン符号で符号化すると、ハフマン木が 3 つ必要になります。これをそのままファイルに付加すると、圧縮率は大きく低下してしまうでしょう。そこで、3 つの情報を一つのハフマン符号で圧縮することを考えてみます。

前回作成した LZSS 符号の場合、位置は 0 - 8191 までの値ですから 8192 個あり、一致長は 3 - 18 の値を 0 - 15 で表すので 16 個あります。それから、記号は 1 バイトなので 256 個あります。単純に考えると、8192 + 16 + 256 = 8464 個の記号でハフマン符号を構成することになり、最悪の場合 8464 ビット長の符号語が生成される場合もあるでしょう。ハフマン符号の符号化・復号処理にも時間がかかることになり、実用的な方法とはいえません。

そこで、位置を 6 ビットと 7 ビットの 2 つの数値に分けて符号化します。7 ビットで表せる値は 0 から 127 までなので、これを記号と混在させてみましょう。この場合、0 - 255 の値は記号を表し、256 - 271 の値は一致長を表すと定義します。すると、一致長の後ろに位置を表す符号語を出力すれば、0 - 127 の値が記号なのか位置なのかを区別することができます。この方法の場合、ハフマン符号の構成は 272 種類の記号だけで済ますことができます。また、記号と位置を区別するために付加した 1 ビットのフラグを省略することができます。

ところが、この方法で圧縮率を改善するのはちょっと難しいのです。実際に M.Hiroi が試してみたところ、結果は改善ではなく改悪となってしまいました。その原因を M.Hiroi なりに考えてみました。

まず、最長一致系列の長さですが、これは短い場合が多いと思われるので、ハフマン符号で効率的に圧縮することができそうです。また、不一致のときに出力される記号も、ランダムデータでなければそれなりの圧縮率を期待できるでしょう。

ところが位置情報の場合、特定の場所と一致する確率が高いとは考えにくいので、0 から 8191 までの値が均等に現れると考えた方が妥当です。距離を符号化する場合でも、一般的なテキストファイルでは同様に考えてよさそうです。このような場合、記号といっしょに符号化すると、記号の出現確率によっては位置の符号語長が 7 ビットよりも長くなる場合もあるでしょう。その結果、圧縮ではなく伸長という結果になったと思われます。

そこで、参考文献, URL 1 に書かれていた LHA の処理概要を参考にプログラムを作ることにします。LHA は一致長と不一致記号をまとめてハフマン符号で符号化するのですが、距離はこれとは別にしてハフマン符号で符号化しています。それも数値そのままではなく、距離を「最上位のビット 1 の位置 + 残りのビット」と分けて考え、前者をハフマン符号で符号化し、残りのビットはそのまま出力するという方法をとっています。

そこで、今回は距離を符号化するとき、γ符号のように整数をビット数とビット列に分け、ビット数をハフマン符号で符号化し、ビット列はそのまま出力することにします。特定の記号列が周期的に現れるようなファイルの場合、LZSS 符号で符号化すると特定の距離の出現確率がとても高くなります。距離の符号化でハフマン符号を用いると、このようなファイルの圧縮率を改善できる場合があります。

●LZH 符号の処理概要

それでは、具体的に処理内容を考えていきましょう。まず符号化ですが、LZSS 符号の出力はバッファに蓄えることにします。バッファが満杯になった時点で、ハフマン符号で符号化してファイルへ出力します。

バッファサイズが小さいと何回にも分けてハフマン符号化を行うため、その回数分だけハフマン木の情報をファイルに付加することになり、圧縮率が低下してしまいます。逆に、大きいバッファを用意すれば、圧縮率は良くなるのですが、メモリが少ない環境では圧縮できないことになります。

今回は LHA (lh5 形式) に合わせて、バッファサイズを 16 k 個としました。ここに LZSS 符号の符号語を格納します。このほかに、バッファの代わりにテンポラリファイルを用意し、LZSS 符号をすべてファイルに書き出してから、ハフマン符号でいっきに符号化する方法も考えられるでしょう。興味のある方はいろいろ試してみてください。

次は復号処理です。

復号は入力ファイルからデータを読み込み、ハフマン符号で復号してバッファへセットします。次に、バッファの内容を LZSS 符号で復号して、出力ファイルへ書き出します。バッファが空になったら、ファイルからの入力処理とハフマン符号の復号を行い、バッファへデータをセットします。

●LZSS 符号部の処理

LZH 符号はハフマン符号と LZSS 符号のプログラムを改造することで簡単に作成できます。最初に、LZSS 符号の符号化を行う関数 encode() を作ります。次のリストを見てください。

リスト : LZSS 符号の符号化

def encode(fin, fout):
    s = Slide(fin)
    rp = 0
    huff_buff = [None] * HUFF_SIZE
    hcnt = 0
    while rp < s.data_size:
        s.search(rp)
        if s.match_len < MIN_LEN:
            length = 1
            huff_buff[hcnt] = (s.buff[rp], None)
        else:
            length = s.match_len
            huff_buff[hcnt] = ((length - MIN_LEN), rp - s.match_pos - 1)
        hcnt += 1
        if hcnt >= HUFF_SIZE:
            huff_encode(huff_buff, hcnt, fout)
            hcnt = 0
        for _ in range(length):
            s.insert(rp)
            rp += 1
            if rp >= s.limit: rp = s.update(rp)
    if hcnt > 0: huff_encode(huff_buff, hcnt, fout)

基本的には lzss.py の関数 encode() と同じですが、符号語をファイルへ出力してた処理をバッファへ書き出すように変更します。バッファは huff_buff で、サイズは HUFF_SIZE (16 k) です。変数 hcnt がカウンタで、最初は 0 に初期化します。メソッド search() で最長一致系列を探索し、その長さが MIN_LEN 未満であれば (記号, None) をバッファに書き込みます。今回は MIN_LEN を 4 としました。

LZH 符号の場合、記号の符号化にハフマン符号を用いるため、それだけでも圧縮することが可能です。つまり、不一致の場合が多くても、ある程度は圧縮できるわけです。実際に試してみたところ、LZH 符号の場合、MIN_LEN は 3 よりも 4 の方が少しですが圧縮率は高くなります。

MIN_LEN 以上の最長一致系列が見つかった場合はバッファに (長さ, 距離) を書き込み、バッファが満杯になったら関数 huff_encode() を呼び出してハフマン符号で符号化します。最後に、hcnt が 0 よりも大きい場合は、バッファにデータが残っているので、huff_encode() を呼び出してハフマン符号で符号化します。

次に、LZSS 符号の復号処理を作ります。

リスト : LZSS 符号の復号

def decode(fin, fout, size):
    rp = 0
    buff = [0] * (1 << POS_BITS)
    huff_buff = [None] * HUFF_SIZE
    hcnt = 0
    hsize = 0
    while size > 0:
        if hcnt == hsize:
            hsize = huff_decode(huff_buff, fin)
            hcnt = 0
        n, m = huff_buff[hcnt]
        hcnt += 1
        if m is not None:
            length = n + MIN_LEN
            pos = rp - (m + 1)
            if pos < 0: pos += len(buff)
            for _ in range(length):
                c = buff[pos]
                fout.putc(c)
                buff[rp] = c
                pos += 1
                rp += 1
                if pos >= len(buff): pos = 0
                if rp >= len(buff): rp = 0
        else:
            length = 1
            fout.putc(n)
            buff[rp] = n
            rp += 1
            if rp >= len(buff): rp = 0
        size -= length

基本的には lzss.py の関数 decode() をバッファからの入力に変更するだけです。バッファは huff_buff で、hcnt がリードカウンタ、hsize がバッファ内のデータ数です。最初に、hcnt が hsize と等しい場合はバッファが空になったので、関数 huff_decode() を呼び出してハフマン符号の復号処理を行います。

次に、バッファからデータを取り出して変数 n, m にセットします。変数 m が None でない場合は、n が長さで m が距離を表しています。m が None の場合は n が記号を表しています。あとの処理は LZSS 符号のプログラムと同じなので、説明は割愛いたします。

●ハフマン符号部の処理

次は、ハフマン符号の処理を作りましょう。

リスト : ハフマン符号の符号化

# ビット数を求める
def get_bit_num(n):
    n1 = 0
    n2 = (n + 1) >> 1
    while n2 > 0:
        n1 += 1
        n2 >>= 1
    return n1

# ハフマン符号化
def huff_encode(buff, size, fout):
    count1 = [Node(x) for x in range(256 + 256 - MIN_LEN + 1)]
    count2 = [Node(x) for x in range(POS_BITS + 1)]
    code1 = [None] * (256 + 256 - MIN_LEN + 1)
    code2 = [None] * (POS_BITS + 1)
    # 出現頻度表の作成
    for x in range(size):
        n, m = buff[x]
        if m is None:
            count1[n].count += 1
        else:
            count1[n + 256].count += 1
            n1 = get_bit_num(m)
            count2[n1].count += 1
    # ハフマン木の生成と出力
    root1 = make_tree(count1)
    root2 = make_tree(count2)
    make_code(code1, root1, 0, 0)
    make_code(code2, root2, 0, 0)
    fout.putbits(HUFF_BITS, size - 1)
    write_tree(root1, fout, CODE_BITS)
    write_tree(root2, fout, CODE1_BITS)
    # 符号化
    for x in range(size):
        n, m = buff[x]
        if m is None:
            fout.putbits(*code1[n])
        else:
            fout.putbits(*code1[n + 256])
            n1 = get_bit_num(m)
            fout.putbits(*code2[n1])
            if n1 > 0: fout.putbits(n1, m + 1)

関数 huff_encode() の引数 buff がバッファ、size がバッファ内のデータ数、fout が出力ファイル (BitIO のオブジェクト) です。count1 は記号と長さの出現頻度表で、count2 が距離のビット数の出現頻度表です。code1 は記号と長さのハフマン符号を格納し、code2 は 距離のビット数のハフマン符号を格納します。

次に、データの出現頻度を求めます。バッファからデータを取り出して、変数 n と m にセットします。m が None の場合、n は記号を表しています。count1[n].count を +1 します。そうでなければ、n が長さで m が距離を表しています。この場合、n に 256 を加えて、count1[n + 256].count を +1 します。そして、m のビット数を関数 get_bit_num() で求めて、count2[n1].count を +1 します。

それから、関数 make_tree() を呼び出してハフマン木を作成し、関数 make_code() でハフマン符号を配列 code1 と code2 にセットします。次に、符号化を行った記号数 size をファイルへ出力します。バッファが満杯の場合、size をそのまま出力すると、HUFF_BITS (14) ビットに収まらないので、size - 1 を出力することに注意してください。

そして、関数 write_tree() でハフマン木を出力します。write_tree() の第 3 引数に葉の記号を出力するときのビット数を指定します。CODE_BITS の値は 9 でいいのですが、CODE1_BITS は POS_BITS の値によって変わります。距離のビット数が収まるように設定してください。

最後に、バッファのデータを符号化して出力します。記号の場合は code1[n] を出力します。長さと距離の場合は、code1[n + 256] を出力してから、距離のビット数 n1 を求めて code2[n1] を出力し、m + 1 の下位 n1 ビットをそのまま出力します。

次は復号処理です。

リスト : ハフマン符号の復号

# 記号の復号
def huff_decode_sub(node, fin):
    while node.code is None:
        if fin.getbit() == 0:
            node = node.left
        else:
            node = node.right
    return node.code

# ハフマン復号
def huff_decode(buff, fin):
    size = fin.getbits(HUFF_BITS) + 1
    root1 = read_tree(fin, CODE_BITS)
    root2 = read_tree(fin, CODE1_BITS)
    for x in range(size):
        n = huff_decode_sub(root1, fin)
        if n < 256:
            buff[x] = (n, None)
        else:
            n -= 256
            m = huff_decode_sub(root2, fin)
            if m > 0:
                m = (1 << m) + fin.getbits(m) - 1
            buff[x] = (n, m)
    return size

関数 huff_decode() の引数 buff は復号したデータを格納するバッファで、fin が入力ファイル (BitIO のオブジェクト) です。最初に復号するデータ数 size を getbits() で求めます。次に、2 つのハフマン木を関数 read_tree() で読み込みます。そして、for ループで size 個のデータを復号します。

まず、関数 huff_decode_sub() を呼び出してデータを一つ復号します。それが 256 よりも小さい場合は記号なので、(n, None) を buff にセットします。256 以上の場合は長さと距離を復号します。長さは n - 256 で求めることができます。

次に、距離のビット数を huff_decode_sub() で求めます。m が 0 よりも大きい場合は、getbits() で m ビット読み込んで距離を求めます。そして、buff に (n, m) をセットします。最後に size を返します。

主な変更はこれで終わりです。あとのプログラムは簡単なので説明は割愛いたします。詳細は プログラムリスト2 をお読みください。

●修正

記号が 1 種類しか出現していない場合や、距離のビット数が 1 種類以下しか出現しない場合、ハフマン木を正しく構築できずに符号化でエラーが発生します。関数 make_tree() を次のように修正します。

リスト : ハフマン木の生成

def make_tree(sym_table):
    q = PQueue()   # ヒープの生成
    for x in sym_table:
        if x.count > 0: q.push(x)
    if q.isEmpty():
        return Node(None, 0, sym_table[0], sym_table[1])
    while True:
        n1 = q.pop()
        if q.isEmpty():
            if n1.code is not None:
                for x in sym_table:
                    if x.count == 0: return Node(None, 0, n1, x)
            return n1
        n2 = q.pop()
        q.push(Node(None, n1.count + n2.count, n1, n2))

繰り返しが存在しないデータの場合、距離を表すハフマン木は空になりますが、今回のプログラムでは無条件にハフマン木をファイルへ出力しているため、ダミーのハフマン木を返すことにします。ヒープ q を構築したあと、ヒープにデータがない場合は、ダミーのハフマン木を作成して返します。ヒープにデータが一つしかない場合、n1 の値を返すところで、ダミーの葉を一つ加えたハフマン木を作成して返します。

●評価結果

それでは、実際に Canterbury Corpus で配布されているテストデータ The Canterbury Corpus を圧縮してみましょう。実行結果は次のようになりました。

              表 : LZH 符号の結果 (MAX_LEN = 256, MIN_LEN = 4)

  ファイル名      サイズ     8 k       32 k      64 k      128k   LHA(lh5)
  -------------------------------------------------------------------------
  alice29.txt    152,089    59,919    55,217    53,748    53,187    59,117
  asyoulik.txt   125,179    53,039    49,757    48,922    48,657    52,341
  cp.html         24,603     8,510     8,151     8,151     8,151     8,384
  fields.c        11,150     3,286     3,288     3,288     3,288     3,170
  grammar.lsp      3,721     1,312     1,313     1,313     1,313     1,271
  kennedy.xls  1,029,744   232,791   221,779   218,046   215,569   198,342
  lcet10.txt     426,754   161,981   147,323   141,562   137,432   159,558
  plrabn12.txt   481,861   212,063   197,735   191,684   186,949   210,045
  ptt5           513,216    55,073    56,023    56,509    57,087    52,305
  sum             38,240    15,124    14,038    14,044    14,044    13,993
  xargs.1          4,227     1,819     1,820     1,820     1,820     1,778
  -------------------------------------------------------------------------
  合計         2,810,784   804,917   756,444   739,087   727,497   760,304

どのファイルも LZB 符号より圧縮率が向上しています。ハフマン符号による二段回目の圧縮効果はとても大きいことがわかります。スライド窓を大きくすると、圧縮率は大きく向上しますが、ptt5 のように圧縮率が低下するファイルもあります。スライド窓が 8 k の場合、LHA よりも圧縮率は悪くなりました。LHA は圧縮率を高めるための工夫がいろいろ施されているので、今回のような単純な方法では LHA よりも高い圧縮率を達成するのは難しいようです。

●参考文献, URL

  1. 植松友彦, 『文書データ圧縮アルゴリズム入門』, CQ出版社, 1994
  2. LHa for UNIX with Autoconf - GitHub, (Koji Arai さん)
  3. 広井誠, 『LZ77 符号によるファイルの圧縮とその改良 (前編)』, Interface 2006 年 6 月号, CQ出版社

●プログラムリスト1

#
# lzb.py : LZB coding
#
#          Copyright (C) 2007-2022 Makoto Hiroi
#
import argparse, os.path, time
from bitio import *

# 定数
MIN_LEN = 3
MAX_LEN = 256
POS_BITS = 15

# スライド窓
class Slide:
    def __init__(self, s):
        self.stream = s
        self.size = 1 << POS_BITS
        self.limit = self.size * 2
        self.link = [None] * self.size
        self.ht = {}
        self.too_many_flag = {}
        self.buff = [0] * (self.limit + MAX_LEN)
        self.data_size = self.fread(0, self.limit + MAX_LEN)
        self.match_len = 0
        self.match_pos = 0

    # ハッシュ値を求める
    def hash_value(self, rp):
        value = 0
        for x in range(MIN_LEN):
            value = (value << 8) + self.buff[rp + x]
        return value

    # データ入力
    def fread(self, start, size):
        for i in range(size):
            c = self.stream.getc()
            if c is None: return i
            self.buff[start + i] = c
        return size

    # データ移動
    def move_data(self, to, from_, size):
        for n in range(size):
            self.buff[to + n] = self.buff[from_ + n]

    # スライド窓の更新
    def update(self, rp):
        if self.data_size < self.limit + MAX_LEN: return rp
        # buffer update
        self.move_data(0, self.size, self.size + MAX_LEN)
        n = self.fread(self.size + MAX_LEN, self.size)
        self.data_size = self.size + MAX_LEN + n
        # hash update
        for k, v in list(self.ht.items()):
            if v < self.size:
                del self.ht[k]
            else:
                self.ht[k] = v - self.size
        #
        for x in range(self.size):
            v = self.link[x]
            if v == None or v < self.size:
                self.link[x] = None
            else:
                self.link[x] = v - self.size
        self.too_many_flag.clear()
        return rp - self.size

    # データの挿入
    def insert(self, rp):
        value = self.hash_value(rp)
        if value in self.ht:
            self.link[rp & (self.size - 1)] = self.ht[value]
        else:
            self.link[rp & (self.size - 1)] = None
        self.ht[value] = rp

    # データの探索
    def search(self, rp):
        b = self.buff
        value = self.hash_value(rp)
        limit = rp - self.size
        self.match_len = 0
        self.match_pos = 0
        offset = 0
        while value in self.too_many_flag:
            if offset >= MAX_LEN - MIN_LEN or \
               len(b) - (rp + offset) < MIN_LEN:
                offset = 0
                value = self.hash_value(rp)
                break
            offset += 1
            value = self.hash_value(rp + offset)
        #
        while True:
            count = 0
            if value in self.ht:
                n = self.ht[value]
            else:
                n = None
            while n is not None and n - offset >= limit:
                count += 1
                if b[rp + self.match_len] == b[n + self.match_len - offset]:
                    x = 0
                    while x < MAX_LEN:
                        if b[rp + x] != b[n + x - offset]: break
                        x += 1
                    if self.match_len < x:
                        self.match_len = x
                        self.match_pos = n - offset
                        if x == MAX_LEN: break
                n = self.link[n & (self.size - 1)]
            if count > 256: self.too_many_flag[value] = True
            if self.match_len >= offset + MIN_LEN or offset == 0: break
            # 再検索
            offset = 0
            value = self.hash_value(rp)
        # データの終端をチェック
        if self.match_len >= self.data_size - rp:
            self.match_len = self.data_size - rp

# 距離の符号化 (CBT 符号)
pos_bits = 0

# 更新
def update_pos_bits(rp):
    global pos_bits
    if pos_bits < POS_BITS:
        while rp > (1 << pos_bits) - 1: pos_bits += 1

# 符号化
def pos_encode(fout, rp, pos):
    global pos_bits
    if 1 < pos_bits < POS_BITS:
        fout.cbt_encode(pos, rp, pos_bits)
    else:
        fout.putbits(pos_bits, pos)

# 復号
def pos_decode(fin, rp):
    global pos_bits
    if 1 < pos_bits < POS_BITS:
        pos = fin.cbt_decode(rp, pos_bits)
    else:
        pos = fin.getbits(pos_bits)
    return pos

# LZB 符号の符号化
def encode(fin, fout):
    s = Slide(fin)
    rp = 0
    while rp < s.data_size:
        s.search(rp)
        if s.match_len < MIN_LEN:
            num = 1
            fout.putbit(0)
            fout.putbits(8, s.buff[rp])
        else:
            num = s.match_len
            fout.putbit(1)
            fout.gamma_encode(num - MIN_LEN)
            pos_encode(fout, rp, rp - s.match_pos - 1)
        for _ in range(num):
            s.insert(rp)
            rp += 1
            if rp >= s.limit: rp = s.update(rp)
        update_pos_bits(rp)

# 符号化
def encode_file(name1, name2):
    size = os.path.getsize(name1)
    with ByteIO(name1, ROPEN) as fin, BitIO(name2, WOPEN) as fout:
        fout.putbits(32, size)
        if size > 0: encode(fin, fout)

# LZB 符号の復号
def decode(fin, fout, size):
    rp = 0
    buff = [0] * (1 << POS_BITS)
    while size > 0:
        if fin.getbit() == 1:
            num = fin.gamma_decode() + MIN_LEN
            pos = rp - (pos_decode(fin, rp) + 1)
            if pos < 0: pos += len(buff)
            for _ in range(num):
                c = buff[pos]
                fout.putc(c)
                buff[rp] = c
                pos += 1
                rp += 1
                if pos >= len(buff): pos = 0
                if rp >= len(buff): rp = 0
        else:
            num = 1
            c = fin.getbits(8)
            fout.putc(c)
            buff[rp] = c
            rp += 1
            if rp >= len(buff): rp = 0
        size -= num
        update_pos_bits(rp)

# 復号
def decode_file(name1, name2):
    with BitIO(name1, ROPEN) as fin, ByteIO(name2, WOPEN) as fout:
        size = fin.getbits(32)
        if size > 0: decode(fin, fout, size)

# オプション解析
parser = argparse.ArgumentParser(description='LZB符号')
parser.add_argument('name1', help='入力ファイル')
parser.add_argument('name2', help='出力ファイル')
parser.add_argument('-e', '--encode', action='store_true', help='符号化')
parser.add_argument('-d', '--decode', action='store_true', help='復号')
args = parser.parse_args()

# 実行
s = time.time()
if args.encode and not args.decode:
    encode_file(args.name1, args.name2)
elif args.decode:
    decode_file(args.name1, args.name2)
else:
    print('option error')
e = time.time()
print('{:.3f}'.format(e - s))

●プログラムリスト2

#
# lzh.py : LZH coding (LZSS + Huffman coding)
#
#          Copyright (C) 2007-2022 Makoto Hiroi
#
import argparse, os.path, time
from bitio import *
from pqueue import *

# 定数
MIN_LEN = 4
MAX_LEN = 256
POS_BITS = 16
HUFF_BITS = 14
HUFF_SIZE = 1 << HUFF_BITS
CODE_BITS = 9
CODE1_BITS = 5

##### Huffman coding #####

# ハフマン木の節
class Node:
    def __init__(self, code, count = 0, left = None, right = None):
        self.code = code
        self.count = count
        self.left = left
        self.right = right

    def __gt__(x, y): return x.count > y.count
    def __le__(x, y): return x.count <= y.count

# 符号の生成
def make_code(table, node, n, code):
    if node.code is not None:
        # 葉
        table[node.code] = (n, code)
    else:
        make_code(table, node.left, n + 1, code << 1)         # left  is 0
        make_code(table, node.right, n + 1, (code << 1) | 1)  # right is 1

# ハフマン木の出力
def write_tree(node, fout, bits):
    if node.code is not None:
        # 葉
        fout.putbit(1)
        fout.putbits(bits, node.code)
    else:
        fout.putbit(0)
        write_tree(node.left, fout, bits)
        write_tree(node.right, fout, bits)

# ハフマン木の読み込み
def read_tree(fin, bits):
    if fin.getbit() == 1:
        # 葉
        node = Node(fin.getbits(bits))
    else:
        node = Node(None)
        node.left = read_tree(fin, bits)
        node.right = read_tree(fin, bits)
    return node

# ハフマン木の生成 (修正 2007/10/06)
def make_tree(sym_table):
    q = PQueue()   # ヒープの生成
    for x in sym_table:
        if x.count > 0: q.push(x)
    if q.isEmpty():
        return Node(None, 0, sym_table[0], sym_table[1])
    while True:
        n1 = q.pop()
        if q.isEmpty():
            if n1.code is not None:
                for x in sym_table:
                    if x.count == 0: return Node(None, 0, n1, x)
            return n1
        n2 = q.pop()
        q.push(Node(None, n1.count + n2.count, n1, n2))

#
# code1 : 0 - 255 (ASCII)
#         256 - (512 - MIN_LEN)
# code2 : 距離のビット数
#

# ビット数を求める
def get_bit_num(n):
    n1 = 0
    n2 = (n + 1) >> 1
    while n2 > 0:
        n1 += 1
        n2 >>= 1
    return n1

# ハフマン符号化
def huff_encode(buff, size, fout):
    count1 = [Node(x) for x in range(256 + 256 - MIN_LEN + 1)]
    count2 = [Node(x) for x in range(POS_BITS + 1)]
    code1 = [None] * (256 + 256 - MIN_LEN + 1)
    code2 = [None] * (POS_BITS + 1)
    # 出現頻度表の作成
    for x in range(size):
        n, m = buff[x]
        if m is None:
            count1[n].count += 1
        else:
            count1[n + 256].count += 1
            n1 = get_bit_num(m)
            count2[n1].count += 1
    # ハフマン木の生成と出力
    root1 = make_tree(count1)
    root2 = make_tree(count2)
    make_code(code1, root1, 0, 0)
    make_code(code2, root2, 0, 0)
    fout.putbits(HUFF_BITS, size - 1)
    write_tree(root1, fout, CODE_BITS)
    write_tree(root2, fout, CODE1_BITS)
    # 符号化
    for x in range(size):
        n, m = buff[x]
        if m is None:
            fout.putbits(*code1[n])
        else:
            fout.putbits(*code1[n + 256])
            n1 = get_bit_num(m)
            fout.putbits(*code2[n1])
            if n1 > 0: fout.putbits(n1, m + 1)

# 記号の復号
def huff_decode_sub(node, fin):
    while node.code is None:
        if fin.getbit() == 0:
            node = node.left
        else:
            node = node.right
    return node.code

# ハフマン復号
def huff_decode(buff, fin):
    size = fin.getbits(HUFF_BITS) + 1
    root1 = read_tree(fin, CODE_BITS)
    root2 = read_tree(fin, CODE1_BITS)
    for x in range(size):
        n = huff_decode_sub(root1, fin)
        if n < 256:
            buff[x] = (n, None)
        else:
            n -= 256
            m = huff_decode_sub(root2, fin)
            if m > 0:
                m = (1 << m) + fin.getbits(m) - 1
            buff[x] = (n, m)
    return size


##### LZSS coding #####

# スライド窓
class Slide:
    def __init__(self, s):
        self.stream = s
        self.size = 1 << POS_BITS
        self.limit = self.size * 2
        self.link = [None] * self.size
        self.ht = {}
        self.too_many_flag = {}
        self.buff = [0] * (self.limit + MAX_LEN)
        self.data_size = self.fread(0, self.limit + MAX_LEN)
        self.match_len = 0
        self.match_pos = 0

    # データ入力
    def fread(self, start, size):
        for i in range(size):
            c = self.stream.getc()
            if c is None: return i
            self.buff[start + i] = c
        return size

    # データ移動
    def move_data(self, to, from_, size):
        for n in range(size):
            self.buff[to + n] = self.buff[from_ + n]

    # スライド窓の更新
    def update(self, rp):
        if self.data_size < self.limit + MAX_LEN: return rp
        # buffer update
        self.move_data(0, self.size, self.size + MAX_LEN)
        n = self.fread(self.size + MAX_LEN, self.size)
        self.data_size = self.size + MAX_LEN + n
        # hash update
        for k, v in list(self.ht.items()):
            if v < self.size:
                del self.ht[k]
            else:
                self.ht[k] = v - self.size
        #
        for x in range(self.size):
            v = self.link[x]
            if v == None or v < self.size:
                self.link[x] = None
            else:
                self.link[x] = v - self.size
        self.too_many_flag.clear()
        return rp - self.size

    # ハッシュ関数
    def hash_value(self, rp):
        value = 0
        for x in range(MIN_LEN):
            value = (value << 8) + self.buff[rp + x]
        return value

    # データの挿入
    def insert(self, rp):
        value = self.hash_value(rp)
        if value in self.ht:
            self.link[rp & (self.size - 1)] = self.ht[value]
        else:
            self.link[rp & (self.size - 1)] = None
        self.ht[value] = rp

    # データの探索
    def search(self, rp):
        b = self.buff
        value = self.hash_value(rp)
        limit = rp - self.size
        self.match_len = 0
        self.match_pos = 0
        offset = 0
        while value in self.too_many_flag:
            if offset >= MAX_LEN - MIN_LEN or len(b) - (rp + offset) < MIN_LEN:
                offset = 0
                value = self.hash_value(rp)
                break
            offset += 1
            value = self.hash_value(rp + offset)
        #
        while True:
            count = 0
            if value in self.ht:
                n = self.ht[value]
            else:
                n = None
            while n is not None and n - offset >= limit:
                count += 1
                if b[rp + self.match_len] == b[n + self.match_len - offset]:
                    x = 0
                    while x < MAX_LEN:
                        if b[rp + x] != b[n + x - offset]: break
                        x += 1
                    if self.match_len < x:
                        self.match_len = x
                        self.match_pos = n - offset
                        if x == MAX_LEN: break
                n = self.link[n & (self.size - 1)]
            if count > 256: self.too_many_flag[value] = True
            if self.match_len >= offset + MIN_LEN or offset == 0: break
            # 再検索
            offset = 0
            value = self.hash_value(rp)
        # データの終端をチェック
        if self.match_len >= self.data_size - rp:
            self.match_len = self.data_size - rp

# LZSS 符号の符号化
def encode(fin, fout):
    s = Slide(fin)
    rp = 0
    huff_buff = [None] * HUFF_SIZE
    hcnt = 0
    while rp < s.data_size:
        s.search(rp)
        if s.match_len < MIN_LEN:
            length = 1
            huff_buff[hcnt] = (s.buff[rp], None)
        else:
            length = s.match_len
            huff_buff[hcnt] = ((length - MIN_LEN), rp - s.match_pos - 1)
        hcnt += 1
        if hcnt >= HUFF_SIZE:
            huff_encode(huff_buff, hcnt, fout)
            hcnt = 0
        for _ in range(length):
            s.insert(rp)
            rp += 1
            if rp >= s.limit: rp = s.update(rp)
    if hcnt > 0: huff_encode(huff_buff, hcnt, fout)

# 符号化
def encode_file(name1, name2):
    size = os.path.getsize(name1)
    with ByteIO(name1, ROPEN) as fin, BitIO(name2, WOPEN) as fout:
        fout.putbits(32, size)
        if size > 0: encode(fin, fout)

# LZSS 符号の復号
def decode(fin, fout, size):
    rp = 0
    buff = [0] * (1 << POS_BITS)
    huff_buff = [None] * HUFF_SIZE
    hcnt = 0
    hsize = 0
    while size > 0:
        if hcnt == hsize:
            hsize = huff_decode(huff_buff, fin)
            hcnt = 0
        n, m = huff_buff[hcnt]
        hcnt += 1
        if m is not None:
            length = n + MIN_LEN
            pos = rp - (m + 1)
            if pos < 0: pos += len(buff)
            for _ in range(length):
                c = buff[pos]
                fout.putc(c)
                buff[rp] = c
                pos += 1
                rp += 1
                if pos >= len(buff): pos = 0
                if rp >= len(buff): rp = 0
        else:
            length = 1
            fout.putc(n)
            buff[rp] = n
            rp += 1
            if rp >= len(buff): rp = 0
        size -= length

# 復号
def decode_file(name1, name2):
    with BitIO(name1, ROPEN) as fin, ByteIO(name2, WOPEN) as fout:
        size = fin.getbits(32)
        if size > 0: decode(fin, fout, size)

# オプション解析
parser = argparse.ArgumentParser(description='LZH符号')
parser.add_argument('name1', help='入力ファイル')
parser.add_argument('name2', help='出力ファイル')
parser.add_argument('-e', '--encode', action='store_true', help='符号化')
parser.add_argument('-d', '--decode', action='store_true', help='復号')
args = parser.parse_args()

# 実行
s = time.time()
if args.encode and not args.decode:
    encode_file(args.name1, args.name2)
elif args.decode:
    decode_file(args.name1, args.name2)
else:
    print('option error')
e = time.time()
print('{:.3f}'.format(e - s))

初版 2007 年 5 月 26 日
改訂 2022 年 9 月 17 日

Copyright (C) 2007-2022 Makoto Hiroi
All rights reserved.

[ PrevPage | Python | NextPage ]