前回作成した LZW 符号のプログラムは、辞書が満杯になった時点で辞書を固定して、あとはその辞書をそのまま使ってファイルを圧縮しました。ところが、この方法では不都合な場合があるのです。たとえば、データの種類が異なる複数のファイルをアーカイバでまとめてから圧縮することを考えてみましょう。途中で辞書が満杯になりデータの種類が変化すると、それ以降のデータはほとんど圧縮することができません。
実際に、Canterbury Corpus で配布されている The Canterbury Corpus をアーカイバ tar でまとめたファイル canterbury.tar を LZW 符号で圧縮してみましょう。結果は次のようになりました。
表 : canterbury.tar の評価結果 辞書サイズ PROG サイズ 8k 32k 128k ---------------------------------------------------- LZW 2,821,120 2,410,308 2,647,698 1,196,012
このように、辞書サイズが小さい場合、ファイルを圧縮することはほとんどできません。この場合は、大域的な辞書を作成していることが裏目となり、データの変化についていけないのです。このような問題を解決するために、辞書が満杯になった時の対処方法がいくつか考案されています。今回はその中の一つである「LZT 符号」を取り上げます。
LZT 符号は 1987 年に Tischer 氏によって開発されました。おもな改良点は、辞書が満杯になった場合、長い間使われていない (最長時間未使用 : Least Recently Used) 語を取り除くことで辞書の空きスペースを作るところです。このような操作を「LRU スキーム」と呼びます。
LZT 符号は LRU スキームを行うため符号化と復号に時間がかかることになりますが、少ないメモリでも高い圧縮率を期待することができます。また、データの局所的な偏在も辞書に反映することが可能になります。
辞書の管理は「キュー (queue)」を使って行うと簡単です。キューは先入れ先出し (FIFO) のデータ構造です。トライをたどっていくときに、通過した節をキューから取り出して最後尾へ移動します。そうすると、アクセスされない節ほどキューの先頭に集まることになるので、節を削除する場合はキューの先頭から行えばいいわけです。
一般に、キューはリングバッファや連結リストを使うと簡単に実装することができます。データの出し入れがキューの先頭と最後尾だけならば、これらのデータ構造で十分なのですが、キューの途中からデータを削除する場合には適しているとはいえません。
リングバッファは配列を使って実装するので、削除した分だけデータを移動しないといけません。また、連結リストでデータを削除する場合、そのデータを保持しているセルと、ひとつ手前のセルの両方が必要になるので、リストの先頭から順番にセルをたどらないといけません。どちらの方法でも、キューの途中からデータを削除するには時間がかかるのです。
そこで、今回は「双方向リスト」を使って節の順番を管理することにします。双方向リストは名前が示すように、直後のセルだけでなく直前のセルへの参照を持たせたデータ構造です。下図に双方向リストの構造を示します。
PREV NEXT PREV NEXT PREV NEXT ┌─┬─┬─┐ ┌─┬─┬─┐ ┌─┬─┬─┐ ←─┼・│ │・│←─┼・│ │・│←─┼・│ │・│←─ ─→│・│ │・┼─→│・│ │・┼─→│・│ │・┼─→ └─┴─┴─┘ └─┴─┴─┘ └─┴─┴─┘ DATA DATA DATA 図 1 : 双方向リスト
双方向リストは拙作のページ「連結リストとキュー」で詳しく説明しています。よろしければ参考にしてください。
ところで、前回作成した LZW 符号のプログラムはトライを二分木のように構成しています。この方法では子を探す処理が線形探索になるため、記号の種類が多くなるほど処理に時間がかかる、という欠点があります。LZW 符号の高速化は、参考文献『C言語による最新アルゴリズム事典』の LZ 法で次のように説明されています。参考文献より引用します。
『より高速化するには、木のノード p の文字 c に当たる子の位置を見つけるために p, c によるハッシュ法を使う。』
ようするに、節の子をハッシュ法で管理するわけです。親節の番号 (辞書番号) と子の値 (記号) を用いてハッシュ値を計算すると、一つのハッシュ表で節の子を管理することができます。
Python の辞書 (ハッシュ) はタプルをキーにできるので、(辞書番号, 記号) のようにタプルにまとめれば簡単です。数値に変換する場合は次のようにするといいでしょう。
リスト : ハッシュ関数 def hash_func(n, c): return ((n << 8) + c) % HASH_SIZE
p が親の辞書番号、c が子の記号です。p を 8 ビットシフトして c を加算し、ハッシュ表の大きさ HASH_SIZE で割り算した余りをハッシュ値としています。とても簡単な方法ですが、これでも十分にハッシュ法は機能します。
今回はハッシュ法を使ってトライを実装することにしましょう。具体的な説明は、プログラムを作成するところで行います。なお、LZW 符号の高速化はハッシュ法だけではなく Ternary Search Tree を用いる方法もあります。M.Hiroi はC言語で試したことがあるのですが、テキストファイルでは Ternary Search Tree の方がハッシュ法よりも高速でした。興味のある方は挑戦してみてください。
最初はキューから作成しましょう。次のリストを見てください。
リスト : 双方向リストによるキューの実装 class Queue: def __init__(self): self.prev = [None] * DIC_SIZE self.next_ = [None] * DIC_SIZE self.prev[HEAD] = HEAD self.next_[HEAD] = HEAD
今回作成するキューは辞書番号を管理すればいいので、双方向リストは配列を使って実装します。辞書番号を配列の添字に対応させ、配列 prev にひとつ前のデータ (辞書番号) を格納し、配列 next_ にひとつ後ろの辞書番号を格納します。トライの節 x をアクセスした場合は、prev[x] と next_[x] を双方向リストから削除して、それを最後尾へ追加することで LRU スキームを実現することができます。
それから、prev[0] と next_[0] を双方向リストのヘッダとして使います。LZW 符号の場合、256 種類の記号は必ず辞書に登録されるので、0 番目から 255 番目の辞書番号は LRU スキームを行う必要はありません。0 - 255 は双方向リストに登録しないので、prev[0], next_[0] をヘッダに使っても問題ないわけです。変数 HEAD (0) がヘッダを表していて、prev[HEAD] と next_[HEAD] を HEAD に初期化します。これでキューを空に初期化することができます。
次は、双方向リストの最後尾にデータを追加するメソッド insert() を作ります。次のリストを見てください。
リスト : キューの最後尾に追加 def insert(self, x): last = self.prev[HEAD] self.prev[x] = last self.next_[x] = HEAD self.next_[last] = x self.prev[HEAD] = x
ヘッダから最後尾の辞書番号を取り出して last にセットします。この後ろに辞書番号 n を追加します。次の図を見てください。
L Header ┌─┬─┬─┐ ┌─┬─┬─┐ ←─┼・│ │ │←─┼L│ │ │←─ ─→│ │ │H┼─→│ │ │・┼─→ └─┴─┴─┘ └─┴─┴─┘ PREV NEXT PREV NEXT L と Header の間に N を追加する (a) 挿入前 L N Header ┌─┬─┬─┐ ┌─┬─┬─┐ ┌─┬─┬─┐ ←─┼・│ │ │←─┼L│ │ │←─┼N│ │ │←─ ─→│ │ │N┼─→│ │ │H┼─→│ │ │・┼─→ └─┴─┴─┘ └─┴─┴─┘ └─┴─┴─┘ PREV NEXT PREV NEXT PREV NEXT (b) 挿入後 図 2 : 双方向リストへの挿入
Header の一つ後ろのセル L の後ろに N を追加します。L は Header の PREV から求めることができます。N の PREV に L をセットし、NEXT に Header をセットします。あとは、L の NEXT と Header の PREV を N に書き換えます。プログラムでは、prev[n] に last をセットし、next_[n] に HEAD をセットします。あとは、prev[HEAD] と next_[last] を n に書き換えます。これで、HEAD と last の間に n を挿入することができます。
次は、双方向リストからデータを削除するメソッド delete() を作ります。次のリストを見てください。
リスト : キューからデータを削除する def delete(self, x): p = self.prev[x] q = self.next_[x] self.next_[p] = q self.prev[q] = p
辞書番号 n 自身の削除はとても簡単です。次の図を見てください。
セル N の直前のセルを P とし直後のセルを Q とすると、P の NEXT を Q に、Q の PREV を P に書き換えれば N を削除することができます。プログラムでは、prev[n] から p を求め、next_[n] から q を求めます。あとは、next_[p] を q に、prev[q] を p に書き換えます。これで n を削除することができます。
P N Q ┌─┬─┬─┐ ┌─┬─┬─┐ ┌─┬─┬─┐ ←─┼・│ │ │←─┼P│ │ │←─┼N│ │ │←─ ─→│ │ │N┼─→│ │ │Q┼─→│ │ │・┼─→ └─┴─┴─┘ └─┴─┴─┘ └─┴─┴─┘ PREV NEXT PREV NEXT PREV NEXT セル N を削除する (a) 削除前 P Q ┌─┬─┬─┐ ┌─┬─┬─┐ ←─┼・│ │ │←─┼P│ │ │←─ ─→│ │ │Q┼─→│ │ │・┼─→ └─┴─┴─┘ └─┴─┴─┘ PREV NEXT PREV NEXT (b) 削除後 図 3 : 双方向リストからの削除
最後に、双方向リストの先頭からデータを求めるメソッド traverse() を作ります。
リスト : 巡回 def traverse(self): n = self.next_[HEAD] while n != HEAD: yield n n = self.next_[n]
traverse() はジェネレータで実装しました。ヘッダから next_ を順番にたどっていき、yield で辞書番号を返します。
次は辞書を表すクラス Dic を作成します。次のリストを見てください。
リスト : 辞書の定義 class Dic: def __init__(self): self.sym = [None] * DIC_SIZE self.parent = [None] * DIC_SIZE self.child = [0] * DIC_SIZE self.ht = {} for x in range(256): self.sym[x] = x self.num = 256
辞書は配列を使って実装します。配列の添字が辞書番号に対応します。sym は記号を格納し、parent は親の辞書番号を格納します。そして、child が子の個数を格納します。ht は親子関係を表すハッシュで、num は辞書番号を管理します。
ハッシュを使ってトライを実装すると、節 (辞書番号) が「葉」なのかすぐに判別することができません。そこで、子の個数を配列 child で管理します。たとえば、n に子を追加するときは child[n] を +1 します。n の子を削除するときは child[n] を -1 します。child[n] が 0 の場合、n は葉であることがすぐにわかります。
次は記号 c を持つ子を探すメソッド search() を作ります。
リスト : 記号 c を持つ子を探す def search(self, n, c): key = (n, c) if key in self.ht: return self.ht[key] return None
search() は n の子の中から記号 c を持つ子を探します。最初に、タプル (n, c) を変数 key にセットします。そして、key がハッシュ ht にある場合は、その値 ht[key] を返します。そうでなければ None を返します。探索にハッシュを使っているので、高速に動作することが期待できます。
次は子を挿入するメソッド insert() を作ります。
リスト : 記号 c を持つ子を挿入 def insert(self, q, p, c): if self.num == DIC_SIZE: n = self.search_leaf(q) q.delete(n) self.delete(n) else: n = self.num self.num += 1 self.sym[n] = c self.parent[n] = p self.child[n] = 0 self.child[p] += 1 self.ht[(p, c)] = n q.insert(n)
引数 q はキュー、p が親の辞書番号、c が記号です。num が DIC_SIZE と等しい場合は辞書に空きがないので、メソッド search_leaf() で最も使われていない「葉」をキューから探し出します。このとき、トライの途中にある節 (辞書番号) は、削除できないことに注意してください。
見つけた葉を n にセットし、q.delete() でキューから削除し、self.delete() でトライから削除します。辞書に空きがある場合は、num が子の辞書番号になります。
それから、sym[n] に記号 c をセットし、parent[n] に親 p をセットします。child[n] は 0 で初期化して、child[p] の値は +1 します。そして、ハッシュ ht[(p, c)] に n をセットします。これで、p と n の親子関係がリンクされます。最後に、キューの最後尾に n を追加します。
次は、キューの先頭から葉を探すメソッド search_leaf() と削除するメソッド delete() を作ります。
リスト : 葉の探索と削除 # 葉を探す def search_leaf(self, q): for x in q.traverse(): if self.child[x] == 0: return x return None # 削除 def delete(self, n): p = self.parent[n] c = self.sym[n] del self.ht[(p, c)] self.child[p] -= 1
search_leaf() の引数 q はキューを表します。葉を探す処理は簡単です。キューの traverse() を使って、辞書番号 x を取り出します。child[x] が 0 ならば return で x を返します。見つからない場合は None を返していますが、必ず見つかるはずなので、raise でエラーを送出してもいいでしょう。
delete() の引数 n は削除する辞書番号です。まず、parent[n] から親を、sym[n] から記号を求めて、変数 p, c にセットします。そして、ハッシュ ht からキー (p, c) を削除し、child[p] を -1 します。これでトライから n を削除することができます。
次は符号化を行う関数 encode() を作ります。
リスト : LZT 符号の符号化 def encode(fin, fout): dic = Dic() que = Queue() p = fin.getc() while True: c = fin.getc() if c is None: encode1(fout, p) break q = dic.search(p, c) if q is None: encode1(fout, p) dic.insert(que, p, c) p = c else: que.delete(q) que.insert(q) p = q
引数 fin は入力ファイル (ByteIO のオブジェクト)、fout が出力ファイル (BitIO のオブジェクト) です。最初に、辞書 Dic のオブジェクトとキュー Queue のオブジェクトを生成して、変数 dic と que にセットします。次に、fin から 1 記号読み込んで変数 p にセットします。p がひとつ前の辞書番号を保持します。
次の while ループで符号化を行います。fin から 1 記号読み込んで変数 c にセットします。c が None の場合、p が最長一致系列になるので、encode1() で p を符号化して break でループを終了します。そうでなければ dic.search() を呼び出して、p に記号 c を持つ子がないか探索します。
q が None の場合、p が最長一致系列になるので、encode1() で p を符号化します。そして、dic.insert() で p に記号 c を持つ子を追加します。子 q が見つかった場合、q を que.delete() でキューから削除し、que.insert() で最後尾に追加します。これで LRU スキームを行うことができます。
次は復号を行う関数 decode() を作ります。
リスト : LZT 符号の復号 def decode(fin, fout, size): que = Queue() dic = Dic() p = decode1(fin) c, i = dic.output(que, p, fout) size -= i while size > 0: q = decode1(fin) if dic.check_code(que, q): c, i = dic.output(que, q, fout) dic.insert(que, p, c) else: dic.insert(que, p, c) c, i = dic.output(que, q, fout) p = q size -= i
LZT 符号の場合、復号でも符号化と同じタイミングで LRU スキームを行う必要があります。このため、今回は復号の処理でも辞書 Dic を使うことにします。
最初に、辞書 Dic とキュー Queue のオブジェクトを生成して、変数 que と dic にセットします。次に、decode1() で辞書番号を復号して変数 p にセットし、Dic のメソッド output() で記号列に復号します。output() の返り値は LZW 符号と同じで、記号列の先頭文字と記号列の長さです。
それから while ループで復号処理を行います。ポイントは decode1() で復号した辞書番号 q が辞書に登録されているかチェックするところです。LZW 符号では簡単にチェックすることができましたが、LZT 符号では辞書番号を削除して再登録するときにもチェックが必要になります。この処理を Dic のメソッド check_code() で行います。
リスト : 辞書番号のチェック def check_code(self, q, n): if self.num == DIC_SIZE: return self.search_leaf(q) != n return n < self.num
まず、辞書が満杯かチェックします。満杯でなければ LZW 符号と同じく、n < self.num であれば、復号した n は辞書に登録されています。return で n < self.num の結果を返します。
辞書が満杯の場合は、最も使われていない葉を削除することになります。これから削除する葉と復号した n が同じ場合は、その n はまだ辞書に登録されていません。削除される葉は search_leaf() で求めることができます。この返り値と n が異なっていれば True を返します。そうでなければ False を返します。
最後に Dic のメソッド output() を作ります。
リスト : 記号列の出力 def output(self, q, n, fout): if self.parent[n] is None: fout.putc(n) return n, 1 else: m, i = self.output(q, self.parent[n], fout) fout.putc(self.sym[n]) q.delete(n) q.insert(n) return m, i + 1
output() の引数 q がキューで、n が辞書番号、fout が出力ファイルです。注意する点は、符号化したときと同じ順番で節を双方向リストの最後尾へ移動することです。output() を再帰呼び出しして、記号 sym[n] を出力します。そのあとで、q.delete() で n を削除し、q.insert() で最後尾に n を追加します。これで LRU スキームを行うことができます。
それでは、実際にファイルを圧縮してみましょう。最初に The Canterbury Corpus をアーカイバ tar でまとめた canterbury.tar の結果を表に示します。
表 : canterbury.tar の評価結果 辞書サイズ PROG サイズ 8k 32k 128k ---------------------------------------------------- LZW 2,821,120 2,410,308 2,647,698 1,196,012 LZT 2,821,120 897,810 901,337 946,024
LZT 符号は LZW 符号よりも高い圧縮率になりました。LZT 符号の LRU スキームは大きな効果を発揮していることがわかります。次は The Canterbury Corpus の評価結果を表に示します。
表 : LZT 符号の評価結果 (辞書サイズ 8k) ファイル名 サイズ LZW LZT 符号化 復号 ---------------------------------------------------------- alice29.txt 152,089 68,448 64,193 0.490 0.551 asyoulik.txt 125,179 59,085 56,937 0.405 0.462 cp.html 24,603 12,150 11.111 0.079 0.087 fields.c 11,150 5,760 4,810 0.037 0.041 grammar.lsp 3,721 2,294 1,747 0.014 0.020 kennedy.xls 1,029,744 339,542 274,242 2.528 2.720 lcet10.txt 426,754 194,996 180,431 1.354 1.542 plrabn12.txt 481,861 220,850 215,221 1.622 1.771 ptt5 513,216 66,101 61,407 0.846 0.941 sum 38,240 30,163 19,364 0.135 0.150 xargs.1 4,227 2,916 2,249 0.017 0.019 ---------------------------------------------------------- 合計 2,810,784 1,002,305 891,716 7.527 8.304 符号化と復号の単位 : 秒
実行環境 : Python 3.8.10, Ubuntu 20.04 LTS (WSL1), Intel Core i5-6200U 2.30GHz
辞書サイズが 8 k の場合、LZT 符号の圧縮率は LZW 符号よりも高くなりました。辞書が小さくでも LRU スキームの効果により、LZT 符号は高い圧縮率を達成できることがわかります。符号化と復号の処理時間は、LZW 符号よりも遅くなりました。LZT 符号は LRU スキームを行っているので、その分だけ LZW 符号よりも時間がかかるのはしかたがありません。
次は辞書サイズを増やして試してみました。結果は次のようになります。
表 : LZT 符号の評価結果 (2) ファイル名 サイズ 8 k 32 k 64 k --------------------------------------------------- alice29.txt 152,089 64,193 61.097 60,513 asyoulik.txt 125,179 56,937 54,117 53,206 cp.html 24,603 11.111 10,877 10,877 fields.c 11,150 4,810 4,810 4,810 grammar.lsp 3,721 1,747 1,747 1,747 kennedy.xls 1,029,744 274,242 303,277 316,931 lcet10.txt 426,754 180,431 163,278 159,994 plrabn12.txt 481,861 215,221 198,821 194,643 ptt5 513,216 61,407 61,064 60,333 sum 38,240 19,364 19,473 19,473 xargs.1 4,227 2,249 2,249 2,249 --------------------------------------------------- 合計 2,810,784 891,716 880,810 884,776
辞書サイズを増やすとテキストファイルの圧縮率は向上しますが、kennedy.xls のように圧縮率が低下するファイルもあります。LZB 符号に比べると、辞書サイズを増やしても圧縮率はそれほど高くなりません。LZT 符号の場合、辞書サイズが小さいときは LRU スキームが効果的に働くのですが、辞書サイズが大きくなると LRU スキームの効果は少なくなるようです。
なお、実行時間の結果は M.Hiroi のコーディング、実行したマシン、プログラミング言語などの環境に大きく依存しています。また、これらの環境だけではなく、データの種類によっても実行時間はかなり左右されます。興味のある方は、いろいろなデータをご自分の環境で試してみてください。
ところで、LZSS 符号とハフマン符号を組み合わせた LZH 符号は、二段階目のハフマン符号の効果が大きく、高い圧縮率を達成することができました。それでは、LZT 符号とハフマン符号を組み合わせるとどうなるのでしょうか。実際に試してみましょう。
今回の方法は、辞書番号をγ符号のようにビット数とビット列に分け、ビット数をハフマン符号で符号化します。プログラムの説明は割愛いたしますので、詳細はプログラムリスト2をお読みください。
実行結果は次のようになりました。
表 : LZT 符号 + ハフマン符号の評価結果 ファイル名 サイズ 8 k 32 k 64 k --------------------------------------------------- alice29.txt 152,089 64,311 62.210 62.292 asyoulik.txt 125,179 57,076 55,074 55,074 cp.html 24,603 11.369 11,369 11,369 fields.c 11,150 5,114 5,144 5,144 grammar.lsp 3,721 1,832 1,832 1,832 kennedy.xls 1,029,744 269,891 281,721 286,480 lcet10.txt 426,754 180,242 164,269 161,511 plrabn12.txt 481,861 215,001 199,219 195,599 ptt5 513,216 61,251 61,598 61,761 sum 38,240 19,383 20,533 20,533 xargs.1 4,227 2,330 2,330 2,330 --------------------------------------------------- 合計 2,810,784 887,800 865,269 863,895
kennedy.xls のように圧縮率が高くなるファイルもありますが、圧縮率が低下するファイルが多くなりました。LZT 符号の場合、ハフマン符号と単純に組み合わせるだけでは、高い圧縮率を達成するのは難しいようです。興味のある方はいろいろ試してみてください。
# # lzt.py : LZT coding + CBT coding # # Copyright (C) 2007-2022 Makoto Hiroi # import argparse, os.path, time from bitio import * # 定数 DIC_BITS = 13 DIC_SIZE = 1 << DIC_BITS HEAD = 0 # 双方向リストによるキュー # 0 がヘッダ, 1 - 255 はダミー class Queue: def __init__(self): self.prev = [None] * DIC_SIZE self.next_ = [None] * DIC_SIZE self.prev[HEAD] = HEAD self.next_[HEAD] = HEAD # 最後尾に追加 def insert(self, x): last = self.prev[HEAD] self.prev[x] = last self.next_[x] = HEAD self.next_[last] = x self.prev[HEAD] = x # 削除 def delete(self, x): p = self.prev[x] q = self.next_[x] self.next_[p] = q self.prev[q] = p # 巡回 def traverse(self): n = self.next_[HEAD] while n != HEAD: yield n n = self.next_[n] # 辞書 class Dic: def __init__(self): self.sym = [None] * DIC_SIZE self.parent = [None] * DIC_SIZE self.child = [0] * DIC_SIZE self.ht = {} for x in range(256): self.sym[x] = x self.num = 256 # 探索 def search(self, n, c): key = (n, c) if key in self.ht: return self.ht[key] return None # 葉を探す def search_leaf(self, q): for x in q.traverse(): if self.child[x] == 0: return x return None # 削除 def delete(self, n): p = self.parent[n] c = self.sym[n] del self.ht[(p, c)] self.child[p] -= 1 # 挿入 def insert(self, q, p, c): if self.num == DIC_SIZE: n = self.search_leaf(q) q.delete(n) self.delete(n) else: n = self.num self.num += 1 self.sym[n] = c self.parent[n] = p self.child[n] = 0 self.child[p] += 1 self.ht[(p, c)] = n q.insert(n) # 辞書番号のチェック def check_code(self, q, n): if self.num == DIC_SIZE: return self.search_leaf(q) != n return n < self.num # 出力 def output(self, q, n, fout): if self.parent[n] is None: fout.putc(n) return n, 1 else: m, i = self.output(q, self.parent[n], fout) fout.putc(self.sym[n]) q.delete(n) q.insert(n) return m, i + 1 # 符号語長 dic_bits = 9 code_count = 256 # 辞書番号の符号化 def encode1(fout, n): global dic_bits, code_count if dic_bits < DIC_BITS: fout.cbt_encode(n, code_count, dic_bits) code_count += 1 if code_count > (1 << dic_bits) - 1: dic_bits += 1 else: fout.putbits(DIC_BITS, n) # 辞書番号の復号 def decode1(fin): global dic_bits, code_count if dic_bits < DIC_BITS: n = fin.cbt_decode(code_count, dic_bits) code_count += 1 if code_count > (1 << dic_bits) - 1: dic_bits += 1 else: n = fin.getbits(DIC_BITS) return n # LZT 符号の符号化 def encode(fin, fout): dic = Dic() que = Queue() p = fin.getc() while True: c = fin.getc() if c is None: encode1(fout, p) break q = dic.search(p, c) if q is None: encode1(fout, p) dic.insert(que, p, c) p = c else: que.delete(q) que.insert(q) p = q # LZT 符号の復号 def decode(fin, fout, size): que = Queue() dic = Dic() p = decode1(fin) c, i = dic.output(que, p, fout) size -= i while size > 0: q = decode1(fin) if dic.check_code(que, q): c, i = dic.output(que, q, fout) dic.insert(que, p, c) else: dic.insert(que, p, c) c, i = dic.output(que, q, fout) p = q size -= i # 符号化 def encode_file(name1, name2): size = os.path.getsize(name1) with ByteIO(name1, ROPEN) as fin, BitIO(name2, WOPEN) as fout: fout.putbits(32, size) if size > 0: encode(fin, fout) # 復号 def decode_file(name1, name2): with BitIO(name1, ROPEN) as fin, ByteIO(name2, WOPEN) as fout: size = fin.getbits(32) if size > 0: decode(fin, fout, size) # オプション解析 parser = argparse.ArgumentParser(description='LZT符号') parser.add_argument('name1', help='入力ファイル') parser.add_argument('name2', help='出力ファイル') parser.add_argument('-e', '--encode', action='store_true', help='符号化') parser.add_argument('-d', '--decode', action='store_true', help='復号') args = parser.parse_args() # 実行 s = time.time() if args.encode and not args.decode: encode_file(args.name1, args.name2) elif args.decode: decode_file(args.name1, args.name2) else: print('option error') e = time.time() print('{:.3f}'.format(e - s))
# # lzth.py : LZT coding + Huffman coding # # Copyright (C) 2007-2022 Makoto Hiroi # import argparse, os.path, time from bitio import * from pqueue import * # 定数 DIC_BITS = 13 DIC_SIZE = 1 << DIC_BITS HEAD = 0 HUFF_BITS = 14 HUFF_SIZE = 1 << HUFF_BITS CODE_BITS = 5 ##### Huffman coding ##### # ハフマン木の節 class Node: def __init__(self, code, count = 0, left = None, right = None): self.code = code self.count = count self.left = left self.right = right def __gt__(x, y): return x.count > y.count def __le__(x, y): return x.count <= y.count # 符号の生成 def make_code(table, node, n, code): if node.code is not None: # 葉 table[node.code] = (n, code) else: make_code(table, node.left, n + 1, code << 1) # left is 0 make_code(table, node.right, n + 1, (code << 1) | 1) # right is 1 # ハフマン木の出力 def write_tree(node, fout): if node.code is not None: # 葉 fout.putbit(1) fout.putbits(CODE_BITS, node.code) else: fout.putbit(0) write_tree(node.left, fout) write_tree(node.right, fout) # ハフマン木の読み込み def read_tree(fin): if fin.getbit() == 1: # 葉 node = Node(fin.getbits(CODE_BITS)) else: node = Node(None) node.left = read_tree(fin) node.right = read_tree(fin) return node # ハフマン木の生成 def make_tree(sym_table): q = PQueue() # ヒープの生成 for x in sym_table: if x.count > 0: q.push(x) while True: n1 = q.pop() if q.isEmpty(): return n1 n2 = q.pop() q.push(Node(None, n1.count + n2.count, n1, n2)) # ビット数を求める def get_bit_num(n): n1 = 0 n2 = (n + 1) >> 1 while n2 > 0: n1 += 1 n2 >>= 1 return n1 # ハフマン符号化 def huff_encode(buff, size, fout): count = [Node(x) for x in range(DIC_BITS + 1)] code = [None] * (DIC_BITS + 1) # 出現頻度表の作成 for x in range(size): n = buff[x] n1 = get_bit_num(n) count[n1].count += 1 # ハフマン木の生成と出力 root = make_tree(count) make_code(code, root, 0, 0) fout.putbits(HUFF_BITS, size - 1) write_tree(root, fout) # 符号化 for x in range(size): n = buff[x] n1 = get_bit_num(n) fout.putbits(*code[n1]) if n1 > 0: fout.putbits(n1, n + 1) # 記号の復号 def huff_decode_sub(node, fin): while node.code is None: if fin.getbit() == 0: node = node.left else: node = node.right return node.code # ハフマン復号 def huff_decode(buff, fin): size = fin.getbits(HUFF_BITS) + 1 root = read_tree(fin) for x in range(size): n = huff_decode_sub(root, fin) if n > 0: n = (1 << n) + fin.getbits(n) - 1 buff[x] = n return size ##### LZT 符号 ##### # 双方向リストによるキュー # 0 がヘッダ, 1 - 255 はダミー class Queue: def __init__(self): self.prev = [None] * DIC_SIZE self.next_ = [None] * DIC_SIZE self.prev[HEAD] = HEAD self.next_[HEAD] = HEAD # 最後尾に追加 def insert(self, x): last = self.prev[HEAD] self.prev[x] = last self.next_[x] = HEAD self.next_[last] = x self.prev[HEAD] = x # 削除 def delete(self, x): p = self.prev[x] q = self.next_[x] self.next_[p] = q self.prev[q] = p # 巡回 def traverse(self): n = self.next_[HEAD] while n != HEAD: yield n n = self.next_[n] # 辞書 class Dic: def __init__(self): self.sym = [None] * DIC_SIZE self.parent = [None] * DIC_SIZE self.child = [0] * DIC_SIZE self.ht = {} for x in range(256): self.sym[x] = x self.num = 256 # 探索 def search(self, n, c): key = (n, c) if key in self.ht: return self.ht[key] return None # 葉を探す def search_leaf(self, q): for x in q.traverse(): if self.child[x] == 0: return x return None # 削除 def delete(self, n): p = self.parent[n] c = self.sym[n] del self.ht[(p, c)] self.child[p] -= 1 # 挿入 def insert(self, q, p, c): if self.num == DIC_SIZE: n = self.search_leaf(q) q.delete(n) self.delete(n) else: n = self.num self.num += 1 self.sym[n] = c self.parent[n] = p self.child[n] = 0 self.child[p] += 1 self.ht[(p, c)] = n q.insert(n) # 辞書番号のチェック def check_code(self, q, n): if self.num == DIC_SIZE: return self.search_leaf(q) != n return n < self.num # 出力 def output(self, q, n, fout): if self.parent[n] is None: fout.putc(n) return n, 1 else: m, i = self.output(q, self.parent[n], fout) fout.putc(self.sym[n]) q.delete(n) q.insert(n) return m, i + 1 # LZT 符号の符号化 def encode(fin, fout): huff_buff = [None] * HUFF_SIZE hcnt = 0 dic = Dic() que = Queue() p = fin.getc() while True: c = fin.getc() if c is None: huff_buff[hcnt] = p hcnt += 1 huff_encode(huff_buff, hcnt, fout) break q = dic.search(p, c) if q is None: huff_buff[hcnt] = p hcnt += 1 if hcnt == HUFF_SIZE: huff_encode(huff_buff, hcnt, fout) hcnt = 0 dic.insert(que, p, c) p = c else: que.delete(q) que.insert(q) p = q # LZT 符号の復号 def decode(fin, fout, size): que = Queue() dic = Dic() huff_buff = [None] * HUFF_SIZE hsize = huff_decode(huff_buff, fin) hcnt = 1 p = huff_buff[0] c, i = dic.output(que, p, fout) size -= i while size > 0: if hcnt == hsize: hsize = huff_decode(huff_buff, fin) hcnt = 0 q = huff_buff[hcnt] hcnt += 1 if dic.check_code(que, q): c, i = dic.output(que, q, fout) dic.insert(que, p, c) else: dic.insert(que, p, c) c, i = dic.output(que, q, fout) p = q size -= i # 符号化 def encode_file(name1, name2): size = os.path.getsize(name1) with ByteIO(name1, ROPEN) as fin, BitIO(name2, WOPEN) as fout: fout.putbits(32, size) if size > 0: encode(fin, fout) # 復号 def decode_file(name1, name2): with BitIO(name1, ROPEN) as fin, ByteIO(name2, WOPEN) as fout: size = fin.getbits(32) if size > 0: decode(fin, fout, size) # 復号 def decode_file(name1, name2): with BitIO(name1, ROPEN) as fin, ByteIO(name2, WOPEN) as fout: size = fin.getbits(32) if size > 0: decode(fin, fout, size) # オプション解析 parser = argparse.ArgumentParser(description='LZT+Huffman符号') parser.add_argument('name1', help='入力ファイル') parser.add_argument('name2', help='出力ファイル') parser.add_argument('-e', '--encode', action='store_true', help='符号化') parser.add_argument('-d', '--decode', action='store_true', help='復号') args = parser.parse_args() # 実行 s = time.time() if args.encode and not args.decode: encode_file(args.name1, args.name2) elif args.decode: decode_file(args.name1, args.name2) else: print('option error') e = time.time() print('{:.3f}'.format(e - s))