]>
git.gir.st - subscriptionfeed.git/blob - app/browse/pyproto.py
1 # original (C) 2023 Tekky, MIT licensed. https://pypi.org/project/python-protobuf
2 from enum
import IntEnum
, unique
6 class ProtoFieldType(IntEnum
):
18 def __init__(self
, idx
, type, val
):
26 self
.data
= bytearray()
28 def write0(self
, byte
):
29 self
.data
.append(byte
& 0xFF)
31 def write(self
, bytes
):
32 self
.data
.extend(bytes
)
34 def writeInt32(self
, int32
):
35 bs
= int32
.to_bytes(4, byteorder
="little", signed
=False)
38 def writeInt64(self
, int64
):
39 bs
= int64
.to_bytes(8, byteorder
="little", signed
=False)
42 def writeVarint(self
, vint
):
43 vint
= vint
& 0xFFFFFFFF
45 self
.write0((vint
& 0x7F) |
0x80)
47 self
.write0(vint
& 0x7F)
49 def writeString(self
, bytes
):
50 self
.writeVarint(len(bytes
))
54 return bytes(self
.data
)
58 def __init__(self
, data
=None):
61 if type(data
) != dict:
62 raise ValueError("unsupport type(%s) to protobuf" % (type(data
)))
64 self
.__parseDict
(data
)
67 writer
= ProtoWriter()
68 for field
in self
.fields
:
69 key
= (field
.idx
<< 3) |
(field
.type & 7)
70 writer
.writeVarint(key
)
71 if field
.type == ProtoFieldType
.INT32
:
72 writer
.writeInt32(field
.val
)
73 elif field
.type == ProtoFieldType
.INT64
:
74 writer
.writeInt64(field
.val
)
75 elif field
.type == ProtoFieldType
.VARINT
:
76 writer
.writeVarint(field
.val
)
77 elif field
.type == ProtoFieldType
.STRING
:
78 writer
.writeString(field
.val
)
81 "encode to protobuf error, unexpected field type: %s"
84 return writer
.toBytes()
86 def put(self
, field
: ProtoField
):
87 self
.fields
.append(field
)
89 def putInt32(self
, idx
, int32
):
90 self
.put(ProtoField(idx
, ProtoFieldType
.INT32
, int32
))
92 def putInt64(self
, idx
, int64
):
93 self
.put(ProtoField(idx
, ProtoFieldType
.INT64
, int64
))
95 def putVarint(self
, idx
, vint
):
96 self
.put(ProtoField(idx
, ProtoFieldType
.VARINT
, vint
))
98 def putBytes(self
, idx
, data
):
99 self
.put(ProtoField(idx
, ProtoFieldType
.STRING
, data
))
101 def putUtf8(self
, idx
, data
):
102 self
.put(ProtoField(idx
, ProtoFieldType
.STRING
, data
.encode("utf-8")))
104 def putProtoBuf(self
, idx
, data
):
105 self
.put(ProtoField(idx
, ProtoFieldType
.STRING
, data
.toBuf()))
107 def __parseDict(self
, data
):
109 Convert dict object to ProtoBuf object
111 for k
, v
in data
.items():
112 if isinstance(v
, int):
114 elif isinstance(v
, str):
116 elif isinstance(v
, bytes
):
118 elif isinstance(v
, dict):
119 self
.putProtoBuf(k
, ProtoBuf(v
))
123 raise ValueError("unsupport type(%s) to protobuf" % (type(v
)))