mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2026-02-21 23:23:46 +02:00
Within Codeberg we are looking into distributing the database queries, we tried forgejo/forgejo!7212 on several occasions but never got it to work. After a long debugging session in a staging environment I was able to find two bugs that made it impossible for this feature to work: forgejo/docs!1587 which resulted in replica engines never being configured and used if you followed the documentation. The other bug is what this patch intends to fix. In order to do some database operation, you need the database engine - it will first look if one is set for the context (only useful for transactions) and otherwise create a new session of the engine from the master engine `x`. The problem is that `x` is explicitly set to be the master engine and not the engine group (that includes the replica engines) - Unless the code uses `DefaultContext`, which is almost nowhere used after some great refactoring in Gitea to use the passed context, it did not use the replica engines. Get engine from the `DefaultContext` (which is set to the enginegroup) and create a new session from that.20f8572b92/models/db/engine.go (L220-L231)And `SetDefaultEngine` is called from20f8572b92/models/db/engine.go (L212)Where `eng` is the engine group. ## Test 1. Configure database replicas. 2. Start Forgejo. 3. Verify Forgejo loads. 4. Stop the database replicas. 5. Verify Forgejo shows 500 errors. Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/10140 Reviewed-by: Mathieu Fenniak <mfenniak@noreply.codeberg.org> Co-authored-by: Gusted <postmaster@gusted.xyz> Co-committed-by: Gusted <postmaster@gusted.xyz>
419 lines
12 KiB
Go
419 lines
12 KiB
Go
// Copyright 2019 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package db
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
|
|
"xorm.io/builder"
|
|
"xorm.io/xorm"
|
|
)
|
|
|
|
// DefaultContext is the default context to run xorm queries in
|
|
// will be overwritten by Init with HammerContext
|
|
var DefaultContext context.Context
|
|
|
|
// contextKey is a value for use with context.WithValue.
|
|
type contextKey struct {
|
|
name string
|
|
}
|
|
|
|
// enginedContextKey is a context key. It is used with context.Value() to get the current Engined for the context
|
|
var (
|
|
enginedContextKey = &contextKey{"engined"}
|
|
_ Engined = &Context{}
|
|
)
|
|
|
|
// Context represents a db context
|
|
type Context struct {
|
|
context.Context
|
|
e Engine
|
|
transaction bool
|
|
afterCommitHooks []func()
|
|
}
|
|
|
|
func newContext(ctx context.Context, e Engine, transaction bool) *Context {
|
|
return &Context{
|
|
Context: ctx,
|
|
e: e,
|
|
transaction: transaction,
|
|
}
|
|
}
|
|
|
|
// InTransaction if context is in a transaction
|
|
func (ctx *Context) InTransaction() bool {
|
|
return ctx.transaction
|
|
}
|
|
|
|
// Engine returns db engine
|
|
func (ctx *Context) Engine() Engine {
|
|
return ctx.e
|
|
}
|
|
|
|
// Value shadows Value for context.Context but allows us to get ourselves and an Engined object
|
|
func (ctx *Context) Value(key any) any {
|
|
if key == enginedContextKey {
|
|
return ctx
|
|
}
|
|
return ctx.Context.Value(key)
|
|
}
|
|
|
|
// WithContext returns this engine tied to this context
|
|
func (ctx *Context) WithContext(other context.Context) *Context {
|
|
return newContext(ctx, ctx.e.Context(other), ctx.transaction)
|
|
}
|
|
|
|
// Engined structs provide an Engine
|
|
type Engined interface {
|
|
Engine() Engine
|
|
}
|
|
|
|
// GetEngine will get a db Engine from this context or return an Engine restricted to this context
|
|
func GetEngine(ctx context.Context) Engine {
|
|
if e := getEngine(ctx); e != nil {
|
|
return e
|
|
}
|
|
return DefaultContext.(Engined).Engine().Context(ctx)
|
|
}
|
|
|
|
// getEngine will get a db Engine from this context or return nil
|
|
func getEngine(ctx context.Context) Engine {
|
|
if engined, ok := ctx.(Engined); ok {
|
|
return engined.Engine()
|
|
}
|
|
enginedInterface := ctx.Value(enginedContextKey)
|
|
if enginedInterface != nil {
|
|
return enginedInterface.(Engined).Engine()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Committer represents an interface to Commit or Close the Context
|
|
type Committer interface {
|
|
Commit() error
|
|
Close() error
|
|
}
|
|
|
|
// halfCommitter is a wrapper of Committer.
|
|
// It can be closed early, but can't be committed early, it is useful for reusing a transaction.
|
|
type halfCommitter struct {
|
|
committer Committer
|
|
parentCtx context.Context
|
|
txCtx *Context
|
|
committed bool
|
|
}
|
|
|
|
func (c *halfCommitter) Commit() error {
|
|
c.committed = true
|
|
|
|
// Pass hooks installed into txCtx up to parentCtx
|
|
for _, hook := range c.txCtx.afterCommitHooks {
|
|
AfterTx(c.parentCtx, hook)
|
|
}
|
|
|
|
// should do nothing, and the parent committer will commit later
|
|
return nil
|
|
}
|
|
|
|
func (c *halfCommitter) Close() error {
|
|
if c.committed {
|
|
// it's "commit and close", should do nothing, and the parent committer will commit later
|
|
return nil
|
|
}
|
|
|
|
// it's "rollback and close", let the parent committer rollback right now
|
|
return c.committer.Close()
|
|
}
|
|
|
|
// Wraps an xorm.Session with execution of AfterTx hooks
|
|
type hookCommitter struct {
|
|
sess *xorm.Session
|
|
txCtx *Context
|
|
}
|
|
|
|
func (c *hookCommitter) Commit() error {
|
|
err := c.sess.Commit()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, hook := range c.txCtx.afterCommitHooks {
|
|
hook()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *hookCommitter) Close() error {
|
|
return c.sess.Close()
|
|
}
|
|
|
|
// TxContext represents a transaction Context,
|
|
// it will reuse the existing transaction in the parent context or create a new one.
|
|
// Some tips to use:
|
|
//
|
|
// 1 It's always recommended to use `WithTx` in new code instead of `TxContext`, since `WithTx` will handle the transaction automatically.
|
|
// 2. To maintain the old code which uses `TxContext`:
|
|
// a. Always call `Close()` before returning regardless of whether `Commit()` has been called.
|
|
// b. Always call `Commit()` before returning if there are no errors, even if the code did not change any data.
|
|
// c. Remember the `Committer` will be a halfCommitter when a transaction is being reused.
|
|
// So calling `Commit()` will do nothing, but calling `Close()` without calling `Commit()` will rollback the transaction.
|
|
// And all operations submitted by the caller stack will be rollbacked as well, not only the operations in the current function.
|
|
// d. It doesn't mean rollback is forbidden, but always do it only when there is an error, and you do want to rollback.
|
|
func TxContext(parentCtx context.Context) (*Context, Committer, error) {
|
|
if sess, ok := inTransaction(parentCtx); ok {
|
|
txCtx := newContext(parentCtx, sess, true)
|
|
return txCtx, &halfCommitter{committer: sess, parentCtx: parentCtx, txCtx: txCtx}, nil
|
|
}
|
|
|
|
sess := x.NewSession()
|
|
if err := sess.Begin(); err != nil {
|
|
sess.Close()
|
|
return nil, nil, err
|
|
}
|
|
|
|
txCtx := newContext(parentCtx, sess, true)
|
|
return txCtx, &hookCommitter{sess, txCtx}, nil
|
|
}
|
|
|
|
// WithTx represents executing database operations on a transaction, if the transaction exist,
|
|
// this function will reuse it otherwise will create a new one and close it when finished.
|
|
func WithTx(parentCtx context.Context, f func(ctx context.Context) error) error {
|
|
if sess, ok := inTransaction(parentCtx); ok {
|
|
txCtx := newContext(parentCtx, sess, true)
|
|
err := f(txCtx)
|
|
if err != nil {
|
|
// rollback immediately, in case the caller ignores returned error and tries to commit the transaction.
|
|
_ = sess.Close()
|
|
}
|
|
// Pass hooks installed into txCtx up to parentCtx
|
|
for _, hook := range txCtx.afterCommitHooks {
|
|
AfterTx(parentCtx, hook)
|
|
}
|
|
return err
|
|
}
|
|
return txWithNoCheck(parentCtx, f)
|
|
}
|
|
|
|
func txWithNoCheck(parentCtx context.Context, f func(ctx context.Context) error) error {
|
|
sess := x.NewSession()
|
|
defer sess.Close()
|
|
if err := sess.Begin(); err != nil {
|
|
return err
|
|
}
|
|
|
|
txCtx := newContext(parentCtx, sess, true)
|
|
if err := f(txCtx); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := sess.Commit(); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, hook := range txCtx.afterCommitHooks {
|
|
hook()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// AfterTx registers a function to be called after the current transaction commits. If not in a transaction, the
|
|
// function is called immediately. The hook will only be called if the transaction commits successfully; if the
|
|
// transaction rolls back, the hook is discarded.
|
|
func AfterTx(ctx context.Context, hook func()) {
|
|
dbCtx, ok := ctx.(*Context)
|
|
if !ok || !dbCtx.transaction {
|
|
// Not in a db transaction context, run immediately
|
|
hook()
|
|
return
|
|
}
|
|
dbCtx.afterCommitHooks = append(dbCtx.afterCommitHooks, hook)
|
|
}
|
|
|
|
// Insert inserts records into database
|
|
func Insert(ctx context.Context, beans ...any) error {
|
|
_, err := GetEngine(ctx).Insert(beans...)
|
|
return err
|
|
}
|
|
|
|
// Exec executes a sql with args
|
|
func Exec(ctx context.Context, sqlAndArgs ...any) (sql.Result, error) {
|
|
return GetEngine(ctx).Exec(sqlAndArgs...)
|
|
}
|
|
|
|
func Get[T any](ctx context.Context, cond builder.Cond) (object *T, exist bool, err error) {
|
|
if !cond.IsValid() {
|
|
panic("cond is invalid in db.Get(ctx, cond). This should not be possible.")
|
|
}
|
|
|
|
var bean T
|
|
has, err := GetEngine(ctx).Where(cond).NoAutoCondition().Get(&bean)
|
|
if err != nil {
|
|
return nil, false, err
|
|
} else if !has {
|
|
return nil, false, nil
|
|
}
|
|
return &bean, true, nil
|
|
}
|
|
|
|
func GetByID[T any](ctx context.Context, id int64) (object *T, exist bool, err error) {
|
|
var bean T
|
|
has, err := GetEngine(ctx).ID(id).NoAutoCondition().Get(&bean)
|
|
if err != nil {
|
|
return nil, false, err
|
|
} else if !has {
|
|
return nil, false, nil
|
|
}
|
|
return &bean, true, nil
|
|
}
|
|
|
|
func Exist[T any](ctx context.Context, cond builder.Cond) (bool, error) {
|
|
if !cond.IsValid() {
|
|
panic("cond is invalid in db.Exist(ctx, cond). This should not be possible.")
|
|
}
|
|
|
|
var bean T
|
|
return GetEngine(ctx).Where(cond).NoAutoCondition().Exist(&bean)
|
|
}
|
|
|
|
func ExistByID[T any](ctx context.Context, id int64) (bool, error) {
|
|
var bean T
|
|
return GetEngine(ctx).ID(id).NoAutoCondition().Exist(&bean)
|
|
}
|
|
|
|
// DeleteByID deletes the given bean with the given ID
|
|
func DeleteByID[T any](ctx context.Context, id int64) (int64, error) {
|
|
var bean T
|
|
return GetEngine(ctx).ID(id).NoAutoCondition().NoAutoTime().Delete(&bean)
|
|
}
|
|
|
|
func DeleteByIDs[T any](ctx context.Context, ids ...int64) error {
|
|
if len(ids) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var bean T
|
|
_, err := GetEngine(ctx).In("id", ids).NoAutoCondition().NoAutoTime().Delete(&bean)
|
|
return err
|
|
}
|
|
|
|
func Delete[T any](ctx context.Context, opts FindOptions) (int64, error) {
|
|
if opts == nil || !opts.ToConds().IsValid() {
|
|
panic("opts are empty or invalid in db.Delete(ctx, opts). This should not be possible.")
|
|
}
|
|
|
|
var bean T
|
|
return GetEngine(ctx).Where(opts.ToConds()).NoAutoCondition().NoAutoTime().Delete(&bean)
|
|
}
|
|
|
|
// DeleteByBean deletes all records according non-empty fields of the bean as conditions.
|
|
func DeleteByBean(ctx context.Context, bean any) (int64, error) {
|
|
return GetEngine(ctx).Delete(bean)
|
|
}
|
|
|
|
// FindIDs finds the IDs for the given table name satisfying the given condition
|
|
// By passing a different value than "id" for "idCol", you can query for foreign IDs, i.e. the repo IDs which satisfy the condition
|
|
func FindIDs(ctx context.Context, tableName, idCol string, cond builder.Cond) ([]int64, error) {
|
|
ids := make([]int64, 0, 10)
|
|
if err := GetEngine(ctx).Table(tableName).
|
|
Cols(idCol).
|
|
Where(cond).
|
|
Find(&ids); err != nil {
|
|
return nil, err
|
|
}
|
|
return ids, nil
|
|
}
|
|
|
|
// DecrByIDs decreases the given column for entities of the "bean" type with one of the given ids by one
|
|
// Timestamps of the entities won't be updated
|
|
func DecrByIDs(ctx context.Context, ids []int64, decrCol string, bean any) error {
|
|
if len(ids) == 0 {
|
|
return nil
|
|
}
|
|
_, err := GetEngine(ctx).Decr(decrCol).In("id", ids).NoAutoCondition().NoAutoTime().Update(bean)
|
|
return err
|
|
}
|
|
|
|
// DeleteBeans deletes all given beans, beans must contain delete conditions.
|
|
func DeleteBeans(ctx context.Context, beans ...any) (err error) {
|
|
e := GetEngine(ctx)
|
|
for i := range beans {
|
|
if _, err = e.Delete(beans[i]); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TruncateBeans deletes all given beans, beans may contain delete conditions.
|
|
func TruncateBeans(ctx context.Context, beans ...any) (err error) {
|
|
e := GetEngine(ctx)
|
|
for i := range beans {
|
|
if _, err = e.Truncate(beans[i]); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TruncateBeansCascade deletes all given beans. Beans MUST NOT contain delete conditions, as tables related by foreign
|
|
// keys will also be truncated.
|
|
func TruncateBeansCascade(ctx context.Context, beans ...any) (err error) {
|
|
// Expand the list of beans to any other table with a foreign key reference to the beans
|
|
cascadeTables, err := extendBeansForCascade(beans)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Sort the beans in inverse foreign key delete order
|
|
cascadeSorted, err := sortBeans(cascadeTables, foreignKeySortDelete)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Execute the truncate
|
|
e := GetEngine(ctx)
|
|
for i := range cascadeSorted {
|
|
if _, err = e.Truncate(cascadeSorted[i]); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CountByBean counts the number of database records according non-empty fields of the bean as conditions.
|
|
func CountByBean(ctx context.Context, bean any) (int64, error) {
|
|
return GetEngine(ctx).Count(bean)
|
|
}
|
|
|
|
// TableName returns the table name according a bean object
|
|
func TableName(bean any) string {
|
|
return x.TableName(bean)
|
|
}
|
|
|
|
// InTransaction returns true if the engine is in a transaction otherwise return false
|
|
func InTransaction(ctx context.Context) bool {
|
|
_, ok := inTransaction(ctx)
|
|
return ok
|
|
}
|
|
|
|
func inTransaction(ctx context.Context) (*xorm.Session, bool) {
|
|
e := getEngine(ctx)
|
|
if e == nil {
|
|
return nil, false
|
|
}
|
|
|
|
switch t := e.(type) {
|
|
case *xorm.Engine:
|
|
return nil, false
|
|
case *xorm.Session:
|
|
if t.IsInTx() {
|
|
return t, true
|
|
}
|
|
return nil, false
|
|
default:
|
|
return nil, false
|
|
}
|
|
}
|