The configuration of Debezium source connector has the following properties.
Name
Required
Default
Description
task.class
true
null
A source task class that implemented in Debezium.
database.hostname
true
null
The address of a database server.
database.port
true
null
The port number of a database server.
database.user
true
null
The name of a database user that has the required privileges.
database.password
true
null
The password for a database user that has the required privileges.
database.server.id
true
null
The connector’s identifier that must be unique within a database cluster and similar to the database’s server-id configuration property.
database.server.name
true
null
The logical name of a database server/cluster, which forms a namespace and it is used in all the names of Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used.
database.whitelist
false
null
A list of all databases hosted by this server which is monitored by the connector.
This is optional, and there are other properties for listing databases and tables to include or exclude from monitoring.
key.converter
true
null
The converter provided by Kafka Connect to convert record key.
value.converter
true
null
The converter provided by Kafka Connect to convert record value.
database.history
true
null
The name of the database history class.
database.history.pulsar.topic
true
null
The name of the database history topic where the connector writes and recovers DDL statements.
Note: this topic is for internal use only and should not be used by consumers.
database.history.pulsar.service.url
false
null
Pulsar cluster service URL for history topic.
Note: If database.history.pulsar.service.url is not set, then the database history Pulsar client will use the same client settings as those of the source connector, such as client_auth_plugin and client_auth_params.
pulsar.service.url
true
null
Pulsar cluster service URL.
offset.storage.topic
true
null
Record the last committed offsets that the connector successfully completes.
This config json-with-envelope is valid only for the JsonConverter. It's default value is false, the consumer use the schema Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED),
and the message only consist of payload.
If the config json-with-envelope value is true, the consumer use the schema
Schema.KeyValue(Schema.BYTES, Schema.BYTES, the message consist of schema and payload.
If users select the AvroConverter, then the pulsar consumer should use the schema Schema.KeyValue(Schema.AUTO_CONSUME(), Schema.AUTO_CONSUME(), KeyValueEncodingType.SEPARATED), and the message consist of payload.
The comma-separated list of hostname and port pairs (in the form 'host' or 'host:port') of the MongoDB servers in the replica set. The list contains a single hostname and a port pair. If mongodb.members.auto.discover is set to false, the host and port pair are prefixed with the replica set name (e.g., rs0/localhost:27017).
mongodb.name
true
null
A unique name that identifies the connector and/or MongoDB replica set or shared cluster that this connector monitors. Each server should be monitored by at most one Debezium connector, since this server name prefixes all persisted Kafka topics emanating from the MongoDB replica set or cluster.
mongodb.user
true
null
Name of the database user to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.
mongodb.password
true
null
Password to be used when connecting to MongoDB. This is required only when MongoDB is configured to use authentication.
mongodb.task.id
true
null
The taskId of the MongoDB connector that attempts to use a separate task for each replica set.
Currently, the destination topic (specified by the destination-topic-name option ) is a required configuration but it is not used for the Debezium connector to save data. The Debezium connector saves data in the following 4 types of topics:
One topic named with the database server name ( database.server.name) for storing the database metadata messages, such as public/default/database.server.name.
One topic (database.history.pulsar.topic) for storing the database history information. The connector writes and recovers DDL statements on this topic.
One topic (offset.storage.topic) for storing the offset metadata messages. The connector saves the last successfully-committed offsets on this topic.
One per-table topic. The connector writes change events for all operations that occur in a table to a single Pulsar topic that is specific to that table.
If the automatic topic creation is disabled on your broker, you need to manually create the above 4 types of topics and the destination topic.
Use the YAML configuration file as shown previously.
$ docker run -it--rm\ --name mysqlterm \ --link mysql \ --rm mysql:5.7 sh\ -c'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
A MySQL client pops out.
Change the connection mode to mysql_native_password.
mysql> show variables like "caching_sha2_password_auto_generate_rsa_keys"; +----------------------------------------------+-------+ | Variable_name | Value | +----------------------------------------------+-------+ | caching_sha2_password_auto_generate_rsa_keys | ON | +----------------------------------------------+-------+ # If the value of "caching_sha2_password_auto_generate_rsa_keys" is ON, ensure the plugin of mysql.user is "mysql_native_password". mysql> SELECT Host, User, plugin from mysql.user where user={user}; +-----------+------+-----------------------+ | Host | User | plugin | +-----------+------+-----------------------+ | localhost | root | caching_sha2_password | +-----------+------+-----------------------+ # If the plugin of mysql.user is is "caching_sha2_password", set it to "mysql_native_password". alter user '{user}'@'{host}' identified with mysql_native_password by {password}; # Check the plugin of mysql.user. mysql> SELECT Host, User, plugin from mysql.user where user={user}; +-----------+------+-----------------------+ | Host | User | plugin | +-----------+------+-----------------------+ | localhost | root | mysql_native_password | +-----------+------+-----------------------+
Use the following commands to change the data of the table products.
mysql> use inventory; mysql> show tables; mysql> SELECT * FROM products; mysql> UPDATE products SET name='1111111111' WHERE id=101; mysql> UPDATE products SET name='1111111111' WHERE id=107;
In the terminal window of subscribing topic, you can find the data changes have been kept in the sub-products topic.
You can create a debezium-postgres-source-config.yaml file and copy the contents below to the debezium-postgres-source-config.yaml file.
tenant:"public" namespace:"default" name:"debezium-postgres-source" topicName:"debezium-postgres-topic" archive:"connectors/pulsar-io-debezium-postgres-2.8.4.nar" parallelism:1 configs: ## config for postgres version 10+, official docker image: postgres:<10+> database.hostname:"localhost" database.port:"5432" database.user:"postgres" database.password:"changeme" database.dbname:"postgres" database.server.name:"dbserver1" plugin.name:"pgoutput" schema.whitelist:"public" table.whitelist:"public.users" ## PULSAR_SERVICE_URL_CONFIG pulsar.service.url:"pulsar://127.0.0.1:6650"
Notice that pgoutput is a standard plugin of Postgres introduced in version 10 - see Postgres architecture docu. You don't need to install anything, just make sure the WAL level is set to logical (see docker command below and Postgres docu).
Currently, the destination topic (specified by the destination-topic-name option ) is a required configuration but it is not used for the Debezium connector to save data. The Debezium connector saves data in the following 4 types of topics:
One topic named with the database server name ( database.server.name) for storing the database metadata messages, such as public/default/database.server.name.
One topic (database.history.pulsar.topic) for storing the database history information. The connector writes and recovers DDL statements on this topic.
One topic (offset.storage.topic) for storing the offset metadata messages. The connector saves the last successfully-committed offsets on this topic.
One per-table topic. The connector writes change events for all operations that occur in a table to a single Pulsar topic that is specific to that table.
If the automatic topic creation is disabled on your broker, you need to manually create the above 4 types of topics and the destination topic.
Use the YAML configuration file as shown previously.
Use the following commands to initialize the data.
./usr/local/bin/init-inventory.sh
If the local host cannot access the container network, you can update the file /etc/hosts and add a rule 127.0.0.1 6 f114527a95f. f114527a95f is container id, you can try to get by docker ps -a
Start a Pulsar service locally in standalone mode.
$ bin/pulsar standalone
Start the Pulsar Debezium connector in local run mode using one of the following methods.
Use the JSON configuration file as shown previously.
Make sure the nar file is available at connectors/pulsar-io-mongodb-2.8.4.nar.
Currently, the destination topic (specified by the destination-topic-name option ) is a required configuration but it is not used for the Debezium connector to save data. The Debezium connector saves data in the following 4 types of topics:
One topic named with the database server name ( database.server.name) for storing the database metadata messages, such as public/default/database.server.name.
One topic (database.history.pulsar.topic) for storing the database history information. The connector writes and recovers DDL statements on this topic.
One topic (offset.storage.topic) for storing the offset metadata messages. The connector saves the last successfully-committed offsets on this topic.
One per-table topic. The connector writes change events for all operations that occur in a table to a single Pulsar topic that is specific to that table.
If the automatic topic creation is disabled on your broker, you need to manually create the above 4 types of topics and the destination topic.
Use the YAML configuration file as shown previously.
Debezium postgres connector will hang when create snap
#18 prio=5 os_prio=31 tid=0x00007fd83096f800 nid=0xa403 waiting on condition [0x000070000f534000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000007ab025a58> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingDeque.putLast(LinkedBlockingDeque.java:396) at java.util.concurrent.LinkedBlockingDeque.put(LinkedBlockingDeque.java:649) at io.debezium.connector.base.ChangeEventQueue.enqueue(ChangeEventQueue.java:132) at io.debezium.connector.postgresql.PostgresConnectorTask$Lambda$203/385424085.accept(Unknown Source) at io.debezium.connector.postgresql.RecordsSnapshotProducer.sendCurrentRecord(RecordsSnapshotProducer.java:402) at io.debezium.connector.postgresql.RecordsSnapshotProducer.readTable(RecordsSnapshotProducer.java:321) at io.debezium.connector.postgresql.RecordsSnapshotProducer.lambda$takeSnapshot$6(RecordsSnapshotProducer.java:226) at io.debezium.connector.postgresql.RecordsSnapshotProducer$Lambda$240/1347039967.accept(Unknown Source) at io.debezium.jdbc.JdbcConnection.queryWithBlockingConsumer(JdbcConnection.java:535) at io.debezium.connector.postgresql.RecordsSnapshotProducer.takeSnapshot(RecordsSnapshotProducer.java:224) at io.debezium.connector.postgresql.RecordsSnapshotProducer.lambda$start$0(RecordsSnapshotProducer.java:87) at io.debezium.connector.postgresql.RecordsSnapshotProducer$Lambda$206/589332928.run(Unknown Source) at java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:705) at java.util.concurrent.CompletableFuture.uniRunStage(CompletableFuture.java:717) at java.util.concurrent.CompletableFuture.thenRun(CompletableFuture.java:2010) at io.debezium.connector.postgresql.RecordsSnapshotProducer.start(RecordsSnapshotProducer.java:87) at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:126) at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:47) at org.apache.pulsar.io.kafka.connect.KafkaConnectSource.open(KafkaConnectSource.java:127) at org.apache.pulsar.io.debezium.DebeziumSource.open(DebeziumSource.java:100) at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupInput(JavaInstanceRunnable.java:690) at org.apache.pulsar.functions.instance.JavaInstanceRunnable.setupJavaInstance(JavaInstanceRunnable.java:200) at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:230) at java.lang.Thread.run(Thread.java:748)
If you encounter the above problems in synchronizing data, please refer to this and add the following configuration to the configuration file: