mongodb使用debezium

前置

服务器上需要安装jdk11
jdk下载地址

kafka安装

官网下载地址

安装教程

debezium 安装

运行 Debezium 连接器需要 Java 11 或更高版本
Debezium 并不是一个独立的软件,而是很多个 Kafka 连接器的总称。这些 Kafka 连接器分别对应不同的数据库,比如 MySQL、Oracle 等。按 Kafka 连接器的常见命名规则,可能我们会把它们叫做 MySQL Kafka Source Connector 之类。

部署

1.下载对应版本的debezium插件

插件地址
在这里插入图片描述
在这里插入图片描述

2.文件解压

将下载的文件解压,将解压后的文件放到kafka的plugin文件夹下(该plugin文件夹为自己创建的plugin文件夹)*,例如
在这里插入图片描述

3. 通过 kafka connect部署

kafka connect有两种部署方式,一是单机部署,二是分布式部署。单机部署配置kafka/config/connect-standalone.properties 文件,分布式部署则配置kafka/config/connect-distributed.properties。分布式部署支持通过rest api管理connector

此处是单机部署,配置文件为kafka/config/connect-standalone.properties,主要修改以下内容:

# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
plugin.path=/usr/local/kafka/plugin

4.启动kafka-connect

需要先启动kafka

bin/connect-standalone.sh config/connect-standalone.properties

5.创建对应的debezium配置文件

在这里插入图片描述

curl -X POST http://${debezium所在服务器}:8083/connectors

{
	"name": "cdc-connector",
	"config": {
		"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
		"mongodb.connection.string": "mongodb://root:123456@192.168.2.18:27017,192.168.2.19:27017/?authSource=admin",
		"collection.include.list": "db_cdc_1.c_cdc_2",
		"topic.prefix": "mycdc",
		"capture.mode":"change_streams"
	}
}
  • 如果需要在cdc输出的语句上显示before信息,需要开启mongodb版本 6.0 中的新增功能changeStreamPreAndPostImages,并且在capture.mode上使用change_streams_with_pre_image或change_streams_update_full_with_pre_image
  • 如果capture.mode未设置成change_streams_with_pre_imagechange_streams_update_full_with_pre_image的话,在进行删除时cdc输出会没有before信息
    在这里插入图片描述
db.runCommand({
  collMod: "对应的controllerName", 
  changeStreamPreAndPostImages: {
    enabled: true
  } 
})
例如:
use db_cdc_1
db.runCommand({
  collMod: "c_cdc_2", 
  changeStreamPreAndPostImages: {
    enabled: true
  } 
})

{
	"name": "cdc-connector",
	"config": {
		"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
		"mongodb.connection.string": "mongodb://root:123456@192.168.2.18:27017,192.168.2.19:27017/?authSource=admin",
		"collection.include.list": "db_cdc_1.c_cdc_2",
		"topic.prefix": "mycdc",
		"capture.mode":"change_streams_with_pre_image"
	}
}

在这里插入图片描述

重点参数

参数描述
connector.class固定值io.debezium.connector.mongodb.MongoDbConnector
mongodb.connection.stringmongodb连接信息
collection.include.list需要监听的具体collection
topic.prefixkafkaTopic前缀
capture.mode输出模式

capture.mode

模式描述
change_streams输出变化流,但是在进行update操作时,不输出after字段
change_streams_update_full在change_streams的基础上,增加after字段,用于输出现在变化后的数据的内容
change_streams_with_pre_image在change_streams的基础上,增加before字段的输出,但需要进行配置
change_streams_update_full_with_pre_image在change_streams_with_pre_image的基础上增加,增加after字段,用于输出现在变化后的数据的内容

其他未使用参数

参数描述
database.include.list需要监听的具体database
database.exclude.list不监听的database(不要与database.include.list填写相同的db)
collection.exclude.list不监听的collection(不要与collection.include.list填写相同的collection)

更多参数请参考
在这里插入图片描述

cdc结果

原数据

{
  "userId": "1000000",
  "allPoints": 190,
  "createTime": {
    "$date": "2024-04-25T13:31:59.678Z"
  },
  "updateTime": {
    "$date": "2024-04-25T13:31:59.678Z"
  }
}

添加数据

在这里插入图片描述
在这里插入图片描述
capture.mode两种模式输出结果一样

push数据

{
  $push: {
	"history":{
      "historyId": "1",
      "changerPoints": 0,
      "beforePoints": 0,
      "afterPoints": 0,
      "status": "0",
      "createTime": {
        "$date": "2024-01-01T16:00:00.000Z"
      },
      "comment": "测试数据",
      "versionNo": 0
    }
  },
}

在这里插入图片描述

第一次添加

{"payload": {
    "before": null,
    "after": null,
    "updateDescription": {
      "removedFields": null,
      "updatedFields": "{\"history\": [{\"historyId\": \"1\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}]}",
      "truncatedArrays": null
    }}

在这里插入图片描述

第二次添加

在这里插入图片描述

{"payload": {
    "before": null,
    "after": null,
    "updateDescription": {
      "removedFields": null,
      "updatedFields": "{\"history.1\": {\"historyId\": \"2\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}}",
      "truncatedArrays": null
    }}

在这里插入图片描述

如果capture.modechange_streams_update_full,则会在after字段上显示现在修改的这条数据的完整数据,例如
在这里插入图片描述

修改数组中的值

在这里插入图片描述

{"payload": {
    "before": null,
    "after": null,
    "updateDescription": {
      "removedFields": null,
      "updatedFields": "{\"history.1.historyId\": \"100\"}",
      "truncatedArrays": null
    }}

在这里插入图片描述
如果capture.modechange_streams_update_full,则会在after字段上显示现在修改的这条数据现有的完整数据,例如
在这里插入图片描述

pull操作

在这里插入图片描述

{
  $pull: {
    history: {
      historyId: "100",
    },
  },
}

在这里插入图片描述
此时会把现有的所有数据都返回

{"payload": {
    "before": null,
    "after": null,
    "updateDescription": {
      "removedFields": null,
      "updatedFields": "{\"history\": [{\"historyId\": \"1\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}, {\"historyId\": \"2\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}, {\"historyId\": \"3\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}, {\"historyId\": \"4\", \"changerPoints\": 0, \"beforePoints\": 0, \"afterPoints\": 0, \"status\": \"0\", \"createTime\": {\"$date\": \"2024-01-01T16:00:00.000Z\"}, \"comment\": \"测试数据\", \"versionNo\": 0}]}",
      "truncatedArrays": null
    }}

在这里插入图片描述
如果capture.modechange_streams_update_full,则会在after字段上显示现在修改的这条数据现有的完整数据,例如在这里插入图片描述

删除字段

在这里插入图片描述

{  "payload": {
    "before": null,
    "after": null,
    "updateDescription": {
      "removedFields": [
        "updateTime"
      ],
      "updatedFields": "{}",
      "truncatedArrays": null
    }
  }

在这里插入图片描述
如果capture.modechange_streams_update_full,则会在after字段上显示现在修改的这条数据现有的完整数据(此处删除的是另外一个字段),例如
在这里插入图片描述

删除数据

在这里插入图片描述
在这里插入图片描述
如果capture.mode未设置成change_streams_with_pre_imagechange_streams_update_full_with_pre_image的话,在进行删除时cdc输出会没有before信息
在这里插入图片描述
通过开启mongodb版本 6.0 中的新增功能changeStreamPreAndPostImages,并且在capture.mode上使用change_streams_with_pre_image或change_streams_update_full_with_pre_image即可解决

db.runCommand({
  collMod: "对应的controllerName", 
  changeStreamPreAndPostImages: {
    enabled: true
  } 
})
例如:
use db_cdc_1
db.runCommand({
  collMod: "c_cdc_2", 
  changeStreamPreAndPostImages: {
    enabled: true
  } 
})

{
	"name": "cdc-connector",
	"config": {
		"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
		"mongodb.connection.string": "mongodb://root:123456@192.168.2.18:27017,192.168.2.19:27017/?authSource=admin",
		"collection.include.list": "db_cdc_1.c_cdc_2",
		"topic.prefix": "mycdc",
		"capture.mode":"change_streams_with_pre_image"
	}
}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/580944.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

使用Cesium ion将 Sketchfab 3D 模型添加到您的GIS应用中

您现在可以将 Sketchfab 中的 3D 模型导入 Cesium ion 中以创建 3D 块,从而更轻松地为地理空间体验创建上下文和内容。 Sketchfab 是 Epic Games 的一部分,也是使用最广泛的 3D 资产市场之一。自 2012 年推出以来,已有超过 1000 万用户使用 …

2024/4/28 C++day5

有以下类&#xff0c;完成特殊成员函数 class Person { string name; int *age; } class Stu:public Person { const double score; } #include <iostream> #include <string> using namespace std; class Person { string name; int *age ; publi…

Kafka报错ERROR Exiting Kafka due to fatal exception during startup

报错&#xff1a; ERROR Exiting Kafka due to fatal exception during startup. (kafka.Kafka$) kafka.common.InconsistentClusterIdException: The Cluster ID FSzSO50oTLCRhRnRylihcg doesnt match stored clusterId Some(0oSLohwtQZWbIi73YUMs8g) in meta.properties. Th…

手撕红黑树(kv模型模拟)

目录 前言 一、相关概念 二、性质介绍 红黑树平衡说明 三、红黑树模拟&#xff08;kv结构&#xff09; 1、红黑树节点 2、红黑树插入 2、特殊处理情况 声明&#xff1a; 情况一&#xff1a;cur为红&#xff0c;p为红&#xff0c;g为黑&#xff0c;u存在&#xff0c;且…

MyBatis 核心配置讲解(下)

大家好&#xff0c;我是王有志&#xff0c;一个分享硬核 Java 技术的互金摸鱼侠。 我们书接上回&#xff0c;继续聊 MyBatis 的核心配置&#xff0c;我们今天分享剩下的 5 项核心配置。 不过正式开始前&#xff0c;我会先纠正上一篇文章 MyBatis 核心配置讲解&#xff08;上&…

QAnything知识库问答系统离线部署(LLM+RAG)

一、QAnything介绍 &#xff08;一&#xff09;简介 QAnything 是网易有道开源的一个问答系统框架&#xff0c;支持私有化部署和SaaS服务两种调用形式。它能够支持多种格式的文件或数据库&#xff0c;提供准确、快速和可靠的问答体验。目前已支持的文件格式包括PDF、Word、PP…

防火墙对要保护的服务器做端口映射的好处是几个

防火墙对要保护的服务器进行端口映射具有多重好处&#xff0c;这些好处主要围绕网络安全性、灵活性和可管理性展开。以下是对这些好处的专业分析&#xff1a; 1. 增强网络安全性&#xff1a;端口映射允许防火墙对进入服务器的流量进行精确控制。通过映射特定端口&#xff0c;防…

FPGA秋招-笔记整理(3)无符号数、有符号数

参考&#xff1a;Verilog学习笔记——有符号数的乘法和加法 一、无符号数、有符号数 将输入输出全部定义为有符号数 &#xff08;1&#xff09;无符号数的读取按照原码进行&#xff0c;有符号数的读取应该按照补码读取&#xff0c;计算规则为去掉符号位后取反、加1在计算数值…

Flink学习(九)-jar 包提交给 flink 集群执行

一、界面执行 1&#xff0c;点击左侧的 submit new job&#xff0c;然后点击add New 2&#xff0c;粘贴程序入口&#xff0c;设置并行度 3&#xff0c;执行后&#xff0c;就可以在 taskManager 中找到相关任务了 二、控制台执行 在命令行中&#xff0c;在flink 的安装目录下&…

【Java】关于异常你需要知道的事情

文章目录 异常体系异常声明捕获多个异常Java中的哪些异常&#xff0c;程序不用捕获处理&#xff1f;【重要】try with resource 异常处理流程foreach中遇到异常面试题try和finally中都由return 异常体系 异常声明 如果声明的是Exception&#xff0c;那么必须要处理如果声明的是…

基于SpringBoot的合家云社区物业管理平台 - 项目介绍

合家云社区物业管理平台 2.合家云需求&设计 2.1 项目概述 2.1.1 项目介绍 合家云社区物业管理平台是一个全新的 ”智慧物业解决方案“&#xff0c;是一款互联网的专业社区物业管理系统。平台通过社区资产管理、小区管理、访客管理、在线报修、意见投诉等多种功能模块&a…

CSS详解(一)

1、css工作中使用场景 美化网页&#xff08;文字样式、背景样式、边框样式、盒子模型、定位、动画、&#xff09;&#xff0c;布局页面&#xff08;flex布局、响应式布局、媒体查询&#xff09; 2、CSS 规则 通常由两个主要部分组成选择器和样式声明 2.1选择器 选择器指定了…

Opencv | 边缘提取

目录 一. 边缘检测1. 边缘的定义2. Sobel算子 边缘提取3. Scharr算子 边缘提取4. Laplacian算子 边缘提取5. Canny 边缘检测算法5.1 计算梯度的强度及方向5.2 非极大值抑制5.3 双阈值检测5.4 抑制孤立弱边缘 二. 轮廓信息1. 获取轮廓信息2. 画轮廓 一. 边缘检测 1. 边缘的定义…

号卡流量卡分销推广系统源码

这是一个多功能的流量卡推广分销系统PHP源码&#xff0c;它是一套完善的、功能丰富的号卡分销系统&#xff0c;拥有多个接口&#xff0c;包括运营商接口&#xff0c;以及无限三级代理。这是目前市面上最优雅的号卡系统&#xff0c;没有之一。 软件架构说明&#xff1a; 环境要求…

网络原理(qq消息发送原理)

1.网络初识 IP地址 概念&#xff1a; IP地址主要⽤于标识⽹络主机、其他⽹络设备&#xff08;如路由器&#xff09;的⽹络地址。简单说&#xff0c;IP地址⽤于定位主机的⽹络地址。 就像我们发送快递⼀样&#xff0c;需要知道对⽅的收货地址&#xff0c;快递员才能将包裹送到…

多模态视觉大模型(2): 常用模型介绍(CLIP和LLAVA)

文章目录 1.CLIP 讲解1.1 clip 预训练过程1.2 利用clip进行图像分类1.3 CLIP代码详解1.3.1 Image Encoder 和 Text Encoder的实现1.3.2 搭建CLIP模型1.3.3 准备数据1.3.4 Loss的定义1.4 完整代码2.GLIP 讲解2.1 GLIP 介绍2.2 GLIP 网络结构3.Flamingo3.1 模型介绍3.2 Loss 定义…

远程控制软件优化(1)

远程控制软件优化&#xff08;1&#xff09; 第一版存在以下缺点&#xff1a; 1、四大部分中 Robot States 部分过于简陋&#xff0c;不适合放到论文中 2、Lidar BEV 图像显示效果非常差&#xff0c;显示不全且很稀疏 3、视频流传输延时过高&#xff0c;无法实现远程控制 以…

基于OpenMV 双轴机械臂 机器学习

文章目录 一、项目简要二、目标追踪1. 色块识别与最大色块筛选2. PID位置闭环 三、机器学习1. Device12. Device2 四、效果演示 一、项目简要 两套二维云台设备&#xff0c;Device1通过摄像头捕捉目标物块点位进行实时追踪&#xff0c;再将自身点位传到Device2&#xff0c;Dev…

【力扣周赛】第394场周赛

文章目录 1.统计特殊字母的数量2.使矩阵满足条件的最少操作次数 1.统计特殊字母的数量 题目链接 &#x1f34e;该题涉及的小技巧&#xff1a;&#x1f425; &#x1f427;①大写字母和对应的小写字母低5位都是相等的&#xff1b; &#x1f427;②大写字母ASCII二进制第 6 位…

node.js + @elastic/elasticsearch 操作elasticsearch数据库

我这边node.js 使用的是 koa2&#xff0c;elasticsearch是8.11.1版本 官网&#xff1a;https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/getting-started-js.html 一、elastic/elasticsearch 连接 elasticsearch数据库 如果elasticsearch没有设…