mirror of
				https://github.com/matrix-org/synapse.git
				synced 2025-11-04 02:01:03 +01:00 
			
		
		
		
	Metrics for number of RDATA commands received
I found myself wishing we had this.
This commit is contained in:
		
							parent
							
								
									5e16c1dc8c
								
							
						
					
					
						commit
						5c3c32f16f
					
				@ -517,25 +517,28 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
 | 
			
		||||
            self.send_error("Wrong remote")
 | 
			
		||||
 | 
			
		||||
    def on_RDATA(self, cmd):
 | 
			
		||||
        stream_name = cmd.stream_name
 | 
			
		||||
        inbound_rdata_count.inc(stream_name)
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            row = STREAMS_MAP[cmd.stream_name].ROW_TYPE(*cmd.row)
 | 
			
		||||
            row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row)
 | 
			
		||||
        except Exception:
 | 
			
		||||
            logger.exception(
 | 
			
		||||
                "[%s] Failed to parse RDATA: %r %r",
 | 
			
		||||
                self.id(), cmd.stream_name, cmd.row
 | 
			
		||||
                self.id(), stream_name, cmd.row
 | 
			
		||||
            )
 | 
			
		||||
            raise
 | 
			
		||||
 | 
			
		||||
        if cmd.token is None:
 | 
			
		||||
            # I.e. this is part of a batch of updates for this stream. Batch
 | 
			
		||||
            # until we get an update for the stream with a non None token
 | 
			
		||||
            self.pending_batches.setdefault(cmd.stream_name, []).append(row)
 | 
			
		||||
            self.pending_batches.setdefault(stream_name, []).append(row)
 | 
			
		||||
        else:
 | 
			
		||||
            # Check if this is the last of a batch of updates
 | 
			
		||||
            rows = self.pending_batches.pop(cmd.stream_name, [])
 | 
			
		||||
            rows = self.pending_batches.pop(stream_name, [])
 | 
			
		||||
            rows.append(row)
 | 
			
		||||
 | 
			
		||||
            self.handler.on_rdata(cmd.stream_name, cmd.token, rows)
 | 
			
		||||
            self.handler.on_rdata(stream_name, cmd.token, rows)
 | 
			
		||||
 | 
			
		||||
    def on_POSITION(self, cmd):
 | 
			
		||||
        self.handler.on_position(cmd.stream_name, cmd.token)
 | 
			
		||||
@ -644,3 +647,9 @@ metrics.register_callback(
 | 
			
		||||
    },
 | 
			
		||||
    labels=["command", "name", "conn_id"],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
# number of updates received for each RDATA stream
 | 
			
		||||
inbound_rdata_count = metrics.register_counter(
 | 
			
		||||
    "inbound_rdata_count",
 | 
			
		||||
    labels=["stream_name"],
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user