M.Hiroi's Home Page

Algorithms with Python

Prediction by Partial Matching (PPM) [1]

[ PrevPage | Python | NextPage ]

はじめに

圧縮ツールで用いられているアルゴリズムにはいろいろな方法がありますが、その中で ブロックソート (BlockSorting) は圧縮性能が優れている方法として注目を集めています。実際、ブロックソートを用いた圧縮ツール bzip2 は LHA, zip, gzip よりも高い圧縮率になります。このほかにも、高い圧縮率で注目を集めている方法に Prediction by Partial Matching (PPM) があります。今回は PPM について詳しく説明します。そして、実際にファイルを圧縮してみましょう。

●PPM の基礎知識

PPM は 1984 年に J. G. Cleary 氏と I. H. Witten 氏によって提案された圧縮アルゴリズムで、文脈から作られる統計的モデルによって次の文字の出現確率を予測する方法です。このように記号の出現確率を算出し、その確率に基づいて符号化を行う方法を「統計型の手法」といいます。

簡単にいえば高次の 有限文脈モデル を使って記号の出現確率を求め、算術符号またはレンジコーダを用いて記号を符号化します。このとき、PPM はまだ現れていない記号(未出現記号)をエスケープ (escape) 記号とし、このエスケープ記号を使って適切なモデルを選択するところが特徴です。

もう少し具体的に説明しましょう。PPM の基本的な考え方はそれほど難しくありません。たとえば、有限文脈モデルの次数を 5 次 (order-5) としましょう。記号 a を符号化する場合、まず order-5 の出現頻度表を調べます。ここで、order-5 に記号 a が存在すれば、それを算術符号 (またはレンジコーダ) で符号化します。もし、記号 a が存在しない場合は、エスケープ記号を符号化して order-4 の出現頻度表を調べます。つまり、次数を下げながら記号 a が出現しているモデルを探すわけです。

それから、PPM はあらかじめモデルの最高次数を決めておきますが、1995 年に J. G. Cleary 氏, W. J. Teahan 氏, I. H. Witten 氏が提案した PPM* という方法では、モデルの最高次数に制限がありません。ただし、PPM* のプログラムは PPM よりも複雑でとても難しくなります。PPM* を提案した J. G. Cleary 氏と W. J. Teahan 氏の論文 "Unbounded length contexts for PPM" によると、PPM の最高次数は 5 次 (order-5) で十分のようです。逆に、それ以上次数を増やしても圧縮率は向上しないようです。

一般に、統計型の手法は LZSS 符号や LZW 符号などの「辞書法」よりも圧縮性能が優れているといわれています。しかしながら、統計型の手法は処理速度が遅く、必要なメモリ量も多くなるため、これまではあまり実用的ではありませんでした。しかし、近年になってコンピュータの性能が著しく向上したことにより、統計的な手法を用いた圧縮ツールが実用化されるようになりました。

今回は基本的な PPM を使ってファイルを圧縮してみましょう。ただし、Python で PPM を実装する場合、処理速度はとても遅くなります。あらかじめ覚悟しておいてください。

それでは、具体的に PPM のアルゴリズムを説明します。PPM は高次の有限文脈モデルとエスケープ記号の使い方がポイントになります。特に、エスケープ記号に与える出現確率が圧縮率を左右する重要な要素になります。

●エスケープ確率の計算方法

PPM の場合、エスケープ記号に与える出現確率をエスケープ確率と呼びます。そして、エスケープ確率の計算方法を Method と呼びます。Method にはいくつかの方法が提案されていますが、PPM では Method C や Method D が経験的に良い性能を持つと言われています。

Method C : u / (n + u) 
Method D : (u / 2) / n
n : そのモデルで出現した記号の総数
u : そのモデルで出現したエスケープ記号の総数

レンジコーダで符号化する場合、Method C はとても簡単に実現できます。まず、記号の出現頻度表は、エスケープ記号を 1 とし他の記号は 0 に初期化します。たとえば、記号 a を符号化する場合、表に a がない (個数 0) 場合はエスケープ記号を符号化し、出現頻度表を更新します。このとき、a とエスケープ記号の個数を増やせば、通常のレンジコーダで Method C を実現することができます。

つまり、a を符号化したときは出現頻度表の a の個数を +1 して、エスケープ記号を符号化したときは a とエスケープ記号の個数を +1 するのです。これでエスケープ記号の出現確率は u / (n + u) になるので、あとは通常のレンジコーダで符号化すればいいわけです。とても簡単ですね。

Method D は Method C の改良版です。Method C でエスケープ記号を符号化したとき、記号 a とエスケープ記号の個数を +1 ずつしましたが、これを 0.5 ずつにして合わせて +1 とするのが Method D です。Method C では、エスケープ記号の個数を +1 しているため、記号の出現確率よりもエスケープ確率の割合が増大する場合がありますが、Method D ではこれを補正することができます。

Method D を簡単に実現するには、記号 a を符号化するときに記号の個数を +2 すればいいでしょう。これでエスケープ記号の割合が増大するのを補正することができます。あとは通常のレンジコーダで符号化すれば、Method D に相当する出現確率で符号化することができます。

●符号化のアルゴリズム

PPM の基本的な符号化アルゴリズムを疑似コードで表すと、次のようになります。

リスト:PPM の符号化 (疑似コード)

符号化():
    初期化処理
    while ファイルの終了まで:
        ファイルから 1 記号読み込む
        N = 5
        while N >= -1:
            直前の N 記号から N 次モデルの出現頻度表 freq を求める
            if freq で記号の個数が 0 である:
                エスケープ記号をレンジコーダで符号化
                freq の更新 (記号とエスケープ記号の個数を +1 する)
            else:
                記号をレンジコーダで符号化
                freq の更新 (Method C : 記号の個数を +1 する)
                            (Method D : 記号の個数を +2 する)
                break
            N -= 1
        直前の記号を更新する
    終了処理

有限文脈モデルの最高次数を 5 次とし、直前の N 個の記号から N 次のモデルの出現頻度表 freq を求めます。freq で記号の個数が 0 であれば、エスケープ記号を符号化して次のモデルを探します。たとえば、記号が a, b, c, d, e の順番で出現した場合、最初は a, b, c, d, e で 5 次のモデルを探します。次は b, c, d, e で 4 次のモデルを探します。このように、次数を減らして記号が出現しているモデルを探します。

N が 0 になったら order-0 のモデルを探します。これは直前に出現した文字とは無関係のモデル、つまり無記憶情報源モデルになります。ただし、このモデルでもエスケープ記号を使うことに注意してください。そして、N が -1 になったら order-(-1) のモデルを探します。このモデルも「無記憶情報源モデル」ですが、すべての記号が出現しているところが order-0 とは異なります。どの記号でも order-(-1) のモデルで必ず見つかるので、記号を符号化することができます。

ちなみに、order-0 をすべての記号が出現しているモデルと設定してもかまいません。最初、そのように設定したのですが、order-0 と order-(-1) を分けてみたところ、わずかですが圧縮率が向上しました。

PPM は有限文脈モデルの中から直前 N 個の記号と一致するモデルを探す処理が必要になります。この処理に適しているデータ構造やアルゴリズムはいくつかあると思いますが、今回はオーソドックスにトライ (trie) を使ってみましょう。トライについては拙作のページ トライとパトリシア を参考にしてください。

●update exclusion

ここで、出現頻度表 freq の更新に注目してください。記号を符号化したあと、freq の更新は行っていませんね。もし、4 次のモデルで記号を符号化したら、4 次未満のモデルの freq は更新されません。このように出現頻度表を更新するのは高次のモデルだけなのです。これを update exclusion といいます。

exclusion には「除外」という意味があり、低次のモデルを更新から除外することで、圧縮率を向上させることができます。本来ならば 5 次から -1 次までのモデルの出現頻度表を更新するところなのですが、実際に試してみると高次のモデルだけ更新した方が圧縮率は高くなります。また、低次のモデルを更新する必要がないので、実行速度の点でも有利になります。

●exclusion

PPM にはもうひとつ重要な方法があります。高次のモデルでエスケープ記号を符号化したあと、低次のモデルで記号を符号化するとき、高次のモデルで出現している記号を低次のモデルの出現頻度表から除外することができます。これを exclusion といいます。

実は、低次のモデルで符号化するとき、高次のモデルで出現した記号は必要ありません。復号処理から見た場合、エスケープ記号が復号されるということは、復号される記号はそのモデルに出現していない記号であることがわかります。これらの記号は復号処理に必要ありませんね。したがって、低次のモデルで高次モデルの出現記号を取り除いて符号化しても、復号処理で同じように高次モデルの出現記号を除外すれば、記号をきちんと復号することができるのです。

そして exclusion を行うことにより、符号化する記号に割り当てられる出現確率を増やすことができます。これが exclusion の真の目的です。PPM ではこの効果が絶大で、圧縮率を大幅に向上させることができます。簡単な例を示しましょう。記号 c を符号化することを考えてみます。

5 次と 4 次のモデルに c はないので、エスケープ記号が符号化されます。3 次のモデルには c があるので、このモデルで c が符号化されます。exclusion がない場合、確率は 1/2 * 2/4 * 1/6 = 1/24 になります。

exclusion がある場合、5 次のモデルではエスケープ記号の確率は同じですが、4 次のモデルでは a を除外できるので、エスケープ記号の確率は 2/3 となります。同様に 3 次のモデルで c を符号化するときは、5 次と 4 次で出現している a と b を除外できるので、確率は 1/4 になります。したがって、c の確率は全体で 1/2 * 2/3 * 1/4 = 1/12 になります。

情報量 (-log2確率) を計算すると、exclusion がない場合が 4.58 bit になり、exclusion がある場合が 3.58 bit になります。exclusion を行った方が 1 bit 少なくなりますね。それだけ短い符号語で記号を符号化することができるわけです。これが exclusion の効果です。

この exclusion と update exclusion は効果がとても高く、PPM では常套手段といえる方法です。

●出現頻度表と exclusion の処理

それではプログラムを作りましょう。最初にエスケープ記号付きの出現頻度表を作成します。次のリストを見てください。

リスト : ESC 記号付き出現頻度表

from rangecoder import *

# 定数
GR = 16
ESC = 256
CHAR_SIZE = 257

# ESC 記号付き出現頻度表
class FreqEsc:
    def __init__(self, inc, limit):
        self.sym = []
        self.inc = inc
        self.limit = limit
        self.count = [0] * CHAR_SIZE
        self.count[ESC] = 1
        self.count_group = [0] * (CHAR_SIZE // GR + 1)
        self.count_group[ESC // GR] = 1
        self.sum_ = 1

エスケープ記号を付け加えるので、クラス名は FreqEsc としました。記号の種類は CHAR_SIZE (257) で、ESC 記号の値は ESC (256) になります。inc は記号の増分値で、limit は累積度数の上限値です。出現頻度表 count と count_group は 0 に初期化しておいて、count[ESC] と count_group[ESC / GR] に 1 をセットします。count_group の説明は拙作のページ レンジコーダ [3] にある「適応型レンジコーダの高速化」をお読みください。出現した記号は配列 sym に格納します。これは exclusion の処理で使用します。

次は出現頻度表を更新するメソッド update() を作ります。

リスト : 出現頻度表の更新

    def update(self, c0, c1 = None):
        self.count[c0] += self.inc
        self.count_group[c0 // GR] += self.inc
        self.sum_ += self.inc
        if c1 is not None:
            self.count[c1] += self.inc
            self.count_group[c1 // GR] += self.inc
            self.sum_ += self.inc
        if self.sum_ >= self.limit:
            n = 0
            for x in range(len(self.count_group)):
                self.count_group[x] = 0
            for x in range(len(self.count)):
                if self.count[x] > 0:
                    self.count[x] = (self.count[x] >> 1) | 1
                    self.count_group[x // GR] += self.count[x]
                    n += self.count[x]
            self.sum_ = n

エスケープ記号を符号化したあと、記号とエスケープ記号の出現頻度を更新します。記号は引数 c0 に、エスケープ記号は c1 に渡します。記号を符号化した場合は、引数 c0 に記号を渡します。これで Method C に相当するエスケープ確率になります。出現頻度の合計値 sum_ が limit 以上になった場合、出現している記号の個数を半分にします。このとき、個数が 0 にならないように最下位ビットを 1 にしています。

なお、記号の増分値 inc に大きな値を設定すると、圧縮率が向上する場合があります。これはあとで試してみましょう。

次は記号を符号化するメソッド encode() を作ります。

リスト : 記号の符号化

    def encode(self, rc, c, exclusion):
        # 記号の累積度数を求める
        def cumul(c):
            n = 0
            for x in range(c // GR): n += count_group[x]
            for x in range((c // GR)*GR, c):
                if x not in exclusion: n += self.count[x]
            return n

        # exclusion の処理
        count_group = self.count_group[:]
        sum_ = self.sum_
        for x in exclusion:
            count_group[x // GR] -= self.count[x]
            sum_ -= self.count[x]
        #
        temp = rc.range_ // sum_
        rc.low += cumul(c) * temp
        rc.range_ = self.count[c] * temp
        rc.encode_normalize()

ポイントは exclusion の処理です。引数 exclusion には高次モデルで出現した記号が格納されています。今回は Python のセット (集合) を使います。最初に self.count_group と self.sum_ の値をコピーして局所変数 count_group と sum_ にセットします。そして、exclusion に格納されている記号の出現頻度を count_group と sum_ から引き算します。

内部関数 cumul() は count_group を使って記号の累積度数を求めます。count_group は exclusion に含まれている記号を除外してあるので、今までと同じ計算方法で大丈夫です。次に self.count の値を加算するとき、その記号が exclusion に含まれていれば、それを除外して計算します。これで exclusion した累積度数を求めることができます。あとは、sum_ と cumul の返り値を使ってレンジコーダで符号化すればいいわけです。

次は記号を復号するメソッド decode() を作ります。

リスト : 記号の復号

    def decode(self, rc, exclusion):
        # 記号の探索
        def search_code(value):
            n = 0
            for x in range(len(count_group)):
                if value < n + count_group[x]: break
                n += count_group[x]
            for c in range(x*GR, len(self.count)):
                if c not in exclusion:
                    if value < n + self.count[c]: break
                    n += self.count[c]
            return c, n

        # exclusion の処理
        count_group = self.count_group[:]
        sum_ = self.sum_
        for x in exclusion:
            count_group[x // GR] -= self.count[x]
            sum_ -= self.count[x]
        #
        temp = rc.range_ // sum_
        c, num = search_code(rc.low // temp)
        rc.low -= temp * num
        rc.range_ = temp * self.count[c]
        rc.decode_normalize()
        return c

exclusion の処理は encode() と同じです。内部関数 search_code() は、count_group を使って復号する記号を線形探索します。そして self.count を線形探索するときは、exclusion に含まれている記号を除外します。あとは今までの処理と同じです。

最後に order-(-1) 用の出現頻度表を作成します。

リスト : order-(-1) の出現頻度表

class Freq(FreqEsc):
    def __init__(self, inc, limit):
        self.inc = inc
        self.limit = limit
        self.count = [1] * (CHAR_SIZE - 1)
        self.count_group = [GR] * ((CHAR_SIZE - 1) // GR)
        self.sum_ = CHAR_SIZE - 1

出現頻度表 Freq は FreqEsc を継承しているので、FreqEsc のメソッドをそのまま利用することができます。エスケープ記号は必要ないので、出現頻度表の大きさは CHAR_SIZE - 1 になります。そして、count の各要素は 1 で初期化することに注意してください。これでどの記号でも符号化・復号することができます。

●トライの作成

次はトライを表すクラス Trie を作成します。次のリストを見てください。

リスト : トライの定義

class Trie:
    def __init__(self):
        self.sym = [None] * TRIE_SIZE
        self.freq = [None] * TRIE_SIZE
        self.ht = {}
        for x in range(256):
            self.sym[x] = x
            self.freq[x] = FreqEsc(INC, LIMIT)  # order-1
        self.num = 256

今回のプログラムは、トライの節のレベルと有限文脈モデルの次数を一致させることにします。たとえば、記号が a, b, c, d, e の順番で出現した場合、トライには e - d - c - b - a の順番で記号を挿入します。すると、ルートの次の節 (レベル 1) が order-1 に対応し、次の節 (レベル 2) が order-2 に、そして、記号 a を格納する節 (レベル 5) が order-5 に対応します。このようにすると、e - d - c - b - a を一度探索するだけで、oreder-1 から order-5 までの出現頻度表を求めることができます。

トライは配列を使って実装します。sym は記号を格納し、freq が出現頻度表を格納します。ht は親子関係を表すハッシュで、num は使用する節の番号を管理します。ハッシュを使ったトライの実装方法は拙作のページ LZT 符号 で詳しく説明しています。そちらをお読みくださいませ。そして、あらかじめ 0 - 255 に order-1 の出現頻度表をセットしておきます。増分値 INC は +4 で、累積度数の上限値は LIMIT (0x4000) としました。

次は記号 c を持つ子を探すメソッド search() を作ります。

リスト : 記号 c を持つ子を探す

    def search(self, n, c):
        key = (n, c)
        if key in self.ht: return self.ht[key]
        return None

search() は n の子の中から記号 c を持つ子を探します。最初に、タプル (n, c) を変数 key にセットします。そして、key がハッシュ ht にある場合は、その値 ht[key] を返します。そうでなければ None を返します。

次は出現頻度表を求めるメソッド get_freq() を作ります。

リスト : 出現頻度表の取得

    def get_freq(self, buff, prev_code):
        n = prev_code[0]
        buff.append(self.freq[n])       # order-1 をセット
        for x in range(1, MAX_ORDER):
            c = prev_code[x]
            m = self.search(n, c)
            if m is None:
                # Trie に追加する
                m = self.num
                if m < TRIE_SIZE:
                    freq = FreqEsc(INC, LIMIT)
                    self.num += 1
                    self.sym[m] = c
                    self.freq[m] = freq
                    self.ht[(n, c)] = m
                else:
                    # メモリを使い切った場合は
                    # ESC 記号だけを符号化する
                    freq = freq_e
            else:
                freq = self.freq[m]
            # 出現頻度表を追加する
            buff.append(freq)
            n = m

引数 buff は出現頻度表を格納する配列、prev_code は直前に出現した記号を格納する配列です。buff にはあらかじめ order-(-1), order-0 の出現頻度表がセットされていて、このあとトライをたどって order-1 から order-5 までの出現頻度表をセットします。

最初に order-1 の出現頻度表を求めて buff に追加します。次に、for ループで prev_code から記号 c を取り出してトライをたどります。節 n に記号 c を持つ子が存在しない場合、新しい出現頻度表を freq にセットして、新しい節を作ってトライに挿入します。

トライの節を使い切った場合、出現頻度表 freq_e を buff にセットします。freq_e はエスケープ記号の個数を 1 に、ほかの記号の個数は 0 に初期化されていて、出現頻度の更新は行いません。この場合、レンジコーダで符号化を行っても状態 (low と range の値) に変化はありません。つまり、符号化の処理をスキップしていることと同じになります。これでメモリを使い切っても、今までに作成した有限文脈モデルを使って符号化を続けることができます。

●符号化のプログラム

次は PPM で符号化を行う関数 encode() を作ります。

リスト : 符号化

##### 出現頻度表 #####
trie = Trie()                  # order-1 - MAX_ORDER
freq0 = FreqEsc(INC, LIMIT)    # order-0
freq_1 = Freq(INC, LIMIT)      # order-(-1)
freq_e = FreqEsc(0, LIMIT)     # ESC 記号を符号化・復号する

# PPM の符号化
def encode(fin, fout):
    rc = RangeCoder(fout, ENCODE)
    prev_code = [0] * MAX_ORDER
    exclusion = set()
    for x in read_file(fin):
        exclusion.clear()
        buff = [freq_1, freq0]
        trie.get_freq(buff, prev_code)
        for i in range(MAX_ORDER + 1, -1, -1):
            freq = buff[i]
            if freq.count[x] == 0:
                freq.encode(rc, ESC, exclusion)
                if freq is not freq_e:
                    freq.update(x, ESC)
                    exclusion.update(freq.sym)
                    freq.sym.append(x)
            else:
                freq.encode(rc, x, exclusion)
                freq.update(x)       # Method C
                # freq.update(x, x)  # Method D
                break
        #
        prev_code.pop()
        prev_code.insert(0, x)
    rc.finish()

最初に、RangeCoder と直前の記号列を格納する配列 prev_codeと exclusion 用のセットを初期化します。それから、ファイルから 1 記号読み込み PPM で符号化します。このとき、exclusion を clear で空にしておきます。そして、メソッド get_freq() でトライから出現頻度表を求めます。

次の for ループで、buff の後ろから出現頻度表を取り出して freq にセットし、記号 x を符号化します。最後尾の出現頻度表が order-5 で、先頭の出現頻度表が order-(-1) です。どんな記号でも先頭の出現頻度表で必ず符号化されます。

freq.count[x] が 0 の場合は、メソッド encode() でエスケープ記号 (ESC) を符号化します。freq が freq_e でなければ、メソッド update() で記号の出現頻度を更新し、freq に出現している記号を exclusion に追加します。この処理はセットのメソッド update() を呼び出すだけです。そして、新しい記号 x を freq.sym に追加します。

freq.count[x] が 0 でなければ、メソッド encode() で記号 x を符号化します。それから、freq.update(x) で記号の出現頻度を更新します。これでエスケープ確率は Method C になります。ここで freq.update(x, x) とするとエスケープ確率は Method D になります。あとは、prev_code を更新するだけです。

●復号のプログラム

最後に、復号を行う関数 decode() を作ります。

リスト : PPM による復号

def decode(fin, fout, size):
    rc = RangeCoder(fin, DECODE)
    prev_code = [0] * MAX_ORDER
    exclusion = set()
    for _ in range(size):
        exclusion.clear()
        buff = [freq_1, freq0]
        trie.get_freq(buff, prev_code)
        for i in range(MAX_ORDER + 1, -1, -1):
            freq = buff[i]
            c = freq.decode(rc, exclusion)
            if c == ESC:
                if freq is not freq_e:
                    exclusion.update(freq.sym)
            else:
                for x in range(MAX_ORDER + 1, i, -1):
                    if buff[x] is not freq_e:
                        buff[x].update(c, ESC)
                        buff[x].sym.append(c)
                freq.update(c)       # Method C
                # freq.update(c, c)  # Method D
                break
        #
        fout.putc(c)
        prev_code.pop()
        prev_code.insert(0, c)

関数 decode() のポイントは出現頻度表の更新処理です。記号が復号されないと、出現頻度表を更新できません。そこで、記号を復号したときに、高次モデルの出現頻度表をまとめて更新しています。

あとはとくに難しいところはないでしょう。詳細は プログラムリスト2 をお読みくださいませ。

●評価結果

それでは、実際に Canterbury Corpus で配布されているテストデータ The Canterbury Corpus を圧縮してみましょう。結果は次にようになりました。

      表 : PPM の評価結果 (節の個数 100,000, INC = +4)

  ファイル名      サイズ    PPM   節の個数     LHA     bzip2
  -----------------------------------------------------------
  alice29.txt    152,089   42,585   65,009   59,117   43,202
  asyoulik.txt   125,179   39,405   67,311   52,341   39,569
  cp.html         24,603    7,117   27,149    8,384    7,624
  fields.c        11,150    3,008    9,433    3,170    3,039
  grammar.lsp      3,721    1,141    4,659    1,271    1,283
  kennedy.xls  1,029,744  184,738  100,000  198,342  130,280
  lcet10.txt     426,754  106,399  100,000  159,558  107,706
  plrabn12.txt   481,861  143,284  100,000  210,045  145,577
  ptt5           513,216   51,339  100,000   52,305   49,759
  sum             38,240   13,044   41,518   13,993   12,909
  xargs.1          4,227    1,592    6,721    1,778    1,762
  -----------------------------------------------------------
  合計         2,810,784  593,652           760,304  542,710


            表 : PPM の実行時間
    (節の個数 100,000, INC = +4, 単位 秒)

                    Python3         PyPy3
  ファイル名     符号化  復号    符号化  復号
  ---------------------------------------------
  alice29.txt    2.697   3.027   1.233   1.194
  asyoulik.txt   2.573   2.809   1.206   1.161
  cp.html        0.619   0.685   0.506   0.460
  fields.c       0.229   0.253   0.338   0.339
  grammar.lsp    0.096   0.105   0.220   0.216
  kennedy.xls   16.268  17.762   3.603   3.743
  lcet10.txt     6.075   6.805   2.261   2.229
  plrabn12.txt   7.041   7.945   2.469   2.458
  ptt5           5.401   5.913   1.924   1.881
  sum            1.091   1.199   0.782   0.712
  xargs.1        0.141   0.150   0.252   0.245
  ---------------------------------------------
  合計          42.231  46.653  14.794  14.638

符号化と復号の単位 : 秒
実行環境 : Python 3.8.10, PyPy 7.3.1, Ubuntu 20.04 LTS (WSL1), Intel Core i5-6200U 2.30GHz

PPM の圧縮率は LHA を大きく上回り、bzip2 にせまる高い圧縮率になりました。PPM は優れたアルゴリズムだと思います。テキストファイルは bzip2 よりも高い圧縮率になりますが、kennedy.xls, ptt5, sum などのバイナリファイルの圧縮率は悪くなりました。PPM はバイナリファイルが苦手なアルゴリズムのようです。

実行速度は符号化と復号どちらでも大変遅くなりました。ブロックソートの符号化よりも遅いですね。PPM は記号を 1 つ符号化するのにレンジコーダを複数回実行するので、処理時間はどうしても遅くなります。また、トライの探索にも時間がかかります。order-1 と order-2 は単純な配列にして、order-3, 4, 5 をトライにする、または order-3, 4, 5 をハッシュにすると、もう少し速くなるかもしれません。

なお、実行時間の結果は M.Hiroi のコーディング、実行したマシン、プログラミング言語などの環境に大きく依存しています。また、これらの環境だけではなく、データの種類によっても実行時間はかなり左右されます。興味のある方は、いろいろなデータをご自分の環境で試してみてください。

次回は、このプログラムを改良して Method D や LRU スキームを試してみましょう。

●参考文献, URL

  1. 儘田真吾, 『PPM系列の符号化法について』, (リンク切れ)
  2. 広井誠, 『PPM によるファイルの圧縮 (前編)』, Interface 2005 年 3 月号, CQ出版社
  3. 広井誠, 『PPM によるファイルの圧縮 (後編)』, Interface 2005 年 4 月号, CQ出版社

●プログラムリスト1

#
# ppmfreq.py : PPM 用の出現頻度表
#
#              Copyright (C) 2007-2022 Makoto Hiroi
#
from rangecoder import *

# 定数
GR = 16
ESC = 256
CHAR_SIZE = 257

# ESC 記号付き出現頻度表
class FreqEsc:
    def __init__(self, inc, limit):
        self.sym = []
        self.inc = inc
        self.limit = limit
        self.count = [0] * CHAR_SIZE
        self.count[ESC] = 1
        self.count_group = [0] * (CHAR_SIZE // GR + 1)
        self.count_group[ESC // GR] = 1
        self.sum_ = 1

    # 出現頻度表の更新
    def update(self, c0, c1 = None):
        self.count[c0] += self.inc
        self.count_group[c0 // GR] += self.inc
        self.sum_ += self.inc
        if c1 is not None:
            self.count[c1] += self.inc
            self.count_group[c1 // GR] += self.inc
            self.sum_ += self.inc
        if self.sum_ >= self.limit:
            n = 0
            for x in range(len(self.count_group)):
                self.count_group[x] = 0
            for x in range(len(self.count)):
                if self.count[x] > 0:
                    self.count[x] = (self.count[x] >> 1) | 1
                    self.count_group[x // GR] += self.count[x]
                    n += self.count[x]
            self.sum_ = n

    # 符号化
    def encode(self, rc, c, exclusion):
        # 記号の累積度数を求める
        def cumul(c):
            n = 0
            for x in range(c // GR): n += count_group[x]
            for x in range((c // GR)*GR, c):
                if x not in exclusion: n += self.count[x]
            return n

        # exclusion の処理
        count_group = self.count_group[:]
        sum_ = self.sum_
        for x in exclusion:
            count_group[x // GR] -= self.count[x]
            sum_ -= self.count[x]
        #
        temp = rc.range_ // sum_
        rc.low += cumul(c) * temp
        rc.range_ = self.count[c] * temp
        rc.encode_normalize()

    # 復号
    def decode(self, rc, exclusion):
        # 記号の探索
        def search_code(value):
            n = 0
            for x in range(len(count_group)):
                if value < n + count_group[x]: break
                n += count_group[x]
            for c in range(x*GR, len(self.count)):
                if c not in exclusion:
                    if value < n + self.count[c]: break
                    n += self.count[c]
            return c, n

        # exclusion の処理
        count_group = self.count_group[:]
        sum_ = self.sum_
        for x in exclusion:
            count_group[x // GR] -= self.count[x]
            sum_ -= self.count[x]
        #
        temp = rc.range_ // sum_
        c, num = search_code(rc.low // temp)
        rc.low -= temp * num
        rc.range_ = temp * self.count[c]
        rc.decode_normalize()
        return c

# order-(-1) 用の出現頻度表
class Freq(FreqEsc):
    def __init__(self, inc, limit):
        self.inc = inc
        self.limit = limit
        self.count = [1] * (CHAR_SIZE - 1)
        self.count_group = [GR] * ((CHAR_SIZE - 1) // GR)
        self.sum_ = CHAR_SIZE - 1

●プログラムリスト2

#
# ppm.py : PPM のテスト
#
#          Copyright (C) 2007-2022 Makoto Hiroi
#
import argparse, os.path, time
from ppmfreq import *

# 定数
MAX_ORDER = 5
INC = 4
LIMIT = 0x4000
TRIE_SIZE = 100000

# 4 バイトの正整数値を出力
def write_number(num, fout):
    fout.putc((num >> 24) & 0xff)
    fout.putc((num >> 16) & 0xff)
    fout.putc((num >> 8) & 0xff)
    fout.putc(num & 0xff)

# 4 バイトの正整数値を入力
def read_number(fin):
    num = 0
    for _ in range(4):
        num = (num << 8) + fin.getc()
    return num

##### Trie #####

class Trie:
    def __init__(self):
        self.sym = [None] * TRIE_SIZE
        self.freq = [None] * TRIE_SIZE
        self.ht = {}
        for x in range(256):
            self.sym[x] = x
            self.freq[x] = FreqEsc(INC, LIMIT)  # order-1
        self.num = 256

    # 子の探索
    def search(self, n, c):
        key = (n, c)
        if key in self.ht: return self.ht[key]
        return None

    # 出現頻度表の取得
    def get_freq(self, buff, prev_code):
        n = prev_code[0]
        buff.append(self.freq[n])       # order-1 をセット
        for x in range(1, MAX_ORDER):
            c = prev_code[x]
            m = self.search(n, c)
            if m is None:
                # Trie に追加する
                m = self.num
                if m < TRIE_SIZE:
                    freq = FreqEsc(INC, LIMIT)
                    self.num += 1
                    self.sym[m] = c
                    self.freq[m] = freq
                    self.ht[(n, c)] = m
                else:
                    # メモリを使い切った場合は
                    # ESC 記号だけを符号化する
                    freq = freq_e
            else:
                freq = self.freq[m]
            # 出現頻度表を追加する
            buff.append(freq)
            n = m


##### 出現頻度表 #####
trie = Trie()                  # order-1 - MAX_ORDER
freq0 = FreqEsc(INC, LIMIT)    # order-0
freq_1 = Freq(INC, LIMIT)      # order-(-1)
freq_e = FreqEsc(0, LIMIT)     # ESC 記号を符号化・復号する

# ファイルの読み込み
def read_file(fin):
    while True:
        c = fin.getc()
        if c is None: break
        yield c

# PPM の符号化
def encode(fin, fout):
    rc = RangeCoder(fout, ENCODE)
    prev_code = [0] * MAX_ORDER
    exclusion = set()
    for x in read_file(fin):
        exclusion.clear()
        buff = [freq_1, freq0]
        trie.get_freq(buff, prev_code)
        for i in range(MAX_ORDER + 1, -1, -1):
            freq = buff[i]
            if freq.count[x] == 0:
                freq.encode(rc, ESC, exclusion)
                if freq is not freq_e:
                    freq.update(x, ESC)
                    exclusion.update(freq.sym)
                    freq.sym.append(x)
            else:
                freq.encode(rc, x, exclusion)
                freq.update(x)       # Method C
                # freq.update(x, x)  # Method D
                break
        #
        prev_code.pop()
        prev_code.insert(0, x)
    rc.finish()

# PPM の復号
def decode(fin, fout, size):
    rc = RangeCoder(fin, DECODE)
    prev_code = [0] * MAX_ORDER
    exclusion = set()
    for _ in range(size):
        exclusion.clear()
        buff = [freq_1, freq0]
        trie.get_freq(buff, prev_code)
        for i in range(MAX_ORDER + 1, -1, -1):
            freq = buff[i]
            c = freq.decode(rc, exclusion)
            if c == ESC:
                if freq is not freq_e:
                    exclusion.update(freq.sym)
            else:
                for x in range(MAX_ORDER + 1, i, -1):
                    if buff[x] is not freq_e:
                        buff[x].update(c, ESC)
                        buff[x].sym.append(c)
                freq.update(c)       # Method C
                # freq.update(c, c)  # Method D
                break
        #
        fout.putc(c)
        prev_code.pop()
        prev_code.insert(0, c)

# 符号化
def encode_file(name1, name2):
    size = os.path.getsize(name1)
    with ByteIO(name1, ROPEN) as fin, ByteIO(name2, WOPEN) as fout:
        write_number(size, fout)
        if size > 0: encode(fin, fout)

# 復号
def decode_file(name1, name2):
    with ByteIO(name1, ROPEN) as fin, ByteIO(name2, WOPEN) as fout:
        size = read_number(fin)
        if size > 0: decode(fin, fout, size)

# オプション解析
parser = argparse.ArgumentParser(description='PPM によるファイルの圧縮')
parser.add_argument('name1', help='入力ファイル')
parser.add_argument('name2', help='出力ファイル')
parser.add_argument('-e', '--encode', action='store_true', help='符号化')
parser.add_argument('-d', '--decode', action='store_true', help='復号')
args = parser.parse_args()

# 実行
s = time.time()
if args.encode and not args.decode:
    encode_file(args.name1, args.name2)
elif args.decode:
    decode_file(args.name1, args.name2)
else:
    print('option error')
e = time.time()
print('{:.3f}'.format(e - s))

初版 2007 年 8 月 26 日
改訂 2022 年 10 月 2 日

Copyright (C) 2007-2022 Makoto Hiroi
All rights reserved.

[ PrevPage | Python | NextPage ]