[Notes] [Git][BuildGrid/buildgrid][mablanch/connection-drop-simulation] 11 commits: Only add status of lease if result not cached.



Title: GitLab

Martin Blanchard pushed to branch mablanch/connection-drop-simulation at BuildGrid / buildgrid

Commits:

14 changed files:

Changes:

  • buildgrid/_app/bots/buildbox.py
    ... ... @@ -46,11 +46,6 @@ def work_buildbox(context, lease):
    46 46
             command = downloader.get_message(action.command_digest,
    
    47 47
                                              remote_execution_pb2.Command())
    
    48 48
     
    
    49
    -    environment = {}
    
    50
    -    for variable in command.environment_variables:
    
    51
    -        if variable.name not in ['PWD']:
    
    52
    -            environment[variable.name] = variable.value
    
    53
    -
    
    54 49
         if command.working_directory:
    
    55 50
             working_directory = command.working_directory
    
    56 51
         else:
    
    ... ... @@ -82,6 +77,12 @@ def work_buildbox(context, lease):
    82 77
                 if context.cas_server_cert:
    
    83 78
                     command_line.append('--server-cert={}'.format(context.cas_server_cert))
    
    84 79
     
    
    80
    +            command_line.append('--clearenv')
    
    81
    +            for variable in command.environment_variables:
    
    82
    +                command_line.append('--setenv')
    
    83
    +                command_line.append(variable.name)
    
    84
    +                command_line.append(variable.value)
    
    85
    +
    
    85 86
                 command_line.append(context.fuse_dir)
    
    86 87
                 command_line.extend(command.arguments)
    
    87 88
     
    

  • buildgrid/_app/commands/cmd_bot.py
    ... ... @@ -52,16 +52,19 @@ from ..cli import pass_context
    52 52
                   help="Public CAS client certificate for TLS (PEM-encoded)")
    
    53 53
     @click.option('--cas-server-cert', type=click.Path(exists=True, dir_okay=False), default=None,
    
    54 54
                   help="Public CAS server certificate for TLS (PEM-encoded)")
    
    55
    +@click.option('--update-period', type=click.FLOAT, default=0.5, show_default=True,
    
    56
    +              help="Time period for bot updates to the server in seconds.")
    
    55 57
     @click.option('--parent', type=click.STRING, default='main', show_default=True,
    
    56 58
                   help="Targeted farm resource.")
    
    57 59
     @pass_context
    
    58
    -def cli(context, parent, remote, client_key, client_cert, server_cert,
    
    60
    +def cli(context, parent, update_period, remote, client_key, client_cert, server_cert,
    
    59 61
             remote_cas, cas_client_key, cas_client_cert, cas_server_cert):
    
    60 62
         # Setup the remote execution server channel:
    
    61 63
         url = urlparse(remote)
    
    62 64
     
    
    63 65
         context.remote = '{}:{}'.format(url.hostname, url.port or 50051)
    
    64 66
         context.remote_url = remote
    
    67
    +    context.update_period = update_period
    
    65 68
         context.parent = parent
    
    66 69
     
    
    67 70
         if url.scheme == 'http':
    
    ... ... @@ -138,7 +141,7 @@ def run_dummy(context):
    138 141
         Creates a session, accepts leases, does fake work and updates the server.
    
    139 142
         """
    
    140 143
         try:
    
    141
    -        b = bot.Bot(context.bot_session)
    
    144
    +        b = bot.Bot(context.bot_session, context.update_period)
    
    142 145
             b.session(dummy.work_dummy,
    
    143 146
                       context)
    
    144 147
         except KeyboardInterrupt:
    
    ... ... @@ -153,7 +156,7 @@ def run_host_tools(context):
    153 156
         result back to CAS.
    
    154 157
         """
    
    155 158
         try:
    
    156
    -        b = bot.Bot(context.bot_session)
    
    159
    +        b = bot.Bot(context.bot_session, context.update_period)
    
    157 160
             b.session(host.work_host_tools,
    
    158 161
                       context)
    
    159 162
         except KeyboardInterrupt:
    
    ... ... @@ -174,7 +177,7 @@ def run_buildbox(context, local_cas, fuse_dir):
    174 177
         context.fuse_dir = fuse_dir
    
    175 178
     
    
    176 179
         try:
    
    177
    -        b = bot.Bot(context.bot_session)
    
    180
    +        b = bot.Bot(context.bot_session, context.update_period)
    
    178 181
             b.session(buildbox.work_buildbox,
    
    179 182
                       context)
    
    180 183
         except KeyboardInterrupt:
    

  • buildgrid/_app/settings/cas.yml
    1 1
     server:
    
    2 2
       - !channel
    
    3
    -    port: 50051
    
    3
    +    port: 50052
    
    4 4
         insecure_mode: true
    
    5 5
     #    credentials:
    
    6 6
     #      tls-server-key: null
    

  • buildgrid/client/cas.py
    ... ... @@ -241,7 +241,7 @@ class Downloader:
    241 241
             """Fetches a blob using ByteStream.Read()"""
    
    242 242
             read_blob = bytearray()
    
    243 243
     
    
    244
    -        if self.instance_name is not None:
    
    244
    +        if self.instance_name:
    
    245 245
                 resource_name = '/'.join([self.instance_name, 'blobs',
    
    246 246
                                           digest.hash, str(digest.size_bytes)])
    
    247 247
             else:
    
    ... ... @@ -313,7 +313,7 @@ class Downloader:
    313 313
     
    
    314 314
         def _fetch_file(self, digest, file_path):
    
    315 315
             """Fetches a file using ByteStream.Read()"""
    
    316
    -        if self.instance_name is not None:
    
    316
    +        if self.instance_name:
    
    317 317
                 resource_name = '/'.join([self.instance_name, 'blobs',
    
    318 318
                                           digest.hash, str(digest.size_bytes)])
    
    319 319
             else:
    
    ... ... @@ -699,7 +699,7 @@ class Uploader:
    699 699
             else:
    
    700 700
                 blob_digest.hash = HASH(blob).hexdigest()
    
    701 701
                 blob_digest.size_bytes = len(blob)
    
    702
    -        if self.instance_name is not None:
    
    702
    +        if self.instance_name:
    
    703 703
                 resource_name = '/'.join([self.instance_name, 'uploads', self.u_uid, 'blobs',
    
    704 704
                                           blob_digest.hash, str(blob_digest.size_bytes)])
    
    705 705
             else:
    

  • buildgrid/server/actioncache/service.py
    ... ... @@ -52,7 +52,7 @@ class ActionCacheService(remote_execution_pb2_grpc.ActionCacheServicer):
    52 52
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    53 53
     
    
    54 54
             except NotFoundError as e:
    
    55
    -            self.logger.info(e)
    
    55
    +            self.logger.debug(e)
    
    56 56
                 context.set_code(grpc.StatusCode.NOT_FOUND)
    
    57 57
     
    
    58 58
             return remote_execution_pb2.ActionResult()
    

  • buildgrid/server/bots/instance.py
    ... ... @@ -66,7 +66,9 @@ class BotsInterface:
    66 66
             self._bot_sessions[name] = bot_session
    
    67 67
             self.logger.info("Created bot session name=[{}] with bot_id=[{}]".format(name, bot_id))
    
    68 68
     
    
    69
    -        for lease in self._scheduler.create_leases():
    
    69
    +        # For now, one lease at a time.
    
    70
    +        lease = self._scheduler.create_lease()
    
    71
    +        if lease:
    
    70 72
                 bot_session.leases.extend([lease])
    
    71 73
     
    
    72 74
             return bot_session
    
    ... ... @@ -83,8 +85,11 @@ class BotsInterface:
    83 85
             del bot_session.leases[:]
    
    84 86
             bot_session.leases.extend(leases)
    
    85 87
     
    
    86
    -        for lease in self._scheduler.create_leases():
    
    87
    -            bot_session.leases.extend([lease])
    
    88
    +        # For now, one lease at a time
    
    89
    +        if not bot_session.leases:
    
    90
    +            lease = self._scheduler.create_lease()
    
    91
    +            if lease:
    
    92
    +                bot_session.leases.extend([lease])
    
    88 93
     
    
    89 94
             self._bot_sessions[name] = bot_session
    
    90 95
             return bot_session
    

  • buildgrid/server/cas/service.py
    ... ... @@ -46,8 +46,11 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    46 46
     
    
    47 47
         def FindMissingBlobs(self, request, context):
    
    48 48
             try:
    
    49
    +            self.logger.debug("FindMissingBlobs request: [{}]".format(request))
    
    49 50
                 instance = self._get_instance(request.instance_name)
    
    50
    -            return instance.find_missing_blobs(request.blob_digests)
    
    51
    +            response = instance.find_missing_blobs(request.blob_digests)
    
    52
    +            self.logger.debug("FindMissingBlobs response: [{}]".format(response))
    
    53
    +            return response
    
    51 54
     
    
    52 55
             except InvalidArgumentError as e:
    
    53 56
                 self.logger.error(e)
    
    ... ... @@ -58,8 +61,11 @@ class ContentAddressableStorageService(remote_execution_pb2_grpc.ContentAddressa
    58 61
     
    
    59 62
         def BatchUpdateBlobs(self, request, context):
    
    60 63
             try:
    
    64
    +            self.logger.debug("BatchUpdateBlobs request: [{}]".format(request))
    
    61 65
                 instance = self._get_instance(request.instance_name)
    
    62
    -            return instance.batch_update_blobs(request.requests)
    
    66
    +            response = instance.batch_update_blobs(request.requests)
    
    67
    +            self.logger.debug("FindMissingBlobs response: [{}]".format(response))
    
    68
    +            return response
    
    63 69
     
    
    64 70
             except InvalidArgumentError as e:
    
    65 71
                 self.logger.error(e)
    
    ... ... @@ -102,6 +108,7 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    102 108
     
    
    103 109
         def Read(self, request, context):
    
    104 110
             try:
    
    111
    +            self.logger.debug("Read request: [{}]".format(request))
    
    105 112
                 path = request.resource_name.split("/")
    
    106 113
                 instance_name = path[0]
    
    107 114
     
    
    ... ... @@ -141,10 +148,13 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    141 148
                 context.set_code(grpc.StatusCode.OUT_OF_RANGE)
    
    142 149
                 yield bytestream_pb2.ReadResponse()
    
    143 150
     
    
    151
    +            self.logger.debug("Read finished.")
    
    152
    +
    
    144 153
         def Write(self, requests, context):
    
    145 154
             try:
    
    146 155
                 requests, request_probe = tee(requests, 2)
    
    147 156
                 first_request = next(request_probe)
    
    157
    +            self.logger.debug("First write request: [{}]".format(first_request))
    
    148 158
     
    
    149 159
                 path = first_request.resource_name.split("/")
    
    150 160
     
    
    ... ... @@ -164,7 +174,9 @@ class ByteStreamService(bytestream_pb2_grpc.ByteStreamServicer):
    164 174
                     raise InvalidArgumentError("Invalid resource name: [{}]".format(first_request.resource_name))
    
    165 175
     
    
    166 176
                 instance = self._get_instance(instance_name)
    
    167
    -            return instance.write(requests)
    
    177
    +            response = instance.write(requests)
    
    178
    +            self.logger.debug("Write response: [{}]".format(response))
    
    179
    +            return response
    
    168 180
     
    
    169 181
             except NotImplementedError as e:
    
    170 182
                 self.logger.error(e)
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -21,8 +21,11 @@ An instance of the Remote Execution Service.
    21 21
     
    
    22 22
     import logging
    
    23 23
     
    
    24
    +import grpc
    
    25
    +
    
    24 26
     from buildgrid._exceptions import FailedPreconditionError, InvalidArgumentError
    
    25 27
     from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action
    
    28
    +from buildgrid._protos.google.longrunning import operations_pb2
    
    26 29
     
    
    27 30
     from ..job import Job
    
    28 31
     
    
    ... ... @@ -71,7 +74,12 @@ class ExecutionInstance:
    71 74
     
    
    72 75
         def stream_operation_updates(self, message_queue, operation_name):
    
    73 76
             operation = message_queue.get()
    
    77
    +        last_operation = operation
    
    74 78
             while not operation.done:
    
    75 79
                 yield operation
    
    80
    +            last_operation = operation
    
    76 81
                 operation = message_queue.get()
    
    82
    +            if not isinstance(operation, operations_pb2.Operation):
    
    83
    +                message_queue.context_.cancel()
    
    84
    +                operation = last_operation
    
    77 85
             yield operation

  • buildgrid/server/execution/service.py
    ... ... @@ -52,6 +52,8 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    52 52
                 context.add_callback(partial(instance.unregister_message_client,
    
    53 53
                                              operation.name, message_queue))
    
    54 54
     
    
    55
    +            message_queue.context_ = context
    
    56
    +
    
    55 57
                 yield from instance.stream_operation_updates(message_queue,
    
    56 58
                                                              operation.name)
    
    57 59
     
    
    ... ... @@ -68,9 +70,10 @@ class ExecutionService(remote_execution_pb2_grpc.ExecutionServicer):
    68 70
                 yield operations_pb2.Operation()
    
    69 71
     
    
    70 72
         def WaitExecution(self, request, context):
    
    73
    +        print('Client reopened the operation stream!')
    
    74
    +
    
    71 75
             try:
    
    72 76
                 names = request.name.split("/")
    
    73
    -
    
    74 77
                 # Operation name should be in format:
    
    75 78
                 # {instance/name}/{operation_id}
    
    76 79
                 instance_name = ''.join(names[0:-1])
    

  • buildgrid/server/job.py
    ... ... @@ -122,8 +122,11 @@ class Job:
    122 122
             if self.result is not None:
    
    123 123
                 self._operation.done = True
    
    124 124
                 response = remote_execution_pb2.ExecuteResponse(result=self.result,
    
    125
    -                                                            cached_result=self.result_cached,
    
    126
    -                                                            status=self.lease.status)
    
    125
    +                                                            cached_result=self.result_cached)
    
    126
    +
    
    127
    +            if not self.result_cached:
    
    128
    +                response.status.CopyFrom(self.lease.status)
    
    129
    +
    
    127 130
                 self._operation.response.CopyFrom(self._pack_any(response))
    
    128 131
     
    
    129 132
             return self._operation
    

  • buildgrid/server/operations/service.py
    ... ... @@ -43,14 +43,16 @@ class OperationsService(operations_pb2_grpc.OperationsServicer):
    43 43
     
    
    44 44
         def GetOperation(self, request, context):
    
    45 45
             try:
    
    46
    -            name = request.name
    
    47
    -            operation_name = self._get_operation_name(name)
    
    46
    +            operation_name, job = request.name, None
    
    47
    +            for instance in self._instances.values():
    
    48
    +                if operation_name in instance._scheduler.jobs:
    
    49
    +                    job = instance._scheduler.jobs.get(operation_name)
    
    50
    +                    break
    
    48 51
     
    
    49
    -            instance = self._get_instance(name)
    
    52
    +            print('Connection drop simulation request received')
    
    50 53
     
    
    51
    -            operation = instance.get_operation(operation_name)
    
    52
    -            operation.name = name
    
    53
    -            return operation
    
    54
    +            for queue in job._operation_update_queues:
    
    55
    +                queue.put('drop')
    
    54 56
     
    
    55 57
             except InvalidArgumentError as e:
    
    56 58
                 self.logger.error(e)
    

  • buildgrid/server/referencestorage/service.py
    ... ... @@ -47,7 +47,8 @@ class ReferenceStorageService(buildstream_pb2_grpc.ReferenceStorageServicer):
    47 47
                 context.set_details(str(e))
    
    48 48
                 context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
    
    49 49
     
    
    50
    -        except NotFoundError:
    
    50
    +        except NotFoundError as e:
    
    51
    +            self.logger.debug(e)
    
    51 52
                 context.set_code(grpc.StatusCode.NOT_FOUND)
    
    52 53
     
    
    53 54
             return buildstream_pb2.GetReferenceResponse()
    

  • buildgrid/server/scheduler.py
    ... ... @@ -23,8 +23,6 @@ Schedules jobs.
    23 23
     
    
    24 24
     from collections import deque
    
    25 25
     
    
    26
    -from google.protobuf import any_pb2
    
    27
    -
    
    28 26
     from buildgrid._exceptions import NotFoundError
    
    29 27
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    30 28
     from buildgrid._protos.google.longrunning import operations_pb2
    
    ... ... @@ -60,9 +58,7 @@ class Scheduler:
    60 58
                     job.update_execute_stage(ExecuteStage.QUEUED)
    
    61 59
     
    
    62 60
                 else:
    
    63
    -                cached_result_any = any_pb2.Any()
    
    64
    -                cached_result_any.Pack(cached_result)
    
    65
    -                job.result = cached_result_any
    
    61
    +                job.result = cached_result
    
    66 62
                     job.result_cached = True
    
    67 63
                     job.update_execute_stage(ExecuteStage.COMPLETED)
    
    68 64
     
    
    ... ... @@ -112,10 +108,11 @@ class Scheduler:
    112 108
             if state in (LeaseState.PENDING.value, LeaseState.ACTIVE.value):
    
    113 109
                 self.retry_job(name)
    
    114 110
     
    
    115
    -    def create_leases(self):
    
    116
    -        while self.queue:
    
    111
    +    def create_lease(self):
    
    112
    +        if self.queue:
    
    117 113
                 job = self.queue.popleft()
    
    118 114
                 job.update_execute_stage(ExecuteStage.EXECUTING)
    
    119 115
                 job.create_lease()
    
    120 116
                 job.lease.state = LeaseState.PENDING.value
    
    121
    -            yield job.lease
    117
    +            return job.lease
    
    118
    +        return None

  • tests/integration/bots_service.py
    ... ... @@ -129,7 +129,7 @@ def test_number_of_leases(number_of_jobs, bot_session, context, instance):
    129 129
         request = bots_pb2.CreateBotSessionRequest(bot_session=bot_session)
    
    130 130
         response = instance.CreateBotSession(request, context)
    
    131 131
     
    
    132
    -    assert len(response.leases) == number_of_jobs
    
    132
    +    assert len(response.leases) == min(number_of_jobs, 1)
    
    133 133
     
    
    134 134
     
    
    135 135
     def test_update_leases_with_work(bot_session, context, instance):
    



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]