範例輸出外掛程式可以在 PostgreSQL 原始程式碼樹的 contrib/test_decoding
子目錄中找到。
輸出外掛程式透過動態載入共用函式庫來載入,其中輸出外掛程式的名稱作為函式庫的基本名稱。會使用正常的函式庫搜尋路徑來尋找函式庫。為了提供必要的輸出外掛程式回呼,並表明該函式庫實際上是輸出外掛程式,它需要提供一個名為 _PG_output_plugin_init
的函數。此函數會傳遞一個結構,該結構需要填入個別動作的回呼函數指標。
typedef struct OutputPluginCallbacks { LogicalDecodeStartupCB startup_cb; LogicalDecodeBeginCB begin_cb; LogicalDecodeChangeCB change_cb; LogicalDecodeTruncateCB truncate_cb; LogicalDecodeCommitCB commit_cb; LogicalDecodeMessageCB message_cb; LogicalDecodeFilterByOriginCB filter_by_origin_cb; LogicalDecodeShutdownCB shutdown_cb; LogicalDecodeFilterPrepareCB filter_prepare_cb; LogicalDecodeBeginPrepareCB begin_prepare_cb; LogicalDecodePrepareCB prepare_cb; LogicalDecodeCommitPreparedCB commit_prepared_cb; LogicalDecodeRollbackPreparedCB rollback_prepared_cb; LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; LogicalDecodeStreamPrepareCB stream_prepare_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; LogicalDecodeStreamTruncateCB stream_truncate_cb; } OutputPluginCallbacks; typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
begin_cb
、change_cb
和 commit_cb
回呼是必要的,而 startup_cb
、truncate_cb
、message_cb
、filter_by_origin_cb
和 shutdown_cb
則是選用的。如果未設定 truncate_cb
,但要解碼 TRUNCATE
,則該動作將被忽略。
輸出外掛程式也可以定義函數來支援大型、進行中交易的串流。stream_start_cb
、stream_stop_cb
、stream_abort_cb
、stream_commit_cb
和 stream_change_cb
是必要的,而 stream_message_cb
和 stream_truncate_cb
則是選用的。如果輸出外掛程式也支援兩階段提交,則也需要 stream_prepare_cb
。
輸出外掛程式也可以定義函數來支援兩階段提交,這允許在 PREPARE TRANSACTION
上解碼動作。 begin_prepare_cb
、prepare_cb
、commit_prepared_cb
和 rollback_prepared_cb
回呼是必要的,而 filter_prepare_cb
則是選用的。如果輸出外掛程式也支援大型進行中交易的串流,則也需要 stream_prepare_cb
。
為了對變更進行解碼、格式化和輸出,輸出外掛程式可以使用後端的大部分正常基礎架構,包括呼叫輸出函數。 允許唯讀存取關係,只要僅存取由 initdb
在 pg_catalog
結構描述中建立的關係,或已使用下列標記為使用者提供的目錄資料表的關係:
ALTER TABLE user_catalog_table SET (user_catalog_table = true); CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
請注意,在輸出外掛程式中存取使用者目錄資料表或一般系統目錄資料表只能透過 systable_*
掃描 API 進行。透過 heap_*
掃描 API 進行存取將會產生錯誤。此外,禁止任何導致交易 ID 指派的動作。除其他外,這包括寫入資料表、執行 DDL 變更和呼叫 pg_current_xact_id()
。
輸出外掛程式回呼可以將資料以幾乎任意格式傳遞給消費者。對於某些使用案例,例如透過 SQL 查看變更,以可以包含任意資料的資料類型(例如 bytea
)傳回資料是很麻煩的。如果輸出外掛程式僅以伺服器的編碼輸出文字資料,則可以透過在 啟動回呼 中將 OutputPluginOptions.output_type
設定為 OUTPUT_PLUGIN_TEXTUAL_OUTPUT
而不是 OUTPUT_PLUGIN_BINARY_OUTPUT
來宣告。在這種情況下,所有資料都必須採用伺服器的編碼,以便 text
資料可以包含它。這在啟用斷言的版本中進行檢查。
輸出外掛程式會透過它需要提供的各種回呼來收到有關正在發生的變更的通知。
並行交易會按照提交順序進行解碼,並且只有屬於特定交易的變更會在 begin
和 commit
回呼之間進行解碼。 已明確或隱含回滾的交易永遠不會被解碼。 成功的儲存點會按照它們在該交易中執行的順序合併到包含它們的交易中。 如果提供了對它們進行解碼所需的輸出外掛程式回呼,則也會解碼使用 PREPARE TRANSACTION
準備進行兩階段提交的交易。 可能會出現正在解碼的目前已準備交易透過 ROLLBACK PREPARED
命令同時中止的情況。 在這種情況下,此交易的邏輯解碼也會中止。 一旦偵測到中止並呼叫 prepare_cb
回呼,就會略過此類交易的所有變更。 因此,即使在同時中止的情況下,也為輸出外掛程式提供了足夠的資訊,以便在解碼 ROLLBACK PREPARED
後正確處理它。
僅會解碼已安全刷新到磁碟的交易。 當 synchronous_commit
設定為 off
時,這可能會導致在直接跟隨的 pg_logical_slot_get_changes()
中不會立即解碼 COMMIT
。
每當建立複寫槽或要求串流變更時,都會呼叫選用的 startup_cb
回呼,無論準備輸出的變更數量如何。
typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx, OutputPluginOptions *options, bool is_init);
當正在建立複寫槽時,is_init
參數將為 true,否則為 false。options
指向輸出外掛程式可以設定的選項結構
typedef struct OutputPluginOptions { OutputPluginOutputType output_type; bool receive_rewrites; } OutputPluginOptions;
output_type
必須設定為 OUTPUT_PLUGIN_TEXTUAL_OUTPUT
或 OUTPUT_PLUGIN_BINARY_OUTPUT
。另請參閱第 47.6.3 節。如果 receive_rewrites
為 true,則也會為某些 DDL 操作期間由堆積重寫所做的變更呼叫輸出外掛程式。這些對於處理 DDL 複寫的外掛程式很有用,但它們需要特殊處理。
啟動回呼應驗證 ctx->output_plugin_options
中存在的選項。如果輸出外掛程式需要有狀態,則可以使用 ctx->output_plugin_private
來儲存它。
可選的 shutdown_cb
回呼會在不再使用先前啟用的複寫槽時呼叫,可用於釋放輸出外掛程式私有的資源。該槽不一定會被刪除,只是停止串流。
typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
每當解碼已提交交易的開始時,都會呼叫所需的 begin_cb
回呼。已中止的交易及其內容永遠不會被解碼。
typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
txn
參數包含有關交易的中繼資訊,例如其提交時間戳記及其 XID。
每當解碼交易提交時,都會呼叫所需的 commit_cb
回呼。如果存在任何已修改的列,則在此之前將呼叫所有已修改列的 change_cb
回呼。
typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
對於交易中的每個個別列修改,無論是 INSERT
、UPDATE
或 DELETE
,都會呼叫所需的 change_cb
回呼。即使原始指令一次修改多個列,也會為每個列個別呼叫回呼。change_cb
回呼可以存取系統或使用者目錄表,以協助輸出列修改詳細資訊。如果解碼已準備(但尚未提交)的交易或解碼未提交的交易,則由於同時回滾相同的交易,此變更回呼也可能會出錯。在這種情況下,將會優雅地停止中止交易的邏輯解碼。
typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
ctx
和 txn
參數的內容與 begin_cb
和 commit_cb
回呼相同,但此外,關係描述符 relation
指向列所屬的關係,並且傳入一個描述列修改的結構 change
。
只有未記錄(請參閱 UNLOGGED
)且不是暫時的(請參閱 TEMPORARY
或 TEMP
)使用者定義表中的變更才能使用邏輯解碼來提取。
針對 TRUNCATE
指令呼叫可選的 truncate_cb
回呼。
typedef void (*LogicalDecodeTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
這些參數與 change_cb
回呼類似。但是,由於需要一起執行以外鍵連接的表上的 TRUNCATE
動作,因此此回呼會接收關係陣列,而不僅僅是一個關係。有關詳細資訊,請參閱 TRUNCATE 陳述式的描述。
呼叫可選的 filter_by_origin_cb
回呼以確定從 origin_id
重播的資料是否對輸出外掛程式有意義。
typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx, RepOriginId origin_id);
ctx
參數的內容與其他回呼相同。除了來源之外,沒有其他資訊可用。若要表示在傳入節點上發生的變更無關緊要,請傳回 true,這會導致將它們過濾掉;否則傳回 false。對於已過濾掉的交易和變更,將不會呼叫其他回呼。
這在實作串聯或多向複寫解決方案時很有用。依來源篩選可防止在這些設定中來回複寫相同的變更。雖然交易和變更也攜帶有關來源的資訊,但透過此回呼進行篩選的效率明顯更高。
每當解碼邏輯解碼訊息時,就會呼叫可選的 message_cb
回呼。
typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message);
txn
參數包含有關交易的中繼資訊,例如其提交時間戳記及其 XID。但請注意,當訊息是非交易的,並且 XID 尚未在記錄訊息的交易中指派時,它可以是 NULL。lsn
具有訊息的 WAL 位置。transactional
表示訊息是作為交易傳送的還是非交易傳送的。與變更回呼類似,如果解碼已準備(但尚未提交)的交易或解碼未提交的交易,則由於同時回滾相同的交易,此訊息回呼也可能會出錯。在這種情況下,將會優雅地停止中止交易的邏輯解碼。prefix
是任意以空值結尾的前置詞,可用於識別目前外掛程式的相關訊息。最後,message
參數保留了 message_size
大小的實際訊息。
應格外小心,以確保輸出外掛程式認為相關的前置詞是唯一的。通常,使用擴充功能或輸出外掛程式本身的名稱是個不錯的選擇。
呼叫可選的 filter_prepare_cb
回呼,以確定是否應在此準備階段考慮解碼目前兩階段提交交易的一部分資料,還是稍後在 COMMIT PREPARED
時將其作為常規單階段交易。若要表示應跳過解碼,請傳回 true
;否則傳回 false
。當未定義回呼時,會假設為 false
(即不進行篩選,也將分兩個階段解碼使用兩階段提交的所有交易)。
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx, TransactionId xid, const char *gid);
ctx
參數的內容與其他回呼相同。xid
和 gid
參數提供了兩種不同的方式來識別交易。稍後的 COMMIT PREPARED
或 ROLLBACK PREPARED
攜帶這兩個識別碼,讓輸出外掛程式可以選擇使用哪個。
每次交易可能會多次呼叫該回呼進行解碼,並且每次呼叫時都必須為給定的 xid
和 gid
組提供相同的靜態答案。
當已解碼準備好的交易開始時,會呼叫必要的 begin_prepare_cb
回呼函式。 txn
參數中的 gid
欄位可用於此回呼函式中,以檢查外掛程式是否已收到此 PREPARE
,在這種情況下,它可以發生錯誤或跳過交易的其餘變更。
typedef void (*LogicalDecodeBeginPrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
當已解碼為兩階段提交準備的交易時,會呼叫必要的 prepare_cb
回呼函式。 如果有任何修改過的列,則在此之前會呼叫所有修改列的 change_cb
回呼函式。 txn
參數中的 gid
欄位可用於此回呼函式中。
typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
當已解碼 COMMIT PREPARED
交易時,會呼叫必要的 commit_prepared_cb
回呼函式。 txn
參數中的 gid
欄位可用於此回呼函式中。
typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
當已解碼 ROLLBACK PREPARED
交易時,會呼叫必要的 rollback_prepared_cb
回呼函式。 prepare_end_lsn
和 prepare_time
參數可用於檢查外掛程式是否已收到此 PREPARE TRANSACTION
,在這種情況下,它可以套用回滾,否則,它可以跳過回滾操作。單獨使用 gid
是不夠的,因為下游節點可能具有具有相同識別碼的已準備交易。
typedef void (*LogicalDecodeRollbackPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time);
當從進行中的交易開啟一組串流變更時,會呼叫必要的 stream_start_cb
回呼函式。
typedef void (*LogicalDecodeStreamStartCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
當從進行中的交易關閉一組串流變更時,會呼叫必要的 stream_stop_cb
回呼函式。
typedef void (*LogicalDecodeStreamStopCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn);
呼叫必要的 stream_abort_cb
回呼函式以中止先前串流的交易。
typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn);
呼叫 stream_prepare_cb
回呼函式以準備先前串流的交易,作為兩階段提交的一部分。當輸出外掛程式同時支援大型進行中交易的串流和兩階段提交時,才需要此回呼函式。
typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
呼叫必要的 stream_commit_cb
回呼函式以提交先前串流的交易。
typedef void (*LogicalDecodeStreamCommitCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
當在串流變更區塊中傳送變更時(由 stream_start_cb
和 stream_stop_cb
呼叫分隔),會呼叫必要的 stream_change_cb
回呼函式。實際變更不會顯示,因為交易稍後可能會中止,並且我們不會解碼已中止交易的變更。
typedef void (*LogicalDecodeStreamChangeCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change);
當在串流變更區塊中傳送一般訊息時(由 stream_start_cb
和 stream_stop_cb
呼叫分隔),會呼叫選用的 stream_message_cb
回呼函式。不會顯示交易訊息的訊息內容,因為交易稍後可能會中止,並且我們不會解碼已中止交易的變更。
typedef void (*LogicalDecodeStreamMessageCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message);
對於串流變更區塊中的 TRUNCATE
命令(由 stream_start_cb
和 stream_stop_cb
呼叫分隔),會呼叫選用的 stream_truncate_cb
回呼函式。
typedef void (*LogicalDecodeStreamTruncateCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change);
這些參數類似於 stream_change_cb
回呼函式。但是,由於需要一起執行通過外鍵連線的表上的 TRUNCATE
動作,因此此回呼函式接收關係陣列,而不僅僅是一個關係。有關詳細資訊,請參閱 TRUNCATE 語句的描述。
為了實際產生輸出,輸出外掛程式可以在 begin_cb
、commit_cb
或 change_cb
回呼函式內,將資料寫入 ctx->out
中的 StringInfo
輸出緩衝區。在寫入輸出緩衝區之前,必須呼叫 OutputPluginPrepareWrite(ctx, last_write)
,並且在完成寫入緩衝區後,必須呼叫 OutputPluginWrite(ctx, last_write)
以執行寫入。last_write
表示特定寫入是否為回呼函式的最後一次寫入。
以下範例顯示如何將資料輸出到輸出外掛程式的消費者
OutputPluginPrepareWrite(ctx, true); appendStringInfo(ctx->out, "BEGIN %u", txn->xid); OutputPluginWrite(ctx, true);
如果您在文件中看到任何不正確、與特定功能不符或需要進一步澄清的內容,請使用此表格回報文件問題。