HA Implementation Details

Synchronous vs. Asynchronous Replication

eXtremeDB High Availability allows both Synchronous and Asynchronous replication between master and replica applications. Because synchronous replication requires a successful transaction commit and ACK response from the replica application, the overall transaction rate is mostly defined by network round-trip delay time (RTT). For this reason it may be preferable for some applications to use asynchronous replication.

Asynchronous replication uses a buffer for passing processing transactions at a time; where the master puts transaction data to this buffer and a separate application thread sends the buffer to replicas (e.g. with C API mco_HA_async_send_data_to_replicas()) . In the asynchronous mode, a replica does not confirm the transactions at all. Asynchronous replication is very fast, because it doesn't depend on the network latency (Round Trip Time), only on the bandwidth. But the replica will lag behind the master, because some transactions are committed on the master but not yet sent to the replica. So the maximum lag is defined by the size of the asynchronous buffer.

Transaction Window

A transaction window can be specified for synchronous replication (e.g. using C APIs mco_HA_set_trans_window_size() and mco_HA_commit_window()). The transaction window is a kind of сompromise between synchronous and asynchronous replication. If the size of the window is N, the replica sends an ACK for each N-th transaction. In this case, the RTT becomes less restrictive, but the replica can lag behind the master up to N transactions. (Note that the transaction window has no effect when using asynchronous replication; the asynchronous buffer is a separate memory device allocated by the application and its size is specified to optimize transaction throughput.)

Mixing Synchronous and Asynchronous Replication

A single master application can implement synchronous replication with one or more replicas, and asynchronous replication with other replicas.

When the flag MCO_HAMODE_FORCE_SYNC is set, it turns on synchronous replication for a given replica. Typically the flag is set as follows:

     
    mco_HA_replica_params_t replica_p;
    ...
    mco_HA_replica_params_init(&replica_p);
    ...
    replica_p.mode_flags = MCO_HAMODE_REPLICA_NOTIFICATION | MCO_HAMODE_FORCE_SYNC;
     

Note that if the master is run in synchronous mode, the setting has no effect. (See sample haasync component rplsync for implementation details.)

Replication with a Persistent Database

It may be desirable for a replica to store data in a persistent database. In this case there are performance issues to be considered as the write processes on the replica will block the master if synchronous replication is used. To tune performance the transaction commit policy and timeout parameters on the replica must be adjusted according to application requirements. (For details on the transaction commit policies see the Persistent Database I/O page.)

If the MCO_COMMIT_DELAYED policy is used, the transactions are not written to disk upon the mco_trans_commit() call. The data is written to disk only if one of the thresholds is reached. The thresholds are:

log_params.delayed_commit_threshold - size of uncommitted data (unwritten to disk)

log_params.max_delayed_transactions - the number of unwritten transactions

log_params.max_commit_delay – the maximum delay between commit() and writing to disk (in milliseconds)

For example, with the following parameter values:

     
    db_params.log_params.delayed_commit_threshold = 64*1024;
    db_params.log_params.max_delayed_transactions = 10;
    db_params.log_params.max_commit_delay = 1000;
     

the data will be written to disk if any of the thresholds is reached - every 1 second, or every 10 transactions, or every 64K of changed data - whichever comes sooner.

Note that these thresholds are checked only in the mco_trans_commit() call. For example, if you set max_commit_delay to 1000 (1 second) and perform a single small transaction, the data will not be written to disk until the next mco_trans_commit() even if it occurs after a long time (say, 10 seconds). Thus, if you insert 1000 objects, if the total size of the data (changed pages) doesn't exceed the delayed_commit_threshold then the changes are not written to disk.

The only way to cause data to be written to disk on each transaction commit is to use the MCO_COMMIT_SYNC_FLUSH policy. This is the most “durable” transaction commit policy. However, not that when using MCO_COMMIT_SYNC_FLUSH (with synchronous replication) the mco_trans_commit() on the master returns only after the data was written to the replica's persistent media. This can be very slow, so in this case it is advisable to use as long transactions as possible (eg. insert 1000 objects in one transaction, not in 1000 small transactions). (Note that if the master database is on persistent media, the application can call to mco_disk_flush() to flush all changes made by committed transactions. But there is currently no way to force the replica database to be flushed to disk.)

It is possible to use asynchronous replication with the MCO_COMMIT_SYNC_FLUSH policy on the replica to avoid blocking the master. But note that in asynchronous mode the transaction data is not sent to replica immediately. Instead the data is placed in the async buffer and is sent to the replica after the mco_trans_commit() finishes. If the replica spends a lot of time committing (and the size of the data in the master's async buffer can be significant), then if the master dies for some reason, this data will be lost.

Multiple Communication Channels

High Availability applications can communicate on more than one channel. The functions mco_HA_attach_master() and mco_HA_attach_replica() choose the channel implementation based on connection string content. The connection string is passed to the first registered channel. If the channel implementation recognizes the string (i.e. the string has the right format for the channel implementation), it will be used. Otherwise the string is passed to the next implementation, etc., Each channel has its own unique prefix that identifies the string for the channel (the standard channels have tcp, udp and pipe prefixes). When the prefix is present in the connection string, the string parsing is suspended and the appropriate channel is used.

For example:

1. :tcp:20000 - master side, TCP channel, listen port – 20000

2. :udp:127.0.0.1:30000 - replica or master, UDP channel, host - 127.0.0.1, port - 30000

Each communication channel must be registered in the master and replica by calling mco_HA_channel_implementation_add()after mco_HA_start() and prior to any other HA function call. For example:

     
    mco_HA_start();
    …
    mco_HA_channel_implementation_add( mco_nw_tcpip_vt() );
    mco_HA_channel_implementation_add( mco_nw_udpip_vt() );
    mco_HA_channel_implementation_add( mco_nw_pipe_vt() );
     

And the master must create listener threads (the thread that calls mco_HA_attach_replica()) for each registered channel implementation. These listener threads differ only in the connection string that is passed to mco_HA_attach_replica().

(See sample hamultichan for implementation details.)

Replica taking over as Master

In most mission critical HA applications a replica should be able to take over for a failed master. In this case the master and replica will be copies of the same application, aware that they are operating as master or replica, with the replica able to switch roles if necessary.

The essential feature of this type of application is the implementation of code to detect if the master is running, and if so connect to it and run in replica mode. Otherwise, the master node has failed for some reason and the application must take over as master.

The following example code illustrates how a C/C++ application might implement a switch-over from replica to master:

     
    for (i = 0; i < max_retries; ++i) 
    {
        printf("Try to attach master...");
        master_mode = 0;
        ReplicaParams.mode_flags = MCO_HAMODE_REPLICA_NOTIFICATION;
        rc = mco_HA_attach_master(&attach_p.db, &ReplicaParams);
     
        if (rc != MCO_S_OK) 
        {
            printf("attempt #%d failed (rc=%d)\n", i + 1, rc);
            sample_sleep(500);
        } 
        else
         {
            break;
        }
    }
     
    /* switch to MASTER mode */
    /******* setup HA instance *********/
    master_mode = 1;
    stop_flag = exit_flag;
    init_db = (attach_p.db == 0);
    if (init_db) /* attach_master not detect "old" master */
    { 
     
        /* create the master database */
        printf("Create database for the first time\n");
        rc = sample_open_database( db_name, switchdb_get_dictionary(),
                        DATABASE_SIZE, CACHE_SIZE,
                        MEMORY_PAGE_SIZE, PSTORAGE_PAGE_SIZE, 5,
                        &attach_p.memory);
     
        if (rc != MCO_S_OK) 
        {
            printf("Can't open database, error %d\n", rc);
            return 1;
        }
     
        /* connect to the database, obtain a database handle */
        rc = mco_db_connect(db_name, &attach_p.db);
     
        if (rc != MCO_S_OK) 
        {
            printf("Can't connect to database, error %d\n", rc);
            return 1;
        }
    }
     
    MasterParams.mode_flags = MCO_MASTER_MODE;
    mco_HA_set_master_params(attach_p.db, &MasterParams); // set MASTER mode
     
    sample_start_connected_task(&listen_task, ListenToReplicas, db_name,
                        &ha ); /* start the ListenToReplicas thread*/
                         
    sample_start_connected_task(&master_task, Master, db_name, (void*)
                        init_db); /* start the Master thread*/
                         
    sample_join_task (&listen_task);
    sample_join_task (&master_task);
     
    /****************** master clean up *****************/
    mco_HA_stop(attach_p.db); /* detach all replicas */
    }
     

Similarly, the following example code illustrates how a Java application might implement a switch-over from replica to master:

     
    HASwitch() throws Exception
    {
        …
        // prepare replica parameters
        ReplicaConnection rpl_con = new ReplicaConnection(db);
        ReplicaConnection.Parameters replParams = new
        ReplicaConnection.Parameters();
         
        // attach to master. Analog of mco_nw_attach_master
        try
         {
            System.out.println("Try to connect master...");
            if (!rpl_con.attachMaster("localhost:" + PORT, replParams,
                            CONNECT_TIMEOUT))
            {
                System.err.println("Failed to connect to master : timeout");
            }
        } 
        catch (DatabaseError de)
         {
            System.err.println("Failed to connect to master : error " +
                            de.errorCode);
        }
             
        //stop & wait working thread
        running = false;
        inspectThread.join();
        System.out.println("Replica is terminated, switch to MASTER mode");
        rpl_con.disconnect();
         
        /************* Master mode **************/
        MasterConnection mst_con = new MasterConnection(db);
        MasterConnection.Parameters mst_params = new
        MasterConnection.Parameters( MasterConnection.MCO_HAMODE_ASYNCH );
        mst_params.maxReplicas = MAX_REPLICAS;
        mst_params.asyncBuf = new
        Database.PrivateMemoryDevice( Database.Device.Kind.AsyncBuffer,
                        ASUNC_BUF_SIZE );
        mst_con.setReplicationMode(mst_params);
        listening = true;
         
        // start listen and async. commit threads
        Thread listenThread = new Thread( new Runnable() {
                            public void run() {
                                listen(); } } );
                                 
        Thread replicateThread = new Thread( new Runnable() {
                            public void run() {
                                replicate(); } });
                                 
        listenThread.start();
        replicateThread.start();
        //////////////////////////////////////////////////////
        // Insert some data in the database if needed
        //////////////////////////////////////////////////////
        mst_con.startTransaction(Database.TransactionType.ReadWrite);
        …
        mst_con.commitTransaction();
     
        // stop & wait threads
        listening = false;
        listenThread.join();
        mst_con.stopReplication();
        replicateThread.join();
     
        mst_con.disconnect();
        db.close();
        System.out.println("Master completes it work");
    }
     

High Availability for Distributed Databases

Replication forms the foundation of high availability in a sharded eXtremeDB database. Sharding is supported though eXtremeSQL SQL by the DistributedSqlEngine. Each shard consists of an HA master and a certain number of HA replicas. (Please refer to the eXtremeSQL User Guide for further details.)

Partial Replication

It may be desirable for an HA application to replicate a part but not all of the objects in a database. For this purpose, the keyword local in a C/C++ application schema indicates what classes are subject to the partial replication. Local class objects from the master database don't get replicated to replica nodes; and the content of local class objects from the replica database does not get written into the database when it's received from the master. Local classes on the master and replicas can be different and it is not necessary to turn on binary evolution to enforce the partial replication.

C/C++ APIs

The function mco_HA_enable_filter(mco_db_h db, mco_bool enabled) turns on and off filtering at runtime. It should be called after mco_db_connect() but before calling mco_HA_attach_replica() or mco_HA_attach_master(). If local tables are defined, the filtering is on by default. Consequently mco_HA_enable_filter(db, MCO_NO); turns filtering off – effectively ignoring any local class definitions in the schema. If the schema does not define local tables, the function has no effect.

(See sample samples/native/ha/hafilter for a C/C++ example of partial replication.)

Java and C#

With the C# and Java APIs local classes are specified using the attribute local and at runtime partial replication can be turned on/off by calling the enableFilter() method of either the MasterConnection or ReplicaConnection:

 
    MasterConnection::enableFilter(boolean enabled);
    ReplicaConnection::enableFilter(boolean enabled);
     

Python

The Python wrapper supports partial replication by declaring a class as local in the schema definition, as for C/C++ applications, then calling the load_dictionary() method. For example:

 
    schema = '''
        #define uint4     unsigned<4>
        #define uint4   unsigned<4>
     
        declare database filtermstdb;
        declare auto_oid [2000];
     
        class T1
        {
            uint4 key;
            unique tree<key> tkey;
        };
        class T2
        {
            uint4 key;
            unique tree<key> tkey;
        };
        class T3
        {
            uint4 key;
            unique tree<key> tkey;
        };
        local class T4
        {
            uint4 key;
            unique tree<key> tkey;
        };
    '''
     dict = exdb.load_dictionary(schema)
     

Also, as with the Java and C# APIs, at runtime partial replication can be turned on/off by calling the enableFilter() method of either the MasterConnection or ReplicaConnection:

 
    MasterConnection::enableFilter(boolean enabled);
    ReplicaConnection::enableFilter(boolean enabled);
     

Setting the Quorum

Under some circumstances it is necessary for the master to perform updates in the database only if some minimal number of replicas (quorum) is connected. If the number of active replicas is less than the quorum, the mco_trans_start() or mco_trans_commit() will return error code MCO_E_HA_NO_QUORUM. This minimal number of replicas can be set via the mco_HA_master_params::quorum parameter (the default value is 0).

In some cases (e.g. if the network goes down during the commit) it is not possible to determine if the last transaction was received by the replica or not. In this case the commit() applies changes to master's database and returns status code MCO_S_HA_REPLICA_DETACH. This code means that if application switches to the replica, this transaction can be missed. (Note that MCO_S_HA_REPLICA_DETACH is only possible if mco_HA_master_params::quorum is greater than 0.)

The master parameter mco_HA_master_params::quorum sets the initial value of the HA quorum, and the mco_HA_set_quorum() C API, or the equivalent Java setQuorum(), C# SetQuorum() or Python setQuorum() API, is used to change the HA quorum at runtime. The default quorum value is 0, which means that any number of active replicas is acceptable (including 0).

Note that a quorum value greater than 0 is allowed only for synchronous replication mode (i.e. flag MCO_HAMODE_ASYNCH is not set) and if transaction window size is 1 (see mco_HA_set_trans_window_size()).

Extending Database memory

Extending the memory size for an eXtremeDB database is done by calling the core API function mco_db_extend(). However, because this call causes an internal write transaction, it cannot be called from the replica. So the procedure for extending database memory is to first call mco_db_extend()from the master, then in a notifying_callback function the replica will respond to a MCO_REPL_NOTIFY_MASTER_DB_EXTENDED notification code by calling mco_db_extend_dev(). The following code snippet illustrates how such the notifying_callback function is implemented in the replica:

 
    /* replica notification callback function */
    void replica_notifying(  uint2 notification_code,  /* notification code */
                    uint4 param1,  /* reserved for special cases */
                    void* param2,  /* reserved for special cases */
                    void* context) /* pointer to the user-defined context */
    {
        char *context_str = (char*)context; /* get context */
     
        switch (notification_code)  
        {
            case MCO_REPL_NOTIFY_CONNECTED:
                printf("\n** Notification ** Replica's been connected, “
                “context = %s\n", context_str);
                break;
            case MCO_REPL_NOTIFY_DB_LOAD_OK:
                printf("\n** Notification ** Database's been loaded “
                    “successfully, context = %s\n", context_str);
                break;
            case MCO_REPL_NOTIFY_MASTER_DB_EXTENDED:
                {
                    MCO_RET rc;
                    /* Get the device size passed in param2 */
                    mco_size_t size = *((mco_size_t*) param2);
     
                    printf("\n** Notification ** Master's database was extended, “
                            “extend size %d bytes, context = %s\n",
                    (int)size, context_str);
                    extend_dev.type       = MCO_MEMORY_CONV;
                    extend_dev.assignment = MCO_MEMORY_ASSIGN_DATABASE;                     extend_dev.size       = size;
                     
                    /* allocate memory and set device pointer */
                    extend_dev.dev.conv.ptr = (void*)malloc( extend_dev.size );
                     
                    if (extend_dev.dev.conv.ptr) 
                    {
                        rc = mco_db_extend_dev(db_name, &extend_dev);
                        printf("\nmco_db_extend_dev(), size %d : %s\n",
                            DATABASE_SIZE, (rc == MCO_S_OK) ? "OK" : "FAILED" );
                    }
                    break;
                }
            case MCO_REPL_NOTIFY_REPLICA_STOPPED:
                {
                    const char* reason = "";
     
                    if (param1 == MCO_HA_REPLICA_MASTER_REQUESTED_DISCONNECT) {
                    reason = "MCO_HA_REPLICA_MASTER_REQUESTED_DISCONNECT";
                }
                printf("\n** Notification ** Replica stopped with the reason: “
                        “%d (%s), context = %s\n", param1, reason, context_str);
                break;
                }
            default:
                printf("\n** Notification ** Replica's been notified code = “
                    “%d, param1 = %u, param2 = %p, context = %s\n",
                    notification_code, param1, param2, context_str);
                break;
        }
    }
     

Note that the size of the device is passed into the notification callback through the third parameter (param2) as a pointer to mco_size_t and the replica extends its storage size by calling mco_db_extend_dev().

 

HA sequencer API

Sometimes it may be necessary to determine which of the replica's databases is the most relevant. Suppose we have configuration with the master and two replicas R1 and R2. Then R1 dies at some moment T1. Later at moment T2 (> T1) the master and R2 stop processing due to a power failure. After the cold restart of the R1 and R2 applications, it must be determined which of the nodes will be the new master. To resolve this issue, mco_HA_get_sequencer()API can be called to return the number of the "db version" (or "current-ness" in the sense of HA). This number can be used to determine which database is the latest.

 

A Note on setting detach_timeout

When the master detaches the replica (explicitly or inside mco_HA_stop()), it sends the DETACH message indicating the end of replication. The detach_timeout is used for this send() operation. After sending the message (regardless of the result), the master closes the channel to the replica. Changing the timeout does not affect how quickly the replica will be disconnected; it affects the maximum amount of time inside the mco_HA_detach() call. Setting the detach_timeout to 0 could lead to the DETACH message not being sent at all, and thus the replica will not receive the notification about detaching. Consequently the replica will run into the unexpected closing of the channel (and will return error code MCO_E_NW_RECVERR). So setting the detach_timeout to 0 is not recommended.