mirror of
				https://github.com/matrix-org/synapse.git
				synced 2025-11-04 02:01:03 +01:00 
			
		
		
		
	Merge pull request #2924 from matrix-org/erikj/split_stream_store
Split out stream store
This commit is contained in:
		
						commit
						92789199a9
					
				@ -22,9 +22,8 @@ from synapse.storage.event_push_actions import EventPushActionsWorkerStore
 | 
			
		||||
from synapse.storage.events_worker import EventsWorkerStore
 | 
			
		||||
from synapse.storage.roommember import RoomMemberWorkerStore
 | 
			
		||||
from synapse.storage.state import StateGroupWorkerStore
 | 
			
		||||
from synapse.storage.stream import StreamStore
 | 
			
		||||
from synapse.storage.stream import StreamWorkerStore
 | 
			
		||||
from synapse.storage.signatures import SignatureStore
 | 
			
		||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
 | 
			
		||||
from ._base import BaseSlavedStore
 | 
			
		||||
from ._slaved_id_tracker import SlavedIdTracker
 | 
			
		||||
 | 
			
		||||
@ -41,33 +40,18 @@ logger = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
 | 
			
		||||
                       EventsWorkerStore, StateGroupWorkerStore,
 | 
			
		||||
                       StreamWorkerStore, EventsWorkerStore, StateGroupWorkerStore,
 | 
			
		||||
                       BaseSlavedStore):
 | 
			
		||||
 | 
			
		||||
    def __init__(self, db_conn, hs):
 | 
			
		||||
        super(SlavedEventStore, self).__init__(db_conn, hs)
 | 
			
		||||
        self._stream_id_gen = SlavedIdTracker(
 | 
			
		||||
            db_conn, "events", "stream_ordering",
 | 
			
		||||
        )
 | 
			
		||||
        self._backfill_id_gen = SlavedIdTracker(
 | 
			
		||||
            db_conn, "events", "stream_ordering", step=-1
 | 
			
		||||
        )
 | 
			
		||||
        events_max = self._stream_id_gen.get_current_token()
 | 
			
		||||
        event_cache_prefill, min_event_val = self._get_cache_dict(
 | 
			
		||||
            db_conn, "events",
 | 
			
		||||
            entity_column="room_id",
 | 
			
		||||
            stream_column="stream_ordering",
 | 
			
		||||
            max_value=events_max,
 | 
			
		||||
        )
 | 
			
		||||
        self._events_stream_cache = StreamChangeCache(
 | 
			
		||||
            "EventsRoomStreamChangeCache", min_event_val,
 | 
			
		||||
            prefilled_cache=event_cache_prefill,
 | 
			
		||||
        )
 | 
			
		||||
        self._membership_stream_cache = StreamChangeCache(
 | 
			
		||||
            "MembershipStreamChangeCache", events_max,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self._stream_order_on_start = self.get_room_max_stream_ordering()
 | 
			
		||||
        super(SlavedEventStore, self).__init__(db_conn, hs)
 | 
			
		||||
 | 
			
		||||
    # Cached functions can't be accessed through a class instance so we need
 | 
			
		||||
    # to reach inside the __dict__ to extract them.
 | 
			
		||||
@ -75,30 +59,6 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
 | 
			
		||||
        "get_latest_event_ids_in_room"
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
    get_recent_event_ids_for_room = (
 | 
			
		||||
        StreamStore.__dict__["get_recent_event_ids_for_room"]
 | 
			
		||||
    )
 | 
			
		||||
    has_room_changed_since = DataStore.has_room_changed_since.__func__
 | 
			
		||||
 | 
			
		||||
    get_membership_changes_for_user = (
 | 
			
		||||
        DataStore.get_membership_changes_for_user.__func__
 | 
			
		||||
    )
 | 
			
		||||
    get_room_events_max_id = DataStore.get_room_events_max_id.__func__
 | 
			
		||||
    get_room_events_stream_for_room = (
 | 
			
		||||
        DataStore.get_room_events_stream_for_room.__func__
 | 
			
		||||
    )
 | 
			
		||||
    get_events_around = DataStore.get_events_around.__func__
 | 
			
		||||
 | 
			
		||||
    get_recent_events_for_room = DataStore.get_recent_events_for_room.__func__
 | 
			
		||||
    get_room_events_stream_for_rooms = (
 | 
			
		||||
        DataStore.get_room_events_stream_for_rooms.__func__
 | 
			
		||||
    )
 | 
			
		||||
    get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__
 | 
			
		||||
 | 
			
		||||
    _set_before_and_after = staticmethod(DataStore._set_before_and_after)
 | 
			
		||||
 | 
			
		||||
    _get_events_around_txn = DataStore._get_events_around_txn.__func__
 | 
			
		||||
 | 
			
		||||
    get_backfill_events = DataStore.get_backfill_events.__func__
 | 
			
		||||
    _get_backfill_events = DataStore._get_backfill_events.__func__
 | 
			
		||||
    get_missing_events = DataStore.get_missing_events.__func__
 | 
			
		||||
@ -119,8 +79,11 @@ class SlavedEventStore(RoomMemberWorkerStore, EventPushActionsWorkerStore,
 | 
			
		||||
 | 
			
		||||
    get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__
 | 
			
		||||
 | 
			
		||||
    get_federation_out_pos = DataStore.get_federation_out_pos.__func__
 | 
			
		||||
    update_federation_out_pos = DataStore.update_federation_out_pos.__func__
 | 
			
		||||
    def get_room_max_stream_ordering(self):
 | 
			
		||||
        return self._stream_id_gen.get_current_token()
 | 
			
		||||
 | 
			
		||||
    def get_room_min_stream_ordering(self):
 | 
			
		||||
        return self._backfill_id_gen.get_current_token()
 | 
			
		||||
 | 
			
		||||
    get_latest_event_ids_and_hashes_in_room = (
 | 
			
		||||
        DataStore.get_latest_event_ids_and_hashes_in_room.__func__
 | 
			
		||||
 | 
			
		||||
@ -140,22 +140,6 @@ class DataStore(RoomMemberStore, RoomStore,
 | 
			
		||||
        else:
 | 
			
		||||
            self._cache_id_gen = None
 | 
			
		||||
 | 
			
		||||
        events_max = self._stream_id_gen.get_current_token()
 | 
			
		||||
        event_cache_prefill, min_event_val = self._get_cache_dict(
 | 
			
		||||
            db_conn, "events",
 | 
			
		||||
            entity_column="room_id",
 | 
			
		||||
            stream_column="stream_ordering",
 | 
			
		||||
            max_value=events_max,
 | 
			
		||||
        )
 | 
			
		||||
        self._events_stream_cache = StreamChangeCache(
 | 
			
		||||
            "EventsRoomStreamChangeCache", min_event_val,
 | 
			
		||||
            prefilled_cache=event_cache_prefill,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self._membership_stream_cache = StreamChangeCache(
 | 
			
		||||
            "MembershipStreamChangeCache", events_max,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self._presence_on_startup = self._get_active_presence(db_conn)
 | 
			
		||||
 | 
			
		||||
        presence_cache_prefill, min_presence_val = self._get_cache_dict(
 | 
			
		||||
@ -203,6 +187,7 @@ class DataStore(RoomMemberStore, RoomStore,
 | 
			
		||||
            "DeviceListFederationStreamChangeCache", device_list_max,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        events_max = self._stream_id_gen.get_current_token()
 | 
			
		||||
        curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict(
 | 
			
		||||
            db_conn, "current_state_delta_stream",
 | 
			
		||||
            entity_column="room_id",
 | 
			
		||||
 | 
			
		||||
@ -35,13 +35,17 @@ what sort order was used:
 | 
			
		||||
 | 
			
		||||
from twisted.internet import defer
 | 
			
		||||
 | 
			
		||||
from ._base import SQLBaseStore
 | 
			
		||||
from synapse.storage._base import SQLBaseStore
 | 
			
		||||
from synapse.storage.events import EventsWorkerStore
 | 
			
		||||
 | 
			
		||||
from synapse.util.caches.descriptors import cached
 | 
			
		||||
from synapse.api.constants import EventTypes
 | 
			
		||||
from synapse.types import RoomStreamToken
 | 
			
		||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
 | 
			
		||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
 | 
			
		||||
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 | 
			
		||||
 | 
			
		||||
import abc
 | 
			
		||||
import logging
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -143,81 +147,41 @@ def filter_to_clause(event_filter):
 | 
			
		||||
    return " AND ".join(clauses), args
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class StreamStore(SQLBaseStore):
 | 
			
		||||
    @defer.inlineCallbacks
 | 
			
		||||
    def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
 | 
			
		||||
        # NB this lives here instead of appservice.py so we can reuse the
 | 
			
		||||
        # 'private' StreamToken class in this file.
 | 
			
		||||
        if limit:
 | 
			
		||||
            limit = max(limit, MAX_STREAM_SIZE)
 | 
			
		||||
        else:
 | 
			
		||||
            limit = MAX_STREAM_SIZE
 | 
			
		||||
class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 | 
			
		||||
    """This is an abstract base class where subclasses must implement
 | 
			
		||||
    `get_room_max_stream_ordering` and `get_room_min_stream_ordering`
 | 
			
		||||
    which can be called in the initializer.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
        # From and to keys should be integers from ordering.
 | 
			
		||||
        from_id = RoomStreamToken.parse_stream_token(from_key)
 | 
			
		||||
        to_id = RoomStreamToken.parse_stream_token(to_key)
 | 
			
		||||
    __metaclass__ = abc.ABCMeta
 | 
			
		||||
 | 
			
		||||
        if from_key == to_key:
 | 
			
		||||
            defer.returnValue(([], to_key))
 | 
			
		||||
            return
 | 
			
		||||
    def __init__(self, db_conn, hs):
 | 
			
		||||
        super(StreamWorkerStore, self).__init__(db_conn, hs)
 | 
			
		||||
 | 
			
		||||
        # select all the events between from/to with a sensible limit
 | 
			
		||||
        sql = (
 | 
			
		||||
            "SELECT e.event_id, e.room_id, e.type, s.state_key, "
 | 
			
		||||
            "e.stream_ordering FROM events AS e "
 | 
			
		||||
            "LEFT JOIN state_events as s ON "
 | 
			
		||||
            "e.event_id = s.event_id "
 | 
			
		||||
            "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
 | 
			
		||||
            "ORDER BY stream_ordering ASC LIMIT %(limit)d "
 | 
			
		||||
        ) % {
 | 
			
		||||
            "limit": limit
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        def f(txn):
 | 
			
		||||
            # pull out all the events between the tokens
 | 
			
		||||
            txn.execute(sql, (from_id.stream, to_id.stream,))
 | 
			
		||||
            rows = self.cursor_to_dict(txn)
 | 
			
		||||
 | 
			
		||||
            # Logic:
 | 
			
		||||
            #  - We want ALL events which match the AS room_id regex
 | 
			
		||||
            #  - We want ALL events which match the rooms represented by the AS
 | 
			
		||||
            #    room_alias regex
 | 
			
		||||
            #  - We want ALL events for rooms that AS users have joined.
 | 
			
		||||
            # This is currently supported via get_app_service_rooms (which is
 | 
			
		||||
            # used for the Notifier listener rooms). We can't reasonably make a
 | 
			
		||||
            # SQL query for these room IDs, so we'll pull all the events between
 | 
			
		||||
            # from/to and filter in python.
 | 
			
		||||
            rooms_for_as = self._get_app_service_rooms_txn(txn, service)
 | 
			
		||||
            room_ids_for_as = [r.room_id for r in rooms_for_as]
 | 
			
		||||
 | 
			
		||||
            def app_service_interested(row):
 | 
			
		||||
                if row["room_id"] in room_ids_for_as:
 | 
			
		||||
                    return True
 | 
			
		||||
 | 
			
		||||
                if row["type"] == EventTypes.Member:
 | 
			
		||||
                    if service.is_interested_in_user(row.get("state_key")):
 | 
			
		||||
                        return True
 | 
			
		||||
                return False
 | 
			
		||||
 | 
			
		||||
            return [r for r in rows if app_service_interested(r)]
 | 
			
		||||
 | 
			
		||||
        rows = yield self.runInteraction("get_appservice_room_stream", f)
 | 
			
		||||
 | 
			
		||||
        ret = yield self._get_events(
 | 
			
		||||
            [r["event_id"] for r in rows],
 | 
			
		||||
            get_prev_content=True
 | 
			
		||||
        events_max = self.get_room_max_stream_ordering()
 | 
			
		||||
        event_cache_prefill, min_event_val = self._get_cache_dict(
 | 
			
		||||
            db_conn, "events",
 | 
			
		||||
            entity_column="room_id",
 | 
			
		||||
            stream_column="stream_ordering",
 | 
			
		||||
            max_value=events_max,
 | 
			
		||||
        )
 | 
			
		||||
        self._events_stream_cache = StreamChangeCache(
 | 
			
		||||
            "EventsRoomStreamChangeCache", min_event_val,
 | 
			
		||||
            prefilled_cache=event_cache_prefill,
 | 
			
		||||
        )
 | 
			
		||||
        self._membership_stream_cache = StreamChangeCache(
 | 
			
		||||
            "MembershipStreamChangeCache", events_max,
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self._set_before_and_after(ret, rows, topo_order=from_id is None)
 | 
			
		||||
        self._stream_order_on_start = self.get_room_max_stream_ordering()
 | 
			
		||||
 | 
			
		||||
        if rows:
 | 
			
		||||
            key = "s%d" % max(r["stream_ordering"] for r in rows)
 | 
			
		||||
        else:
 | 
			
		||||
            # Assume we didn't get anything because there was nothing to
 | 
			
		||||
            # get.
 | 
			
		||||
            key = to_key
 | 
			
		||||
    @abc.abstractmethod
 | 
			
		||||
    def get_room_max_stream_ordering(self):
 | 
			
		||||
        raise NotImplementedError()
 | 
			
		||||
 | 
			
		||||
        defer.returnValue((ret, key))
 | 
			
		||||
    @abc.abstractmethod
 | 
			
		||||
    def get_room_min_stream_ordering(self):
 | 
			
		||||
        raise NotImplementedError()
 | 
			
		||||
 | 
			
		||||
    @defer.inlineCallbacks
 | 
			
		||||
    def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
 | 
			
		||||
@ -380,88 +344,6 @@ class StreamStore(SQLBaseStore):
 | 
			
		||||
 | 
			
		||||
        defer.returnValue(ret)
 | 
			
		||||
 | 
			
		||||
    @defer.inlineCallbacks
 | 
			
		||||
    def paginate_room_events(self, room_id, from_key, to_key=None,
 | 
			
		||||
                             direction='b', limit=-1, event_filter=None):
 | 
			
		||||
        # Tokens really represent positions between elements, but we use
 | 
			
		||||
        # the convention of pointing to the event before the gap. Hence
 | 
			
		||||
        # we have a bit of asymmetry when it comes to equalities.
 | 
			
		||||
        args = [False, room_id]
 | 
			
		||||
        if direction == 'b':
 | 
			
		||||
            order = "DESC"
 | 
			
		||||
            bounds = upper_bound(
 | 
			
		||||
                RoomStreamToken.parse(from_key), self.database_engine
 | 
			
		||||
            )
 | 
			
		||||
            if to_key:
 | 
			
		||||
                bounds = "%s AND %s" % (bounds, lower_bound(
 | 
			
		||||
                    RoomStreamToken.parse(to_key), self.database_engine
 | 
			
		||||
                ))
 | 
			
		||||
        else:
 | 
			
		||||
            order = "ASC"
 | 
			
		||||
            bounds = lower_bound(
 | 
			
		||||
                RoomStreamToken.parse(from_key), self.database_engine
 | 
			
		||||
            )
 | 
			
		||||
            if to_key:
 | 
			
		||||
                bounds = "%s AND %s" % (bounds, upper_bound(
 | 
			
		||||
                    RoomStreamToken.parse(to_key), self.database_engine
 | 
			
		||||
                ))
 | 
			
		||||
 | 
			
		||||
        filter_clause, filter_args = filter_to_clause(event_filter)
 | 
			
		||||
 | 
			
		||||
        if filter_clause:
 | 
			
		||||
            bounds += " AND " + filter_clause
 | 
			
		||||
            args.extend(filter_args)
 | 
			
		||||
 | 
			
		||||
        if int(limit) > 0:
 | 
			
		||||
            args.append(int(limit))
 | 
			
		||||
            limit_str = " LIMIT ?"
 | 
			
		||||
        else:
 | 
			
		||||
            limit_str = ""
 | 
			
		||||
 | 
			
		||||
        sql = (
 | 
			
		||||
            "SELECT * FROM events"
 | 
			
		||||
            " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
 | 
			
		||||
            " ORDER BY topological_ordering %(order)s,"
 | 
			
		||||
            " stream_ordering %(order)s %(limit)s"
 | 
			
		||||
        ) % {
 | 
			
		||||
            "bounds": bounds,
 | 
			
		||||
            "order": order,
 | 
			
		||||
            "limit": limit_str
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        def f(txn):
 | 
			
		||||
            txn.execute(sql, args)
 | 
			
		||||
 | 
			
		||||
            rows = self.cursor_to_dict(txn)
 | 
			
		||||
 | 
			
		||||
            if rows:
 | 
			
		||||
                topo = rows[-1]["topological_ordering"]
 | 
			
		||||
                toke = rows[-1]["stream_ordering"]
 | 
			
		||||
                if direction == 'b':
 | 
			
		||||
                    # Tokens are positions between events.
 | 
			
		||||
                    # This token points *after* the last event in the chunk.
 | 
			
		||||
                    # We need it to point to the event before it in the chunk
 | 
			
		||||
                    # when we are going backwards so we subtract one from the
 | 
			
		||||
                    # stream part.
 | 
			
		||||
                    toke -= 1
 | 
			
		||||
                next_token = str(RoomStreamToken(topo, toke))
 | 
			
		||||
            else:
 | 
			
		||||
                # TODO (erikj): We should work out what to do here instead.
 | 
			
		||||
                next_token = to_key if to_key else from_key
 | 
			
		||||
 | 
			
		||||
            return rows, next_token,
 | 
			
		||||
 | 
			
		||||
        rows, token = yield self.runInteraction("paginate_room_events", f)
 | 
			
		||||
 | 
			
		||||
        events = yield self._get_events(
 | 
			
		||||
            [r["event_id"] for r in rows],
 | 
			
		||||
            get_prev_content=True
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self._set_before_and_after(events, rows)
 | 
			
		||||
 | 
			
		||||
        defer.returnValue((events, token))
 | 
			
		||||
 | 
			
		||||
    @defer.inlineCallbacks
 | 
			
		||||
    def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
 | 
			
		||||
        rows, token = yield self.get_recent_event_ids_for_room(
 | 
			
		||||
@ -542,7 +424,7 @@ class StreamStore(SQLBaseStore):
 | 
			
		||||
        `room_id` causes it to return the current room specific topological
 | 
			
		||||
        token.
 | 
			
		||||
        """
 | 
			
		||||
        token = yield self._stream_id_gen.get_current_token()
 | 
			
		||||
        token = yield self.get_room_max_stream_ordering()
 | 
			
		||||
        if room_id is None:
 | 
			
		||||
            defer.returnValue("s%d" % (token,))
 | 
			
		||||
        else:
 | 
			
		||||
@ -552,12 +434,6 @@ class StreamStore(SQLBaseStore):
 | 
			
		||||
            )
 | 
			
		||||
            defer.returnValue("t%d-%d" % (topo, token))
 | 
			
		||||
 | 
			
		||||
    def get_room_max_stream_ordering(self):
 | 
			
		||||
        return self._stream_id_gen.get_current_token()
 | 
			
		||||
 | 
			
		||||
    def get_room_min_stream_ordering(self):
 | 
			
		||||
        return self._backfill_id_gen.get_current_token()
 | 
			
		||||
 | 
			
		||||
    def get_stream_token_for_event(self, event_id):
 | 
			
		||||
        """The stream token for an event
 | 
			
		||||
        Args:
 | 
			
		||||
@ -832,3 +708,168 @@ class StreamStore(SQLBaseStore):
 | 
			
		||||
 | 
			
		||||
    def has_room_changed_since(self, room_id, stream_id):
 | 
			
		||||
        return self._events_stream_cache.has_entity_changed(room_id, stream_id)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class StreamStore(StreamWorkerStore):
 | 
			
		||||
    def get_room_max_stream_ordering(self):
 | 
			
		||||
        return self._stream_id_gen.get_current_token()
 | 
			
		||||
 | 
			
		||||
    def get_room_min_stream_ordering(self):
 | 
			
		||||
        return self._backfill_id_gen.get_current_token()
 | 
			
		||||
 | 
			
		||||
    @defer.inlineCallbacks
 | 
			
		||||
    def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
 | 
			
		||||
        # NB this lives here instead of appservice.py so we can reuse the
 | 
			
		||||
        # 'private' StreamToken class in this file.
 | 
			
		||||
        if limit:
 | 
			
		||||
            limit = max(limit, MAX_STREAM_SIZE)
 | 
			
		||||
        else:
 | 
			
		||||
            limit = MAX_STREAM_SIZE
 | 
			
		||||
 | 
			
		||||
        # From and to keys should be integers from ordering.
 | 
			
		||||
        from_id = RoomStreamToken.parse_stream_token(from_key)
 | 
			
		||||
        to_id = RoomStreamToken.parse_stream_token(to_key)
 | 
			
		||||
 | 
			
		||||
        if from_key == to_key:
 | 
			
		||||
            defer.returnValue(([], to_key))
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        # select all the events between from/to with a sensible limit
 | 
			
		||||
        sql = (
 | 
			
		||||
            "SELECT e.event_id, e.room_id, e.type, s.state_key, "
 | 
			
		||||
            "e.stream_ordering FROM events AS e "
 | 
			
		||||
            "LEFT JOIN state_events as s ON "
 | 
			
		||||
            "e.event_id = s.event_id "
 | 
			
		||||
            "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
 | 
			
		||||
            "ORDER BY stream_ordering ASC LIMIT %(limit)d "
 | 
			
		||||
        ) % {
 | 
			
		||||
            "limit": limit
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        def f(txn):
 | 
			
		||||
            # pull out all the events between the tokens
 | 
			
		||||
            txn.execute(sql, (from_id.stream, to_id.stream,))
 | 
			
		||||
            rows = self.cursor_to_dict(txn)
 | 
			
		||||
 | 
			
		||||
            # Logic:
 | 
			
		||||
            #  - We want ALL events which match the AS room_id regex
 | 
			
		||||
            #  - We want ALL events which match the rooms represented by the AS
 | 
			
		||||
            #    room_alias regex
 | 
			
		||||
            #  - We want ALL events for rooms that AS users have joined.
 | 
			
		||||
            # This is currently supported via get_app_service_rooms (which is
 | 
			
		||||
            # used for the Notifier listener rooms). We can't reasonably make a
 | 
			
		||||
            # SQL query for these room IDs, so we'll pull all the events between
 | 
			
		||||
            # from/to and filter in python.
 | 
			
		||||
            rooms_for_as = self._get_app_service_rooms_txn(txn, service)
 | 
			
		||||
            room_ids_for_as = [r.room_id for r in rooms_for_as]
 | 
			
		||||
 | 
			
		||||
            def app_service_interested(row):
 | 
			
		||||
                if row["room_id"] in room_ids_for_as:
 | 
			
		||||
                    return True
 | 
			
		||||
 | 
			
		||||
                if row["type"] == EventTypes.Member:
 | 
			
		||||
                    if service.is_interested_in_user(row.get("state_key")):
 | 
			
		||||
                        return True
 | 
			
		||||
                return False
 | 
			
		||||
 | 
			
		||||
            return [r for r in rows if app_service_interested(r)]
 | 
			
		||||
 | 
			
		||||
        rows = yield self.runInteraction("get_appservice_room_stream", f)
 | 
			
		||||
 | 
			
		||||
        ret = yield self._get_events(
 | 
			
		||||
            [r["event_id"] for r in rows],
 | 
			
		||||
            get_prev_content=True
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self._set_before_and_after(ret, rows, topo_order=from_id is None)
 | 
			
		||||
 | 
			
		||||
        if rows:
 | 
			
		||||
            key = "s%d" % max(r["stream_ordering"] for r in rows)
 | 
			
		||||
        else:
 | 
			
		||||
            # Assume we didn't get anything because there was nothing to
 | 
			
		||||
            # get.
 | 
			
		||||
            key = to_key
 | 
			
		||||
 | 
			
		||||
        defer.returnValue((ret, key))
 | 
			
		||||
 | 
			
		||||
    @defer.inlineCallbacks
 | 
			
		||||
    def paginate_room_events(self, room_id, from_key, to_key=None,
 | 
			
		||||
                             direction='b', limit=-1, event_filter=None):
 | 
			
		||||
        # Tokens really represent positions between elements, but we use
 | 
			
		||||
        # the convention of pointing to the event before the gap. Hence
 | 
			
		||||
        # we have a bit of asymmetry when it comes to equalities.
 | 
			
		||||
        args = [False, room_id]
 | 
			
		||||
        if direction == 'b':
 | 
			
		||||
            order = "DESC"
 | 
			
		||||
            bounds = upper_bound(
 | 
			
		||||
                RoomStreamToken.parse(from_key), self.database_engine
 | 
			
		||||
            )
 | 
			
		||||
            if to_key:
 | 
			
		||||
                bounds = "%s AND %s" % (bounds, lower_bound(
 | 
			
		||||
                    RoomStreamToken.parse(to_key), self.database_engine
 | 
			
		||||
                ))
 | 
			
		||||
        else:
 | 
			
		||||
            order = "ASC"
 | 
			
		||||
            bounds = lower_bound(
 | 
			
		||||
                RoomStreamToken.parse(from_key), self.database_engine
 | 
			
		||||
            )
 | 
			
		||||
            if to_key:
 | 
			
		||||
                bounds = "%s AND %s" % (bounds, upper_bound(
 | 
			
		||||
                    RoomStreamToken.parse(to_key), self.database_engine
 | 
			
		||||
                ))
 | 
			
		||||
 | 
			
		||||
        filter_clause, filter_args = filter_to_clause(event_filter)
 | 
			
		||||
 | 
			
		||||
        if filter_clause:
 | 
			
		||||
            bounds += " AND " + filter_clause
 | 
			
		||||
            args.extend(filter_args)
 | 
			
		||||
 | 
			
		||||
        if int(limit) > 0:
 | 
			
		||||
            args.append(int(limit))
 | 
			
		||||
            limit_str = " LIMIT ?"
 | 
			
		||||
        else:
 | 
			
		||||
            limit_str = ""
 | 
			
		||||
 | 
			
		||||
        sql = (
 | 
			
		||||
            "SELECT * FROM events"
 | 
			
		||||
            " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
 | 
			
		||||
            " ORDER BY topological_ordering %(order)s,"
 | 
			
		||||
            " stream_ordering %(order)s %(limit)s"
 | 
			
		||||
        ) % {
 | 
			
		||||
            "bounds": bounds,
 | 
			
		||||
            "order": order,
 | 
			
		||||
            "limit": limit_str
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        def f(txn):
 | 
			
		||||
            txn.execute(sql, args)
 | 
			
		||||
 | 
			
		||||
            rows = self.cursor_to_dict(txn)
 | 
			
		||||
 | 
			
		||||
            if rows:
 | 
			
		||||
                topo = rows[-1]["topological_ordering"]
 | 
			
		||||
                toke = rows[-1]["stream_ordering"]
 | 
			
		||||
                if direction == 'b':
 | 
			
		||||
                    # Tokens are positions between events.
 | 
			
		||||
                    # This token points *after* the last event in the chunk.
 | 
			
		||||
                    # We need it to point to the event before it in the chunk
 | 
			
		||||
                    # when we are going backwards so we subtract one from the
 | 
			
		||||
                    # stream part.
 | 
			
		||||
                    toke -= 1
 | 
			
		||||
                next_token = str(RoomStreamToken(topo, toke))
 | 
			
		||||
            else:
 | 
			
		||||
                # TODO (erikj): We should work out what to do here instead.
 | 
			
		||||
                next_token = to_key if to_key else from_key
 | 
			
		||||
 | 
			
		||||
            return rows, next_token,
 | 
			
		||||
 | 
			
		||||
        rows, token = yield self.runInteraction("paginate_room_events", f)
 | 
			
		||||
 | 
			
		||||
        events = yield self._get_events(
 | 
			
		||||
            [r["event_id"] for r in rows],
 | 
			
		||||
            get_prev_content=True
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        self._set_before_and_after(events, rows)
 | 
			
		||||
 | 
			
		||||
        defer.returnValue((events, token))
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user