[Notes] [Git][BuildGrid/buildgrid][arber/91-get-tree] Start the implementation of the getTree method.



Title: GitLab

Arber Xhindoli pushed to branch arber/91-get-tree at BuildGrid / buildgrid

Commits:

6 changed files:

Changes:

  • buildgrid/client/cas.py
    ... ... @@ -24,7 +24,7 @@ from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_p
    24 24
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    25 25
     from buildgrid._protos.google.rpc import code_pb2
    
    26 26
     from buildgrid.settings import HASH
    
    27
    -from buildgrid.utils import merkle_tree_maker
    
    27
    +from buildgrid.utils import merkle_tree_maker, MAX_REQUEST_COUNT
    
    28 28
     
    
    29 29
     
    
    30 30
     # Maximum size for a queueable file:
    
    ... ... @@ -33,9 +33,6 @@ FILE_SIZE_THRESHOLD = 1 * 1024 * 1024
    33 33
     # Maximum size for a single gRPC request:
    
    34 34
     MAX_REQUEST_SIZE = 2 * 1024 * 1024
    
    35 35
     
    
    36
    -# Maximum number of elements per gRPC request:
    
    37
    -MAX_REQUEST_COUNT = 500
    
    38
    -
    
    39 36
     
    
    40 37
     class _CallCache:
    
    41 38
         """Per remote grpc.StatusCode.UNIMPLEMENTED call cache."""
    
    ... ... @@ -87,8 +84,10 @@ class Downloader:
    87 84
     
    
    88 85
             self.instance_name = instance
    
    89 86
     
    
    90
    -        self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    91
    -        self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    87
    +        self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(
    
    88
    +            self.channel)
    
    89
    +        self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(
    
    90
    +            self.channel)
    
    92 91
     
    
    93 92
             self.__file_requests = {}
    
    94 93
             self.__file_request_count = 0
    
    ... ... @@ -245,7 +244,8 @@ class Downloader:
    245 244
                 resource_name = '/'.join([self.instance_name, 'blobs',
    
    246 245
                                           digest.hash, str(digest.size_bytes)])
    
    247 246
             else:
    
    248
    -            resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    247
    +            resource_name = '/'.join(['blobs', digest.hash,
    
    248
    +                                      str(digest.size_bytes)])
    
    249 249
     
    
    250 250
             read_request = bytestream_pb2.ReadRequest()
    
    251 251
             read_request.resource_name = resource_name
    
    ... ... @@ -261,7 +261,8 @@ class Downloader:
    261 261
             except grpc.RpcError as e:
    
    262 262
                 status_code = e.code()
    
    263 263
                 if status_code == grpc.StatusCode.NOT_FOUND:
    
    264
    -                raise NotFoundError("Requested data does not exist on the remote.")
    
    264
    +                raise NotFoundError(
    
    265
    +                    "Requested data does not exist on the remote.")
    
    265 266
     
    
    266 267
                 else:
    
    267 268
                     assert False
    
    ... ... @@ -295,7 +296,8 @@ class Downloader:
    295 296
                 except grpc.RpcError as e:
    
    296 297
                     status_code = e.code()
    
    297 298
                     if status_code == grpc.StatusCode.UNIMPLEMENTED:
    
    298
    -                    _CallCache.mark_unimplemented(self.channel, 'BatchReadBlobs')
    
    299
    +                    _CallCache.mark_unimplemented(
    
    300
    +                        self.channel, 'BatchReadBlobs')
    
    299 301
     
    
    300 302
                     elif status_code == grpc.StatusCode.INVALID_ARGUMENT:
    
    301 303
                         read_blobs.clear()
    
    ... ... @@ -317,7 +319,8 @@ class Downloader:
    317 319
                 resource_name = '/'.join([self.instance_name, 'blobs',
    
    318 320
                                           digest.hash, str(digest.size_bytes)])
    
    319 321
             else:
    
    320
    -            resource_name = '/'.join(['blobs', digest.hash, str(digest.size_bytes)])
    
    322
    +            resource_name = '/'.join(['blobs', digest.hash,
    
    323
    +                                      str(digest.size_bytes)])
    
    321 324
     
    
    322 325
             read_request = bytestream_pb2.ReadRequest()
    
    323 326
             read_request.resource_name = resource_name
    
    ... ... @@ -391,10 +394,12 @@ class Downloader:
    391 394
                 except grpc.RpcError as e:
    
    392 395
                     status_code = e.code()
    
    393 396
                     if status_code == grpc.StatusCode.UNIMPLEMENTED:
    
    394
    -                    _CallCache.mark_unimplemented(self.channel, 'BatchUpdateBlobs')
    
    397
    +                    _CallCache.mark_unimplemented(
    
    398
    +                        self.channel, 'BatchUpdateBlobs')
    
    395 399
     
    
    396 400
                     elif status_code == grpc.StatusCode.NOT_FOUND:
    
    397
    -                    raise NotFoundError("Requested directory does not exist on the remote.")
    
    401
    +                    raise NotFoundError(
    
    402
    +                        "Requested directory does not exist on the remote.")
    
    398 403
     
    
    399 404
                     else:
    
    400 405
                         assert False
    
    ... ... @@ -422,7 +427,8 @@ class Downloader:
    422 427
                     directory = directories[directory_node.digest.hash]
    
    423 428
                 else:
    
    424 429
                     directory = remote_execution_pb2.Directory()
    
    425
    -                directory.ParseFromString(self._fetch_blob(directory_node.digest))
    
    430
    +                directory.ParseFromString(
    
    431
    +                    self._fetch_blob(directory_node.digest))
    
    426 432
     
    
    427 433
                 os.makedirs(directory_path, exist_ok=True)
    
    428 434
     
    
    ... ... @@ -484,8 +490,10 @@ class Uploader:
    484 490
             else:
    
    485 491
                 self.u_uid = str(uuid.uuid4())
    
    486 492
     
    
    487
    -        self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(self.channel)
    
    488
    -        self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(self.channel)
    
    493
    +        self.__bytestream_stub = bytestream_pb2_grpc.ByteStreamStub(
    
    494
    +            self.channel)
    
    495
    +        self.__cas_stub = remote_execution_pb2_grpc.ContentAddressableStorageStub(
    
    496
    +            self.channel)
    
    489 497
     
    
    490 498
             self.__requests = {}
    
    491 499
             self.__request_count = 0
    
    ... ... @@ -770,7 +778,8 @@ class Uploader:
    770 778
                     request.data = blob
    
    771 779
     
    
    772 780
                 try:
    
    773
    -                batch_response = self.__cas_stub.BatchUpdateBlobs(batch_request)
    
    781
    +                batch_response = self.__cas_stub.BatchUpdateBlobs(
    
    782
    +                    batch_request)
    
    774 783
                     for response in batch_response.responses:
    
    775 784
                         assert response.digest.hash in batch
    
    776 785
     
    
    ... ... @@ -783,7 +792,8 @@ class Uploader:
    783 792
                 except grpc.RpcError as e:
    
    784 793
                     status_code = e.code()
    
    785 794
                     if status_code == grpc.StatusCode.UNIMPLEMENTED:
    
    786
    -                    _CallCache.mark_unimplemented(self.channel, 'BatchUpdateBlobs')
    
    795
    +                    _CallCache.mark_unimplemented(
    
    796
    +                        self.channel, 'BatchUpdateBlobs')
    
    787 797
     
    
    788 798
                     elif status_code == grpc.StatusCode.INVALID_ARGUMENT:
    
    789 799
                         written_digests.clear()
    

  • buildgrid/server/cas/instance.py
    ... ... @@ -54,6 +54,19 @@ class ContentAddressableStorageInstance:
    54 54
     
    
    55 55
             return response
    
    56 56
     
    
    57
    +    def get_tree(self, request, directory_list, digest_list):
    
    58
    +        """
    
    59
    +        This function will start reading directories at request.root_digest.
    
    60
    +        It will push the directories, and their corresponding digests into the
    
    61
    +        directory_list and digest_list.
    
    62
    +
    
    63
    +        It will continue to do a level-order traversal until either: directory_list reaches the end,
    
    64
    +        or we have made request.page_size reads. If the latter case, it will return len(directory_list) - 1 so
    
    65
    +        subsequent calls can pick up where it left off.
    
    66
    +        Otherwise, returns None, meaning we have read the directory tree.
    
    67
    +        """
    
    68
    +        return None
    
    69
    +
    
    57 70
     
    
    58 71
     class ByteStreamInstance:
    
    59 72
     
    
    ... ... @@ -127,13 +140,15 @@ class ByteStreamInstance:
    127 140
     
    
    128 141
                 for request in requests:
    
    129 142
                     if finished:
    
    130
    -                    raise InvalidArgumentError("Write request sent after write finished")
    
    143
    +                    raise InvalidArgumentError(
    
    144
    +                        "Write request sent after write finished")
    
    131 145
     
    
    132 146
                     elif request.write_offset != bytes_written:
    
    133 147
                         raise InvalidArgumentError("Invalid write offset")
    
    134 148
     
    
    135 149
                     elif request.resource_name and request.resource_name != first_request.resource_name:
    
    136
    -                    raise InvalidArgumentError("Resource name changed mid-write")
    
    150
    +                    raise InvalidArgumentError(
    
    151
    +                        "Resource name changed mid-write")
    
    137 152
     
    
    138 153
                     finished = request.finish_write
    
    139 154
                     bytes_written += len(request.data)
    
    ... ... @@ -145,7 +160,8 @@ class ByteStreamInstance:
    145 160
     
    
    146 161
             # Check that the data matches the provided digest.
    
    147 162
             if bytes_written != digest.size_bytes or not finished:
    
    148
    -            raise NotImplementedError("Cannot close stream before finishing write")
    
    163
    +            raise NotImplementedError(
    
    164
    +                "Cannot close stream before finishing write")
    
    149 165
     
    
    150 166
             elif hash_.hexdigest() != digest.hash:
    
    151 167
                 raise InvalidArgumentError("Data does not match hash")
    

  • buildgrid/server/cas/service.py
    ... ... @@ -30,6 +30,7 @@ from buildgrid._exceptions import InvalidArgumentError, NotFoundError, OutOfRang
    30 30
     from buildgrid._protos.google.bytestream import bytestream_pb2, bytestream_pb2_grpc
    
    31 31
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    32 32
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2_grpc
    
    33
    +from buildgrid.utils import MAX_REQUEST_COUNT
    
    33 34
     
    
    34 35
     
    
    35 36
     class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressableStorageServicer):
    
    ... ... @@ -39,7 +40,8 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    39 40
     
    
    40 41
             self._instances = {}
    
    41 42
     
    
    42
    -        remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(self, server)
    
    43
    +        remote_execution_pb2_grpc.add_ContentAddressableStorageServicer_to_server(
    
    44
    +            self, server)
    
    43 45
     
    
    44 46
         def add_instance(self, name, instance):
    
    45 47
             self._instances[name] = instance
    
    ... ... @@ -49,7 +51,8 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    49 51
                 self.logger.debug("FindMissingBlobs request: [{}]".format(request))
    
    50 52
                 instance = self._get_instance(request.instance_name)
    
    51 53
                 response = instance.find_missing_blobs(request.blob_digests)
    
    52
    -            self.logger.debug("FindMissingBlobs response: [{}]".format(response))
    
    54
    +            self.logger.debug(
    
    55
    +                "FindMissingBlobs response: [{}]".format(response))
    
    53 56
                 return response
    
    54 57
     
    
    55 58
             except InvalidArgumentError as e:
    
    ... ... @@ -64,7 +67,8 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    64 67
                 self.logger.debug("BatchUpdateBlobs request: [{}]".format(request))
    
    65 68
                 instance = self._get_instance(request.instance_name)
    
    66 69
                 response = instance.batch_update_blobs(request.requests)
    
    67
    -            self.logger.debug("FindMissingBlobs response: [{}]".format(response))
    
    70
    +            self.logger.debug(
    
    71
    +                "FindMissingBlobs response: [{}]".format(response))
    
    68 72
                 return response
    
    69 73
     
    
    70 74
             except InvalidArgumentError as e:
    
    ... ... @@ -81,8 +85,63 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    81 85
             return remote_execution_pb2.BatchReadBlobsResponse()
    
    82 86
     
    
    83 87
         def GetTree(self, request, context):
    
    88
    +
    
    84 89
             context.set_code(grpc.StatusCode.UNIMPLEMENTED)
    
    85 90
             context.set_details('Method not implemented!')
    
    91
    +        return iter([remote_execution_pb2.GetTreeResponse()])
    
    92
    +
    
    93
    +        # Stores the directories as long as a page token is returned.
    
    94
    +        directories = []
    
    95
    +        # Stores the digests of those directories
    
    96
    +        digests = []
    
    97
    +
    
    98
    +        # if page_size is not set, set it to MAX_REQUEST_COUNT
    
    99
    +        if request.page_size == 0:
    
    100
    +            request.page_size = MAX_REQUEST_COUNT
    
    101
    +
    
    102
    +        """Set to 0, will be used to index into directory list,
    
    103
    +         and updated in instance.get_tree(only way this makes sense to me.)
    
    104
    +        """
    
    105
    +        request.page_token = 0
    
    106
    +
    
    107
    +        # start at index 1, to not return root
    
    108
    +        start_index = 1
    
    109
    +
    
    110
    +        try:
    
    111
    +            instance = self._get_instance(request.instance_name)
    
    112
    +            while True:
    
    113
    +                self.logger.debug("GetTree request: [{}]".format(request))
    
    114
    +                """
    
    115
    +                Returns next page_token once page_size directories is reached.
    
    116
    +                The page_token, is essentially an index into the directories/digests list.
    
    117
    +                """
    
    118
    +                page_token = instance.get_tree(
    
    119
    +                    request, directories, digests)
    
    120
    +
    
    121
    +                response = remote_execution_pb2.GetTreeResponse()
    
    122
    +                if not page_token:
    
    123
    +                    # get directories from last request to the end since no page_token
    
    124
    +                    response.directories = directories[start_index:]
    
    125
    +                    response.page_token = None
    
    126
    +                    # stop the generator no more directories
    
    127
    +                    return response
    
    128
    +                else:
    
    129
    +                    # return from last request, to current request directories
    
    130
    +                    response.directories = directories[start_index:page_token]
    
    131
    +                    response.page_token = str(page_token)
    
    132
    +                    yield response
    
    133
    +
    
    134
    +                # create new request using returned page token, update start_index
    
    135
    +                request = remote_execution_pb2.GetTreeRequest()
    
    136
    +                request.page_size = MAX_REQUEST_COUNT
    
    137
    +                request.page_token = page_token
    
    138
    +                request.root_digest = digests[page_token]
    
    139
    +                start_index = page_token
    
    140
    +
    
    141
    +        except InvalidArgumentError as e:
    
    142
    +            self.logger.error(e)
    
    143
    +            context.set_details(str(e))
    
    144
    +            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    86 145
     
    
    87 146
             return iter([remote_execution_pb2.GetTreeResponse()])
    
    88 147
     
    
    ... ... @@ -91,7 +150,8 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    91 150
                 return self._instances[instance_name]
    
    92 151
     
    
    93 152
             except KeyError:
    
    94
    -            raise InvalidArgumentError("Invalid instance name: [{}]".format(instance_name))
    
    153
    +            raise InvalidArgumentError(
    
    154
    +                "Invalid instance name: [{}]".format(instance_name))
    
    95 155
     
    
    96 156
     
    
    97 157
     class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    
    ... ... @@ -115,15 +175,18 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    115 175
                 # TODO: Decide on default instance name
    
    116 176
                 if path[0] == "blobs":
    
    117 177
                     if len(path) < 3 or not path[2].isdigit():
    
    118
    -                    raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
    
    178
    +                    raise InvalidArgumentError(
    
    179
    +                        "Invalid resource name: [{}]".format(request.resource_name))
    
    119 180
                     instance_name = ""
    
    120 181
     
    
    121 182
                 elif path[1] == "blobs":
    
    122 183
                     if len(path) < 4 or not path[3].isdigit():
    
    123
    -                    raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
    
    184
    +                    raise InvalidArgumentError(
    
    185
    +                        "Invalid resource name: [{}]".format(request.resource_name))
    
    124 186
     
    
    125 187
                 else:
    
    126
    -                raise InvalidArgumentError("Invalid resource name: [{}]".format(request.resource_name))
    
    188
    +                raise InvalidArgumentError(
    
    189
    +                    "Invalid resource name: [{}]".format(request.resource_name))
    
    127 190
     
    
    128 191
                 instance = self._get_instance(instance_name)
    
    129 192
                 yield from instance.read(path,
    
    ... ... @@ -154,7 +217,8 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    154 217
             try:
    
    155 218
                 requests, request_probe = tee(requests, 2)
    
    156 219
                 first_request = next(request_probe)
    
    157
    -            self.logger.debug("First write request: [{}]".format(first_request))
    
    220
    +            self.logger.debug(
    
    221
    +                "First write request: [{}]".format(first_request))
    
    158 222
     
    
    159 223
                 path = first_request.resource_name.split("/")
    
    160 224
     
    
    ... ... @@ -163,15 +227,18 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    163 227
                 # TODO: Sort out no instance name
    
    164 228
                 if path[0] == "uploads":
    
    165 229
                     if len(path) < 5 or path[2] != "blobs" or not path[4].isdigit():
    
    166
    -                    raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
    
    230
    +                    raise InvalidArgumentError(
    
    231
    +                        "Invalid resource name: [{}]".format(first_request.resource_name))
    
    167 232
                     instance_name = ""
    
    168 233
     
    
    169 234
                 elif path[1] == "uploads":
    
    170 235
                     if len(path) < 6 or path[3] != "blobs" or not path[5].isdigit():
    
    171
    -                    raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
    
    236
    +                    raise InvalidArgumentError(
    
    237
    +                        "Invalid resource name: [{}]".format(first_request.resource_name))
    
    172 238
     
    
    173 239
                 else:
    
    174
    -                raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
    
    240
    +                raise InvalidArgumentError(
    
    241
    +                    "Invalid resource name: [{}]".format(first_request.resource_name))
    
    175 242
     
    
    176 243
                 instance = self._get_instance(instance_name)
    
    177 244
                 response = instance.write(requests)
    
    ... ... @@ -206,4 +273,5 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    206 273
                 return self._instances[instance_name]
    
    207 274
     
    
    208 275
             except KeyError:
    
    209
    -            raise InvalidArgumentError("Invalid instance name: [{}]".format(instance_name))
    276
    +            raise InvalidArgumentError(
    
    277
    +                "Invalid instance name: [{}]".format(instance_name))

  • buildgrid/server/cas/storage/storage_abc.py
    ... ... @@ -22,7 +22,7 @@ The abstract base class for storage providers.
    22 22
     
    
    23 23
     import abc
    
    24 24
     
    
    25
    -from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
    
    25
    +from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest, Directory
    
    26 26
     from buildgrid._protos.google.rpc.status_pb2 import Status
    
    27 27
     from buildgrid._protos.google.rpc import code_pb2
    
    28 28
     
    
    ... ... @@ -92,7 +92,8 @@ class StorageABC(abc.ABC):
    92 92
                         write_session.write(data)
    
    93 93
                         self.commit_write(digest, write_session)
    
    94 94
                     except IOError as ex:
    
    95
    -                    result.append(Status(code=code_pb2.UNKNOWN, message=str(ex)))
    
    95
    +                    result.append(
    
    96
    +                        Status(code=code_pb2.UNKNOWN, message=str(ex)))
    
    96 97
                     else:
    
    97 98
                         result.append(Status(code=code_pb2.OK))
    
    98 99
             return result
    
    ... ... @@ -100,7 +101,8 @@ class StorageABC(abc.ABC):
    100 101
         def put_message(self, message):
    
    101 102
             """Store the given Protobuf message in CAS, returning its digest."""
    
    102 103
             message_blob = message.SerializeToString()
    
    103
    -        digest = Digest(hash=HASH(message_blob).hexdigest(), size_bytes=len(message_blob))
    
    104
    +        digest = Digest(hash=HASH(message_blob).hexdigest(),
    
    105
    +                        size_bytes=len(message_blob))
    
    104 106
             session = self.begin_write(digest)
    
    105 107
             session.write(message_blob)
    
    106 108
             self.commit_write(digest, session)
    

  • buildgrid/utils.py
    ... ... @@ -19,6 +19,9 @@ import os
    19 19
     from buildgrid.settings import HASH
    
    20 20
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    21 21
     
    
    22
    +# Maximum number of elements per gRPC request:
    
    23
    +MAX_REQUEST_COUNT = 500
    
    24
    +
    
    22 25
     
    
    23 26
     def create_digest(bytes_to_digest):
    
    24 27
         """Computes the :obj:`Digest` of a piece of data.
    

  • setup.cfg
    ... ... @@ -16,4 +16,4 @@ pep8ignore =
    16 16
         *_pb2_grpc.py ALL
    
    17 17
     filterwarnings =
    
    18 18
         ignore::DeprecationWarning
    
    19
    -    ignore::PendingDeprecationWarning
    \ No newline at end of file
    19
    +    ignore::PendingDeprecationWarning



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]