[Notes] [Git][BuildGrid/buildgrid][mablanch/157-action-browser-url] 7 commits: Make server instances aware of their name



Title: GitLab

Martin Blanchard pushed to branch mablanch/157-action-browser-url at BuildGrid / buildgrid

Commits:

17 changed files:

Changes:

  • buildgrid/_app/settings/parser.py
    ... ... @@ -235,8 +235,8 @@ class Execution(YamlFactory):
    235 235
     
    
    236 236
         yaml_tag = u'!execution'
    
    237 237
     
    
    238
    -    def __new__(cls, storage, action_cache=None):
    
    239
    -        return ExecutionController(action_cache, storage)
    
    238
    +    def __new__(cls, storage, action_cache=None, action_browser_url=None):
    
    239
    +        return ExecutionController(storage, action_cache, action_browser_url)
    
    240 240
     
    
    241 241
     
    
    242 242
     class Action(YamlFactory):
    

  • buildgrid/_app/settings/reference.yml
    ... ... @@ -75,7 +75,7 @@ instances:
    75 75
             # Whether or not writing to the cache is allowed.
    
    76 76
             allow-updates: true
    
    77 77
             ##
    
    78
    -        # Whether failed actions (non-zero exit code) are stored
    
    78
    +        # Whether failed actions (non-zero exit code) are stored.
    
    79 79
             cache-failed-actions: true
    
    80 80
     
    
    81 81
           - !execution
    
    ... ... @@ -85,6 +85,9 @@ instances:
    85 85
             ##
    
    86 86
             # Alias to an action-cache service.
    
    87 87
             action-cache: *main-action
    
    88
    +        ##
    
    89
    +        # Base URL for external build action (web) browser service.
    
    90
    +        action-browser-url: http://localhost:8080
    
    88 91
     
    
    89 92
           - !cas
    
    90 93
             ##
    

  • buildgrid/server/actioncache/storage.py
    ... ... @@ -40,12 +40,25 @@ class ActionCache(ReferenceCache):
    40 40
     
    
    41 41
             self.__logger = logging.getLogger(__name__)
    
    42 42
     
    
    43
    +        self._instance_name = None
    
    44
    +
    
    43 45
             self._cache_failed_actions = cache_failed_actions
    
    44 46
     
    
    45 47
         # --- Public API ---
    
    46 48
     
    
    49
    +    @property
    
    50
    +    def instance_name(self):
    
    51
    +        return self._instance_name
    
    52
    +
    
    47 53
         def register_instance_with_server(self, instance_name, server):
    
    48
    -        server.add_action_cache_instance(self, instance_name)
    
    54
    +        """Names and registers the action-cache instance with a given server."""
    
    55
    +        if self._instance_name is None:
    
    56
    +            server.add_action_cache_instance(self, instance_name)
    
    57
    +
    
    58
    +            self._instance_name = instance_name
    
    59
    +
    
    60
    +        else:
    
    61
    +            raise AssertionError("Instance already registered")
    
    49 62
     
    
    50 63
         def get_action_result(self, action_digest):
    
    51 64
             """Retrieves the cached result for an action."""
    

  • buildgrid/server/bots/instance.py
    ... ... @@ -33,16 +33,31 @@ class BotsInterface:
    33 33
         def __init__(self, scheduler):
    
    34 34
             self.__logger = logging.getLogger(__name__)
    
    35 35
     
    
    36
    +        self._scheduler = scheduler
    
    37
    +        self._instance_name = None
    
    38
    +
    
    36 39
             self._bot_ids = {}
    
    37 40
             self._assigned_leases = {}
    
    38
    -        self._scheduler = scheduler
    
    41
    +
    
    42
    +    # --- Public API ---
    
    43
    +
    
    44
    +    @property
    
    45
    +    def instance_name(self):
    
    46
    +        return self._instance_name
    
    39 47
     
    
    40 48
         @property
    
    41 49
         def scheduler(self):
    
    42 50
             return self._scheduler
    
    43 51
     
    
    44 52
         def register_instance_with_server(self, instance_name, server):
    
    45
    -        server.add_bots_interface(self, instance_name)
    
    53
    +        """Names and registers the bots interface with a given server."""
    
    54
    +        if self._instance_name is None:
    
    55
    +            server.add_bots_interface(self, instance_name)
    
    56
    +
    
    57
    +            self._instance_name = instance_name
    
    58
    +
    
    59
    +        else:
    
    60
    +            raise AssertionError("Instance already registered")
    
    46 61
     
    
    47 62
         def create_bot_session(self, parent, bot_session):
    
    48 63
             """ Creates a new bot session. Server should assign a unique
    
    ... ... @@ -98,6 +113,8 @@ class BotsInterface:
    98 113
             self._request_leases(bot_session)
    
    99 114
             return bot_session
    
    100 115
     
    
    116
    +    # --- Private API ---
    
    117
    +
    
    101 118
         def _request_leases(self, bot_session):
    
    102 119
             # Only send one lease at a time currently.
    
    103 120
             if not bot_session.leases:
    

  • buildgrid/server/capabilities/instance.py
    ... ... @@ -22,12 +22,28 @@ class CapabilitiesInstance:
    22 22
     
    
    23 23
         def __init__(self, cas_instance=None, action_cache_instance=None, execution_instance=None):
    
    24 24
             self.__logger = logging.getLogger(__name__)
    
    25
    +
    
    26
    +        self._instance_name = None
    
    27
    +
    
    25 28
             self.__cas_instance = cas_instance
    
    26 29
             self.__action_cache_instance = action_cache_instance
    
    27 30
             self.__execution_instance = execution_instance
    
    28 31
     
    
    32
    +    # --- Public API ---
    
    33
    +
    
    34
    +    @property
    
    35
    +    def instance_name(self):
    
    36
    +        return self._instance_name
    
    37
    +
    
    29 38
         def register_instance_with_server(self, instance_name, server):
    
    30
    -        server.add_capabilities_instance(self, instance_name)
    
    39
    +        """Names and registers the capabilities instance with a given server."""
    
    40
    +        if self._instance_name is None:
    
    41
    +            server.add_capabilities_instance(self, instance_name)
    
    42
    +
    
    43
    +            self._instance_name = instance_name
    
    44
    +
    
    45
    +        else:
    
    46
    +            raise AssertionError("Instance already registered")
    
    31 47
     
    
    32 48
         def add_cas_instance(self, cas_instance):
    
    33 49
             self.__cas_instance = cas_instance
    
    ... ... @@ -50,6 +66,8 @@ class CapabilitiesInstance:
    50 66
             # server_capabilities.hig_api_version =
    
    51 67
             return server_capabilities
    
    52 68
     
    
    69
    +    # --- Private API ---
    
    70
    +
    
    53 71
         def _get_cache_capabilities(self):
    
    54 72
             capabilities = remote_execution_pb2.CacheCapabilities()
    
    55 73
             action_cache_update_capabilities = remote_execution_pb2.ActionCacheUpdateCapabilities()
    

  • buildgrid/server/cas/instance.py
    ... ... @@ -34,10 +34,25 @@ class ContentAddressableStorageInstance:
    34 34
         def __init__(self, storage):
    
    35 35
             self.__logger = logging.getLogger(__name__)
    
    36 36
     
    
    37
    -        self._storage = storage
    
    37
    +        self._instance_name = None
    
    38
    +
    
    39
    +        self.__storage = storage
    
    40
    +
    
    41
    +    # --- Public API ---
    
    42
    +
    
    43
    +    @property
    
    44
    +    def instance_name(self):
    
    45
    +        return self._instance_name
    
    38 46
     
    
    39 47
         def register_instance_with_server(self, instance_name, server):
    
    40
    -        server.add_cas_instance(self, instance_name)
    
    48
    +        """Names and registers the CAS instance with a given server."""
    
    49
    +        if self._instance_name is None:
    
    50
    +            server.add_cas_instance(self, instance_name)
    
    51
    +
    
    52
    +            self._instance_name = instance_name
    
    53
    +
    
    54
    +        else:
    
    55
    +            raise AssertionError("Instance already registered")
    
    41 56
     
    
    42 57
         def hash_type(self):
    
    43 58
             return get_hash_type()
    
    ... ... @@ -51,12 +66,12 @@ class ContentAddressableStorageInstance:
    51 66
             return re_pb2.CacheCapabilities().DISALLOWED
    
    52 67
     
    
    53 68
         def find_missing_blobs(self, blob_digests):
    
    54
    -        storage = self._storage
    
    69
    +        storage = self.__storage
    
    55 70
             return re_pb2.FindMissingBlobsResponse(
    
    56 71
                 missing_blob_digests=storage.missing_blobs(blob_digests))
    
    57 72
     
    
    58 73
         def batch_update_blobs(self, requests):
    
    59
    -        storage = self._storage
    
    74
    +        storage = self.__storage
    
    60 75
             store = []
    
    61 76
             for request_proto in requests:
    
    62 77
                 store.append((request_proto.digest, request_proto.data))
    
    ... ... @@ -72,7 +87,7 @@ class ContentAddressableStorageInstance:
    72 87
             return response
    
    73 88
     
    
    74 89
         def batch_read_blobs(self, digests):
    
    75
    -        storage = self._storage
    
    90
    +        storage = self.__storage
    
    76 91
     
    
    77 92
             response = re_pb2.BatchReadBlobsResponse()
    
    78 93
     
    
    ... ... @@ -101,7 +116,7 @@ class ContentAddressableStorageInstance:
    101 116
             return response
    
    102 117
     
    
    103 118
         def get_tree(self, request):
    
    104
    -        storage = self._storage
    
    119
    +        storage = self.__storage
    
    105 120
     
    
    106 121
             response = re_pb2.GetTreeResponse()
    
    107 122
             page_size = request.page_size
    
    ... ... @@ -143,10 +158,25 @@ class ByteStreamInstance:
    143 158
         def __init__(self, storage):
    
    144 159
             self.__logger = logging.getLogger(__name__)
    
    145 160
     
    
    146
    -        self._storage = storage
    
    161
    +        self._instance_name = None
    
    162
    +
    
    163
    +        self.__storage = storage
    
    164
    +
    
    165
    +    # --- Public API ---
    
    166
    +
    
    167
    +    @property
    
    168
    +    def instance_name(self):
    
    169
    +        return self._instance_name
    
    147 170
     
    
    148 171
         def register_instance_with_server(self, instance_name, server):
    
    149
    -        server.add_bytestream_instance(self, instance_name)
    
    172
    +        """Names and registers the byte-stream instance with a given server."""
    
    173
    +        if self._instance_name is None:
    
    174
    +            server.add_bytestream_instance(self, instance_name)
    
    175
    +
    
    176
    +            self._instance_name = instance_name
    
    177
    +
    
    178
    +        else:
    
    179
    +            raise AssertionError("Instance already registered")
    
    150 180
     
    
    151 181
         def read(self, digest_hash, digest_size, read_offset, read_limit):
    
    152 182
             if len(digest_hash) != HASH_LENGTH or not digest_size.isdigit():
    
    ... ... @@ -169,7 +199,7 @@ class ByteStreamInstance:
    169 199
                 raise InvalidArgumentError("Negative read_limit is invalid")
    
    170 200
     
    
    171 201
             # Read the blob from storage and send its contents to the client.
    
    172
    -        result = self._storage.get_blob(digest)
    
    202
    +        result = self.__storage.get_blob(digest)
    
    173 203
             if result is None:
    
    174 204
                 raise NotFoundError("Blob not found")
    
    175 205
     
    
    ... ... @@ -191,7 +221,7 @@ class ByteStreamInstance:
    191 221
     
    
    192 222
             digest = re_pb2.Digest(hash=digest_hash, size_bytes=int(digest_size))
    
    193 223
     
    
    194
    -        write_session = self._storage.begin_write(digest)
    
    224
    +        write_session = self.__storage.begin_write(digest)
    
    195 225
     
    
    196 226
             # Start the write session and write the first request's data.
    
    197 227
             write_session.write(first_block)
    
    ... ... @@ -213,6 +243,6 @@ class ByteStreamInstance:
    213 243
             elif computed_hash.hexdigest() != digest.hash:
    
    214 244
                 raise InvalidArgumentError("Data does not match hash")
    
    215 245
     
    
    216
    -        self._storage.commit_write(digest, write_session)
    
    246
    +        self.__storage.commit_write(digest, write_session)
    
    217 247
     
    
    218 248
             return bytestream_pb2.WriteResponse(committed_size=bytes_written)

  • buildgrid/server/controller.py
    ... ... @@ -36,19 +36,19 @@ from .operations.instance import OperationsInstance
    36 36
     
    
    37 37
     class ExecutionController:
    
    38 38
     
    
    39
    -    def __init__(self, action_cache=None, storage=None):
    
    39
    +    def __init__(self, storage=None, action_cache=None, action_browser_url=None):
    
    40 40
             self.__logger = logging.getLogger(__name__)
    
    41 41
     
    
    42
    -        scheduler = Scheduler(action_cache)
    
    42
    +        scheduler = Scheduler(action_cache, action_browser_url=action_browser_url)
    
    43 43
     
    
    44 44
             self._execution_instance = ExecutionInstance(scheduler, storage)
    
    45 45
             self._bots_interface = BotsInterface(scheduler)
    
    46 46
             self._operations_instance = OperationsInstance(scheduler)
    
    47 47
     
    
    48 48
         def register_instance_with_server(self, instance_name, server):
    
    49
    -        server.add_execution_instance(self._execution_instance, instance_name)
    
    50
    -        server.add_bots_interface(self._bots_interface, instance_name)
    
    51
    -        server.add_operations_instance(self._operations_instance, instance_name)
    
    49
    +        self._execution_instance.register_instance_with_server(instance_name, server)
    
    50
    +        self._bots_interface.register_instance_with_server(instance_name, server)
    
    51
    +        self._operations_instance.register_instance_with_server(instance_name, server)
    
    52 52
     
    
    53 53
         def stream_operation_updates(self, message_queue, operation_name):
    
    54 54
             operation = message_queue.get()
    

  • buildgrid/server/execution/instance.py
    ... ... @@ -31,15 +31,32 @@ class ExecutionInstance:
    31 31
         def __init__(self, scheduler, storage):
    
    32 32
             self.__logger = logging.getLogger(__name__)
    
    33 33
     
    
    34
    -        self._storage = storage
    
    35 34
             self._scheduler = scheduler
    
    35
    +        self._instance_name = None
    
    36
    +
    
    37
    +        self.__storage = storage
    
    38
    +
    
    39
    +    # --- Public API ---
    
    40
    +
    
    41
    +    @property
    
    42
    +    def instance_name(self):
    
    43
    +        return self._instance_name
    
    36 44
     
    
    37 45
         @property
    
    38 46
         def scheduler(self):
    
    39 47
             return self._scheduler
    
    40 48
     
    
    41 49
         def register_instance_with_server(self, instance_name, server):
    
    42
    -        server.add_execution_instance(self, instance_name)
    
    50
    +        """Names and registers the execution instance with a given server."""
    
    51
    +        if self._instance_name is None:
    
    52
    +            server.add_execution_instance(self, instance_name)
    
    53
    +
    
    54
    +            self._instance_name = instance_name
    
    55
    +            if self._scheduler is not None:
    
    56
    +                self._scheduler.set_instance_name(instance_name)
    
    57
    +
    
    58
    +        else:
    
    59
    +            raise AssertionError("Instance already registered")
    
    43 60
     
    
    44 61
         def hash_type(self):
    
    45 62
             return get_hash_type()
    
    ... ... @@ -49,11 +66,12 @@ class ExecutionInstance:
    49 66
             Queues an action and creates an Operation instance to be associated with
    
    50 67
             this action.
    
    51 68
             """
    
    52
    -        action = self._storage.get_message(action_digest, Action)
    
    69
    +        action = self.__storage.get_message(action_digest, Action)
    
    70
    +
    
    53 71
             if not action:
    
    54 72
                 raise FailedPreconditionError("Could not get action from storage.")
    
    55 73
     
    
    56
    -        command = self._storage.get_message(action.command_digest, Command)
    
    74
    +        command = self.__storage.get_message(action.command_digest, Command)
    
    57 75
     
    
    58 76
             if not command:
    
    59 77
                 raise FailedPreconditionError("Could not get command from storage.")
    

  • buildgrid/server/job.py
    ... ... @@ -37,10 +37,8 @@ class Job:
    37 37
             self._action = remote_execution_pb2.Action()
    
    38 38
             self._lease = None
    
    39 39
     
    
    40
    -        self.__execute_response = None
    
    40
    +        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    41 41
             self.__operation_metadata = remote_execution_pb2.ExecuteOperationMetadata()
    
    42
    -        self.__operations_by_name = {}  # Name to Operation 1:1 mapping
    
    43
    -        self.__operations_by_peer = {}  # Peer to Operation 1:1 mapping
    
    44 42
     
    
    45 43
             self.__queued_timestamp = timestamp_pb2.Timestamp()
    
    46 44
             self.__queued_time_duration = duration_pb2.Duration()
    
    ... ... @@ -48,6 +46,8 @@ class Job:
    48 46
             self.__worker_completed_timestamp = timestamp_pb2.Timestamp()
    
    49 47
     
    
    50 48
             self.__operations_message_queues = {}
    
    49
    +        self.__operations_by_name = {}  # Name to Operation 1:1 mapping
    
    50
    +        self.__operations_by_peer = {}  # Peer to Operation 1:1 mapping
    
    51 51
             self.__operations_cancelled = set()
    
    52 52
             self.__lease_cancelled = False
    
    53 53
             self.__job_cancelled = False
    
    ... ... @@ -146,6 +146,11 @@ class Job:
    146 146
             else:
    
    147 147
                 return False
    
    148 148
     
    
    149
    +    def set_action_url(self, url):
    
    150
    +        """Generates a CAS browser URL for the job's action."""
    
    151
    +        if url.for_message('action', self.__operation_metadata.action_digest):
    
    152
    +            self.__execute_response.message = url.generate()
    
    153
    +
    
    149 154
         def set_cached_result(self, action_result):
    
    150 155
             """Allows specifying an action result form the action cache for the job.
    
    151 156
     
    
    ... ... @@ -155,7 +160,6 @@ class Job:
    155 160
             Args:
    
    156 161
                 action_result (ActionResult): The result from cache.
    
    157 162
             """
    
    158
    -        self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    159 163
             self.__execute_response.result.CopyFrom(action_result)
    
    160 164
             self.__execute_response.cached_result = True
    
    161 165
     
    
    ... ... @@ -445,7 +449,6 @@ class Job:
    445 449
                 action_metadata.worker_start_timestamp.CopyFrom(self.__worker_start_timestamp)
    
    446 450
                 action_metadata.worker_completed_timestamp.CopyFrom(self.__worker_completed_timestamp)
    
    447 451
     
    
    448
    -            self.__execute_response = remote_execution_pb2.ExecuteResponse()
    
    449 452
                 self.__execute_response.result.CopyFrom(action_result)
    
    450 453
                 self.__execute_response.cached_result = False
    
    451 454
                 self.__execute_response.status.CopyFrom(status)
    

  • buildgrid/server/operations/instance.py
    ... ... @@ -31,13 +31,27 @@ class OperationsInstance:
    31 31
             self.__logger = logging.getLogger(__name__)
    
    32 32
     
    
    33 33
             self._scheduler = scheduler
    
    34
    +        self._instance_name = None
    
    35
    +
    
    36
    +    # --- Public API ---
    
    37
    +
    
    38
    +    @property
    
    39
    +    def instance_name(self):
    
    40
    +        return self._instance_name
    
    34 41
     
    
    35 42
         @property
    
    36 43
         def scheduler(self):
    
    37 44
             return self._scheduler
    
    38 45
     
    
    39 46
         def register_instance_with_server(self, instance_name, server):
    
    40
    -        server.add_operations_instance(self, instance_name)
    
    47
    +        """Names and registers the operations instance with a given server."""
    
    48
    +        if self._instance_name is None:
    
    49
    +            server.add_operations_instance(self, instance_name)
    
    50
    +
    
    51
    +            self._instance_name = instance_name
    
    52
    +
    
    53
    +        else:
    
    54
    +            raise AssertionError("Instance already registered")
    
    41 55
     
    
    42 56
         def get_operation(self, job_name):
    
    43 57
             try:
    

  • buildgrid/server/referencestorage/storage.py
    ... ... @@ -41,13 +41,29 @@ class ReferenceCache:
    41 41
             """
    
    42 42
             self.__logger = logging.getLogger(__name__)
    
    43 43
     
    
    44
    +        self._instance_name = None
    
    45
    +
    
    46
    +        self.__storage = storage
    
    47
    +
    
    44 48
             self._allow_updates = allow_updates
    
    45
    -        self._storage = storage
    
    46 49
             self._max_cached_refs = max_cached_refs
    
    47 50
             self._digest_map = collections.OrderedDict()
    
    48 51
     
    
    52
    +    # --- Public API ---
    
    53
    +
    
    54
    +    @property
    
    55
    +    def instance_name(self):
    
    56
    +        return self._instance_name
    
    57
    +
    
    49 58
         def register_instance_with_server(self, instance_name, server):
    
    50
    -        server.add_reference_storage_instance(self, instance_name)
    
    59
    +        """Names and registers the refs instance with a given server."""
    
    60
    +        if self._instance_name is None:
    
    61
    +            server.add_reference_storage_instance(self, instance_name)
    
    62
    +
    
    63
    +            self._instance_name = instance_name
    
    64
    +
    
    65
    +        else:
    
    66
    +            raise AssertionError("Instance already registered")
    
    51 67
     
    
    52 68
         @property
    
    53 69
         def allow_updates(self):
    
    ... ... @@ -64,7 +80,8 @@ class ReferenceCache:
    64 80
                 NotFoundError.
    
    65 81
             """
    
    66 82
             if key in self._digest_map:
    
    67
    -            reference_result = self._storage.get_message(self._digest_map[key], remote_execution_pb2.Digest)
    
    83
    +            reference_result = self.__storage.get_message(self._digest_map[key],
    
    84
    +                                                          remote_execution_pb2.Digest)
    
    68 85
     
    
    69 86
                 if reference_result is not None:
    
    70 87
                     return reference_result
    
    ... ... @@ -84,7 +101,8 @@ class ReferenceCache:
    84 101
                 NotFoundError.
    
    85 102
             """
    
    86 103
             if key in self._digest_map:
    
    87
    -            reference_result = self._storage.get_message(self._digest_map[key], remote_execution_pb2.ActionResult)
    
    104
    +            reference_result = self.__storage.get_message(self._digest_map[key],
    
    105
    +                                                          remote_execution_pb2.ActionResult)
    
    88 106
     
    
    89 107
                 if reference_result is not None:
    
    90 108
                     if self._action_result_blobs_still_exist(reference_result):
    
    ... ... @@ -115,9 +133,11 @@ class ReferenceCache:
    115 133
             while len(self._digest_map) >= self._max_cached_refs:
    
    116 134
                 self._digest_map.popitem(last=False)
    
    117 135
     
    
    118
    -        result_digest = self._storage.put_message(result)
    
    136
    +        result_digest = self.__storage.put_message(result)
    
    119 137
             self._digest_map[key] = result_digest
    
    120 138
     
    
    139
    +    # --- Private API ---
    
    140
    +
    
    121 141
         def _action_result_blobs_still_exist(self, action_result):
    
    122 142
             """Checks CAS for ActionResult output blobs existance.
    
    123 143
     
    
    ... ... @@ -135,8 +155,8 @@ class ReferenceCache:
    135 155
     
    
    136 156
             for output_directory in action_result.output_directories:
    
    137 157
                 blobs_needed.append(output_directory.tree_digest)
    
    138
    -            tree = self._storage.get_message(output_directory.tree_digest,
    
    139
    -                                             remote_execution_pb2.Tree)
    
    158
    +            tree = self.__storage.get_message(output_directory.tree_digest,
    
    159
    +                                              remote_execution_pb2.Tree)
    
    140 160
                 if tree is None:
    
    141 161
                     return False
    
    142 162
     
    
    ... ... @@ -153,5 +173,5 @@ class ReferenceCache:
    153 173
             if action_result.stderr_digest.hash and not action_result.stderr_raw:
    
    154 174
                 blobs_needed.append(action_result.stderr_digest)
    
    155 175
     
    
    156
    -        missing = self._storage.missing_blobs(blobs_needed)
    
    176
    +        missing = self.__storage.missing_blobs(blobs_needed)
    
    157 177
             return len(missing) == 0

  • buildgrid/server/scheduler.py
    ... ... @@ -26,15 +26,18 @@ import logging
    26 26
     from buildgrid._enums import LeaseState, OperationStage
    
    27 27
     from buildgrid._exceptions import NotFoundError
    
    28 28
     from buildgrid.server.job import Job
    
    29
    +from buildgrid.utils import BrowserURL
    
    29 30
     
    
    30 31
     
    
    31 32
     class Scheduler:
    
    32 33
     
    
    33 34
         MAX_N_TRIES = 5
    
    34 35
     
    
    35
    -    def __init__(self, action_cache=None, monitor=False):
    
    36
    +    def __init__(self, action_cache=None, action_browser_url=False, monitor=False):
    
    36 37
             self.__logger = logging.getLogger(__name__)
    
    37 38
     
    
    39
    +        self._instance_name = None
    
    40
    +
    
    38 41
             self.__build_metadata_queues = None
    
    39 42
     
    
    40 43
             self.__operations_by_stage = None
    
    ... ... @@ -43,6 +46,7 @@ class Scheduler:
    43 46
             self.__retries_count = 0
    
    44 47
     
    
    45 48
             self._action_cache = action_cache
    
    49
    +        self._action_browser_url = action_browser_url
    
    46 50
     
    
    47 51
             self.__jobs_by_action = {}  # Action to Job 1:1 mapping
    
    48 52
             self.__jobs_by_operation = {}  # Operation to Job 1:1 mapping
    
    ... ... @@ -57,6 +61,14 @@ class Scheduler:
    57 61
     
    
    58 62
         # --- Public API ---
    
    59 63
     
    
    64
    +    @property
    
    65
    +    def instance_name(self):
    
    66
    +        return self._instance_name
    
    67
    +
    
    68
    +    def set_instance_name(self, instance_name):
    
    69
    +        if not self._instance_name:
    
    70
    +            self._instance_name = instance_name
    
    71
    +
    
    60 72
         def list_current_jobs(self):
    
    61 73
             """Returns a list of the :class:`Job` names currently managed."""
    
    62 74
             return self.__jobs_by_name.keys()
    
    ... ... @@ -186,6 +198,10 @@ class Scheduler:
    186 198
                       platform_requirements=platform_requirements,
    
    187 199
                       priority=priority)
    
    188 200
     
    
    201
    +        if self._action_browser_url:
    
    202
    +            job.set_action_url(
    
    203
    +                BrowserURL(self._action_browser_url, self._instance_name))
    
    204
    +
    
    189 205
             self.__logger.debug("Job created for action [%s]: [%s]",
    
    190 206
                                 action_digest.hash[:8], job.name)
    
    191 207
     
    

  • buildgrid/settings.py
    ... ... @@ -35,3 +35,11 @@ MAX_REQUEST_COUNT = 500
    35 35
     LOG_RECORD_FORMAT = '%(asctime)s:[%(name)36.36s][%(levelname)5.5s]: %(message)s'
    
    36 36
     # The different log record attributes are documented here:
    
    37 37
     # https://docs.python.org/3/library/logging.html#logrecord-attributes
    
    38
    +
    
    39
    +# URL scheme for the CAS content browser:
    
    40
    +BROWSER_URL_FORMAT = '%(type)s/%(instance)s/%(hash)s/%(sizebytes)s/'
    
    41
    +# The string markers that are substituted are:
    
    42
    +#  instance   - CAS instance's name.
    
    43
    +#  type       - Type of CAS object, eg. 'action_result', 'command'...
    
    44
    +#  hash       - Object's digest hash.
    
    45
    +#  sizebytes  - Object's digest size in bytes.

  • buildgrid/utils.py
    ... ... @@ -13,14 +13,61 @@
    13 13
     # limitations under the License.
    
    14 14
     
    
    15 15
     
    
    16
    +from urllib.parse import urljoin
    
    16 17
     from operator import attrgetter
    
    17 18
     import os
    
    18 19
     import socket
    
    19 20
     
    
    20
    -from buildgrid.settings import HASH, HASH_LENGTH
    
    21
    +from buildgrid.settings import HASH, HASH_LENGTH, BROWSER_URL_FORMAT
    
    21 22
     from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    22 23
     
    
    23 24
     
    
    25
    +class BrowserURL:
    
    26
    +
    
    27
    +    __url_markers = (
    
    28
    +        '%(instance)s',
    
    29
    +        '%(type)s',
    
    30
    +        '%(hash)s',
    
    31
    +        '%(sizebytes)s',
    
    32
    +    )
    
    33
    +
    
    34
    +    def __init__(self, base_url, instance_name=None):
    
    35
    +        """Begins browser URL helper initialization."""
    
    36
    +        self.__base_url = base_url
    
    37
    +        self.__initialized = False
    
    38
    +        self.__url_spec = {
    
    39
    +            '%(instance)s': instance_name or '',
    
    40
    +        }
    
    41
    +
    
    42
    +    def for_message(self, message_type, message_digest):
    
    43
    +        """Completes browser URL initialization for a protobuf message."""
    
    44
    +        if self.__initialized:
    
    45
    +            return False
    
    46
    +
    
    47
    +        self.__url_spec['%(type)s'] = message_type
    
    48
    +        self.__url_spec['%(hash)s'] = message_digest.hash
    
    49
    +        self.__url_spec['%(sizebytes)s'] = str(message_digest.size_bytes)
    
    50
    +
    
    51
    +        self.__initialized = True
    
    52
    +        return True
    
    53
    +
    
    54
    +    def generate(self):
    
    55
    +        """Generates a browser URL string."""
    
    56
    +        if not self.__base_url or not self.__initialized:
    
    57
    +            return None
    
    58
    +
    
    59
    +        url_tail = BROWSER_URL_FORMAT
    
    60
    +
    
    61
    +        for url_marker in self.__url_markers:
    
    62
    +            if url_marker not in self.__url_spec:
    
    63
    +                return None
    
    64
    +            if url_marker not in url_tail:
    
    65
    +                continue
    
    66
    +            url_tail = url_tail.replace(url_marker, self.__url_spec[url_marker])
    
    67
    +
    
    68
    +        return urljoin(self.__base_url, url_tail)
    
    69
    +
    
    70
    +
    
    24 71
     def get_hostname():
    
    25 72
         """Returns the hostname of the machine executing that function.
    
    26 73
     
    

  • tests/integration/execution_service.py
    ... ... @@ -66,10 +66,10 @@ def controller(request):
    66 66
     
    
    67 67
         if request.param == "action-cache":
    
    68 68
             cache = ActionCache(storage, 50)
    
    69
    -        yield ExecutionController(cache, storage)
    
    69
    +        yield ExecutionController(storage=storage, action_cache=cache)
    
    70 70
     
    
    71 71
         else:
    
    72
    -        yield ExecutionController(None, storage)
    
    72
    +        yield ExecutionController(storage=storage)
    
    73 73
     
    
    74 74
     
    
    75 75
     # Instance to test
    

  • tests/integration/operations_service.py
    ... ... @@ -71,7 +71,7 @@ def controller():
    71 71
         write_session.write(action.SerializeToString())
    
    72 72
         storage.commit_write(action_digest, write_session)
    
    73 73
     
    
    74
    -    yield ExecutionController(None, storage)
    
    74
    +    yield ExecutionController(storage=storage)
    
    75 75
     
    
    76 76
     
    
    77 77
     # Instance to test
    

  • tests/test_utils.py
    1
    +# Copyright (C) 2018 Bloomberg LP
    
    2
    +#
    
    3
    +# Licensed under the Apache License, Version 2.0 (the "License");
    
    4
    +# you may not use this file except in compliance with the License.
    
    5
    +# You may obtain a copy of the License at
    
    6
    +#
    
    7
    +#  <http://www.apache.org/licenses/LICENSE-2.0>
    
    8
    +#
    
    9
    +# Unless required by applicable law or agreed to in writing, software
    
    10
    +# distributed under the License is distributed on an "AS IS" BASIS,
    
    11
    +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    
    12
    +# See the License for the specific language governing permissions and
    
    13
    +# limitations under the License.
    
    14
    +
    
    15
    +
    
    16
    +from urllib.parse import urlparse
    
    17
    +
    
    18
    +import pytest
    
    19
    +
    
    20
    +from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
    
    21
    +from buildgrid.utils import BrowserURL
    
    22
    +from buildgrid.utils import get_hash_type
    
    23
    +from buildgrid.utils import create_digest, parse_digest
    
    24
    +
    
    25
    +
    
    26
    +BLOBS = (b'', b'non-empty-blob',)
    
    27
    +BLOB_HASHES = (
    
    28
    +    'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
    
    29
    +    '89070dfb3175a2c75835d70147b52bd97afd8228819566d84eecd2d20e9b19fc',)
    
    30
    +BLOB_SIZES = (0, 14,)
    
    31
    +BLOB_DATA = zip(BLOBS, BLOB_HASHES, BLOB_SIZES)
    
    32
    +
    
    33
    +STRINGS = (
    
    34
    +    'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855/0',
    
    35
    +    '89070dfb3175a2c75835d70147b52bd97afd8228819566d84eecd2d20e9b19fc/14',
    
    36
    +    'e1ca41574914ba00e8ed5c8fc78ec8efdfd48941c7e48ad74dad8ada7f2066d/12', )
    
    37
    +BLOB_HASHES = (
    
    38
    +    'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
    
    39
    +    '89070dfb3175a2c75835d70147b52bd97afd8228819566d84eecd2d20e9b19fc',
    
    40
    +    None, )
    
    41
    +BLOB_SIZES = (0, 14, None,)
    
    42
    +STRING_VALIDITIES = (True, True, False,)
    
    43
    +STRING_DATA = zip(STRINGS, BLOB_HASHES, BLOB_SIZES, STRING_VALIDITIES)
    
    44
    +
    
    45
    +BASE_URL = 'http://localhost:8080'
    
    46
    +INSTANCES = (None, '', 'instance',)
    
    47
    +URL_HASHES = (
    
    48
    +    'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855',
    
    49
    +    '89070dfb3175a2c75835d70147b52bd97afd8228819566d84eecd2d20e9b19fc',)
    
    50
    +URL_SIZES = (0, 14,)
    
    51
    +URL_DATA = zip(URL_HASHES, URL_SIZES)
    
    52
    +
    
    53
    +
    
    54
    +@pytest.mark.parametrize('blob,digest_hash,digest_size', BLOB_DATA)
    
    55
    +def test_create_digest(blob, digest_hash, digest_size):
    
    56
    +    # Generate a Digest message from given blob:
    
    57
    +    blob_digest = create_digest(blob)
    
    58
    +
    
    59
    +    assert get_hash_type() == remote_execution_pb2.SHA256
    
    60
    +
    
    61
    +    assert hasattr(blob_digest, 'DESCRIPTOR')
    
    62
    +    assert blob_digest.DESCRIPTOR == remote_execution_pb2.Digest.DESCRIPTOR
    
    63
    +    assert blob_digest.hash == digest_hash
    
    64
    +    assert blob_digest.size_bytes == digest_size
    
    65
    +
    
    66
    +
    
    67
    +@pytest.mark.parametrize('string,digest_hash,digest_size,validity', STRING_DATA)
    
    68
    +def test_parse_digest(string, digest_hash, digest_size, validity):
    
    69
    +    # Generate a Digest message from given string:
    
    70
    +    string_digest = parse_digest(string)
    
    71
    +
    
    72
    +    assert get_hash_type() == remote_execution_pb2.SHA256
    
    73
    +
    
    74
    +    if validity:
    
    75
    +        assert hasattr(string_digest, 'DESCRIPTOR')
    
    76
    +        assert string_digest.DESCRIPTOR == remote_execution_pb2.Digest.DESCRIPTOR
    
    77
    +        assert string_digest.hash == digest_hash
    
    78
    +        assert string_digest.size_bytes == digest_size
    
    79
    +
    
    80
    +    else:
    
    81
    +        assert string_digest is None
    
    82
    +
    
    83
    +
    
    84
    +@pytest.mark.parametrize('instance', INSTANCES)
    
    85
    +@pytest.mark.parametrize('digest_hash,digest_size', URL_DATA)
    
    86
    +def test_browser_url_initialization(instance, digest_hash, digest_size):
    
    87
    +    # Initialize and generate a browser compatible URL:
    
    88
    +    browser_url = BrowserURL(BASE_URL, instance)
    
    89
    +    browser_digest = remote_execution_pb2.Digest(hash=digest_hash,
    
    90
    +                                                 size_bytes=digest_size)
    
    91
    +
    
    92
    +    assert browser_url.generate() is None
    
    93
    +    assert browser_url.for_message('type', browser_digest)
    
    94
    +    assert not browser_url.for_message(None, None)
    
    95
    +
    
    96
    +    url = browser_url.generate()
    
    97
    +
    
    98
    +    assert url is not None
    
    99
    +
    
    100
    +    parsed_url = urlparse(url)
    
    101
    +
    
    102
    +    if instance:
    
    103
    +        assert parsed_url.path.find(instance)
    
    104
    +    assert parsed_url.path.find('type') > 0
    
    105
    +    assert parsed_url.path.find(digest_hash) > 0
    
    106
    +    assert parsed_url.path.find(str(digest_size)) > 0



  • [Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]