M.Hiroi's Home Page

Algorithms with Python

LZT 符号

[ PrevPage | Python | NextPage ]

はじめに

前回作成した LZW 符号のプログラムは、辞書が満杯になった時点で辞書を固定して、あとはその辞書をそのまま使ってファイルを圧縮しました。ところが、この方法では不都合な場合があるのです。たとえば、データの種類が異なる複数のファイルをアーカイバでまとめてから圧縮することを考えてみましょう。途中で辞書が満杯になりデータの種類が変化すると、それ以降のデータはほとんど圧縮することができません。

実際に、Canterbury Corpus で配布されている The Canterbury Corpus をアーカイバ tar でまとめたファイル canterbury.tar を LZW 符号で圧縮してみましょう。結果は次のようになりました。

        表 : canterbury.tar の評価結果

                              辞書サイズ
 PROG    サイズ         8k        32k        128k
----------------------------------------------------
 LZW    2,821,120   2,410,308  2,647,698  1,196,012

このように、辞書サイズが小さい場合、ファイルを圧縮することはほとんどできません。この場合は、大域的な辞書を作成していることが裏目となり、データの変化についていけないのです。このような問題を解決するために、辞書が満杯になった時の対処方法がいくつか考案されています。今回はその中の一つである「LZT 符号」を取り上げます。

●LZT 符号

LZT 符号は 1987 年に Tischer 氏によって開発されました。おもな改良点は、辞書が満杯になった場合、長い間使われていない (最長時間未使用 : Least Recently Used) 語を取り除くことで辞書の空きスペースを作るところです。このような操作を「LRU スキーム」と呼びます。

LZT 符号は LRU スキームを行うため符号化と復号に時間がかかることになりますが、少ないメモリでも高い圧縮率を期待することができます。また、データの局所的な偏在も辞書に反映することが可能になります。

辞書の管理は「キュー (queue)」を使って行うと簡単です。キューは先入れ先出し (FIFO) のデータ構造です。トライをたどっていくときに、通過した節をキューから取り出して最後尾へ移動します。そうすると、アクセスされない節ほどキューの先頭に集まることになるので、節を削除する場合はキューの先頭から行えばいいわけです。

一般に、キューはリングバッファや連結リストを使うと簡単に実装することができます。データの出し入れがキューの先頭と最後尾だけならば、これらのデータ構造で十分なのですが、キューの途中からデータを削除する場合には適しているとはいえません。

リングバッファは配列を使って実装するので、削除した分だけデータを移動しないといけません。また、連結リストでデータを削除する場合、そのデータを保持しているセルと、ひとつ手前のセルの両方が必要になるので、リストの先頭から順番にセルをたどらないといけません。どちらの方法でも、キューの途中からデータを削除するには時間がかかるのです。

そこで、今回は「双方向リスト」を使って節の順番を管理することにします。双方向リストは名前が示すように、直後のセルだけでなく直前のセルへの参照を持たせたデータ構造です。下図に双方向リストの構造を示します。

双方向リストは拙作のページ 連結リストとキュー で詳しく説明しています。よろしければ参考にしてください。

●ハッシュ法によるトライの実装

ところで、前回作成した LZW 符号のプログラムはトライを二分木のように構成しています。この方法では子を探す処理が線形探索になるため、記号の種類が多くなるほど処理に時間がかかる、という欠点があります。LZW 符号の高速化は、参考文献 2 の LZ 法で次のように説明されています。参考文献 2 より引用します。

『より高速化するには、木のノード p の文字 c に当たる子の位置を見つけるために p, c によるハッシュ法を使う。』

ようするに、節の子をハッシュ法で管理するわけです。親節の番号 (辞書番号) と子の値 (記号) を用いてハッシュ値を計算すると、一つのハッシュ表で節の子を管理することができます。

Python の辞書 (ハッシュ) はタプルをキーにできるので、(辞書番号, 記号) のようにタプルにまとめれば簡単です。数値に変換する場合は次のようにするといいでしょう。

リスト : ハッシュ関数

def hash_func(n, c):
    return ((n << 8) + c) % HASH_SIZE

p が親の辞書番号、c が子の記号です。p を 8 ビットシフトして c を加算し、ハッシュ表の大きさ HASH_SIZE で割り算した余りをハッシュ値としています。とても簡単な方法ですが、これでも十分にハッシュ法は機能します。

今回はハッシュ法を使ってトライを実装することにしましょう。具体的な説明は、プログラムを作成するところで行います。なお、LZW 符号の高速化はハッシュ法だけではなく Ternary Search Tree を用いる方法もあります。M.Hiroi はC言語で試したことがあるのですが、テキストファイルでは Ternary Search Tree の方がハッシュ法よりも高速でした。興味のある方は挑戦してみてください。

●キューの作成

最初はキューから作成しましょう。次のリストを見てください。

リスト : 双方向リストによるキューの実装

class Queue:
    def __init__(self):
        self.prev = [None] * DIC_SIZE
        self.next_ = [None] * DIC_SIZE
        self.prev[HEAD] = HEAD
        self.next_[HEAD] = HEAD

今回作成するキューは辞書番号を管理すればいいので、双方向リストは配列を使って実装します。辞書番号を配列の添字に対応させ、配列 prev にひとつ前のデータ (辞書番号) を格納し、配列 next_ にひとつ後ろの辞書番号を格納します。トライの節 x をアクセスした場合は、prev[x] と next_[x] を双方向リストから削除して、それを最後尾へ追加することで LRU スキームを実現することができます。

それから、prev[0] と next_[0] を双方向リストのヘッダとして使います。LZW 符号の場合、256 種類の記号は必ず辞書に登録されるので、0 番目から 255 番目の辞書番号は LRU スキームを行う必要はありません。0 - 255 は双方向リストに登録しないので、prev[0], next_[0] をヘッダに使っても問題ないわけです。変数 HEAD (0) がヘッダを表していて、prev[HEAD] と next_[HEAD] を HEAD に初期化します。これでキューを空に初期化することができます。

次は、双方向リストの最後尾にデータを追加するメソッド insert() を作ります。次のリストを見てください。

リスト : キューの最後尾に追加

    def insert(self, x):
        last = self.prev[HEAD]
        self.prev[x] = last
        self.next_[x] = HEAD
        self.next_[last] = x
        self.prev[HEAD] = x

ヘッダから最後尾の辞書番号を取り出して last にセットします。この後ろに辞書番号 n を追加します。次の図を見てください。

Header の一つ後ろのセル L の後ろに N を追加します。L は Header の PREV から求めることができます。N の PREV に L をセットし、NEXT に Header をセットします。あとは、L の NEXT と Header の PREV を N に書き換えます。プログラムでは、prev[n] に last をセットし、next_[n] に HEAD をセットします。あとは、prev[HEAD] と next_[last] を n に書き換えます。これで、HEAD と last の間に n を挿入することができます。

次は、双方向リストからデータを削除するメソッド delete() を作ります。次のリストを見てください。

リスト : キューからデータを削除する

    def delete(self, x):
        p = self.prev[x]
        q = self.next_[x]
        self.next_[p] = q
        self.prev[q] = p

辞書番号 n 自身の削除はとても簡単です。次の図を見てください。

セル N の直前のセルを P とし直後のセルを Q とすると、P の NEXT を Q に、Q の PREV を P に書き換えれば N を削除することができます。プログラムでは、prev[n] から p を求め、next_[n] から q を求めます。あとは、next_[p] を q に、prev[q] を p に書き換えます。これで n を削除することができます。

最後に、双方向リストの先頭からデータを求めるメソッド traverse() を作ります。

リスト : 巡回

    def traverse(self):
        n = self.next_[HEAD]
        while n != HEAD:
            yield n
            n = self.next_[n]

traverse() はジェネレータで実装しました。ヘッダから next_ を順番にたどっていき、yield で辞書番号を返します。

●辞書の作成

次は辞書を表すクラス Dic を作成します。次のリストを見てください。

リスト : 辞書の定義

class Dic:
    def __init__(self):
        self.sym = [None] * DIC_SIZE
        self.parent = [None] * DIC_SIZE
        self.child = [0] * DIC_SIZE
        self.ht = {}
        for x in range(256): self.sym[x] = x
        self.num = 256

辞書は配列を使って実装します。配列の添字が辞書番号に対応します。sym は記号を格納し、parent は親の辞書番号を格納します。そして、child が子の個数を格納します。ht は親子関係を表すハッシュで、num は辞書番号を管理します。

ハッシュを使ってトライを実装すると、節 (辞書番号) が「葉」なのかすぐに判別することができません。そこで、子の個数を配列 child で管理します。たとえば、n に子を追加するときは child[n] を +1 します。n の子を削除するときは child[n] を -1 します。child[n] が 0 の場合、n は葉であることがすぐにわかります。

次は記号 c を持つ子を探すメソッド search() を作ります。

リスト : 記号 c を持つ子を探す

    def search(self, n, c):
        key = (n, c)
        if key in self.ht: return self.ht[key]
        return None

search() は n の子の中から記号 c を持つ子を探します。最初に、タプル (n, c) を変数 key にセットします。そして、key がハッシュ ht にある場合は、その値 ht[key] を返します。そうでなければ None を返します。探索にハッシュを使っているので、高速に動作することが期待できます。

次は子を挿入するメソッド insert() を作ります。

リスト : 記号 c を持つ子を挿入

    def insert(self, q, p, c):
        if self.num == DIC_SIZE:
            n = self.search_leaf(q)
            q.delete(n)
            self.delete(n)
        else:
            n = self.num
            self.num += 1
        self.sym[n] = c
        self.parent[n] = p
        self.child[n] = 0
        self.child[p] += 1
        self.ht[(p, c)] = n
        q.insert(n)

引数 q はキュー、p が親の辞書番号、c が記号です。num が DIC_SIZE と等しい場合は辞書に空きがないので、メソッド search_leaf() で最も使われていない「葉」をキューから探し出します。このとき、トライの途中にある節 (辞書番号) は、削除できないことに注意してください。

見つけた葉を n にセットし、q.delete() でキューから削除し、self.delete() でトライから削除します。辞書に空きがある場合は、num が子の辞書番号になります。

それから、sym[n] に記号 c をセットし、parent[n] に親 p をセットします。child[n] は 0 で初期化して、child[p] の値は +1 します。そして、ハッシュ ht[(p, c)] に n をセットします。これで、p と n の親子関係がリンクされます。最後に、キューの最後尾に n を追加します。

次は、キューの先頭から葉を探すメソッド search_leaf() と削除するメソッド delete() を作ります。

リスト : 葉の探索と削除

    # 葉を探す
    def search_leaf(self, q):
        for x in q.traverse():
            if self.child[x] == 0: return x
        return None

    # 削除
    def delete(self, n):
        p = self.parent[n]
        c = self.sym[n]
        del self.ht[(p, c)]
        self.child[p] -= 1

search_leaf() の引数 q はキューを表します。葉を探す処理は簡単です。キューの traverse() を使って、辞書番号 x を取り出します。child[x] が 0 ならば return で x を返します。見つからない場合は None を返していますが、必ず見つかるはずなので、raise でエラーを送出してもいいでしょう。

delete() の引数 n は削除する辞書番号です。まず、parent[n] から親を、sym[n] から記号を求めて、変数 p, c にセットします。そして、ハッシュ ht からキー (p, c) を削除し、child[p] を -1 します。これでトライから n を削除することができます。

●符号化のプログラム

次は符号化を行う関数 encode() を作ります。

リスト : LZT 符号の符号化

def encode(fin, fout):
    dic = Dic()
    que = Queue()
    p = fin.getc()
    while True:
        c = fin.getc()
        if c is None:
            encode1(fout, p)
            break
        q = dic.search(p, c)
        if q is None:
            encode1(fout, p)
            dic.insert(que, p, c)
            p = c
        else:
            que.delete(q)
            que.insert(q)
            p = q

引数 fin は入力ファイル (ByteIO のオブジェクト)、fout が出力ファイル (BitIO のオブジェクト) です。最初に、辞書 Dic のオブジェクトとキュー Queue のオブジェクトを生成して、変数 dic と que にセットします。次に、fin から 1 記号読み込んで変数 p にセットします。p がひとつ前の辞書番号を保持します。

次の while ループで符号化を行います。fin から 1 記号読み込んで変数 c にセットします。c が None の場合、p が最長一致系列になるので、encode1() で p を符号化して break でループを終了します。そうでなければ dic.search() を呼び出して、p に記号 c を持つ子がないか探索します。

q が None の場合、p が最長一致系列になるので、encode1() で p を符号化します。そして、dic.insert() で p に記号 c を持つ子を追加します。子 q が見つかった場合、q を que.delete() でキューから削除し、que.insert() で最後尾に追加します。これで LRU スキームを行うことができます。

●復号のプログラム

次は復号を行う関数 decode() を作ります。

リスト : LZT 符号の復号

def decode(fin, fout, size):
    que = Queue()
    dic = Dic()
    p = decode1(fin)
    c, i = dic.output(que, p, fout)
    size -= i
    while size > 0:
        q = decode1(fin)
        if dic.check_code(que, q):
            c, i = dic.output(que, q, fout)
            dic.insert(que, p, c)
        else:
            dic.insert(que, p, c)
            c, i = dic.output(que, q, fout)
        p = q
        size -= i

LZT 符号の場合、復号でも符号化と同じタイミングで LRU スキームを行う必要があります。このため、今回は復号の処理でも辞書 Dic を使うことにします。

最初に、辞書 Dic とキュー Queue のオブジェクトを生成して、変数 que と dic にセットします。次に、decode1() で辞書番号を復号して変数 p にセットし、Dic のメソッド output() で記号列に復号します。output() の返り値は LZW 符号と同じで、記号列の先頭文字と記号列の長さです。

それから while ループで復号処理を行います。ポイントは decode1() で復号した辞書番号 q が辞書に登録されているかチェックするところです。LZW 符号では簡単にチェックすることができましたが、LZT 符号では辞書番号を削除して再登録するときにもチェックが必要になります。この処理を Dic のメソッド check_code() で行います。

リスト : 辞書番号のチェック

    def check_code(self, q, n):
        if self.num == DIC_SIZE:
            return self.search_leaf(q) != n
        return n < self.num

まず、辞書が満杯かチェックします。満杯でなければ LZW 符号と同じく、n < self.num であれば、復号した n は辞書に登録されています。return で n < self.num の結果を返します。

辞書が満杯の場合は、最も使われていない葉を削除することになります。これから削除する葉と復号した n が同じ場合は、その n はまだ辞書に登録されていません。削除される葉は search_leaf() で求めることができます。この返り値と n が異なっていれば True を返します。そうでなければ False を返します。

最後に Dic のメソッド output() を作ります。

リスト : 記号列の出力

    def output(self, q, n, fout):
        if self.parent[n] is None:
            fout.putc(n)
            return n, 1
        else:
            m, i = self.output(q, self.parent[n], fout)
            fout.putc(self.sym[n])
            q.delete(n)
            q.insert(n)
            return m, i + 1

output() の引数 q がキューで、n が辞書番号、fout が出力ファイルです。注意する点は、符号化したときと同じ順番で節を双方向リストの最後尾へ移動することです。output() を再帰呼び出しして、記号 sym[n] を出力します。そのあとで、q.delete() で n を削除し、q.insert() で最後尾に n を追加します。これで LRU スキームを行うことができます。

●評価結果

それでは、実際にファイルを圧縮してみましょう。最初に The Canterbury Corpus をアーカイバ tar でまとめた canterbury.tar の結果を表に示します。

        表 : canterbury.tar の評価結果

                              辞書サイズ
 PROG    サイズ         8k        32k        128k
----------------------------------------------------
 LZW    2,821,120   2,410,308  2,647,698  1,196,012 
 LZT    2,821,120     897,810    901,337    946,024

LZT 符号は LZW 符号よりも高い圧縮率になりました。LZT 符号の LRU スキームは大きな効果を発揮していることがわかります。次は The Canterbury Corpus の評価結果を表に示します。

    表 : LZT 符号の評価結果 (辞書サイズ 8k)

  ファイル名      サイズ     LZW      LZT    符号化  復号
  ----------------------------------------------------------
  alice29.txt    152,089    68,448   64,193  0.490   0.551
  asyoulik.txt   125,179    59,085   56,937  0.405   0.462
  cp.html         24,603    12,150   11.111  0.079   0.087
  fields.c        11,150     5,760    4,810  0.037   0.041
  grammar.lsp      3,721     2,294    1,747  0.014   0.020
  kennedy.xls  1,029,744   339,542  274,242  2.528   2.720
  lcet10.txt     426,754   194,996  180,431  1.354   1.542
  plrabn12.txt   481,861   220,850  215,221  1.622   1.771
  ptt5           513,216    66,101   61,407  0.846   0.941
  sum             38,240    30,163   19,364  0.135   0.150
  xargs.1          4,227     2,916    2,249  0.017   0.019
  ----------------------------------------------------------
  合計         2,810,784 1,002,305  891,716  7.527   8.304

符号化と復号の単位 : 秒
実行環境 : Python 3.8.10, Ubuntu 20.04 LTS (WSL1), Intel Core i5-6200U 2.30GHz

辞書サイズが 8 k の場合、LZT 符号の圧縮率は LZW 符号よりも高くなりました。辞書が小さくでも LRU スキームの効果により、LZT 符号は高い圧縮率を達成できることがわかります。符号化と復号の処理時間は、LZW 符号よりも遅くなりました。LZT 符号は LRU スキームを行っているので、その分だけ LZW 符号よりも時間がかかるのはしかたがありません。

次は辞書サイズを増やして試してみました。結果は次のようになります。

    表 : LZT 符号の評価結果 (2)

  ファイル名      サイズ     8 k      32 k     64 k
  ---------------------------------------------------
  alice29.txt    152,089    64,193   61.097   60,513
  asyoulik.txt   125,179    56,937   54,117   53,206
  cp.html         24,603    11.111   10,877   10,877
  fields.c        11,150     4,810    4,810    4,810
  grammar.lsp      3,721     1,747    1,747    1,747
  kennedy.xls  1,029,744   274,242  303,277  316,931
  lcet10.txt     426,754   180,431  163,278  159,994
  plrabn12.txt   481,861   215,221  198,821  194,643
  ptt5           513,216    61,407   61,064   60,333
  sum             38,240    19,364   19,473   19,473
  xargs.1          4,227     2,249    2,249    2,249
  ---------------------------------------------------
  合計         2,810,784   891,716  880,810  884,776

辞書サイズを増やすとテキストファイルの圧縮率は向上しますが、kennedy.xls のように圧縮率が低下するファイルもあります。LZB 符号に比べると、辞書サイズを増やしても圧縮率はそれほど高くなりません。LZT 符号の場合、辞書サイズが小さいときは LRU スキームが効果的に働くのですが、辞書サイズが大きくなると LRU スキームの効果は少なくなるようです。

なお、実行時間の結果は M.Hiroi のコーディング、実行したマシン、プログラミング言語などの環境に大きく依存しています。また、これらの環境だけではなく、データの種類によっても実行時間はかなり左右されます。興味のある方は、いろいろなデータをご自分の環境で試してみてください。

●LZT 符号とハフマン符号の組み合わせ

ところで、LZSS 符号とハフマン符号を組み合わせた LZH 符号は、二段階目のハフマン符号の効果が大きく、高い圧縮率を達成することができました。それでは、LZT 符号とハフマン符号を組み合わせるとどうなるのでしょうか。実際に試してみましょう。

今回の方法は、辞書番号をγ符号のようにビット数とビット列に分け、ビット数をハフマン符号で符号化します。プログラムの説明は割愛いたしますので、詳細は プログラムリスト2 をお読みください。

実行結果は次のようになりました。

    表 : LZT 符号 + ハフマン符号の評価結果

  ファイル名      サイズ     8 k      32 k     64 k
  ---------------------------------------------------
  alice29.txt    152,089    64,311   62.210   62.292 
  asyoulik.txt   125,179    57,076   55,074   55,074 
  cp.html         24,603    11.369   11,369   11,369 
  fields.c        11,150     5,114    5,144    5,144 
  grammar.lsp      3,721     1,832    1,832    1,832 
  kennedy.xls  1,029,744   269,891  281,721  286,480 
  lcet10.txt     426,754   180,242  164,269  161,511 
  plrabn12.txt   481,861   215,001  199,219  195,599 
  ptt5           513,216    61,251   61,598   61,761 
  sum             38,240    19,383   20,533   20,533 
  xargs.1          4,227     2,330    2,330    2,330 
  ---------------------------------------------------
  合計         2,810,784   887,800  865,269  863,895 

kennedy.xls のように圧縮率が高くなるファイルもありますが、圧縮率が低下するファイルが多くなりました。LZT 符号の場合、ハフマン符号と単純に組み合わせるだけでは、高い圧縮率を達成するのは難しいようです。興味のある方はいろいろ試してみてください。

●参考文献

  1. 植松友彦, 『文書データ圧縮アルゴリズム入門』, CQ出版社, 1994
  2. 広井誠, 『LZ78 符号によるファイルの圧縮と改良 (前編)』, Interface 2006 年 11 月号, CQ出版社

●プログラムリスト1

#
# lzt.py : LZT coding + CBT coding
#
#          Copyright (C) 2007-2022 Makoto Hiroi
#
import argparse, os.path, time
from bitio import *

# 定数
DIC_BITS = 13
DIC_SIZE = 1 << DIC_BITS
HEAD = 0


# 双方向リストによるキュー
# 0 がヘッダ, 1 - 255 はダミー
class Queue:
    def __init__(self):
        self.prev = [None] * DIC_SIZE
        self.next_ = [None] * DIC_SIZE
        self.prev[HEAD] = HEAD
        self.next_[HEAD] = HEAD

    # 最後尾に追加
    def insert(self, x):
        last = self.prev[HEAD]
        self.prev[x] = last
        self.next_[x] = HEAD
        self.next_[last] = x
        self.prev[HEAD] = x

    # 削除
    def delete(self, x):
        p = self.prev[x]
        q = self.next_[x]
        self.next_[p] = q
        self.prev[q] = p

    # 巡回
    def traverse(self):
        n = self.next_[HEAD]
        while n != HEAD:
            yield n
            n = self.next_[n]

# 辞書
class Dic:
    def __init__(self):
        self.sym = [None] * DIC_SIZE
        self.parent = [None] * DIC_SIZE
        self.child = [0] * DIC_SIZE
        self.ht = {}
        for x in range(256): self.sym[x] = x
        self.num = 256

    # 探索
    def search(self, n, c):
        key = (n, c)
        if key in self.ht: return self.ht[key]
        return None

    # 葉を探す
    def search_leaf(self, q):
        for x in q.traverse():
            if self.child[x] == 0: return x
        return None

    # 削除
    def delete(self, n):
        p = self.parent[n]
        c = self.sym[n]
        del self.ht[(p, c)]
        self.child[p] -= 1

    # 挿入
    def insert(self, q, p, c):
        if self.num == DIC_SIZE:
            n = self.search_leaf(q)
            q.delete(n)
            self.delete(n)
        else:
            n = self.num
            self.num += 1
        self.sym[n] = c
        self.parent[n] = p
        self.child[n] = 0
        self.child[p] += 1
        self.ht[(p, c)] = n
        q.insert(n)

    # 辞書番号のチェック
    def check_code(self, q, n):
        if self.num == DIC_SIZE:
            return self.search_leaf(q) != n
        return n < self.num

    # 出力
    def output(self, q, n, fout):
        if self.parent[n] is None:
            fout.putc(n)
            return n, 1
        else:
            m, i = self.output(q, self.parent[n], fout)
            fout.putc(self.sym[n])
            q.delete(n)
            q.insert(n)
            return m, i + 1

# 符号語長
dic_bits = 9
code_count = 256

# 辞書番号の符号化
def encode1(fout, n):
    global dic_bits, code_count
    if dic_bits < DIC_BITS:
        fout.cbt_encode(n, code_count, dic_bits)
        code_count += 1
        if code_count > (1 << dic_bits) - 1: dic_bits += 1
    else:
        fout.putbits(DIC_BITS, n)

# 辞書番号の復号
def decode1(fin):
    global dic_bits, code_count
    if dic_bits < DIC_BITS:
        n = fin.cbt_decode(code_count, dic_bits)
        code_count += 1
        if code_count > (1 << dic_bits) - 1: dic_bits += 1
    else:
        n = fin.getbits(DIC_BITS)
    return n


# LZT 符号の符号化
def encode(fin, fout):
    dic = Dic()
    que = Queue()
    p = fin.getc()
    while True:
        c = fin.getc()
        if c is None:
            encode1(fout, p)
            break
        q = dic.search(p, c)
        if q is None:
            encode1(fout, p)
            dic.insert(que, p, c)
            p = c
        else:
            que.delete(q)
            que.insert(q)
            p = q


# LZT 符号の復号
def decode(fin, fout, size):
    que = Queue()
    dic = Dic()
    p = decode1(fin)
    c, i = dic.output(que, p, fout)
    size -= i
    while size > 0:
        q = decode1(fin)
        if dic.check_code(que, q):
            c, i = dic.output(que, q, fout)
            dic.insert(que, p, c)
        else:
            dic.insert(que, p, c)
            c, i = dic.output(que, q, fout)
        p = q
        size -= i

# 符号化
def encode_file(name1, name2):
    size = os.path.getsize(name1)
    with ByteIO(name1, ROPEN) as fin, BitIO(name2, WOPEN) as fout:
        fout.putbits(32, size)
        if size > 0: encode(fin, fout)

# 復号
def decode_file(name1, name2):
    with BitIO(name1, ROPEN) as fin, ByteIO(name2, WOPEN) as fout:
        size = fin.getbits(32)
        if size > 0: decode(fin, fout, size)

# オプション解析
parser = argparse.ArgumentParser(description='LZT符号')
parser.add_argument('name1', help='入力ファイル')
parser.add_argument('name2', help='出力ファイル')
parser.add_argument('-e', '--encode', action='store_true', help='符号化')
parser.add_argument('-d', '--decode', action='store_true', help='復号')
args = parser.parse_args()

# 実行
s = time.time()
if args.encode and not args.decode:
    encode_file(args.name1, args.name2)
elif args.decode:
    decode_file(args.name1, args.name2)
else:
    print('option error')
e = time.time()
print('{:.3f}'.format(e - s))

●プログラムリスト2

#
# lzth.py : LZT coding + Huffman coding
#
#           Copyright (C) 2007-2022 Makoto Hiroi
#
import argparse, os.path, time
from bitio import *
from pqueue import *

# 定数
DIC_BITS = 13
DIC_SIZE = 1 << DIC_BITS
HEAD = 0

HUFF_BITS = 14
HUFF_SIZE = 1 << HUFF_BITS
CODE_BITS = 5

##### Huffman coding #####

# ハフマン木の節
class Node:
    def __init__(self, code, count = 0, left = None, right = None):
        self.code = code
        self.count = count
        self.left = left
        self.right = right

    def __gt__(x, y): return x.count > y.count
    def __le__(x, y): return x.count <= y.count

# 符号の生成
def make_code(table, node, n, code):
    if node.code is not None:
        # 葉
        table[node.code] = (n, code)
    else:
        make_code(table, node.left, n + 1, code << 1)         # left  is 0
        make_code(table, node.right, n + 1, (code << 1) | 1)  # right is 1

# ハフマン木の出力
def write_tree(node, fout):
    if node.code is not None:
        # 葉
        fout.putbit(1)
        fout.putbits(CODE_BITS, node.code)
    else:
        fout.putbit(0)
        write_tree(node.left, fout)
        write_tree(node.right, fout)

# ハフマン木の読み込み
def read_tree(fin):
    if fin.getbit() == 1:
        # 葉
        node = Node(fin.getbits(CODE_BITS))
    else:
        node = Node(None)
        node.left = read_tree(fin)
        node.right = read_tree(fin)
    return node

# ハフマン木の生成
def make_tree(sym_table):
    q = PQueue()   # ヒープの生成
    for x in sym_table:
        if x.count > 0: q.push(x)
    while True:
        n1 = q.pop()
        if q.isEmpty(): return n1
        n2 = q.pop()
        q.push(Node(None, n1.count + n2.count, n1, n2))

# ビット数を求める
def get_bit_num(n):
    n1 = 0
    n2 = (n + 1) >> 1
    while n2 > 0:
        n1 += 1
        n2 >>= 1
    return n1

# ハフマン符号化
def huff_encode(buff, size, fout):
    count = [Node(x) for x in range(DIC_BITS + 1)]
    code = [None] * (DIC_BITS + 1)
    # 出現頻度表の作成
    for x in range(size):
        n = buff[x]
        n1 = get_bit_num(n)
        count[n1].count += 1
    # ハフマン木の生成と出力
    root = make_tree(count)
    make_code(code, root, 0, 0)
    fout.putbits(HUFF_BITS, size - 1)
    write_tree(root, fout)
    # 符号化
    for x in range(size):
        n = buff[x]
        n1 = get_bit_num(n)
        fout.putbits(*code[n1])
        if n1 > 0: fout.putbits(n1, n + 1)

# 記号の復号
def huff_decode_sub(node, fin):
    while node.code is None:
        if fin.getbit() == 0:
            node = node.left
        else:
            node = node.right
    return node.code

# ハフマン復号
def huff_decode(buff, fin):
    size = fin.getbits(HUFF_BITS) + 1
    root = read_tree(fin)
    for x in range(size):
        n = huff_decode_sub(root, fin)
        if n > 0:
            n = (1 << n) + fin.getbits(n) - 1
        buff[x] = n
    return size

##### LZT 符号 #####

# 双方向リストによるキュー
# 0 がヘッダ, 1 - 255 はダミー
class Queue:
    def __init__(self):
        self.prev = [None] * DIC_SIZE
        self.next_ = [None] * DIC_SIZE
        self.prev[HEAD] = HEAD
        self.next_[HEAD] = HEAD

    # 最後尾に追加
    def insert(self, x):
        last = self.prev[HEAD]
        self.prev[x] = last
        self.next_[x] = HEAD
        self.next_[last] = x
        self.prev[HEAD] = x

    # 削除
    def delete(self, x):
        p = self.prev[x]
        q = self.next_[x]
        self.next_[p] = q
        self.prev[q] = p

    # 巡回
    def traverse(self):
        n = self.next_[HEAD]
        while n != HEAD:
            yield n
            n = self.next_[n]

# 辞書
class Dic:
    def __init__(self):
        self.sym = [None] * DIC_SIZE
        self.parent = [None] * DIC_SIZE
        self.child = [0] * DIC_SIZE
        self.ht = {}
        for x in range(256): self.sym[x] = x
        self.num = 256

    # 探索
    def search(self, n, c):
        key = (n, c)
        if key in self.ht: return self.ht[key]
        return None

    # 葉を探す
    def search_leaf(self, q):
        for x in q.traverse():
            if self.child[x] == 0: return x
        return None

    # 削除
    def delete(self, n):
        p = self.parent[n]
        c = self.sym[n]
        del self.ht[(p, c)]
        self.child[p] -= 1

    # 挿入
    def insert(self, q, p, c):
        if self.num == DIC_SIZE:
            n = self.search_leaf(q)
            q.delete(n)
            self.delete(n)
        else:
            n = self.num
            self.num += 1
        self.sym[n] = c
        self.parent[n] = p
        self.child[n] = 0
        self.child[p] += 1
        self.ht[(p, c)] = n
        q.insert(n)

    # 辞書番号のチェック
    def check_code(self, q, n):
        if self.num == DIC_SIZE:
            return self.search_leaf(q) != n
        return n < self.num

    # 出力
    def output(self, q, n, fout):
        if self.parent[n] is None:
            fout.putc(n)
            return n, 1
        else:
            m, i = self.output(q, self.parent[n], fout)
            fout.putc(self.sym[n])
            q.delete(n)
            q.insert(n)
            return m, i + 1

# LZT 符号の符号化
def encode(fin, fout):
    huff_buff = [None] * HUFF_SIZE
    hcnt = 0
    dic = Dic()
    que = Queue()
    p = fin.getc()
    while True:
        c = fin.getc()
        if c is None:
            huff_buff[hcnt] = p
            hcnt += 1
            huff_encode(huff_buff, hcnt, fout)
            break
        q = dic.search(p, c)
        if q is None:
            huff_buff[hcnt] = p
            hcnt += 1
            if hcnt == HUFF_SIZE:
                huff_encode(huff_buff, hcnt, fout)
                hcnt = 0
            dic.insert(que, p, c)
            p = c
        else:
            que.delete(q)
            que.insert(q)
            p = q


# LZT 符号の復号
def decode(fin, fout, size):
    que = Queue()
    dic = Dic()
    huff_buff = [None] * HUFF_SIZE
    hsize = huff_decode(huff_buff, fin)
    hcnt = 1
    p = huff_buff[0]
    c, i = dic.output(que, p, fout)
    size -= i
    while size > 0:
        if hcnt == hsize:
            hsize = huff_decode(huff_buff, fin)
            hcnt = 0
        q = huff_buff[hcnt]
        hcnt += 1
        if dic.check_code(que, q):
            c, i = dic.output(que, q, fout)
            dic.insert(que, p, c)
        else:
            dic.insert(que, p, c)
            c, i = dic.output(que, q, fout)
        p = q
        size -= i

# 符号化
def encode_file(name1, name2):
    size = os.path.getsize(name1)
    with ByteIO(name1, ROPEN) as fin, BitIO(name2, WOPEN) as fout:
        fout.putbits(32, size)
        if size > 0: encode(fin, fout)

# 復号
def decode_file(name1, name2):
    with BitIO(name1, ROPEN) as fin, ByteIO(name2, WOPEN) as fout:
        size = fin.getbits(32)
        if size > 0: decode(fin, fout, size)

# 復号
def decode_file(name1, name2):
    with BitIO(name1, ROPEN) as fin, ByteIO(name2, WOPEN) as fout:
        size = fin.getbits(32)
        if size > 0: decode(fin, fout, size)

# オプション解析
parser = argparse.ArgumentParser(description='LZT+Huffman符号')
parser.add_argument('name1', help='入力ファイル')
parser.add_argument('name2', help='出力ファイル')
parser.add_argument('-e', '--encode', action='store_true', help='符号化')
parser.add_argument('-d', '--decode', action='store_true', help='復号')
args = parser.parse_args()

# 実行
s = time.time()
if args.encode and not args.decode:
    encode_file(args.name1, args.name2)
elif args.decode:
    decode_file(args.name1, args.name2)
else:
    print('option error')
e = time.time()
print('{:.3f}'.format(e - s))

初版 2007 年 6 月 2 日
改訂 2022 年 9 月 17 日

Copyright (C) 2007-2022 Makoto Hiroi
All rights reserved.

[ PrevPage | Python | NextPage ]