LZSS 符号の続きです。今回は LZSS 符号のバリエーションの一つである LZB 符号と、LZSS 符号とハフマン符号を組み合わせた LZH 符号を取り上げます。
LZSS 符号の場合、長さと位置 (距離) は固定長符号で符号化しました。このため、最長一致系列の最大長やスライド窓を大きくしても、圧縮率が向上するとは限りません。前回作成した lzss.py を改造して、スライド窓の大きさと最大長を変えて試してみましょう。
テストデータは Canterbury Corpus で配布されている The Canterbury Corpus です。まずは最大長 (MAX_LEN) を 18 に固定して、スライド窓の大きさを変えてみました。結果は次のようになります。
表 : LZSS 符号の結果1 (MAX_LEN = 18, MIN_LEN = 3) ファイル名 サイズ 8 k 16 k 32 k 64 k 128k --------------------------------------------------------------------- alice29.txt 152,089 68,332 64,480 61,949 60,664 61,489 asyoulik.txt 125,179 61,789 58,832 57,023 56,464 57,780 cp.html 24,603 10,278 10,055 10.275 10,635 10,994 fields.c 11,150 3,859 4,002 4,156 4,310 4,464 grammar.lsp 3,721 1,594 1,647 1,699 1,752 1,805 kennedy.xls 1,029,744 291,968 295,661 300,684 307,614 315,675 lcet10.txt 426,754 184.684 173,643 165,361 159,078 155,538 plrabn12.txt 481,861 247,780 235.405 224,554 216,287 211,197 ptt5 513,216 107,289 109,904 112,685 115,943 119,209 sum 38,240 17,500 16,678 17,131 17,742 18,355 xargs.1 4,227 2,198 2.268 2.338 2,408 2,478 --------------------------------------------------------------------- 合計 2,810,784 997,271 972,766 957,855 952,897 958,984
スライド窓を大きくすると、大きなテキストファイルの圧縮率は大幅に向上します。逆に、kennedy.xls や ptt5 などのバイナリファイルは圧縮率が低下してしまいます。今回のテストデータでは 64 k バイトでよさそうです。
ただし、スライド窓を大きくすると、最長一致系列の探索に時間がかかるようになります。そこで、今回は LHA のプログラム (LHa for UNIX with Autoconf - GitHub) を参考に、最長一致系列の探索処理を改良しています。Python の場合、それでも実行時間はかなり遅くなります。詳しくはプログラムリスト1をお読みください。
次は、スライド窓の大きさを 64 k バイトに固定して、最大一致系列の最大長 (MAX_LEN) を変更してみましょう。結果は次のようになりました。
表 : LZSS 符号の結果2 (スライド窓 = 64 k, MIN_LEN = 3) ファイル名 サイズ 18 64 128 256 ---------------------------------------------------------------- alice29.txt 152,089 60,664 65,300 67,979 70,661 asyoulik.txt 125,179 56,464 61,145 63,664 66,185 cp.html 24,603 10,635 10,585 10,877 11,201 fields.c 11,150 4,310 4,321 4,450 4,585 grammar.lsp 3,721 1,752 1,792 1,842 1,892 kennedy.xls 1,029,744 307,614 328,559 339,031 349,503 lcet10.txt 426,754 159,078 169,748 176,444 183,423 plrabn12.txt 481,861 216,287 235,945 245,929 255,922 ptt5 513,216 115,943 79,405 74,621 74,033 sum 38,240 17,742 17,858 18,258 18,741 xargs.1 4,227 2,408 2,521 2,590 2,659 ---------------------------------------------------------------- 合計 2,810,784 952,897 977,179 1,005,685 1,038,805
ptt5 を除いて、MAX_LEN を長くすると圧縮率は低下してしまいました。特に、MAX_LEN を 256 にすると、テキストファイルの圧縮率はとても悪くなります。ptt5 は記号 0 が連続しているデータなので、MAX_LEN が長い方が高い圧縮率になります。
一般的なテキストファイルを LZSS 符号で符号化する場合、最長一致系列の長さは短い場合が多く、長い場合は少ないと考えられます。したがって、長さを固定長で符号化すると圧縮率はどうしても悪くなってしまいます。このような場合、小さな正整数ほど短い符号語で表せる可変長符号、具体的には Elias のγ符号を使うと圧縮率を改善することができます。γ符号については拙作のページ「整数の符号化」をお読みください。
1987 年に T. C. Bell 氏が開発した LZB 符号は、長さと位置の符号化に可変長符号を用いることで圧縮率を改善しています。今回は LZB 符号を参考にして、LZSS 符号の圧縮率を改善してみましょう。
最初に、長さをγ符号で符号化します。これは長さを符号化するときにクラス BitIO のメソッド gamma_encode() を、復号ではメソッド gamma_decode() を呼び出すだけです。プログラムは簡単なので説明は割愛いたします。実行結果は次のようになりました。
表 : 長さをγ符号で符号化 (スライド窓 = 64 k, MIN_LEN = 3) ファイル名 サイズ 18 64 128 256 --------------------------------------------------------------- alice29.txt 152,089 61,383 60,588 60,578 60,572 asyoulik.txt 125,179 56,082 55,685 55,679 55,676 cp.html 24,603 10,680 9,891 9,859 9,859 fields.c 11,150 4,426 4,125 4,113 4,106 grammar.lsp 3,721 1,766 1,701 1,701 1,701 kennedy.xls 1,029,744 336,181 336,181 336,181 336,181 lcet10.txt 426,754 162,692 158,897 158,542 158,495 plrabn12.txt 481,861 215,529 215,159 215,147 215,144 ptt5 513,216 128,660 78,485 70,504 67,331 sum 38,240 17,736 16,556 16,381 16,300 xargs.1 4,227 2,376 2,352 2,352 2,352 --------------------------------------------------------------- 合計 2,810,784 997,511 939,620 931,037 927,697
MAX_LEN が 18 と短い場合、γ符号は逆効果になるようです。MAX_LEN が長い場合、γ符号の効果により圧縮率は高くなりました。kennedy.xls などバイナリファイルの圧縮率は悪いですが、テキストファイルは高い圧縮率になりました。もちろん、スライド窓を大きくした効果が一番大きいのですが、MAX_LEN を長くしてγ符号を適用した効果も十分に出ていると思います。
今度は距離の符号化に可変長符号を用いてみましょう。基本的な考え方はとても簡単です。LZSS 符号の場合、スライド窓の先頭位置 (0) からデータを格納していきます。データ数が少ない場合、たとえば 256 個とすると、距離は 0 - 255 の範囲になりますね。この場合、距離を 8 ビットで表すことができます。1024 個ならば、距離を 10 ビットで表すことができます。このように、スライド窓に格納されているデータ数によって、距離を表す符号語の長さを変化させるのです。
M.Hiroi は参考文献『文書データ圧縮アルゴリズム入門』で LZB 符号を勉強したとき、長さをγ符号で符号化する効果は大きくても、位置を可変長符号で符号化する効果は少ないだろうと思っていました。ところが、今回のようにスライド窓を大きくすると、この効果が大きくなるのです。また、小さなファイルを符号化する場合、スライド窓が大きいと圧縮率はどうしても低下してしまいます。距離を可変長符号で符号化することにより、小さなファイルでも効率よく圧縮できると思われます。
ここで CBT 符号を用いると、さらに圧縮率を向上させることができます。CBT 符号については拙作のページ「整数の符号化」をお読みください。プログラムは次のようになります。
リスト : 距離の符号化 # 距離の符号語長 pos_bits = 0 # 更新 def update_pos_bits(rp): global pos_bits if pos_bits < POS_BITS: while rp > (1 << pos_bits) - 1: pos_bits += 1 # 符号化 def pos_encode(fout, rp, pos): global pos_bits if 1 < pos_bits < POS_BITS: fout.cbt_encode(pos, rp, pos_bits) else: fout.putbits(pos_bits, pos) # 復号 def pos_decode(fin, rp): global pos_bits if 1 < pos_bits < POS_BITS: pos = fin.cbt_decode(rp, pos_bits) else: pos = fin.getbits(pos_bits) return pos
関数 update_pos_bits() は距離の符号語長 pos_bits の値を更新します。pos_bits はグローバル変数として定義します。引数 rp はスライド窓内のデータ数を表します。POS_BITS はスライド窓の大きさをビットで表したものです。符号語長は POS_BITS が最大値になります。
スライド窓の更新が終了したら、update_pos_bits() を呼び出して、符号語長 pos_bits を更新します。あとは、符号化するときに距離を求めて、CBT 符号の符号化メソッド cbt_encode() で符号化します。
復号のときも update_pos_bits() を呼び出して、CBT 符号の復号メソッド cbt_decode() で距離を復号します。pos_bits が POS_BITS と等しい場合は、CBT 符号を使わずに putbits(), getbits() で符号化・復号を行います。
それでは、実際に Canterbury Corpus で配布されているテストデータ The Canterbury Corpus を圧縮してみましょう。実行結果は次のようになりました。
表 : LZB 符号の結果 (MAX_LEN = 256, MIN_LEN = 3) ファイル名 サイズ 32k 64k 128k LHA(lh5) --------------------------------------------------------------- alice29.txt 152,089 60,434 58,824 58,293 59,117 asyoulik.txt 125,179 54,733 53,841 53,531 52,341 cp.html 24,603 8,973 8,890 8,890 8,384 fields.c 11,150 3,507 3,507 3,507 3,170 grammar.lsp 3,721 1,424 1,424 1,424 1,271 kennedy.xls 1,029,744 328,111 334,869 342,095 198,342 lcet10.txt 426,754 162,202 156,919 153,102 159,558 plrabn12.txt 481,861 218,899 213,135 208,696 210,045 ptt5 513,216 66,627 67,262 67,711 52,305 sum 38,240 15,001 14,845 14,794 13,993 xargs.1 4,227 1,985 1,985 1,985 1,778 --------------------------------------------------------------- 合計 2,810,784 921,896 915,501 914,028 760,304
距離を可変長符号にしたことにより、小さなファイルの圧縮率は高くなっています。スライド窓が大きくても、小さなファイルの圧縮率が高いのは LZB 符号の優れた特徴だと思います。ほかのファイルでも圧縮率は改善されているので、距離の符号化に CBT 符号を用いた効果は十分に出ていると思います。kennedy.xls などバイナリファイルの圧縮率は悪いですが、テキストファイルは LHA (lh5 形式 : スライド窓 8 k) に迫る圧縮率になりました。LZB 符号は優れた符号化方式だと思います。
LZSS 符号はスライド窓から最長一致系列を探して、その位置と長さで符号化しました。また、LZB 符号は位置と長さの符号化に可変長符号を用いることで圧縮率を改善しました。最長一致系列が見つからなかった場合、LZSS 符号と LZB 符号は記号をそのまま出力しています。この場合も含めて、可変長の符号語を使って平均符号長を短くすることができれば、圧縮率はさらに良くなるはずです。この二段階目の圧縮にハフマン符号を用いたものを一般に「LZH 符号」といいます。
LZSS 符号の場合、一致位置とその長さ、不一致のときに出力される記号の 3 つの情報があります。これらを別々にハフマン符号で符号化すると、ハフマン木が 3 つ必要になります。これをそのままファイルに付加すると、圧縮率は大きく低下してしまうでしょう。そこで、3 つの情報を一つのハフマン符号で圧縮することを考えてみます。
前回作成した LZSS 符号の場合、位置は 0 - 8191 までの値ですから 8192 個あり、一致長は 3 - 18 の値を 0 - 15 で表すので 16 個あります。それから、記号は 1 バイトなので 256 個あります。単純に考えると、8192 + 16 + 256 = 8464 個の記号でハフマン符号を構成することになり、最悪の場合 8464 ビット長の符号語が生成される場合もあるでしょう。ハフマン符号の符号化・復号処理にも時間がかかることになり、実用的な方法とはいえません。
そこで、位置を 6 ビットと 7 ビットの 2 つの数値に分けて符号化します。7 ビットで表せる値は 0 から 127 までなので、これを記号と混在させてみましょう。この場合、0 - 255 の値は記号を表し、256 - 271 の値は一致長を表すと定義します。すると、一致長の後ろに位置を表す符号語を出力すれば、0 - 127 の値が記号なのか位置なのかを区別することができます。この方法の場合、ハフマン符号の構成は 272 種類の記号だけで済ますことができます。また、記号と位置を区別するために付加した 1 ビットのフラグを省略することができます。
ところが、この方法で圧縮率を改善するのはちょっと難しいのです。実際に M.Hiroi が試してみたところ、結果は改善ではなく改悪となってしまいました。その原因を M.Hiroi なりに考えてみました。
まず、最長一致系列の長さですが、これは短い場合が多いと思われるので、ハフマン符号で効率的に圧縮することができそうです。また、不一致のときに出力される記号も、ランダムデータでなければそれなりの圧縮率を期待できるでしょう。
ところが位置情報の場合、特定の場所と一致する確率が高いとは考えにくいので、0 から 8191 までの値が均等に現れると考えた方が妥当です。距離を符号化する場合でも、一般的なテキストファイルでは同様に考えてよさそうです。このような場合、記号といっしょに符号化すると、記号の出現確率によっては位置の符号語長が 7 ビットよりも長くなる場合もあるでしょう。その結果、圧縮ではなく伸長という結果になったと思われます。
そこで、参考文献『文書データ圧縮アルゴリズム入門』に書かれていた LHA の処理概要を参考にプログラムを作ることにします。LHA は一致長と不一致記号をまとめてハフマン符号で符号化するのですが、距離はこれとは別にしてハフマン符号で符号化しています。それも数値そのままではなく、距離を「最上位のビット 1 の位置 + 残りのビット」と分けて考え、前者をハフマン符号で符号化し、残りのビットはそのまま出力するという方法をとっています。
そこで、今回は距離を符号化するとき、γ符号のように整数をビット数とビット列に分け、ビット数をハフマン符号で符号化し、ビット列はそのまま出力することにします。特定の記号列が周期的に現れるようなファイルの場合、LZSS 符号で符号化すると特定の距離の出現確率がとても高くなります。距離の符号化でハフマン符号を用いると、このようなファイルの圧縮率を改善できる場合があります。
それでは、具体的に処理内容を考えていきましょう。まず符号化ですが、LZSS 符号の出力はバッファに蓄えることにします。バッファが満杯になった時点で、ハフマン符号で符号化してファイルへ出力します。
┌──────┐ ┌──────┐ │入力ファイル│─→│LZSS符号│ └──────┘ └──────┘ ↓ ┌────┐ │バッファ│ LZSS 符号を蓄積 └────┘ ↓ ┌──────┐ ┌──────┐ バッファが満杯に │ハフマン符号│─→│出力ファイル│ なったら符号化する └──────┘ └──────┘ 図 1 : LZH 符号の符号化処理概要
バッファサイズが小さいと何回にも分けてハフマン符号化を行うため、その回数分だけハフマン木の情報をファイルに付加することになり、圧縮率が低下してしまいます。逆に、大きいバッファを用意すれば、圧縮率は良くなるのですが、メモリが少ない環境では圧縮できないことになります。
今回は LHA (lh5 形式) に合わせて、バッファサイズを 16 k 個としました。ここに LZSS 符号の符号語を格納します。このほかに、バッファの代わりにテンポラリファイルを用意し、LZSS 符号をすべてファイルに書き出してから、ハフマン符号でいっきに符号化する方法も考えられるでしょう。興味のある方はいろいろ試してみてください。
次は復号処理です。
┌──────┐ ┌──────┐ バッファが空になったら │入力ファイル│─→│ハフマン符号│ ファイルのデータを復号 └──────┘ └──────┘ ↓ ┌────┐ │バッファ│ LZSS 符号に復号 └────┘ ↓ ┌──────┐ ┌──────┐ バッファの内容を │LZSS符号│─→│出力ファイル│ 復号して出力する └──────┘ └──────┘ 図 2 : LZH 符号の復号処理概要
復号は入力ファイルからデータを読み込み、ハフマン符号で復号してバッファへセットします。次に、バッファの内容を LZSS 符号で復号して、出力ファイルへ書き出します。バッファが空になったら、ファイルからの入力処理とハフマン符号の復号を行い、バッファへデータをセットします。
LZH 符号はハフマン符号と LZSS 符号のプログラムを改造することで簡単に作成できます。最初に、LZSS 符号の符号化を行う関数 encode() を作ります。次のリストを見てください。
リスト : LZSS 符号の符号化 def encode(fin, fout): s = Slide(fin) rp = 0 huff_buff = [None] * HUFF_SIZE hcnt = 0 while rp < s.data_size: s.search(rp) if s.match_len < MIN_LEN: length = 1 huff_buff[hcnt] = (s.buff[rp], None) else: length = s.match_len huff_buff[hcnt] = ((length - MIN_LEN), rp - s.match_pos - 1) hcnt += 1 if hcnt >= HUFF_SIZE: huff_encode(huff_buff, hcnt, fout) hcnt = 0 for _ in range(length): s.insert(rp) rp += 1 if rp >= s.limit: rp = s.update(rp) if hcnt > 0: huff_encode(huff_buff, hcnt, fout)
基本的には lzss.py の関数 encode() と同じですが、符号語をファイルへ出力してた処理をバッファへ書き出すように変更します。バッファは huff_buff で、サイズは HUFF_SIZE (16 k) です。変数 hcnt がカウンタで、最初は 0 に初期化します。メソッド search() で最長一致系列を探索し、その長さが MIN_LEN 未満であれば (記号, None) をバッファに書き込みます。今回は MIN_LEN を 4 としました。
LZH 符号の場合、記号の符号化にハフマン符号を用いるため、それだけでも圧縮することが可能です。つまり、不一致の場合が多くても、ある程度は圧縮できるわけです。実際に試してみたところ、LZH 符号の場合、MIN_LEN は 3 よりも 4 の方が少しですが圧縮率は高くなります。
MIN_LEN 以上の最長一致系列が見つかった場合はバッファに (長さ, 距離) を書き込み、バッファが満杯になったら関数 huff_encode() を呼び出してハフマン符号で符号化します。最後に、hcnt が 0 よりも大きい場合は、バッファにデータが残っているので、huff_encode() を呼び出してハフマン符号で符号化します。
次に、LZSS 符号の復号処理を作ります。
リスト : LZSS 符号の復号 def decode(fin, fout, size): rp = 0 buff = [0] * (1 << POS_BITS) huff_buff = [None] * HUFF_SIZE hcnt = 0 hsize = 0 while size > 0: if hcnt == hsize: hsize = huff_decode(huff_buff, fin) hcnt = 0 n, m = huff_buff[hcnt] hcnt += 1 if m is not None: length = n + MIN_LEN pos = rp - (m + 1) if pos < 0: pos += len(buff) for _ in range(length): c = buff[pos] fout.putc(c) buff[rp] = c pos += 1 rp += 1 if pos >= len(buff): pos = 0 if rp >= len(buff): rp = 0 else: length = 1 fout.putc(n) buff[rp] = n rp += 1 if rp >= len(buff): rp = 0 size -= length
基本的には lzss.py の関数 decode() をバッファからの入力に変更するだけです。バッファは huff_buff で、hcnt がリードカウンタ、hsize がバッファ内のデータ数です。最初に、hcnt が hsize と等しい場合はバッファが空になったので、関数 huff_decode() を呼び出してハフマン符号の復号処理を行います。
次に、バッファからデータを取り出して変数 n, m にセットします。変数 m が None でない場合は、n が長さで m が距離を表しています。m が None の場合は n が記号を表しています。あとの処理は LZSS 符号のプログラムと同じなので、説明は割愛いたします。
次は、ハフマン符号の処理を作りましょう。
リスト : ハフマン符号の符号化 # ビット数を求める def get_bit_num(n): n1 = 0 n2 = (n + 1) >> 1 while n2 > 0: n1 += 1 n2 >>= 1 return n1 # ハフマン符号化 def huff_encode(buff, size, fout): count1 = [Node(x) for x in range(256 + 256 - MIN_LEN + 1)] count2 = [Node(x) for x in range(POS_BITS + 1)] code1 = [None] * (256 + 256 - MIN_LEN + 1) code2 = [None] * (POS_BITS + 1) # 出現頻度表の作成 for x in range(size): n, m = buff[x] if m is None: count1[n].count += 1 else: count1[n + 256].count += 1 n1 = get_bit_num(m) count2[n1].count += 1 # ハフマン木の生成と出力 root1 = make_tree(count1) root2 = make_tree(count2) make_code(code1, root1, 0, 0) make_code(code2, root2, 0, 0) fout.putbits(HUFF_BITS, size - 1) write_tree(root1, fout, CODE_BITS) write_tree(root2, fout, CODE1_BITS) # 符号化 for x in range(size): n, m = buff[x] if m is None: fout.putbits(*code1[n]) else: fout.putbits(*code1[n + 256]) n1 = get_bit_num(m) fout.putbits(*code2[n1]) if n1 > 0: fout.putbits(n1, m + 1)
関数 huff_encode() の引数 buff がバッファ、size がバッファ内のデータ数、fout が出力ファイル (BitIO のオブジェクト) です。count1 は記号と長さの出現頻度表で、count2 が距離のビット数の出現頻度表です。code1 は記号と長さのハフマン符号を格納し、code2 は 距離のビット数のハフマン符号を格納します。
次に、データの出現頻度を求めます。バッファからデータを取り出して、変数 n と m にセットします。m が None の場合、n は記号を表しています。count1[n].count を +1 します。そうでなければ、n が長さで m が距離を表しています。この場合、n に 256 を加えて、count1[n + 256].count を +1 します。そして、m のビット数を関数 get_bit_num() で求めて、count2[n1].count を +1 します。
それから、関数 make_tree() を呼び出してハフマン木を作成し、関数 make_code() でハフマン符号を配列 code1 と code2 にセットします。次に、符号化を行った記号数 size をファイルへ出力します。バッファが満杯の場合、size をそのまま出力すると、HUFF_BITS (14) ビットに収まらないので、size - 1 を出力することに注意してください。
そして、関数 write_tree() でハフマン木を出力します。write_tree() の第 3 引数に葉の記号を出力するときのビット数を指定します。CODE_BITS の値は 9 でいいのですが、CODE1_BITS は POS_BITS の値によって変わります。距離のビット数が収まるように設定してください。
最後に、バッファのデータを符号化して出力します。記号の場合は code1[n] を出力します。長さと距離の場合は、code1[n + 256] を出力してから、距離のビット数 n1 を求めて code2[n1] を出力し、m + 1 の下位 n1 ビットをそのまま出力します。
次は復号処理です。
リスト : ハフマン符号の復号 # 記号の復号 def huff_decode_sub(node, fin): while node.code is None: if fin.getbit() == 0: node = node.left else: node = node.right return node.code # ハフマン復号 def huff_decode(buff, fin): size = fin.getbits(HUFF_BITS) + 1 root1 = read_tree(fin, CODE_BITS) root2 = read_tree(fin, CODE1_BITS) for x in range(size): n = huff_decode_sub(root1, fin) if n < 256: buff[x] = (n, None) else: n -= 256 m = huff_decode_sub(root2, fin) if m > 0: m = (1 << m) + fin.getbits(m) - 1 buff[x] = (n, m) return size
関数 huff_decode() の引数 buff は復号したデータを格納するバッファで、fin が入力ファイル (BitIO のオブジェクト) です。最初に復号するデータ数 size を getbits() で求めます。次に、2 つのハフマン木を関数 read_tree() で読み込みます。そして、for ループで size 個のデータを復号します。
まず、関数 huff_decode_sub() を呼び出してデータを一つ復号します。それが 256 よりも小さい場合は記号なので、(n, None) を buff にセットします。256 以上の場合は長さと距離を復号します。長さは n - 256 で求めることができます。
次に、距離のビット数を huff_decode_sub() で求めます。m が 0 よりも大きい場合は、getbits() で m ビット読み込んで距離を求めます。そして、buff に (n, m) をセットします。最後に size を返します。
主な変更はこれで終わりです。あとのプログラムは簡単なので説明は割愛いたします。詳細はプログラムリスト2をお読みください。
記号が 1 種類しか出現していない場合や、距離のビット数が 1 種類以下しか出現しない場合、ハフマン木を正しく構築できずに符号化でエラーが発生します。関数 make_tree() を次のように修正します。
リスト : ハフマン木の生成 def make_tree(sym_table): q = PQueue() # ヒープの生成 for x in sym_table: if x.count > 0: q.push(x) if q.isEmpty(): return Node(None, 0, sym_table[0], sym_table[1]) while True: n1 = q.pop() if q.isEmpty(): if n1.code is not None: for x in sym_table: if x.count == 0: return Node(None, 0, n1, x) return n1 n2 = q.pop() q.push(Node(None, n1.count + n2.count, n1, n2))
繰り返しが存在しないデータの場合、距離を表すハフマン木は空になりますが、今回のプログラムでは無条件にハフマン木をファイルへ出力しているため、ダミーのハフマン木を返すことにします。ヒープ q を構築したあと、ヒープにデータがない場合は、ダミーのハフマン木を作成して返します。ヒープにデータが一つしかない場合、n1 の値を返すところで、ダミーの葉を一つ加えたハフマン木を作成して返します。
それでは、実際に Canterbury Corpus で配布されているテストデータ The Canterbury Corpus を圧縮してみましょう。実行結果は次のようになりました。
表 : LZH 符号の結果 (MAX_LEN = 256, MIN_LEN = 4) ファイル名 サイズ 8 k 32 k 64 k 128k LHA(lh5) ------------------------------------------------------------------------- alice29.txt 152,089 59,919 55,217 53,748 53,187 59,117 asyoulik.txt 125,179 53,039 49,757 48,922 48,657 52,341 cp.html 24,603 8,510 8,151 8,151 8,151 8,384 fields.c 11,150 3,286 3,288 3,288 3,288 3,170 grammar.lsp 3,721 1,312 1,313 1,313 1,313 1,271 kennedy.xls 1,029,744 232,791 221,779 218,046 215,569 198,342 lcet10.txt 426,754 161,981 147,323 141,562 137,432 159,558 plrabn12.txt 481,861 212,063 197,735 191,684 186,949 210,045 ptt5 513,216 55,073 56,023 56,509 57,087 52,305 sum 38,240 15,124 14,038 14,044 14,044 13,993 xargs.1 4,227 1,819 1,820 1,820 1,820 1,778 ------------------------------------------------------------------------- 合計 2,810,784 804,917 756,444 739,087 727,497 760,304
どのファイルも LZB 符号より圧縮率が向上しています。ハフマン符号による二段回目の圧縮効果はとても大きいことがわかります。スライド窓を大きくすると、圧縮率は大きく向上しますが、ptt5 のように圧縮率が低下するファイルもあります。スライド窓が 8 k の場合、LHA よりも圧縮率は悪くなりました。LHA は圧縮率を高めるための工夫がいろいろ施されているので、今回のような単純な方法では LHA よりも高い圧縮率を達成するのは難しいようです。
# # lzb.py : LZB coding # # Copyright (C) 2007-2022 Makoto Hiroi # import argparse, os.path, time from bitio import * # 定数 MIN_LEN = 3 MAX_LEN = 256 POS_BITS = 15 # スライド窓 class Slide: def __init__(self, s): self.stream = s self.size = 1 << POS_BITS self.limit = self.size * 2 self.link = [None] * self.size self.ht = {} self.too_many_flag = {} self.buff = [0] * (self.limit + MAX_LEN) self.data_size = self.fread(0, self.limit + MAX_LEN) self.match_len = 0 self.match_pos = 0 # ハッシュ値を求める def hash_value(self, rp): value = 0 for x in range(MIN_LEN): value = (value << 8) + self.buff[rp + x] return value # データ入力 def fread(self, start, size): for i in range(size): c = self.stream.getc() if c is None: return i self.buff[start + i] = c return size # データ移動 def move_data(self, to, from_, size): for n in range(size): self.buff[to + n] = self.buff[from_ + n] # スライド窓の更新 def update(self, rp): if self.data_size < self.limit + MAX_LEN: return rp # buffer update self.move_data(0, self.size, self.size + MAX_LEN) n = self.fread(self.size + MAX_LEN, self.size) self.data_size = self.size + MAX_LEN + n # hash update for k, v in list(self.ht.items()): if v < self.size: del self.ht[k] else: self.ht[k] = v - self.size # for x in range(self.size): v = self.link[x] if v == None or v < self.size: self.link[x] = None else: self.link[x] = v - self.size self.too_many_flag.clear() return rp - self.size # データの挿入 def insert(self, rp): value = self.hash_value(rp) if value in self.ht: self.link[rp & (self.size - 1)] = self.ht[value] else: self.link[rp & (self.size - 1)] = None self.ht[value] = rp # データの探索 def search(self, rp): b = self.buff value = self.hash_value(rp) limit = rp - self.size self.match_len = 0 self.match_pos = 0 offset = 0 while value in self.too_many_flag: if offset >= MAX_LEN - MIN_LEN or \ len(b) - (rp + offset) < MIN_LEN: offset = 0 value = self.hash_value(rp) break offset += 1 value = self.hash_value(rp + offset) # while True: count = 0 if value in self.ht: n = self.ht[value] else: n = None while n is not None and n - offset >= limit: count += 1 if b[rp + self.match_len] == b[n + self.match_len - offset]: x = 0 while x < MAX_LEN: if b[rp + x] != b[n + x - offset]: break x += 1 if self.match_len < x: self.match_len = x self.match_pos = n - offset if x == MAX_LEN: break n = self.link[n & (self.size - 1)] if count > 256: self.too_many_flag[value] = True if self.match_len >= offset + MIN_LEN or offset == 0: break # 再検索 offset = 0 value = self.hash_value(rp) # データの終端をチェック if self.match_len >= self.data_size - rp: self.match_len = self.data_size - rp # 距離の符号化 (CBT 符号) pos_bits = 0 # 更新 def update_pos_bits(rp): global pos_bits if pos_bits < POS_BITS: while rp > (1 << pos_bits) - 1: pos_bits += 1 # 符号化 def pos_encode(fout, rp, pos): global pos_bits if 1 < pos_bits < POS_BITS: fout.cbt_encode(pos, rp, pos_bits) else: fout.putbits(pos_bits, pos) # 復号 def pos_decode(fin, rp): global pos_bits if 1 < pos_bits < POS_BITS: pos = fin.cbt_decode(rp, pos_bits) else: pos = fin.getbits(pos_bits) return pos # LZB 符号の符号化 def encode(fin, fout): s = Slide(fin) rp = 0 while rp < s.data_size: s.search(rp) if s.match_len < MIN_LEN: num = 1 fout.putbit(0) fout.putbits(8, s.buff[rp]) else: num = s.match_len fout.putbit(1) fout.gamma_encode(num - MIN_LEN) pos_encode(fout, rp, rp - s.match_pos - 1) for _ in range(num): s.insert(rp) rp += 1 if rp >= s.limit: rp = s.update(rp) update_pos_bits(rp) # 符号化 def encode_file(name1, name2): size = os.path.getsize(name1) with ByteIO(name1, ROPEN) as fin, BitIO(name2, WOPEN) as fout: fout.putbits(32, size) if size > 0: encode(fin, fout) # LZB 符号の復号 def decode(fin, fout, size): rp = 0 buff = [0] * (1 << POS_BITS) while size > 0: if fin.getbit() == 1: num = fin.gamma_decode() + MIN_LEN pos = rp - (pos_decode(fin, rp) + 1) if pos < 0: pos += len(buff) for _ in range(num): c = buff[pos] fout.putc(c) buff[rp] = c pos += 1 rp += 1 if pos >= len(buff): pos = 0 if rp >= len(buff): rp = 0 else: num = 1 c = fin.getbits(8) fout.putc(c) buff[rp] = c rp += 1 if rp >= len(buff): rp = 0 size -= num update_pos_bits(rp) # 復号 def decode_file(name1, name2): with BitIO(name1, ROPEN) as fin, ByteIO(name2, WOPEN) as fout: size = fin.getbits(32) if size > 0: decode(fin, fout, size) # オプション解析 parser = argparse.ArgumentParser(description='LZB符号') parser.add_argument('name1', help='入力ファイル') parser.add_argument('name2', help='出力ファイル') parser.add_argument('-e', '--encode', action='store_true', help='符号化') parser.add_argument('-d', '--decode', action='store_true', help='復号') args = parser.parse_args() # 実行 s = time.time() if args.encode and not args.decode: encode_file(args.name1, args.name2) elif args.decode: decode_file(args.name1, args.name2) else: print('option error') e = time.time() print('{:.3f}'.format(e - s))
# # lzh.py : LZH coding (LZSS + Huffman coding) # # Copyright (C) 2007-2022 Makoto Hiroi # import argparse, os.path, time from bitio import * from pqueue import * # 定数 MIN_LEN = 4 MAX_LEN = 256 POS_BITS = 16 HUFF_BITS = 14 HUFF_SIZE = 1 << HUFF_BITS CODE_BITS = 9 CODE1_BITS = 5 ##### Huffman coding ##### # ハフマン木の節 class Node: def __init__(self, code, count = 0, left = None, right = None): self.code = code self.count = count self.left = left self.right = right def __gt__(x, y): return x.count > y.count def __le__(x, y): return x.count <= y.count # 符号の生成 def make_code(table, node, n, code): if node.code is not None: # 葉 table[node.code] = (n, code) else: make_code(table, node.left, n + 1, code << 1) # left is 0 make_code(table, node.right, n + 1, (code << 1) | 1) # right is 1 # ハフマン木の出力 def write_tree(node, fout, bits): if node.code is not None: # 葉 fout.putbit(1) fout.putbits(bits, node.code) else: fout.putbit(0) write_tree(node.left, fout, bits) write_tree(node.right, fout, bits) # ハフマン木の読み込み def read_tree(fin, bits): if fin.getbit() == 1: # 葉 node = Node(fin.getbits(bits)) else: node = Node(None) node.left = read_tree(fin, bits) node.right = read_tree(fin, bits) return node # ハフマン木の生成 (修正 2007/10/06) def make_tree(sym_table): q = PQueue() # ヒープの生成 for x in sym_table: if x.count > 0: q.push(x) if q.isEmpty(): return Node(None, 0, sym_table[0], sym_table[1]) while True: n1 = q.pop() if q.isEmpty(): if n1.code is not None: for x in sym_table: if x.count == 0: return Node(None, 0, n1, x) return n1 n2 = q.pop() q.push(Node(None, n1.count + n2.count, n1, n2)) # # code1 : 0 - 255 (ASCII) # 256 - (512 - MIN_LEN) # code2 : 距離のビット数 # # ビット数を求める def get_bit_num(n): n1 = 0 n2 = (n + 1) >> 1 while n2 > 0: n1 += 1 n2 >>= 1 return n1 # ハフマン符号化 def huff_encode(buff, size, fout): count1 = [Node(x) for x in range(256 + 256 - MIN_LEN + 1)] count2 = [Node(x) for x in range(POS_BITS + 1)] code1 = [None] * (256 + 256 - MIN_LEN + 1) code2 = [None] * (POS_BITS + 1) # 出現頻度表の作成 for x in range(size): n, m = buff[x] if m is None: count1[n].count += 1 else: count1[n + 256].count += 1 n1 = get_bit_num(m) count2[n1].count += 1 # ハフマン木の生成と出力 root1 = make_tree(count1) root2 = make_tree(count2) make_code(code1, root1, 0, 0) make_code(code2, root2, 0, 0) fout.putbits(HUFF_BITS, size - 1) write_tree(root1, fout, CODE_BITS) write_tree(root2, fout, CODE1_BITS) # 符号化 for x in range(size): n, m = buff[x] if m is None: fout.putbits(*code1[n]) else: fout.putbits(*code1[n + 256]) n1 = get_bit_num(m) fout.putbits(*code2[n1]) if n1 > 0: fout.putbits(n1, m + 1) # 記号の復号 def huff_decode_sub(node, fin): while node.code is None: if fin.getbit() == 0: node = node.left else: node = node.right return node.code # ハフマン復号 def huff_decode(buff, fin): size = fin.getbits(HUFF_BITS) + 1 root1 = read_tree(fin, CODE_BITS) root2 = read_tree(fin, CODE1_BITS) for x in range(size): n = huff_decode_sub(root1, fin) if n < 256: buff[x] = (n, None) else: n -= 256 m = huff_decode_sub(root2, fin) if m > 0: m = (1 << m) + fin.getbits(m) - 1 buff[x] = (n, m) return size ##### LZSS coding ##### # スライド窓 class Slide: def __init__(self, s): self.stream = s self.size = 1 << POS_BITS self.limit = self.size * 2 self.link = [None] * self.size self.ht = {} self.too_many_flag = {} self.buff = [0] * (self.limit + MAX_LEN) self.data_size = self.fread(0, self.limit + MAX_LEN) self.match_len = 0 self.match_pos = 0 # データ入力 def fread(self, start, size): for i in range(size): c = self.stream.getc() if c is None: return i self.buff[start + i] = c return size # データ移動 def move_data(self, to, from_, size): for n in range(size): self.buff[to + n] = self.buff[from_ + n] # スライド窓の更新 def update(self, rp): if self.data_size < self.limit + MAX_LEN: return rp # buffer update self.move_data(0, self.size, self.size + MAX_LEN) n = self.fread(self.size + MAX_LEN, self.size) self.data_size = self.size + MAX_LEN + n # hash update for k, v in list(self.ht.items()): if v < self.size: del self.ht[k] else: self.ht[k] = v - self.size # for x in range(self.size): v = self.link[x] if v == None or v < self.size: self.link[x] = None else: self.link[x] = v - self.size self.too_many_flag.clear() return rp - self.size # ハッシュ関数 def hash_value(self, rp): value = 0 for x in range(MIN_LEN): value = (value << 8) + self.buff[rp + x] return value # データの挿入 def insert(self, rp): value = self.hash_value(rp) if value in self.ht: self.link[rp & (self.size - 1)] = self.ht[value] else: self.link[rp & (self.size - 1)] = None self.ht[value] = rp # データの探索 def search(self, rp): b = self.buff value = self.hash_value(rp) limit = rp - self.size self.match_len = 0 self.match_pos = 0 offset = 0 while value in self.too_many_flag: if offset >= MAX_LEN - MIN_LEN or len(b) - (rp + offset) < MIN_LEN: offset = 0 value = self.hash_value(rp) break offset += 1 value = self.hash_value(rp + offset) # while True: count = 0 if value in self.ht: n = self.ht[value] else: n = None while n is not None and n - offset >= limit: count += 1 if b[rp + self.match_len] == b[n + self.match_len - offset]: x = 0 while x < MAX_LEN: if b[rp + x] != b[n + x - offset]: break x += 1 if self.match_len < x: self.match_len = x self.match_pos = n - offset if x == MAX_LEN: break n = self.link[n & (self.size - 1)] if count > 256: self.too_many_flag[value] = True if self.match_len >= offset + MIN_LEN or offset == 0: break # 再検索 offset = 0 value = self.hash_value(rp) # データの終端をチェック if self.match_len >= self.data_size - rp: self.match_len = self.data_size - rp # LZSS 符号の符号化 def encode(fin, fout): s = Slide(fin) rp = 0 huff_buff = [None] * HUFF_SIZE hcnt = 0 while rp < s.data_size: s.search(rp) if s.match_len < MIN_LEN: length = 1 huff_buff[hcnt] = (s.buff[rp], None) else: length = s.match_len huff_buff[hcnt] = ((length - MIN_LEN), rp - s.match_pos - 1) hcnt += 1 if hcnt >= HUFF_SIZE: huff_encode(huff_buff, hcnt, fout) hcnt = 0 for _ in range(length): s.insert(rp) rp += 1 if rp >= s.limit: rp = s.update(rp) if hcnt > 0: huff_encode(huff_buff, hcnt, fout) # 符号化 def encode_file(name1, name2): size = os.path.getsize(name1) with ByteIO(name1, ROPEN) as fin, BitIO(name2, WOPEN) as fout: fout.putbits(32, size) if size > 0: encode(fin, fout) # LZSS 符号の復号 def decode(fin, fout, size): rp = 0 buff = [0] * (1 << POS_BITS) huff_buff = [None] * HUFF_SIZE hcnt = 0 hsize = 0 while size > 0: if hcnt == hsize: hsize = huff_decode(huff_buff, fin) hcnt = 0 n, m = huff_buff[hcnt] hcnt += 1 if m is not None: length = n + MIN_LEN pos = rp - (m + 1) if pos < 0: pos += len(buff) for _ in range(length): c = buff[pos] fout.putc(c) buff[rp] = c pos += 1 rp += 1 if pos >= len(buff): pos = 0 if rp >= len(buff): rp = 0 else: length = 1 fout.putc(n) buff[rp] = n rp += 1 if rp >= len(buff): rp = 0 size -= length # 復号 def decode_file(name1, name2): with BitIO(name1, ROPEN) as fin, ByteIO(name2, WOPEN) as fout: size = fin.getbits(32) if size > 0: decode(fin, fout, size) # オプション解析 parser = argparse.ArgumentParser(description='LZH符号') parser.add_argument('name1', help='入力ファイル') parser.add_argument('name2', help='出力ファイル') parser.add_argument('-e', '--encode', action='store_true', help='符号化') parser.add_argument('-d', '--decode', action='store_true', help='復号') args = parser.parse_args() # 実行 s = time.time() if args.encode and not args.decode: encode_file(args.name1, args.name2) elif args.decode: decode_file(args.name1, args.name2) else: print('option error') e = time.time() print('{:.3f}'.format(e - s))