mirror of
				https://github.com/matrix-org/synapse.git
				synced 2025-11-04 10:11:05 +01:00 
			
		
		
		
	Merge pull request #2214 from matrix-org/rav/hurry_up_purge
When purging, don't de-delta state groups we're about to delete
This commit is contained in:
		
						commit
						29ded770b1
					
				@ -2033,6 +2033,8 @@ class EventsStore(SQLBaseStore):
 | 
				
			|||||||
        for event_id, state_key in event_rows:
 | 
					        for event_id, state_key in event_rows:
 | 
				
			||||||
            txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
 | 
					            txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        logger.debug("[purge] Finding new backward extremities")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # We calculate the new entries for the backward extremeties by finding
 | 
					        # We calculate the new entries for the backward extremeties by finding
 | 
				
			||||||
        # all events that point to events that are to be purged
 | 
					        # all events that point to events that are to be purged
 | 
				
			||||||
        txn.execute(
 | 
					        txn.execute(
 | 
				
			||||||
@ -2045,6 +2047,8 @@ class EventsStore(SQLBaseStore):
 | 
				
			|||||||
        )
 | 
					        )
 | 
				
			||||||
        new_backwards_extrems = txn.fetchall()
 | 
					        new_backwards_extrems = txn.fetchall()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        txn.execute(
 | 
					        txn.execute(
 | 
				
			||||||
            "DELETE FROM event_backward_extremities WHERE room_id = ?",
 | 
					            "DELETE FROM event_backward_extremities WHERE room_id = ?",
 | 
				
			||||||
            (room_id,)
 | 
					            (room_id,)
 | 
				
			||||||
@ -2059,6 +2063,8 @@ class EventsStore(SQLBaseStore):
 | 
				
			|||||||
            ]
 | 
					            ]
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        logger.debug("[purge] finding redundant state groups")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Get all state groups that are only referenced by events that are
 | 
					        # Get all state groups that are only referenced by events that are
 | 
				
			||||||
        # to be deleted.
 | 
					        # to be deleted.
 | 
				
			||||||
        txn.execute(
 | 
					        txn.execute(
 | 
				
			||||||
@ -2074,15 +2080,19 @@ class EventsStore(SQLBaseStore):
 | 
				
			|||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        state_rows = txn.fetchall()
 | 
					        state_rows = txn.fetchall()
 | 
				
			||||||
        state_groups_to_delete = [sg for sg, in state_rows]
 | 
					
 | 
				
			||||||
 | 
					        # make a set of the redundant state groups, so that we can look them up
 | 
				
			||||||
 | 
					        # efficiently
 | 
				
			||||||
 | 
					        state_groups_to_delete = set([sg for sg, in state_rows])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Now we get all the state groups that rely on these state groups
 | 
					        # Now we get all the state groups that rely on these state groups
 | 
				
			||||||
        new_state_edges = []
 | 
					        logger.debug("[purge] finding state groups which depend on redundant"
 | 
				
			||||||
        chunks = [
 | 
					                     " state groups")
 | 
				
			||||||
            state_groups_to_delete[i:i + 100]
 | 
					        remaining_state_groups = []
 | 
				
			||||||
            for i in xrange(0, len(state_groups_to_delete), 100)
 | 
					        for i in xrange(0, len(state_rows), 100):
 | 
				
			||||||
        ]
 | 
					            chunk = [sg for sg, in state_rows[i:i + 100]]
 | 
				
			||||||
        for chunk in chunks:
 | 
					            # look for state groups whose prev_state_group is one we are about
 | 
				
			||||||
 | 
					            # to delete
 | 
				
			||||||
            rows = self._simple_select_many_txn(
 | 
					            rows = self._simple_select_many_txn(
 | 
				
			||||||
                txn,
 | 
					                txn,
 | 
				
			||||||
                table="state_group_edges",
 | 
					                table="state_group_edges",
 | 
				
			||||||
@ -2091,21 +2101,28 @@ class EventsStore(SQLBaseStore):
 | 
				
			|||||||
                retcols=["state_group"],
 | 
					                retcols=["state_group"],
 | 
				
			||||||
                keyvalues={},
 | 
					                keyvalues={},
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
            new_state_edges.extend(row["state_group"] for row in rows)
 | 
					            remaining_state_groups.extend(
 | 
				
			||||||
 | 
					                row["state_group"] for row in rows
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Now we turn the state groups that reference to-be-deleted state groups
 | 
					                # exclude state groups we are about to delete: no point in
 | 
				
			||||||
        # to non delta versions.
 | 
					                # updating them
 | 
				
			||||||
        for new_state_edge in new_state_edges:
 | 
					                if row["state_group"] not in state_groups_to_delete
 | 
				
			||||||
            curr_state = self._get_state_groups_from_groups_txn(
 | 
					 | 
				
			||||||
                txn, [new_state_edge], types=None
 | 
					 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
            curr_state = curr_state[new_state_edge]
 | 
					
 | 
				
			||||||
 | 
					        # Now we turn the state groups that reference to-be-deleted state
 | 
				
			||||||
 | 
					        # groups to non delta versions.
 | 
				
			||||||
 | 
					        for sg in remaining_state_groups:
 | 
				
			||||||
 | 
					            logger.debug("[purge] de-delta-ing remaining state group %s", sg)
 | 
				
			||||||
 | 
					            curr_state = self._get_state_groups_from_groups_txn(
 | 
				
			||||||
 | 
					                txn, [sg], types=None
 | 
				
			||||||
 | 
					            )
 | 
				
			||||||
 | 
					            curr_state = curr_state[sg]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            self._simple_delete_txn(
 | 
					            self._simple_delete_txn(
 | 
				
			||||||
                txn,
 | 
					                txn,
 | 
				
			||||||
                table="state_groups_state",
 | 
					                table="state_groups_state",
 | 
				
			||||||
                keyvalues={
 | 
					                keyvalues={
 | 
				
			||||||
                    "state_group": new_state_edge,
 | 
					                    "state_group": sg,
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -2113,7 +2130,7 @@ class EventsStore(SQLBaseStore):
 | 
				
			|||||||
                txn,
 | 
					                txn,
 | 
				
			||||||
                table="state_group_edges",
 | 
					                table="state_group_edges",
 | 
				
			||||||
                keyvalues={
 | 
					                keyvalues={
 | 
				
			||||||
                    "state_group": new_state_edge,
 | 
					                    "state_group": sg,
 | 
				
			||||||
                }
 | 
					                }
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -2122,7 +2139,7 @@ class EventsStore(SQLBaseStore):
 | 
				
			|||||||
                table="state_groups_state",
 | 
					                table="state_groups_state",
 | 
				
			||||||
                values=[
 | 
					                values=[
 | 
				
			||||||
                    {
 | 
					                    {
 | 
				
			||||||
                        "state_group": new_state_edge,
 | 
					                        "state_group": sg,
 | 
				
			||||||
                        "room_id": room_id,
 | 
					                        "room_id": room_id,
 | 
				
			||||||
                        "type": key[0],
 | 
					                        "type": key[0],
 | 
				
			||||||
                        "state_key": key[1],
 | 
					                        "state_key": key[1],
 | 
				
			||||||
@ -2132,6 +2149,7 @@ class EventsStore(SQLBaseStore):
 | 
				
			|||||||
                ],
 | 
					                ],
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        logger.debug("[purge] removing redundant state groups")
 | 
				
			||||||
        txn.executemany(
 | 
					        txn.executemany(
 | 
				
			||||||
            "DELETE FROM state_groups_state WHERE state_group = ?",
 | 
					            "DELETE FROM state_groups_state WHERE state_group = ?",
 | 
				
			||||||
            state_rows
 | 
					            state_rows
 | 
				
			||||||
@ -2140,12 +2158,15 @@ class EventsStore(SQLBaseStore):
 | 
				
			|||||||
            "DELETE FROM state_groups WHERE id = ?",
 | 
					            "DELETE FROM state_groups WHERE id = ?",
 | 
				
			||||||
            state_rows
 | 
					            state_rows
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Delete all non-state
 | 
					        # Delete all non-state
 | 
				
			||||||
 | 
					        logger.debug("[purge] removing events from event_to_state_groups")
 | 
				
			||||||
        txn.executemany(
 | 
					        txn.executemany(
 | 
				
			||||||
            "DELETE FROM event_to_state_groups WHERE event_id = ?",
 | 
					            "DELETE FROM event_to_state_groups WHERE event_id = ?",
 | 
				
			||||||
            [(event_id,) for event_id, _ in event_rows]
 | 
					            [(event_id,) for event_id, _ in event_rows]
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        logger.debug("[purge] updating room_depth")
 | 
				
			||||||
        txn.execute(
 | 
					        txn.execute(
 | 
				
			||||||
            "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
 | 
					            "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
 | 
				
			||||||
            (topological_ordering, room_id,)
 | 
					            (topological_ordering, room_id,)
 | 
				
			||||||
@ -2171,16 +2192,15 @@ class EventsStore(SQLBaseStore):
 | 
				
			|||||||
            "event_signatures",
 | 
					            "event_signatures",
 | 
				
			||||||
            "rejections",
 | 
					            "rejections",
 | 
				
			||||||
        ):
 | 
					        ):
 | 
				
			||||||
 | 
					            logger.debug("[purge] removing non-state events from %s", table)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
            txn.executemany(
 | 
					            txn.executemany(
 | 
				
			||||||
                "DELETE FROM %s WHERE event_id = ?" % (table,),
 | 
					                "DELETE FROM %s WHERE event_id = ?" % (table,),
 | 
				
			||||||
                to_delete
 | 
					                to_delete
 | 
				
			||||||
            )
 | 
					            )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        txn.executemany(
 | 
					 | 
				
			||||||
            "DELETE FROM events WHERE event_id = ?",
 | 
					 | 
				
			||||||
            to_delete
 | 
					 | 
				
			||||||
        )
 | 
					 | 
				
			||||||
        # Mark all state and own events as outliers
 | 
					        # Mark all state and own events as outliers
 | 
				
			||||||
 | 
					        logger.debug("[purge] marking events as outliers")
 | 
				
			||||||
        txn.executemany(
 | 
					        txn.executemany(
 | 
				
			||||||
            "UPDATE events SET outlier = ?"
 | 
					            "UPDATE events SET outlier = ?"
 | 
				
			||||||
            " WHERE event_id = ?",
 | 
					            " WHERE event_id = ?",
 | 
				
			||||||
@ -2190,6 +2210,8 @@ class EventsStore(SQLBaseStore):
 | 
				
			|||||||
            ]
 | 
					            ]
 | 
				
			||||||
        )
 | 
					        )
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					        logger.debug("[purge] done")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    @defer.inlineCallbacks
 | 
					    @defer.inlineCallbacks
 | 
				
			||||||
    def is_event_after(self, event_id1, event_id2):
 | 
					    def is_event_after(self, event_id1, event_id2):
 | 
				
			||||||
        """Returns True if event_id1 is after event_id2 in the stream
 | 
					        """Returns True if event_id1 is after event_id2 in the stream
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user