mirror of
				https://github.com/matrix-org/synapse.git
				synced 2025-11-04 10:11:05 +01:00 
			
		
		
		
	Merge pull request #2833 from matrix-org/rav/pusher_hacks
Logging and metrics for the http pusher
This commit is contained in:
		
						commit
						d1fe4db882
					
				@ -13,21 +13,30 @@
 | 
				
			|||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
					# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
				
			||||||
# See the License for the specific language governing permissions and
 | 
					# See the License for the specific language governing permissions and
 | 
				
			||||||
# limitations under the License.
 | 
					# limitations under the License.
 | 
				
			||||||
 | 
					import logging
 | 
				
			||||||
from synapse.push import PusherConfigException
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
from twisted.internet import defer, reactor
 | 
					from twisted.internet import defer, reactor
 | 
				
			||||||
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 | 
					from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import logging
 | 
					 | 
				
			||||||
import push_rule_evaluator
 | 
					import push_rule_evaluator
 | 
				
			||||||
import push_tools
 | 
					import push_tools
 | 
				
			||||||
 | 
					import synapse
 | 
				
			||||||
 | 
					from synapse.push import PusherConfigException
 | 
				
			||||||
from synapse.util.logcontext import LoggingContext
 | 
					from synapse.util.logcontext import LoggingContext
 | 
				
			||||||
from synapse.util.metrics import Measure
 | 
					from synapse.util.metrics import Measure
 | 
				
			||||||
 | 
					
 | 
				
			||||||
logger = logging.getLogger(__name__)
 | 
					logger = logging.getLogger(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					metrics = synapse.metrics.get_metrics_for(__name__)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					http_push_processed_counter = metrics.register_counter(
 | 
				
			||||||
 | 
					    "http_pushes_processed",
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					http_push_failed_counter = metrics.register_counter(
 | 
				
			||||||
 | 
					    "http_pushes_failed",
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
class HttpPusher(object):
 | 
					class HttpPusher(object):
 | 
				
			||||||
    INITIAL_BACKOFF_SEC = 1  # in seconds because that's what Twisted takes
 | 
					    INITIAL_BACKOFF_SEC = 1  # in seconds because that's what Twisted takes
 | 
				
			||||||
@ -152,9 +161,16 @@ class HttpPusher(object):
 | 
				
			|||||||
            self.user_id, self.last_stream_ordering, self.max_stream_ordering
 | 
					            self.user_id, self.last_stream_ordering, self.max_stream_ordering
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        logger.info(
 | 
				
			||||||
 | 
					            "Processing %i unprocessed push actions for %s starting at "
 | 
				
			||||||
 | 
					            "stream_ordering %s",
 | 
				
			||||||
 | 
					            len(unprocessed), self.name, self.last_stream_ordering,
 | 
				
			||||||
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        for push_action in unprocessed:
 | 
					        for push_action in unprocessed:
 | 
				
			||||||
            processed = yield self._process_one(push_action)
 | 
					            processed = yield self._process_one(push_action)
 | 
				
			||||||
            if processed:
 | 
					            if processed:
 | 
				
			||||||
 | 
					                http_push_processed_counter.inc()
 | 
				
			||||||
                self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
 | 
					                self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
 | 
				
			||||||
                self.last_stream_ordering = push_action['stream_ordering']
 | 
					                self.last_stream_ordering = push_action['stream_ordering']
 | 
				
			||||||
                yield self.store.update_pusher_last_stream_ordering_and_success(
 | 
					                yield self.store.update_pusher_last_stream_ordering_and_success(
 | 
				
			||||||
@ -169,6 +185,7 @@ class HttpPusher(object):
 | 
				
			|||||||
                        self.failing_since
 | 
					                        self.failing_since
 | 
				
			||||||
                    )
 | 
					                    )
 | 
				
			||||||
            else:
 | 
					            else:
 | 
				
			||||||
 | 
					                http_push_failed_counter.inc()
 | 
				
			||||||
                if not self.failing_since:
 | 
					                if not self.failing_since:
 | 
				
			||||||
                    self.failing_since = self.clock.time_msec()
 | 
					                    self.failing_since = self.clock.time_msec()
 | 
				
			||||||
                    yield self.store.update_pusher_failing_since(
 | 
					                    yield self.store.update_pusher_failing_since(
 | 
				
			||||||
@ -316,7 +333,10 @@ class HttpPusher(object):
 | 
				
			|||||||
        try:
 | 
					        try:
 | 
				
			||||||
            resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
 | 
					            resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
 | 
				
			||||||
        except Exception:
 | 
					        except Exception:
 | 
				
			||||||
            logger.warn("Failed to push %s ", self.url)
 | 
					            logger.warn(
 | 
				
			||||||
 | 
					                "Failed to push event %s to %s",
 | 
				
			||||||
 | 
					                event.event_id, self.name, exc_info=True,
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
            defer.returnValue(False)
 | 
					            defer.returnValue(False)
 | 
				
			||||||
        rejected = []
 | 
					        rejected = []
 | 
				
			||||||
        if 'rejected' in resp:
 | 
					        if 'rejected' in resp:
 | 
				
			||||||
@ -325,7 +345,7 @@ class HttpPusher(object):
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    @defer.inlineCallbacks
 | 
					    @defer.inlineCallbacks
 | 
				
			||||||
    def _send_badge(self, badge):
 | 
					    def _send_badge(self, badge):
 | 
				
			||||||
        logger.info("Sending updated badge count %d to %r", badge, self.user_id)
 | 
					        logger.info("Sending updated badge count %d to %s", badge, self.name)
 | 
				
			||||||
        d = {
 | 
					        d = {
 | 
				
			||||||
            'notification': {
 | 
					            'notification': {
 | 
				
			||||||
                'id': '',
 | 
					                'id': '',
 | 
				
			||||||
@ -347,7 +367,10 @@ class HttpPusher(object):
 | 
				
			|||||||
        try:
 | 
					        try:
 | 
				
			||||||
            resp = yield self.http_client.post_json_get_json(self.url, d)
 | 
					            resp = yield self.http_client.post_json_get_json(self.url, d)
 | 
				
			||||||
        except Exception:
 | 
					        except Exception:
 | 
				
			||||||
            logger.exception("Failed to push %s ", self.url)
 | 
					            logger.warn(
 | 
				
			||||||
 | 
					                "Failed to send badge count to %s",
 | 
				
			||||||
 | 
					                self.name, exc_info=True,
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
            defer.returnValue(False)
 | 
					            defer.returnValue(False)
 | 
				
			||||||
        rejected = []
 | 
					        rejected = []
 | 
				
			||||||
        if 'rejected' in resp:
 | 
					        if 'rejected' in resp:
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user