class Struct(object): def __init__(self, **attrs): for name, value in attrs.items(): setattr(self, name, value) ReferencePrefix = "`" ReferencePrefixCode = ord(ReferencePrefix) ReferenceIntBase = 96 ReferenceIntFloorCode = ord(" ") ReferenceIntCeilCode = ReferenceIntFloorCode + ReferenceIntBase - 1 MaxStringDistance = ReferenceIntBase ** 2 - 1 MinStringLength = 5 MaxStringLength = ReferenceIntBase ** 1 - 1 + MinStringLength MaxWindowLength = MaxStringDistance + MinStringLength; def encodeReferenceInt(value, width): if value >= 0 and value < (ReferenceIntBase ** width - 1): encoded = "" while value > 0: encoded = chr((value % ReferenceIntBase) + ReferenceIntFloorCode) + encoded value = int(value / ReferenceIntBase) missingLength = width - len(encoded) for i in range(missingLength): encoded = chr(ReferenceIntFloorCode) + encoded return encoded else: raise Exception("Reference int out of range: %d (width = %d)" % (value, width)) def encodeReferenceLength(length): return encodeReferenceInt(length - MinStringLength, 1) def decodeReferenceInt(data, width): value = 0 for i in range(width): value *= ReferenceIntBase charCode = ord(data[i]) if charCode >= ReferenceIntFloorCode and charCode <= ReferenceIntCeilCode: value += charCode - ReferenceIntFloorCode else: raise Exception("Invalid char code in reference int: %d" % charCode) return value def decodeReferenceLength(data): return decodeReferenceInt(data, 1) + MinStringLength def compress(data, windowLength): if windowLength > MaxWindowLength: raise Exception("Window length too large") compressed = "" pos = 0 lastPos = len(data) - MinStringLength while pos < lastPos: searchStart = max(pos - windowLength, 0) matchLength = MinStringLength foundMatch = False bestMatch = Struct(distance = MaxStringDistance, length = 0) newCompressed = None while (searchStart + matchLength) < pos: isValidMatch = ( data[searchStart:searchStart + matchLength] == data[pos:pos + matchLength] and matchLength < MaxStringLength ) if isValidMatch: matchLength += 1 foundMatch = True else: realMatchLength = matchLength - 1 if foundMatch and realMatchLength > bestMatch.length: bestMatch.distance = pos - searchStart - realMatchLength bestMatch.length = realMatchLength matchLength = MinStringLength searchStart += 1 foundMatch = False if bestMatch.length: newCompressed = ( ReferencePrefix + encodeReferenceInt(bestMatch.distance, 2) + encodeReferenceLength(bestMatch.length) ) pos += bestMatch.length else: if data[pos] != ReferencePrefix: newCompressed = data[pos] else: newCompressed = ReferencePrefix + ReferencePrefix pos += 1 compressed += newCompressed return compressed + data[pos:].replace("`", "``") def decompress(data): decompressed = "" pos = 0 while pos < len(data): currentChar = data[pos]; if currentChar != ReferencePrefix: decompressed += currentChar pos += 1 else: nextChar = data[pos + 1] if nextChar != ReferencePrefix: distance = decodeReferenceInt(data[pos + 1:(pos + 1) + 2], 2) length = decodeReferenceLength(data[pos + 3]) decompressed += decompressed[-distance - length:distance - 1] pos += MinStringLength - 1 else: decompressed += ReferencePrefix pos += 2 return decompressed if __name__ == "__main__": import optparse, sys, time parser = optparse.OptionParser( usage = "%prog [OPTIONS] ( compress WINDOW-LENGTH | decompress )", description = "Compress or decompress text using the LZ77 algorithm" ) parser.add_option( "-e", "--encoding", dest = "encoding", default = "utf-8", help = "use the specified encoding both for input and output [default = %default]" ) parser.add_option( "-s", "--statistics", dest = "stats", action = "store_true", default = False, help = "output statistics about what was done instead of the common output" ) options, args = parser.parse_args() if len(args) < 1 or len(args) > 2: parser.print_help() sys.exit(1) action = args[0] if action == "compress": try: windowLength = int(args[1]) assert 0 < windowLength < MaxWindowLength except: parser.print_usage() print "WINDOW-LENGTH must be an integer in the range (0, %d)" % MaxWindowLength sys.exit(1) elif action != "decompress": parser.print_usage() sys.exit(1) original = sys.stdin.read().decode(options.encoding) start = time.time() if action == "compress": result = compress(original, windowLength) else: result = decompress(original) end = time.time() if not options.stats: sys.stdout.write(result.encode(options.encoding)) else: ratio = (1 - 1.0 * min(len(original), len(result)) / max(len(original), len(result))) * 100 elapsed = end - start if elapsed: throughput = " (%.2f characters/sec)" % (1.0 * len(original) / elapsed) else: throughput = "" print action.capitalize() + "ed in %.2f milliseconds" % (elapsed) + throughput, if action == "compress": print "with window length of %d" % windowLength else: print print "Original text size was %d characters" % len(original) print "Result text size is %d characters (%s ratio of %.2f%%)" % (len(result), action, ratio)