Abstract
With the rapid increase of spatial data resolution, the huge volume of datasets makes geo-computation more time-consuming especially when operating some complex algorithms, i.e. viewshed analysis and drainage network extraction in digital terrain analysis. Parallel computing is regarded as an efficient solution by utilizing more computing resources. Among them, the stable and credible services play an irreplaceable role in parallel computing, especially when an error occurs in the large-scale scientific computing. In this paper, a master/slave approach to implement the parallel re-computing is proposed based on redundancy mechanism. Once some errors in application layer are detected, the original data block with computation errors is further partitioned into several sub-blocks which are re-computed by the surviving processes concurrently to improve the efficiency of failure recovery. The multi-thread strategy in the main process is responsible for the distribution of data blocks, detecting errors and starting re-computing procedure concurrently. Performance evaluation is conducted in different data distributed modes by theory analysis. The experimental results show that the performance of fault-tolerant parallel computing is different by way of adopting different data distribution modes.
Keywords
Introduction
Digital terrain analysis (DTA) is a digital information processing technology of the computation of the terrain attributes and feature extraction on the basis of the digital elevation mode (DEM). 1 With increased precision and accuracy, DEMs have gone from 1000-m resolutions 5–10 years ago to 1–5 m resolutions today in many areas. As a result of the increased precision and file sizes, many land surface parameters, such as slope, profile curvature, and hydrologic land surface parameters for lower resolutions and smaller DEMs, become prohibitively time-consuming when being applied to the high-resolution and large volume of data. Hence, parallel computing has become a fundamental tool for geographic information science.2,3
High-performance geo-computations take much time to complete their computation on distributed processes. At the same time, the reliability of the system becomes a foremost key while the stability of cluster with tens of thousands of processors is threatened constantly by a larger number of hardware and software failures, such as network, memory, processors, and operating systems. Therefore, the fault tolerance has become a necessary component of the reliable computations.
Large-scale scientific computing brings more computation tasks so that the running time is longer than the small-scale applications. Meanwhile, the failure risk becomes higher and it could cause a lot of resource waste including the time and storage. The checkpointing technique 4 is widely used for fault tolerance. The basic idea is that the current state and data should be saved to stable storage when processing the key position of a process. A large number of the checkpoints will be stored in the parallel system periodically. Whenever a process fails, all processes related to this process have to be rolled back to the last checkpoint to restart the computation. The operation of the checkpoints will lead to the huge volume of data transmission through I/O, which becomes a performance bottleneck.
To speed up the recovery procedure, Yang et al. 5 put forward a new application-level fault-tolerant scheme based on parallel re-computing, called fault-tolerant parallel algorithm. However, the method is mainly applied to an instruction level of a program that will affect whether or not the parallel re-computing is done. If the failure part of the program cannot be repartitioned again, then parallel re-computing could not be finished; so the performance of re-computing will not be provided. Moreover, Yang et al. also proposed an improved approach that can solve program division and workload redistribution. 6 From the above, these approaches discussed the key issues about the program partitioning rather than data partitioning. A parallel re-computing method for parallel DTA is first presented by Miao et al. to improve the efficiency of redundancy computing mechanism.7–9 They presented an improved error detection scheme which adopts the multiple threads strategy to check whether or not the errors exist in the computation of a data block so as to reduce the overheads of error detection.8,10 Song et al. implemented a fault-tolerant parallel computing algorithm adopting checkpointing technique for DTA. 11 This re-computing method adopting redundancy mechanism is deeply discussed in theory. 12 An initial performance analysis for parallel re-computation method is discussed in literature. 13 This paper extends further the analysis of system performance in different redundancy strategies and designs and implements a parallel computing system in master/slave mode by adopting a multi-thread mechanism to detect the errors and start the re-computation while an error appears. Our system can deal with different data distribution modes and compare their performance with several experiments.
The rest of paper starts with a framework of fast parallel re-computing procedure that is given in the next section. Then the algorithms of implementing the parallel re-computing method in master/slave mode are given, and a multi-thread mechanism is applied to the main process. Performance analysis is also discussed in this section. Some experimental results are analyzed in “Experiments and analysis” section, followed by the conclusion and future work in the last section.
Framework of parallel re-computing
In the field of the DTA, DEM has a large amount of data in general. Processing the DEM data will take more time thereby increasing the I/O cost. When a process fails, the work is needed to restart and all the computations will be rolled back. To avoid the failure, some measures are needed to guarantee the right service of the system. When facing a large number of the communication and computation among processes, multi-thread or multi-process mode are adopted to process the partitioned data blocks, which can effectively improve the efficiency of data processing. Processing the sub-blocks has an advantage of using fewer computing resources and storage space. Upon a failure, the parallel re-computing (PR) does not require the roll-back of all surviving processes. Instead, it utilizes the surviving processes to compute the blocks of the failed process in parallel. If the master node fails, all the data blocks will be recomputed again. In order to avoid the master node failure, the backup mechanism can be adopted. In the process of data distribution, a fast factor is adopted to regulate the number of the partitioned data block. If any other slave node fails, the data block will be repartitioned and recomputed again.
The tasks of the main process in the master node are data distribution, error detection, and result fusion. The framework of PR is shown in Figure 1. The main process in master node reads the data from the disk and creates many threads according to the number of partitioned data blocks. Each thread is responsible for distributing each data block to the process and its copy process in slave nodes. Then each thread is waiting for receiving their results of each sub-block. Once the results of the sub-block and its copy are received, the thread compares whether the results are consistent or not. When the results are consistent, it shows that the computation is correct and the thread will submit the results to memory to fuse. The fusion procedure is to save the results according to the position of each grid cell and avoid the consistency of redundant parts. If the results are inconsistent, the respective logic sub-block is redistributed to a new process to execute the re-computing process. Only when the results of all logic sub-blocks are gained and detected to be correct and then fused to the whole result set, the thread could be closed. Finally, the main process saves and writes the whole results to a disk file.

Framework of parallel re-computing procedure.
The computing process on a slave node and its copy process in another slave node are responsible for receiving a data block, computing the result, and sending back the result to the respective thread. The computing process receives the data block and computes the result of the data block. Then, they send the respective results of the same logic data sub-block to the thread in a master node according to the appointment in advance.
In order to enhance the efficiency of data distribution and error detection, a multi-thread technology in the main process is adopted. Each thread is responsible for data distribution and error detection. When there is an error after comparing the results of the computation on the main process, the thread is also responsible for redistributing the data block when there is a computing error to occur and starts a new process to execute the re-computing process.
The parallel re-computing is based on redundancy mechanism. The basic principle of PR is that the process with a data block and the process with the copy of the data block are first executed on two different slave nodes concurrently. After these two processes finish the computation tasks, the results of them are sent to the main process which is run on a master node and compared whether these two results are consistent or not. If there are no errors, the main process saves the result to the external file. Once some errors occur when the two results are not consistent, the main process will partition the data block into a group of small sub-blocks, for instance, four sub-blocks. Then these sub-blocks are distributed to the surviving computing processes to re-compute in parallel. Finally, the results of these sub-blocks are sent to the main process to fuse as the result of the original data block.
In order to simplify the relationship, there is no dependent relationship between data blocks. A collection of n computing processes is defined as P = {P1, P2, …, Pn}; Pi denotes the current process. A set of data blocks is denoted by B = {B1, B2, …, Bn}; Bi denotes the partitioned block that will be handled. A set of data results is denoted by R = {R1, R2, …, Rn}; Ri represents the results of the computation. The whole process of PR consists of five steps:
Figure 2 shows a quad-tree of the partitioned data blocks. Each layer has 4k data blocks, where k is the kth layer in the quad-tree. Apart from the original data in the first layer and the data blocks in the last layer, the parallel re-computing could choose data blocks in the suitable layer as re-computed data blocks according to the cluster environment and performance requirements. The data blocks below this layer could be used to remedy the error occurred on the basic data block. For example, when the data block B0 in layer L1 is selected as the basic data block in the parallel computing with redundancy mechanism, then four sub-blocks B00, B01, B02, and B03 in layer L2 could be adopted as the sub-blocks of the re-computation to correct the result of B0 once the result of B0 has the errors. However, 16 sub-blocks from B000 to B033 could be adopted to gain a faster correction of the result of B0.

Repartitioned data blocks according to quad-tree structure.
Implementation of parallel re-computing in master/slave mode
Designing a PR is to incorporate the parallel error remedying scheme in parallel DTA. The major characteristic of the PR is that the data block of a failed computing process is repartitioned into some small sub-blocks by the main process and then these sub-blocks are redistributed to and recomputed by the idle processes concurrently to gain the right result of the original data block.
Data distribution strategy in single process mode
In order to implement the fast parallel re-computing procedure, we adopt a master/slave structure in which the master node is responsible for data distribution, error detection, and results in fusion. The slave node is responsible for the computation of each data block. It is an easy approach to implement the scheduling tasks in regular sequence; so, we first discuss a data distribution strategy in sequence.
Figure 3 shows the execution time of scheduling tasks in sequence. Firstly, the main process on master node reads the whole data from the disk and distributes each data block to two computing processes in different slave nodes, in which one is a normal process and the other is a copy of this process. Hence, there are 2n computing processes. Secondly, all 2n processes will execute the computing tasks once they receive the required data block. It is possible to make the main process to wait a while after finishing the distribution of all data blocks in which time the first computing process could not finish the computing task. It depends on the time of each computing process executing a task in slave node and data distribution time in the main process. If 2ntd−1 > tc, then the first computing process is required to wait for sending the result to the master process until the master finishes the data distribution; otherwise, the master process is required to wait for receiving the computing result until the first computing process finishes the computing task.

Model of data scheduling in the process.
As seen in Figure 3, once the master process receives the computing results of the normal process and its copy process, it will check whether the two results are consistent by comparing every value in the results. If the results are consistent, it means that the computing procedure is right and no error exists; the master process will receive the next result of the subsequent process. Otherwise, if the result is not correct, the master process must repartition this block into four sub-blocks and distribute to four idle computing processes to re-computing. Thirdly, the master process receives the re-computing results and fuses them into the result of the original data block. Finally, the master writes the whole result to the disk.
In equation (1), Tread is the time of reading the whole data to memory, Td is the time of distribution of each data block, Tc is the computing time of each data block, Tr is the time of receiving the computing result of each data block, Tcp is the time of comparing two results, Twrite is the time of writing the result back to the disk, and K is the number of re-computing processes. Note that n is the number of data blocks partitioned in advance. According to the above analysis, the whole computing time is given as follows
Let Ts be the execution time of serial mode with redundancy mechanism, that is to say, n = 1, then
In general, Tread = Twrite, then the speedup ratio is given as follows
Figure 4 shows the change of speedup ratio in theory analysis with the change of the number of computing processes. The speedup ratio goes up with the increase of the number of computing processes, but when the number of computing processes reaches a value, say 1024 in the number of computing processes as shown in Figure 4, the speedup ratio increases slowly or almost remains unchanged. When the number of processes is more and more, the speedup ratio will approach to 0 in theory; because in this time, the computing time is very small and ignored and the whole computing time is mainly data reading from and the result writing back to the disk and will not increase, i.e., the speedup ratio will tend 0.

Speedup ratio analysis in theory.
Data distribution strategy in multi-thread mode
In order to provide the high efficiency in data distribution and error detection, we adopt the multi-thread technology based on shared memory mode. Each thread is responsible for data distribution and error detection. When there is any error after comparing, the thread is also responsible for redistributing the logic data block on which computing error is occurred to a new process to execute re-computing.
The main process in master node reads the whole data into main memory from the disk and creates the threads according to the number of partitioned data blocks. Each thread is responsible for distributing each data block partitioned in advance to the process and its copy process in slave nodes. After finishing the data block distribution, each thread will wait for receiving the computing results of each data block. Once the results of the data block and its copy are received, the thread compares whether the results are consistent or not. If the results are consistent, it shows that the computation is correct and the thread will submit the result to memory to fuse. Otherwise, if the result is not correct, the respective data block is repartitioned into K sub-blocks and redistributed to K new processes to execute the re-computing in parallel. Only when the results of all data blocks are gained and detected to be correct and then fused to the whole result set, the thread could be closed. Finally, the main process is responsible for saving the whole result set to a disk file.
The process in a slave node and its copy process in another slave node are responsible for receiving a data block, computing the result, and sending the result back to the respective thread. The process receives the data block and starts to compute the results of the data block. Finally, it sends the result of the data block to the thread according to the appointment in advance.
When the data block is read directly by each process on the slave node, then the computing time and speedup ratio are given as follows
For a high-performance parallel computing environment, ε is a very small value, that is to say, n2ε ≪ 2Tread + 3Tr + (1 + 1/K)Tc + Tcp, so the speedup ratio approaches n. Note that the above analysis is based on the assumption that there is no communication overhead of tasks among data blocks.
Figure 5 shows the change of speedup ratio with the number of computing processes in multi-thread mode. The speedup ratio rises with the increase of the number of computing processes, but when the number of processes attains a specified value, the speedup ratio appears to drop. The reason is the overhead of creating processes, which will consume the system time in particular in a great number of processes.

Speedup ratio analysis in theory with multi-thread mode.
The above analysis shows that for a specified parallel computing environment, there must be a suitable number of computing processes when the data size and algorithm are determined in advance, which can achieve an optimum speedup ratio. However, I/O problem has to be considered when a large number of computing processes access I/O concurrently that is unavoidable to drop the performance of the system.
Experiments and analysis
The experiments were performed on a small-scale cluster system. Each node is equipped with an Intel XeonE5645, 2.8 GHz with quad-core processors and 8 GB memory. The nodes adopt the Gigabit Ethernet connectivity. The master/slave parallel computing model is adopted. A primary node is responsible for distributing data and recycling results. The software environments have the GDAL 1.6.1, OpenMP 1.5.4, GCC 4.4.7, and MPICH2. Two kinds of datasets are employed as the testing data: the size of the smaller dataset is 1.61 GB and of the bigger datasets is 6.9 GB. The data type is floating point and the type of image is TIFF format.
The experiment adopts the slope algorithm of DTA to validate the efficiency of PR proposed in this paper. Gradient (slope) is the degree of surface unit steep slow; the ratio of the vertical height to the horizontal distance, of slope surface, is called slope.
In this paper, we implemented a parallel re-computing algorithm with master/slave mode in which the master node is responsible for data distribution, results in comparison and results in fusion and saving to disk, and the slave node is responsible for computing task with a data block. Figure 6 shows the total computation time with the number of processes with independent data distribution way. We specify only a failure of process in whole computation procedure, and the re-computing result of the failed process will be finished by four re-computing processes by partitioning the original data block into four sub-blocks.

Total computation time with the number of process under independent data distribution and four re-computing processes for a failed process.
We adopted dual redundancy mechanism in which the master process distributes data block two times: one is distributed to a process and the other is to its copy process. Figure 6 shows the results with two DEM datasets, one is the size of 1.6 GB and another one is the size of 6.9 GB. As shown in Figure 6, we summarize that the total computation time declines with the increase of the number of processes (data blocks). Since only a failure of the process occurs, only four new processes are started for executing the re-computing procedure; hence, the overheads of the system are small. But only a main process in master node is responsible for data distribution, result comparison, result fusion, etc. and is so busy that the computation processes from slave nodes have to wait for communication. So the total computation time does not decline quickly, and when the number of processes which are responsible for the computation attains to a certain value, the total time will increase slowly with the number of processes or data blocks. For example, for dataset 1, the process number of 32 is a critical point in which the smallest time is gained. When the number of process exceeds 32, the total time will increase slowly. The critical point for dataset 2 is 64.
The above experiment permits the main process in master node to distribute data in sequence and to receive and compare rightness of two results from the computing process and its copy process, respectively. The advantage is that the data is read into memory from disk orderly which does not incur I/O contention and the disadvantage is that the computing processes in slave node will wait for data distribution. The following experiments adopt multi-threads that may in the main process distribute data blocks in parallel, compare the correctness of results, and determine whether or not the re-computing procedure is started.
Figure 7 shows the total computation time with the number of thread in the main process. For a failed process, four processes will be started to execute the re-computing procedure. As shown in Figure 7, the time declines with the increase of thread number for two datasets. The reason is that there are a lot of threads to deal with the distribution of every data block, results in comparison, and starting re-computing procedure in parallel. The parallel data distribution using multi-thread technique can provide high efficiency in data distribution and error detection comparing with independent data distribution.

Total time with the number of process under parallel data distribution and four re-computing processes for a failed process.
Obviously, the total computation time in parallel data distribution is lower than that in independent data distribution. Furthermore, the time in parallel data distribution declines quickly with the number of thread comparing with independent data distribution.
When each process reads data block directly from the disk using the parallel file system, the total computing time will decline rapidly with the number of processes. Because the data reading is executed concurrently by all processes and the total time is mainly determined by the computing time, the total computing time must decline rapidly while the data block becomes smaller with the increase of the process number. Figure 8 shows this situation with data reading by each process from the disk directly.

Total time with the number of process under direct data reading from the disk by each process and four re-computing processes for a failed process.
Compared with a traditional re-computation using checkpoint, our PR adopts more fine-granularity sub-blocks to recover the errors concurrently. However, maybe the checkpoint technique can also adopt the fine-granularity strategy to recover errors. Figure 9 gives a comparison of normal parallel re-computing strategy, simple re-computing strategy, and our PR re-computing strategy using dataset 2. In simple re-computing strategy, we design a strategy in which the block to occur an error is directly distributed to a computing process to compute again without partitioning the data block into sub-blocks. In normal parallel re-computing strategy, the data block with errors is partitioned into several sub-blocks to be computed concurrently by distributing to several computing processes. Moreover, in our PR re-computing method, data block does not need to be completed; but during whole computation, the part result will be sent to the main process to detect the error. Each part of data block may be seen as a logical sub-block. If there is an error in a logical sub-block, the re-computing process is immediately initiated to correct the error that occurs in this sub-block. In our PR method, the computation, error detection, and re-computing procedure are almost executed concurrently; hence, the whole time is reduced greatly.

Comparison of three re-computing approaches.
When the number of the computing processes increases, the size of the data block will become small and the size of sub-data blocks partitioned for re-computation do not have an evident difference with one of the original data blocks. So, the re-computation time of the sub-block and the direct computing time of the data block approaches equally. Therefore, the computation time of simple computation will not be obviously greater than the normal parallel computation. The PR algorithm achieves better efficiency, in particular, when the number of processes is smaller, the size of each data block is large, so the size of the sub-block is also large and the overlapping strategy in the PR method would reduce greatly the time of re-computation, as shown in Figure 9.
Conclusions
DTA is a data-intensive and computation-intensive application in geographic information system (GIS). With the rapid increase of the scale in DEM data because of the high-resolution radar, the parallel computing technique is required to turn into the DTA to improve the computation efficiency. However, the fault-tolerant computing cannot be ignored because the reliability and correctness are also other targets in parallel DTA. The fault tolerance mechanism is necessary for parallel computing, especially the big data processing. It is due to the complicated relations with the real-time communication and data contest among tasks. More importantly, once an error occurs in the parallel computing, it is difficult and time-consuming to recover the system and continue to execute.
In this paper, a PR algorithm for parallel DTA is implemented to achieve a fast self-recovery of error results. It is time-consuming to deal with the fault-tolerant parallel computing for the whole of DEM data because more time is required to detect and correct errors. Hence, the data partition strategy and corresponding multi-thread scheme are adopted to solve these problems. The computing time will be shortened and the overall performance of parallel computing will be optimized by repartitioning the data block with errors into several smaller sub-blocks. Moreover, the performances of PR are evaluated in different data sizes with the slope algorithm on a cluster system. The experimental results show that the overhead of PR does not result in more overheads of the system when a failure occurs.
However, our work addresses the case when there is no dependent relationship between data blocks. In future, our efforts will mainly focus on two aspects. Firstly, the performance of the PR will be evaluated in different DTA algorithms. Secondly, focus will be placed on fault tolerance with dependent data blocks. The I/O performance also needs to be considered with an introduction of redundancy computing.
Footnotes
Declaration of conflicting interests
The author(s) declared no potential conflicts of interest with respect to the research, authorship, and/or publication of this article.
Funding
The author(s) disclosed receipt of the following financial support for the research, authorship, and/or publication of this article: This work has been substantially supported by the National Natural Science Foundation of China (No. 41771411).
