# original (C) 2023 Tekky, MIT licensed. https://pypi.org/project/python-protobuf from enum import IntEnum, unique @unique class ProtoFieldType(IntEnum): VARINT = 0 INT64 = 1 STRING = 2 GROUPSTART = 3 GROUPEND = 4 INT32 = 5 ERROR1 = 6 ERROR2 = 7 class ProtoField: def __init__(self, idx, type, val): self.idx = idx self.type = type self.val = val class ProtoWriter: def __init__(self): self.data = bytearray() def write0(self, byte): self.data.append(byte & 0xFF) def write(self, bytes): self.data.extend(bytes) def writeInt32(self, int32): bs = int32.to_bytes(4, byteorder="little", signed=False) self.write(bs) def writeInt64(self, int64): bs = int64.to_bytes(8, byteorder="little", signed=False) self.write(bs) def writeVarint(self, vint): vint = vint & 0xFFFFFFFF while vint > 0x80: self.write0((vint & 0x7F) | 0x80) vint >>= 7 self.write0(vint & 0x7F) def writeString(self, bytes): self.writeVarint(len(bytes)) self.write(bytes) def toBytes(self): return bytes(self.data) class ProtoBuf: def __init__(self, data=None): self.fields = [] if data != None: if type(data) != dict: raise ValueError("unsupport type(%s) to protobuf" % (type(data))) self.__parseDict(data) def toBuf(self): writer = ProtoWriter() for field in self.fields: key = (field.idx << 3) | (field.type & 7) writer.writeVarint(key) if field.type == ProtoFieldType.INT32: writer.writeInt32(field.val) elif field.type == ProtoFieldType.INT64: writer.writeInt64(field.val) elif field.type == ProtoFieldType.VARINT: writer.writeVarint(field.val) elif field.type == ProtoFieldType.STRING: writer.writeString(field.val) else: raise ValueError( "encode to protobuf error, unexpected field type: %s" % (field.type.name) ) return writer.toBytes() def put(self, field: ProtoField): self.fields.append(field) def putInt32(self, idx, int32): self.put(ProtoField(idx, ProtoFieldType.INT32, int32)) def putInt64(self, idx, int64): self.put(ProtoField(idx, ProtoFieldType.INT64, int64)) def putVarint(self, idx, vint): self.put(ProtoField(idx, ProtoFieldType.VARINT, vint)) def putBytes(self, idx, data): self.put(ProtoField(idx, ProtoFieldType.STRING, data)) def putUtf8(self, idx, data): self.put(ProtoField(idx, ProtoFieldType.STRING, data.encode("utf-8"))) def putProtoBuf(self, idx, data): self.put(ProtoField(idx, ProtoFieldType.STRING, data.toBuf())) def __parseDict(self, data): """ Convert dict object to ProtoBuf object """ for k, v in data.items(): if isinstance(v, int): self.putVarint(k, v) elif isinstance(v, str): self.putUtf8(k, v) elif isinstance(v, bytes): self.putBytes(k, v) elif isinstance(v, dict): self.putProtoBuf(k, ProtoBuf(v)) elif v is None: continue else: raise ValueError("unsupport type(%s) to protobuf" % (type(v)))