diff --git a/db/commitlog/AbstractCommitLogService.java b/db/commitlog/AbstractCommitLogService.java new file mode 100644 index 0000000000..59bf691295 --- /dev/null +++ b/db/commitlog/AbstractCommitLogService.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import org.apache.cassandra.utils.concurrent.WaitQueue; +import org.slf4j.*; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; + +public abstract class AbstractCommitLogService +{ + // how often should we log syngs that lag behind our desired period + private static final long LAG_REPORT_INTERVAL = TimeUnit.MINUTES.toMillis(5); + + private final Thread thread; + private volatile boolean shutdown = false; + + // all Allocations written before this time will be synced + protected volatile long lastSyncedAt = System.currentTimeMillis(); + + // counts of total written, and pending, log messages + private final AtomicLong written = new AtomicLong(0); + protected final AtomicLong pending = new AtomicLong(0); + + // signal that writers can wait on to be notified of a completed sync + protected final WaitQueue syncComplete = new WaitQueue(); + private final Semaphore haveWork = new Semaphore(1); + + private static final Logger logger = LoggerFactory.getLogger(AbstractCommitLogService.class); + + /** + * CommitLogService provides a fsync service for Allocations, fulfilling either the + * Batch or Periodic contract. + * + * Subclasses may be notified when a sync finishes by using the syncComplete WaitQueue. + */ + AbstractCommitLogService(final CommitLog commitLog, final String name, final long pollIntervalMillis) + { + if (pollIntervalMillis < 1) + throw new IllegalArgumentException(String.format("Commit log flush interval must be positive: %dms", pollIntervalMillis)); + + Runnable runnable = new Runnable() + { + public void run() + { + long firstLagAt = 0; + long totalSyncDuration = 0; // total time spent syncing since firstLagAt + long syncExceededIntervalBy = 0; // time that syncs exceeded pollInterval since firstLagAt + int lagCount = 0; + int syncCount = 0; + + boolean run = true; + while (run) + { + try + { + // always run once after shutdown signalled + run = !shutdown; + + // sync and signal + long syncStarted = System.currentTimeMillis(); + commitLog.sync(shutdown); + lastSyncedAt = syncStarted; + syncComplete.signalAll(); + + + // sleep any time we have left before the next one is due + long now = System.currentTimeMillis(); + long sleep = syncStarted + pollIntervalMillis - now; + if (sleep < 0) + { + // if we have lagged noticeably, update our lag counter + if (firstLagAt == 0) + { + firstLagAt = now; + totalSyncDuration = syncExceededIntervalBy = syncCount = lagCount = 0; + } + syncExceededIntervalBy -= sleep; + lagCount++; + } + syncCount++; + totalSyncDuration += now - syncStarted; + + if (firstLagAt > 0 && now - firstLagAt >= LAG_REPORT_INTERVAL) + { + logger.warn(String.format("Out of %d commit log syncs over the past %ds with average duration of %.2fms, %d have exceeded the configured commit interval by an average of %.2fms", + syncCount, (now - firstLagAt) / 1000, (double) totalSyncDuration / syncCount, lagCount, (double) syncExceededIntervalBy / lagCount)); + firstLagAt = 0; + } + + // if we have lagged this round, we probably have work to do already so we don't sleep + if (sleep < 0 || !run) + continue; + + try + { + haveWork.tryAcquire(sleep, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + } + catch (Throwable t) + { + if (!CommitLog.handleCommitError("Failed to persist commits to disk", t)) + break; + + // sleep for full poll-interval after an error, so we don't spam the log file + try + { + haveWork.tryAcquire(pollIntervalMillis, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + throw new AssertionError(); + } + } + } + } + }; + + thread = new Thread(runnable, name); + thread.start(); + } + + /** + * Block for @param alloc to be sync'd as necessary, and handle bookkeeping + */ + public void finishWriteFor(Allocation alloc) + { + maybeWaitForSync(alloc); + written.incrementAndGet(); + } + + protected abstract void maybeWaitForSync(Allocation alloc); + + /** + * Sync immediately, but don't block for the sync to cmplete + */ + public WaitQueue.Signal requestExtraSync() + { + WaitQueue.Signal signal = syncComplete.register(); + haveWork.release(1); + return signal; + } + + public void shutdown() + { + shutdown = true; + haveWork.release(1); + } + + public void awaitTermination() throws InterruptedException + { + thread.join(); + } + + public long getCompletedTasks() + { + return written.incrementAndGet(); + } + + public long getPendingTasks() + { + return pending.incrementAndGet(); + } +} \ No newline at end of file diff --git a/db/commitlog/BatchCommitLogService.java b/db/commitlog/BatchCommitLogService.java new file mode 100644 index 0000000000..65bee406d6 --- /dev/null +++ b/db/commitlog/BatchCommitLogService.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import org.apache.cassandra.config.DatabaseDescriptor; + +class BatchCommitLogService extends AbstractCommitLogService +{ + public BatchCommitLogService(CommitLog commitLog) + { + super(commitLog, "COMMIT-LOG-WRITER", (int) DatabaseDescriptor.getCommitLogSyncBatchWindow()); + } + + protected void maybeWaitForSync(CommitLogSegment.Allocation alloc) + { + // wait until record has been safely persisted to disk + pending.incrementAndGet(); + alloc.awaitDiskSync(); + pending.decrementAndGet(); + } +} diff --git a/db/commitlog/CommitLog.java b/db/commitlog/CommitLog.java new file mode 100644 index 0000000000..416b51de3c --- /dev/null +++ b/db/commitlog/CommitLog.java @@ -0,0 +1,377 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.*; +import java.lang.management.ManagementFactory; +import java.nio.ByteBuffer; +import java.util.*; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.StringUtils; + +import org.apache.cassandra.config.Config; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.*; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.util.DataOutputByteBuffer; +import org.apache.cassandra.metrics.CommitLogMetrics; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.PureJavaCrc32; + +import static org.apache.cassandra.db.commitlog.CommitLogSegment.*; + +/* + * Commit Log tracks every write operation into the system. The aim of the commit log is to be able to + * successfully recover data that was not stored to disk via the Memtable. + */ +public class CommitLog implements CommitLogMBean +{ + private static final Logger logger = LoggerFactory.getLogger(CommitLog.class); + + public static final CommitLog instance = new CommitLog(); + + // we only permit records HALF the size of a commit log, to ensure we don't spin allocating many mostly + // empty segments when writing large records + private static final long MAX_MUTATION_SIZE = DatabaseDescriptor.getCommitLogSegmentSize() >> 1; + + public final CommitLogSegmentManager allocator; + public final CommitLogArchiver archiver = new CommitLogArchiver(); + final CommitLogMetrics metrics; + final AbstractCommitLogService executor; + + private CommitLog() + { + DatabaseDescriptor.createAllDirectories(); + + allocator = new CommitLogSegmentManager(); + + executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch + ? new BatchCommitLogService(this) + : new PeriodicCommitLogService(this); + + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + try + { + mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=Commitlog")); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + + // register metrics + metrics = new CommitLogMetrics(executor, allocator); + } + + /** + * Perform recovery on commit logs located in the directory specified by the config file. + * + * @return the number of mutations replayed + */ + public int recover() throws IOException + { + FilenameFilter unmanagedFilesFilter = new FilenameFilter() + { + public boolean accept(File dir, String name) + { + // we used to try to avoid instantiating commitlog (thus creating an empty segment ready for writes) + // until after recover was finished. this turns out to be fragile; it is less error-prone to go + // ahead and allow writes before recover(), and just skip active segments when we do. + return CommitLogDescriptor.isValid(name) && !instance.allocator.manages(name); + } + }; + + // submit all existing files in the commit log dir for archiving prior to recovery - CASSANDRA-6904 + for (File file : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter)) + { + archiver.maybeArchive(file.getPath(), file.getName()); + archiver.maybeWaitForArchiving(file.getName()); + } + + assert archiver.archivePending.isEmpty() : "Not all commit log archive tasks were completed before restore"; + archiver.maybeRestoreArchive(); + + File[] files = new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(unmanagedFilesFilter); + int replayed = 0; + if (files.length == 0) + { + logger.info("No commitlog files found; skipping replay"); + } + else + { + Arrays.sort(files, new CommitLogSegmentFileComparator()); + logger.info("Replaying {}", StringUtils.join(files, ", ")); + replayed = recover(files); + logger.info("Log replay complete, {} replayed mutations", replayed); + + for (File f : files) + CommitLog.instance.allocator.recycleSegment(f); + } + + allocator.enableReserveSegmentCreation(); + return replayed; + } + + /** + * Perform recovery on a list of commit log files. + * + * @param clogs the list of commit log files to replay + * @return the number of mutations replayed + */ + public int recover(File... clogs) throws IOException + { + CommitLogReplayer recovery = new CommitLogReplayer(); + recovery.recover(clogs); + return recovery.blockForWrites(); + } + + /** + * Perform recovery on a single commit log. + */ + public void recover(String path) throws IOException + { + recover(new File(path)); + } + + /** + * @return a ReplayPosition which, if >= one returned from add(), implies add() was started + * (but not necessarily finished) prior to this call + */ + public ReplayPosition getContext() + { + return allocator.allocatingFrom().getContext(); + } + + /** + * Flushes all dirty CFs, waiting for them to free and recycle any segments they were retaining + */ + public void forceRecycleAllSegments(Iterable droppedCfs) + { + allocator.forceRecycleAll(droppedCfs); + } + + /** + * Flushes all dirty CFs, waiting for them to free and recycle any segments they were retaining + */ + public void forceRecycleAllSegments() + { + allocator.forceRecycleAll(Collections.emptyList()); + } + + /** + * Forces a disk flush on the commit log files that need it. Blocking. + */ + public void sync(boolean syncAllSegments) + { + CommitLogSegment current = allocator.allocatingFrom(); + for (CommitLogSegment segment : allocator.getActiveSegments()) + { + if (!syncAllSegments && segment.id > current.id) + return; + segment.sync(); + } + } + + /** + * Preempts the CLExecutor, telling to to sync immediately + */ + public void requestExtraSync() + { + executor.requestExtraSync(); + } + + /** + * Add a Mutation to the commit log. + * + * @param mutation the Mutation to add to the log + */ + public ReplayPosition add(Mutation mutation) + { + assert mutation != null; + + long size = Mutation.serializer.serializedSize(mutation, MessagingService.current_version); + + long totalSize = size + ENTRY_OVERHEAD_SIZE; + if (totalSize > MAX_MUTATION_SIZE) + { + throw new IllegalArgumentException(String.format("Mutation of %s bytes is too large for the maxiumum size of %s", + totalSize, MAX_MUTATION_SIZE)); + } + + Allocation alloc = allocator.allocate(mutation, (int) totalSize); + try + { + PureJavaCrc32 checksum = new PureJavaCrc32(); + final ByteBuffer buffer = alloc.getBuffer(); + DataOutputByteBuffer dos = new DataOutputByteBuffer(buffer); + + // checksummed length + dos.writeInt((int) size); + checksum.update(buffer, buffer.position() - 4, 4); + buffer.putInt(checksum.getCrc()); + + int start = buffer.position(); + // checksummed mutation + Mutation.serializer.serialize(mutation, dos, MessagingService.current_version); + checksum.update(buffer, start, (int) size); + buffer.putInt(checksum.getCrc()); + } + catch (IOException e) + { + throw new FSWriteError(e, alloc.getSegment().getPath()); + } + finally + { + alloc.markWritten(); + } + + executor.finishWriteFor(alloc); + return alloc.getReplayPosition(); + } + + /** + * Modifies the per-CF dirty cursors of any commit log segments for the column family according to the position + * given. Discards any commit log segments that are no longer used. + * + * @param cfId the column family ID that was flushed + * @param context the replay position of the flush + */ + public void discardCompletedSegments(final UUID cfId, final ReplayPosition context) + { + logger.debug("discard completed log segments for {}, table {}", context, cfId); + + // Go thru the active segment files, which are ordered oldest to newest, marking the + // flushed CF as clean, until we reach the segment file containing the ReplayPosition passed + // in the arguments. Any segments that become unused after they are marked clean will be + // recycled or discarded. + for (Iterator iter = allocator.getActiveSegments().iterator(); iter.hasNext();) + { + CommitLogSegment segment = iter.next(); + segment.markClean(cfId, context); + + if (segment.isUnused()) + { + logger.debug("Commit log segment {} is unused", segment); + allocator.recycleSegment(segment); + } + else + { + logger.debug("Not safe to delete{} commit log segment {}; dirty is {}", + (iter.hasNext() ? "" : " active"), segment, segment.dirtyString()); + } + + // Don't mark or try to delete any newer segments once we've reached the one containing the + // position of the flush. + if (segment.contains(context)) + break; + } + } + + @Override + public long getCompletedTasks() + { + return metrics.completedTasks.value(); + } + + @Override + public long getPendingTasks() + { + return metrics.pendingTasks.value(); + } + + /** + * @return the total size occupied by commitlog segments expressed in bytes. (used by MBean) + */ + public long getTotalCommitlogSize() + { + return metrics.totalCommitLogSize.value(); + } + + public List getActiveSegmentNames() + { + List segmentNames = new ArrayList<>(); + for (CommitLogSegment segment : allocator.getActiveSegments()) + segmentNames.add(segment.getName()); + return segmentNames; + } + + public List getArchivingSegmentNames() + { + return new ArrayList<>(archiver.archivePending.keySet()); + } + + /** + * Shuts down the threads used by the commit log, blocking until completion. + */ + public void shutdownBlocking() throws InterruptedException + { + executor.shutdown(); + executor.awaitTermination(); + allocator.shutdown(); + allocator.awaitTermination(); + } + + /** + * FOR TESTING PURPOSES. See CommitLogAllocator. + */ + public void resetUnsafe() + { + allocator.resetUnsafe(); + } + + /** + * Used by tests. + * + * @return the number of active segments (segments with unflushed data in them) + */ + public int activeSegments() + { + return allocator.getActiveSegments().size(); + } + + @VisibleForTesting + public static boolean handleCommitError(String message, Throwable t) + { + JVMStabilityInspector.inspectCommitLogThrowable(t); + switch (DatabaseDescriptor.getCommitFailurePolicy()) + { + // Needed here for unit tests to not fail on default assertion + case die: + case stop: + StorageService.instance.stopTransports(); + case stop_commit: + logger.error(String.format("%s. Commit disk failure policy is %s; terminating thread", message, DatabaseDescriptor.getCommitFailurePolicy()), t); + return false; + case ignore: + logger.error(message, t); + return true; + default: + throw new AssertionError(DatabaseDescriptor.getCommitFailurePolicy()); + } + } + +} diff --git a/db/commitlog/CommitLogArchiver.java b/db/commitlog/CommitLogArchiver.java new file mode 100644 index 0000000000..6cba603773 --- /dev/null +++ b/db/commitlog/CommitLogArchiver.java @@ -0,0 +1,245 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.cassandra.db.commitlog; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Map; +import java.util.Properties; +import java.util.TimeZone; +import java.util.concurrent.*; + +import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.WrappedRunnable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Strings; + +public class CommitLogArchiver +{ + private static final Logger logger = LoggerFactory.getLogger(CommitLogArchiver.class); + public static final SimpleDateFormat format = new SimpleDateFormat("yyyy:MM:dd HH:mm:ss"); + private static final String DELIMITER = ","; + static + { + format.setTimeZone(TimeZone.getTimeZone("GMT")); + } + + public final Map> archivePending = new ConcurrentHashMap>(); + private final ExecutorService executor = new JMXEnabledThreadPoolExecutor("CommitLogArchiver"); + private final String archiveCommand; + private final String restoreCommand; + private final String restoreDirectories; + public final long restorePointInTime; + public final TimeUnit precision; + + public CommitLogArchiver() + { + Properties commitlog_commands = new Properties(); + InputStream stream = null; + try + { + stream = getClass().getClassLoader().getResourceAsStream("commitlog_archiving.properties"); + + if (stream == null) + { + logger.debug("No commitlog_archiving properties found; archive + pitr will be disabled"); + archiveCommand = null; + restoreCommand = null; + restoreDirectories = null; + restorePointInTime = Long.MAX_VALUE; + precision = TimeUnit.MICROSECONDS; + } + else + { + commitlog_commands.load(stream); + archiveCommand = commitlog_commands.getProperty("archive_command"); + restoreCommand = commitlog_commands.getProperty("restore_command"); + restoreDirectories = commitlog_commands.getProperty("restore_directories"); + if (restoreDirectories != null && !restoreDirectories.isEmpty()) + { + for (String dir : restoreDirectories.split(DELIMITER)) + { + File directory = new File(dir); + if (!directory.exists()) + { + if (!directory.mkdir()) + { + throw new RuntimeException("Unable to create directory: " + dir); + } + } + } + } + String targetTime = commitlog_commands.getProperty("restore_point_in_time"); + precision = TimeUnit.valueOf(commitlog_commands.getProperty("precision", "MICROSECONDS")); + try + { + restorePointInTime = Strings.isNullOrEmpty(targetTime) ? Long.MAX_VALUE : format.parse(targetTime).getTime(); + } + catch (ParseException e) + { + throw new RuntimeException("Unable to parse restore target time", e); + } + } + } + catch (IOException e) + { + throw new RuntimeException("Unable to load commitlog_archiving.properties", e); + } + finally + { + FileUtils.closeQuietly(stream); + } + } + + public void maybeArchive(final CommitLogSegment segment) + { + if (Strings.isNullOrEmpty(archiveCommand)) + return; + + archivePending.put(segment.getName(), executor.submit(new WrappedRunnable() + { + protected void runMayThrow() throws IOException + { + segment.waitForFinalSync(); + String command = archiveCommand.replace("%name", segment.getName()); + command = command.replace("%path", segment.getPath()); + exec(command); + } + })); + } + + /** + * Differs from the above because it can be used on any file, rather than only + * managed commit log segments (and thus cannot call waitForFinalSync). + * + * Used to archive files present in the commit log directory at startup (CASSANDRA-6904) + */ + public void maybeArchive(final String path, final String name) + { + if (Strings.isNullOrEmpty(archiveCommand)) + return; + + archivePending.put(name, executor.submit(new WrappedRunnable() + { + protected void runMayThrow() throws IOException + { + String command = archiveCommand.replace("%name", name); + command = command.replace("%path", path); + exec(command); + } + })); + } + + public boolean maybeWaitForArchiving(String name) + { + Future f = archivePending.remove(name); + if (f == null) + return true; // archiving disabled + + try + { + f.get(); + } + catch (InterruptedException e) + { + throw new AssertionError(e); + } + catch (ExecutionException e) + { + if (e.getCause() instanceof IOException) + { + logger.error("Looks like the archiving of file {} failed earlier, cassandra is going to ignore this segment for now.", name); + return false; + } + throw new RuntimeException(e); + } + + return true; + } + + public void maybeRestoreArchive() + { + if (Strings.isNullOrEmpty(restoreDirectories)) + return; + + for (String dir : restoreDirectories.split(DELIMITER)) + { + File[] files = new File(dir).listFiles(); + if (files == null) + { + throw new RuntimeException("Unable to list directory " + dir); + } + for (File fromFile : files) + { + CommitLogDescriptor fromHeader = CommitLogDescriptor.fromHeader(fromFile); + CommitLogDescriptor fromName = CommitLogDescriptor.isValid(fromFile.getName()) ? CommitLogDescriptor.fromFileName(fromFile.getName()) : null; + CommitLogDescriptor descriptor; + if (fromHeader == null && fromName == null) + throw new IllegalStateException("Cannot safely construct descriptor for segment, either from its name or its header: " + fromFile.getPath()); + else if (fromHeader != null && fromName != null && !fromHeader.equals(fromName)) + throw new IllegalStateException(String.format("Cannot safely construct descriptor for segment, as name and header descriptors do not match (%s vs %s): %s", fromHeader, fromName, fromFile.getPath())); + else if (fromName != null && fromHeader == null && fromName.version >= CommitLogDescriptor.VERSION_21) + throw new IllegalStateException("Cannot safely construct descriptor for segment, as name descriptor implies a version that should contain a header descriptor, but that descriptor could not be read: " + fromFile.getPath()); + else if (fromHeader != null) + descriptor = fromHeader; + else descriptor = fromName; + + if (descriptor.version > CommitLogDescriptor.VERSION_21) + throw new IllegalStateException("Unsupported commit log version: " + descriptor.version); + + File toFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName()); + if (toFile.exists()) + { + logger.debug("Skipping restore of archive {} as the segment already exists in the restore location {}", + fromFile.getPath(), toFile.getPath()); + continue; + } + + String command = restoreCommand.replace("%from", fromFile.getPath()); + command = command.replace("%to", toFile.getPath()); + try + { + exec(command); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } + } + + private void exec(String command) throws IOException + { + ProcessBuilder pb = new ProcessBuilder(command.split(" ")); + pb.redirectErrorStream(true); + FBUtilities.exec(pb); + } +} diff --git a/db/commitlog/CommitLogDescriptor.java b/db/commitlog/CommitLogDescriptor.java new file mode 100644 index 0000000000..f914c2c884 --- /dev/null +++ b/db/commitlog/CommitLogDescriptor.java @@ -0,0 +1,168 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.cassandra.db.commitlog; + +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.cassandra.io.FSReadError; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.PureJavaCrc32; + +public class CommitLogDescriptor +{ + private static final String SEPARATOR = "-"; + private static final String FILENAME_PREFIX = "CommitLog" + SEPARATOR; + private static final String FILENAME_EXTENSION = ".log"; + // match both legacy and new version of commitlogs Ex: CommitLog-12345.log and CommitLog-4-12345.log. + private static final Pattern COMMIT_LOG_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + FILENAME_EXTENSION); + + public static final int VERSION_12 = 2; + public static final int VERSION_20 = 3; + public static final int VERSION_21 = 4; + public static final int VERSION_30 = 5; + /** + * Increment this number if there is a changes in the commit log disc layout or MessagingVersion changes. + * Note: make sure to handle {@link #getMessagingVersion()} + */ + @VisibleForTesting + public static final int current_version = VERSION_30; + + // [version, id, checksum] + static final int HEADER_SIZE = 4 + 8 + 4; + + final int version; + public final long id; + + public CommitLogDescriptor(int version, long id) + { + this.version = version; + this.id = id; + } + + public CommitLogDescriptor(long id) + { + this(current_version, id); + } + + static void writeHeader(ByteBuffer out, CommitLogDescriptor descriptor) + { + out.putInt(0, descriptor.version); + out.putLong(4, descriptor.id); + PureJavaCrc32 crc = new PureJavaCrc32(); + crc.updateInt(descriptor.version); + crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL)); + crc.updateInt((int) (descriptor.id >>> 32)); + out.putInt(12, crc.getCrc()); + } + + public static CommitLogDescriptor fromHeader(File file) + { + try (RandomAccessFile raf = new RandomAccessFile(file, "r")) + { + assert raf.getFilePointer() == 0; + int version = raf.readInt(); + long id = raf.readLong(); + int crc = raf.readInt(); + PureJavaCrc32 checkcrc = new PureJavaCrc32(); + checkcrc.updateInt(version); + checkcrc.updateInt((int) (id & 0xFFFFFFFFL)); + checkcrc.updateInt((int) (id >>> 32)); + if (crc == checkcrc.getCrc()) + return new CommitLogDescriptor(version, id); + return null; + } + catch (EOFException e) + { + throw new RuntimeException(e); + } + catch (IOException e) + { + throw new FSReadError(e, file); + } + } + + public static CommitLogDescriptor fromFileName(String name) + { + Matcher matcher; + if (!(matcher = COMMIT_LOG_FILE_PATTERN.matcher(name)).matches()) + throw new RuntimeException("Cannot parse the version of the file: " + name); + + if (matcher.group(3) == null) + throw new UnsupportedOperationException("Commitlog segment is too old to open; upgrade to 1.2.5+ first"); + + long id = Long.parseLong(matcher.group(3).split(SEPARATOR)[1]); + return new CommitLogDescriptor(Integer.parseInt(matcher.group(2)), id); + } + + public int getMessagingVersion() + { + switch (version) + { + case VERSION_12: + return MessagingService.VERSION_12; + case VERSION_20: + return MessagingService.VERSION_20; + case VERSION_21: + return MessagingService.VERSION_21; + case VERSION_30: + return MessagingService.VERSION_30; + default: + throw new IllegalStateException("Unknown commitlog version " + version); + } + } + + public String fileName() + { + return FILENAME_PREFIX + version + SEPARATOR + id + FILENAME_EXTENSION; + } + + /** + * @param filename the filename to check + * @return true if filename could be a commit log based on it's filename + */ + public static boolean isValid(String filename) + { + return COMMIT_LOG_FILE_PATTERN.matcher(filename).matches(); + } + + public String toString() + { + return "(" + version + "," + id + ")"; + } + + public boolean equals(Object that) + { + return that instanceof CommitLogDescriptor && equals((CommitLogDescriptor) that); + } + + public boolean equals(CommitLogDescriptor that) + { + return this.version == that.version && this.id == that.id; + } + +} diff --git a/db/commitlog/CommitLogMBean.java b/db/commitlog/CommitLogMBean.java new file mode 100644 index 0000000000..6c0d8d75ee --- /dev/null +++ b/db/commitlog/CommitLogMBean.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + + +import java.io.IOException; +import java.util.List; + +public interface CommitLogMBean +{ + /** + * Get the number of completed tasks + * @see org.apache.cassandra.metrics.CommitLogMetrics#completedTasks + */ + @Deprecated + public long getCompletedTasks(); + + /** + * Get the number of tasks waiting to be executed + * @see org.apache.cassandra.metrics.CommitLogMetrics#pendingTasks + */ + @Deprecated + public long getPendingTasks(); + + /** + * Get the current size used by all the commitlog segments. + * @see org.apache.cassandra.metrics.CommitLogMetrics#totalCommitLogSize + */ + @Deprecated + public long getTotalCommitlogSize(); + + /** + * Recover a single file. + */ + public void recover(String path) throws IOException; + + /** + * @return file names (not full paths) of active commit log segments (segments containing unflushed data) + */ + public List getActiveSegmentNames(); + + /** + * @return Files which are pending for archival attempt. Does NOT include failed archive attempts. + */ + public List getArchivingSegmentNames(); +} diff --git a/db/commitlog/CommitLogReplayer.java b/db/commitlog/CommitLogReplayer.java new file mode 100644 index 0000000000..e89338ae02 --- /dev/null +++ b/db/commitlog/CommitLogReplayer.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.*; +import java.util.*; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.base.Predicate; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; +import com.google.common.collect.Ordering; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.*; +import org.apache.cassandra.io.util.FastByteArrayInputStream; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.util.RandomAccessReader; +import org.apache.cassandra.utils.*; + +import org.cliffc.high_scale_lib.NonBlockingHashSet; + +public class CommitLogReplayer +{ + private static final Logger logger = LoggerFactory.getLogger(CommitLogReplayer.class); + private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024; + private static final int LEGACY_END_OF_SEGMENT_MARKER = 0; + + private final Set keyspacesRecovered; + private final List> futures; + private final Map invalidMutations; + private final AtomicInteger replayedCount; + private final Map cfPositions; + private final ReplayPosition globalPosition; + private final PureJavaCrc32 checksum; + private byte[] buffer; + + public CommitLogReplayer() + { + this.keyspacesRecovered = new NonBlockingHashSet(); + this.futures = new ArrayList>(); + this.buffer = new byte[4096]; + this.invalidMutations = new HashMap(); + // count the number of replayed mutation. We don't really care about atomicity, but we need it to be a reference. + this.replayedCount = new AtomicInteger(); + this.checksum = new PureJavaCrc32(); + + // compute per-CF and global replay positions + cfPositions = new HashMap(); + Ordering replayPositionOrdering = Ordering.from(ReplayPosition.comparator); + for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) + { + // it's important to call RP.gRP per-cf, before aggregating all the positions w/ the Ordering.min call + // below: gRP will return NONE if there are no flushed sstables, which is important to have in the + // list (otherwise we'll just start replay from the first flush position that we do have, which is not correct). + ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables()); + + // but, if we've truncted the cf in question, then we need to need to start replay after the truncation + ReplayPosition truncatedAt = SystemKeyspace.getTruncatedPosition(cfs.metadata.cfId); + if (truncatedAt != null) + rp = replayPositionOrdering.max(Arrays.asList(rp, truncatedAt)); + + cfPositions.put(cfs.metadata.cfId, rp); + } + globalPosition = replayPositionOrdering.min(cfPositions.values()); + logger.debug("Global replay position is {} from columnfamilies {}", globalPosition, FBUtilities.toString(cfPositions)); + } + + public void recover(File[] clogs) throws IOException + { + for (final File file : clogs) + recover(file); + } + + public int blockForWrites() + { + for (Map.Entry entry : invalidMutations.entrySet()) + logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %s", entry.getValue().intValue(), entry.getKey())); + + // wait for all the writes to finish on the mutation stage + FBUtilities.waitOnFutures(futures); + logger.debug("Finished waiting on mutations from recovery"); + + // flush replayed keyspaces + futures.clear(); + for (Keyspace keyspace : keyspacesRecovered) + futures.addAll(keyspace.flush()); + FBUtilities.waitOnFutures(futures); + return replayedCount.get(); + } + + private int readSyncMarker(CommitLogDescriptor descriptor, int offset, RandomAccessReader reader) throws IOException + { + if (offset > reader.length() - CommitLogSegment.SYNC_MARKER_SIZE) + { + if (offset != reader.length() && offset != Integer.MAX_VALUE) + logger.warn("Encountered bad header at position {} of Commit log {}; not enough room for a header", offset, reader.getPath()); + // cannot possibly be a header here. if we're == length(), assume it's a correctly written final segment + return -1; + } + reader.seek(offset); + PureJavaCrc32 crc = new PureJavaCrc32(); + crc.updateInt((int) (descriptor.id & 0xFFFFFFFFL)); + crc.updateInt((int) (descriptor.id >>> 32)); + crc.updateInt((int) reader.getPosition()); + int end = reader.readInt(); + long filecrc; + if (descriptor.version < CommitLogDescriptor.VERSION_21) + filecrc = reader.readLong(); + else + filecrc = reader.readInt() & 0xffffffffL; + if (crc.getValue() != filecrc) + { + if (end != 0 || filecrc != 0) + { + logger.warn("Encountered bad header at position {} of commit log {}, with invalid CRC. The end of segment marker should be zero.", offset, reader.getPath()); + } + return -1; + } + else if (end < offset || end > reader.length()) + { + logger.warn("Encountered bad header at position {} of commit log {}, with bad position but valid CRC", offset, reader.getPath()); + return -1; + } + return end; + } + + private int getStartOffset(long segmentId, int version) + { + if (globalPosition.segment < segmentId) + { + if (version >= CommitLogDescriptor.VERSION_21) + return CommitLogDescriptor.HEADER_SIZE + CommitLogSegment.SYNC_MARKER_SIZE; + else + return 0; + } + else if (globalPosition.segment == segmentId) + return globalPosition.position; + else + return -1; + } + + private abstract static class ReplayFilter + { + public abstract Iterable filter(Mutation mutation); + + public static ReplayFilter create() + { + // If no replaylist is supplied an empty array of strings is used to replay everything. + if (System.getProperty("cassandra.replayList") == null) + return new AlwaysReplayFilter(); + + Multimap toReplay = HashMultimap.create(); + for (String rawPair : System.getProperty("cassandra.replayList").split(",")) + { + String[] pair = rawPair.trim().split("\\."); + if (pair.length != 2) + throw new IllegalArgumentException("Each table to be replayed must be fully qualified with keyspace name, e.g., 'system.peers'"); + + Keyspace ks = Schema.instance.getKeyspaceInstance(pair[0]); + if (ks == null) + throw new IllegalArgumentException("Unknown keyspace " + pair[0]); + if (ks.getColumnFamilyStore(pair[1]) == null) + throw new IllegalArgumentException(String.format("Unknown table %s.%s", pair[0], pair[1])); + + toReplay.put(pair[0], pair[1]); + } + return new CustomReplayFilter(toReplay); + } + } + + private static class AlwaysReplayFilter extends ReplayFilter + { + public Iterable filter(Mutation mutation) + { + return mutation.getColumnFamilies(); + } + } + + private static class CustomReplayFilter extends ReplayFilter + { + private Multimap toReplay; + + public CustomReplayFilter(Multimap toReplay) + { + this.toReplay = toReplay; + } + + public Iterable filter(Mutation mutation) + { + final Collection cfNames = toReplay.get(mutation.getKeyspaceName()); + if (cfNames == null) + return Collections.emptySet(); + + return Iterables.filter(mutation.getColumnFamilies(), new Predicate() + { + public boolean apply(ColumnFamily cf) + { + return cfNames.contains(cf.metadata().cfName); + } + }); + } + } + + public void recover(File file) throws IOException + { + final ReplayFilter replayFilter = ReplayFilter.create(); + logger.info("Replaying {}", file.getPath()); + CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName()); + final long segmentId = desc.id; + logger.info("Replaying {} (CL version {}, messaging version {})", + file.getPath(), + desc.version, + desc.getMessagingVersion()); + RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath())); + + try + { + assert reader.length() <= Integer.MAX_VALUE; + int offset = getStartOffset(segmentId, desc.version); + if (offset < 0) + { + logger.debug("skipping replay of fully-flushed {}", file); + return; + } + + int prevEnd = CommitLogDescriptor.HEADER_SIZE; + main: while (true) + { + + int end = prevEnd; + if (desc.version < CommitLogDescriptor.VERSION_21) + end = Integer.MAX_VALUE; + else + { + do { end = readSyncMarker(desc, end, reader); } + while (end < offset && end > prevEnd); + } + + if (end < prevEnd) + break; + + if (logger.isDebugEnabled()) + logger.debug("Replaying {} between {} and {}", file, offset, end); + + reader.seek(offset); + + /* read the logs populate Mutation and apply */ + while (reader.getPosition() < end && !reader.isEOF()) + { + if (logger.isDebugEnabled()) + logger.debug("Reading mutation at {}", reader.getFilePointer()); + + long claimedCRC32; + int serializedSize; + try + { + // any of the reads may hit EOF + serializedSize = reader.readInt(); + if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER) + { + logger.debug("Encountered end of segment marker at {}", reader.getFilePointer()); + break main; + } + + // Mutation must be at LEAST 10 bytes: + // 3 each for a non-empty Keyspace and Key (including the + // 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count. + // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128 + if (serializedSize < 10) + break main; + + long claimedSizeChecksum; + if (desc.version < CommitLogDescriptor.VERSION_21) + claimedSizeChecksum = reader.readLong(); + else + claimedSizeChecksum = reader.readInt() & 0xffffffffL; + checksum.reset(); + if (desc.version < CommitLogDescriptor.VERSION_20) + checksum.update(serializedSize); + else + checksum.updateInt(serializedSize); + + if (checksum.getValue() != claimedSizeChecksum) + break main; // entry wasn't synced correctly/fully. that's + // ok. + + if (serializedSize > buffer.length) + buffer = new byte[(int) (1.2 * serializedSize)]; + reader.readFully(buffer, 0, serializedSize); + if (desc.version < CommitLogDescriptor.VERSION_21) + claimedCRC32 = reader.readLong(); + else + claimedCRC32 = reader.readInt() & 0xffffffffL; + } + catch (EOFException eof) + { + break main; // last CL entry didn't get completely written. that's ok. + } + + checksum.update(buffer, 0, serializedSize); + if (claimedCRC32 != checksum.getValue()) + { + // this entry must not have been fsynced. probably the rest is bad too, + // but just in case there is no harm in trying them (since we still read on an entry boundary) + continue; + } + + /* deserialize the commit log entry */ + FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize); + final Mutation mutation; + try + { + mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn), + desc.getMessagingVersion(), + ColumnSerializer.Flag.LOCAL); + // doublecheck that what we read is [still] valid for the current schema + for (ColumnFamily cf : mutation.getColumnFamilies()) + for (Cell cell : cf) + cf.getComparator().validate(cell.name()); + } + catch (UnknownColumnFamilyException ex) + { + if (ex.cfId == null) + continue; + AtomicInteger i = invalidMutations.get(ex.cfId); + if (i == null) + { + i = new AtomicInteger(1); + invalidMutations.put(ex.cfId, i); + } + else + i.incrementAndGet(); + continue; + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + File f = File.createTempFile("mutation", "dat"); + DataOutputStream out = new DataOutputStream(new FileOutputStream(f)); + try + { + out.write(buffer, 0, serializedSize); + } + finally + { + out.close(); + } + String st = String.format("Unexpected error deserializing mutation; saved to %s and ignored. This may be caused by replaying a mutation against a table with the same name but incompatible schema. Exception follows: ", + f.getAbsolutePath()); + logger.error(st, t); + continue; + } + + if (logger.isDebugEnabled()) + logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}"); + + final long entryLocation = reader.getFilePointer(); + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() throws IOException + { + if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null) + return; + if (pointInTimeExceeded(mutation)) + return; + + final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); + + // Rebuild the mutation, omitting column families that + // a) the user has requested that we ignore, + // b) have already been flushed, + // or c) are part of a cf that was dropped. + // Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead. + Mutation newMutation = null; + for (ColumnFamily columnFamily : replayFilter.filter(mutation)) + { + if (Schema.instance.getCF(columnFamily.id()) == null) + continue; // dropped + + ReplayPosition rp = cfPositions.get(columnFamily.id()); + + // replay if current segment is newer than last flushed one or, + // if it is the last known segment, if we are after the replay position + if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position)) + { + if (newMutation == null) + newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key()); + newMutation.add(columnFamily); + replayedCount.incrementAndGet(); + } + } + if (newMutation != null) + { + assert !newMutation.isEmpty(); + Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false); + keyspacesRecovered.add(keyspace); + } + } + }; + futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable)); + if (futures.size() > MAX_OUTSTANDING_REPLAY_COUNT) + { + FBUtilities.waitOnFutures(futures); + futures.clear(); + } + } + + if (desc.version < CommitLogDescriptor.VERSION_21) + break; + + offset = end + CommitLogSegment.SYNC_MARKER_SIZE; + prevEnd = end; + } + } + finally + { + FileUtils.closeQuietly(reader); + logger.info("Finished reading {}", file); + } + } + + protected boolean pointInTimeExceeded(Mutation fm) + { + long restoreTarget = CommitLog.instance.archiver.restorePointInTime; + + for (ColumnFamily families : fm.getColumnFamilies()) + { + if (CommitLog.instance.archiver.precision.toMillis(families.maxTimestamp()) > restoreTarget) + return true; + } + return false; + } +} diff --git a/db/commitlog/CommitLogSegment.java b/db/commitlog/CommitLogSegment.java new file mode 100644 index 0000000000..185f57a03b --- /dev/null +++ b/db/commitlog/CommitLogSegment.java @@ -0,0 +1,632 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.cliffc.high_scale_lib.NonBlockingHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.io.FSWriteError; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.CLibrary; +import org.apache.cassandra.utils.PureJavaCrc32; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.concurrent.WaitQueue; + +/* + * A single commit log file on disk. Manages creation of the file and writing mutations to disk, + * as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment + * files are initially allocated to a fixed size and can grow to accomidate a larger value if necessary. + */ +public class CommitLogSegment +{ + private static final Logger logger = LoggerFactory.getLogger(CommitLogSegment.class); + + private final static long idBase; + private final static AtomicInteger nextId = new AtomicInteger(1); + static + { + long maxId = Long.MIN_VALUE; + for (File file : new File(DatabaseDescriptor.getCommitLogLocation()).listFiles()) + { + if (CommitLogDescriptor.isValid(file.getName())) + maxId = Math.max(CommitLogDescriptor.fromFileName(file.getName()).id, maxId); + } + idBase = Math.max(System.currentTimeMillis(), maxId + 1); + } + + // The commit log entry overhead in bytes (int: length + int: head checksum + int: tail checksum) + public static final int ENTRY_OVERHEAD_SIZE = 4 + 4 + 4; + + // The commit log (chained) sync marker/header size in bytes (int: length + int: checksum [segmentId, position]) + static final int SYNC_MARKER_SIZE = 4 + 4; + + // The OpOrder used to order appends wrt sync + private final OpOrder appendOrder = new OpOrder(); + + private final AtomicInteger allocatePosition = new AtomicInteger(); + + // Everything before this offset has been synced and written. The SYNC_MARKER_SIZE bytes after + // each sync are reserved, and point forwards to the next such offset. The final + // sync marker in a segment will be zeroed out, or point to EOF. + private volatile int lastSyncedOffset; + + // the amount of the tail of the file we have allocated but not used - this is used when we discard a log segment + // to ensure nobody writes to it after we've decided we're done with it + private int discardedTailFrom; + + // a signal for writers to wait on to confirm the log message they provided has been written to disk + private final WaitQueue syncComplete = new WaitQueue(); + + // a map of Cf->dirty position; this is used to permit marking Cfs clean whilst the log is still in use + private final NonBlockingHashMap cfDirty = new NonBlockingHashMap<>(1024); + + // a map of Cf->clean position; this is used to permit marking Cfs clean whilst the log is still in use + private final ConcurrentHashMap cfClean = new ConcurrentHashMap<>(); + + public final long id; + + private final File logFile; + private final RandomAccessFile logFileAccessor; + private final int fd; + + private final MappedByteBuffer buffer; + + public final CommitLogDescriptor descriptor; + + /** + * @return a newly minted segment file + */ + static CommitLogSegment freshSegment() + { + return new CommitLogSegment(null); + } + + static long getNextId() + { + return idBase + nextId.getAndIncrement(); + } + + /** + * Constructs a new segment file. + * + * @param filePath if not null, recycles the existing file by renaming it and truncating it to CommitLog.SEGMENT_SIZE. + */ + CommitLogSegment(String filePath) + { + id = getNextId(); + descriptor = new CommitLogDescriptor(id); + logFile = new File(DatabaseDescriptor.getCommitLogLocation(), descriptor.fileName()); + boolean isCreating = true; + + try + { + if (filePath != null) + { + File oldFile = new File(filePath); + + if (oldFile.exists()) + { + logger.debug("Re-using discarded CommitLog segment for {} from {}", id, filePath); + if (!oldFile.renameTo(logFile)) + throw new IOException("Rename from " + filePath + " to " + id + " failed"); + isCreating = false; + } + } + + // Open the initial the segment file + logFileAccessor = new RandomAccessFile(logFile, "rw"); + + if (isCreating) + logger.debug("Creating new commit log segment {}", logFile.getPath()); + + // Map the segment, extending or truncating it to the standard segment size. + // (We may have restarted after a segment size configuration change, leaving "incorrectly" + // sized segments on disk.) + logFileAccessor.setLength(DatabaseDescriptor.getCommitLogSegmentSize()); + fd = CLibrary.getfd(logFileAccessor.getFD()); + + buffer = logFileAccessor.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, DatabaseDescriptor.getCommitLogSegmentSize()); + // write the header + CommitLogDescriptor.writeHeader(buffer, descriptor); + // mark the initial sync marker as uninitialised + buffer.putInt(CommitLogDescriptor.HEADER_SIZE, 0); + buffer.putLong(CommitLogDescriptor.HEADER_SIZE + 4, 0); + allocatePosition.set(CommitLogDescriptor.HEADER_SIZE + SYNC_MARKER_SIZE); + lastSyncedOffset = CommitLogDescriptor.HEADER_SIZE; + } + catch (IOException e) + { + throw new FSWriteError(e, logFile); + } + } + + /** + * Allocate space in this buffer for the provided mutation, and return the allocated Allocation object. + * Returns null if there is not enough space in this segment, and a new segment is needed. + */ + Allocation allocate(Mutation mutation, int size) + { + final OpOrder.Group opGroup = appendOrder.start(); + try + { + int position = allocate(size); + if (position < 0) + { + opGroup.close(); + return null; + } + markDirty(mutation, position); + return new Allocation(this, opGroup, position, (ByteBuffer) buffer.duplicate().position(position).limit(position + size)); + } + catch (Throwable t) + { + opGroup.close(); + throw t; + } + } + + // allocate bytes in the segment, or return -1 if not enough space + private int allocate(int size) + { + while (true) + { + int prev = allocatePosition.get(); + int next = prev + size; + if (next >= buffer.capacity()) + return -1; + if (allocatePosition.compareAndSet(prev, next)) + return prev; + } + } + + // ensures no more of this segment is writeable, by allocating any unused section at the end and marking it discarded + void discardUnusedTail() + { + // we guard this with the OpOrdering instead of synchronised due to potential dead-lock with CLSM.advanceAllocatingFrom() + // this actually isn't strictly necessary, as currently all calls to discardUnusedTail occur within a block + // already protected by this OpOrdering, but to prevent future potential mistakes, we duplicate the protection here + // so that the contract between discardUnusedTail() and sync() is more explicit. + try (OpOrder.Group group = appendOrder.start()) + { + while (true) + { + int prev = allocatePosition.get(); + // we set allocatePosition past buffer.capacity() to make sure we always set discardedTailFrom + int next = buffer.capacity() + 1; + if (prev == next) + return; + if (allocatePosition.compareAndSet(prev, next)) + { + discardedTailFrom = prev; + return; + } + } + } + } + + /** + * Wait for any appends or discardUnusedTail() operations started before this method was called + */ + void waitForModifications() + { + // issue a barrier and wait for it + appendOrder.awaitNewBarrier(); + } + + /** + * Forces a disk flush for this segment file. + */ + synchronized void sync() + { + try + { + // check we have more work to do + if (allocatePosition.get() <= lastSyncedOffset + SYNC_MARKER_SIZE) + return; + + // allocate a new sync marker; this is both necessary in itself, but also serves to demarcate + // the point at which we can safely consider records to have been completely written to + int nextMarker; + nextMarker = allocate(SYNC_MARKER_SIZE); + boolean close = false; + if (nextMarker < 0) + { + // ensure no more of this CLS is writeable, and mark ourselves for closing + discardUnusedTail(); + close = true; + + // wait for modifications guards both discardedTailFrom, and any outstanding appends + waitForModifications(); + + if (discardedTailFrom < buffer.capacity() - SYNC_MARKER_SIZE) + { + // if there's room in the discard section to write an empty header, use that as the nextMarker + nextMarker = discardedTailFrom; + } + else + { + // not enough space left in the buffer, so mark the next sync marker as the EOF position + nextMarker = buffer.capacity(); + } + } + else + { + waitForModifications(); + } + + assert nextMarker > lastSyncedOffset; + + // write previous sync marker to point to next sync marker + // we don't chain the crcs here to ensure this method is idempotent if it fails + int offset = lastSyncedOffset; + final PureJavaCrc32 crc = new PureJavaCrc32(); + crc.updateInt((int) (id & 0xFFFFFFFFL)); + crc.updateInt((int) (id >>> 32)); + crc.updateInt(offset); + buffer.putInt(offset, nextMarker); + buffer.putInt(offset + 4, crc.getCrc()); + + // zero out the next sync marker so replayer can cleanly exit + if (nextMarker < buffer.capacity()) + { + buffer.putInt(nextMarker, 0); + buffer.putInt(nextMarker + 4, 0); + } + + // actually perform the sync and signal those waiting for it + buffer.force(); + + if (close) + nextMarker = buffer.capacity(); + + lastSyncedOffset = nextMarker; + syncComplete.signalAll(); + + CLibrary.trySkipCache(fd, offset, nextMarker); + if (close) + close(); + } + catch (Exception e) // MappedByteBuffer.force() does not declare IOException but can actually throw it + { + throw new FSWriteError(e, getPath()); + } + } + + public boolean isStillAllocating() + { + return allocatePosition.get() < buffer.capacity(); + } + + /** + * Completely discards a segment file by deleting it. (Potentially blocking operation) + */ + void delete() + { + FileUtils.deleteWithConfirm(logFile); + } + + /** + * Recycle processes an unneeded segment file for reuse. + * + * @return a new CommitLogSegment representing the newly reusable segment. + */ + CommitLogSegment recycle() + { + try + { + sync(); + } + catch (FSWriteError e) + { + logger.error("I/O error flushing {} {}", this, e.getMessage()); + throw e; + } + + close(); + + return new CommitLogSegment(getPath()); + } + + /** + * @return the current ReplayPosition for this log segment + */ + public ReplayPosition getContext() + { + return new ReplayPosition(id, allocatePosition.get()); + } + + /** + * @return the file path to this segment + */ + public String getPath() + { + return logFile.getPath(); + } + + /** + * @return the file name of this segment + */ + public String getName() + { + return logFile.getName(); + } + + void waitForFinalSync() + { + while (true) + { + WaitQueue.Signal signal = syncComplete.register(); + if (lastSyncedOffset < buffer.capacity()) + { + signal.awaitUninterruptibly(); + } + else + { + signal.cancel(); + break; + } + } + } + + /** + * Close the segment file. + */ + void close() + { + try + { + if (FileUtils.isCleanerAvailable()) + FileUtils.clean(buffer); + logFileAccessor.close(); + } + catch (IOException e) + { + throw new FSWriteError(e, getPath()); + } + } + + void markDirty(Mutation mutation, int allocatedPosition) + { + for (ColumnFamily columnFamily : mutation.getColumnFamilies()) + { + // check for deleted CFS + CFMetaData cfm = columnFamily.metadata(); + if (cfm.isPurged()) + logger.error("Attempted to write commit log entry for unrecognized table: {}", columnFamily.id()); + else + ensureAtleast(cfDirty, cfm.cfId, allocatedPosition); + } + } + + /** + * Marks the ColumnFamily specified by cfId as clean for this log segment. If the + * given context argument is contained in this file, it will only mark the CF as + * clean if no newer writes have taken place. + * + * @param cfId the column family ID that is now clean + * @param context the optional clean offset + */ + public synchronized void markClean(UUID cfId, ReplayPosition context) + { + if (!cfDirty.containsKey(cfId)) + return; + if (context.segment == id) + markClean(cfId, context.position); + else if (context.segment > id) + markClean(cfId, Integer.MAX_VALUE); + } + + private void markClean(UUID cfId, int position) + { + ensureAtleast(cfClean, cfId, position); + removeCleanFromDirty(); + } + + private static void ensureAtleast(ConcurrentMap map, UUID cfId, int value) + { + AtomicInteger i = map.get(cfId); + if (i == null) + { + AtomicInteger i2 = map.putIfAbsent(cfId, i = new AtomicInteger()); + if (i2 != null) + i = i2; + } + while (true) + { + int cur = i.get(); + if (cur > value) + break; + if (i.compareAndSet(cur, value)) + break; + } + } + + private void removeCleanFromDirty() + { + // if we're still allocating from this segment, don't touch anything since it can't be done thread-safely + if (isStillAllocating()) + return; + + Iterator> iter = cfClean.entrySet().iterator(); + while (iter.hasNext()) + { + Map.Entry clean = iter.next(); + UUID cfId = clean.getKey(); + AtomicInteger cleanPos = clean.getValue(); + AtomicInteger dirtyPos = cfDirty.get(cfId); + if (dirtyPos != null && dirtyPos.intValue() <= cleanPos.intValue()) + { + cfDirty.remove(cfId); + iter.remove(); + } + } + } + + /** + * @return a collection of dirty CFIDs for this segment file. + */ + public synchronized Collection getDirtyCFIDs() + { + if (cfClean.isEmpty() || cfDirty.isEmpty()) + return cfDirty.keySet(); + + List r = new ArrayList<>(cfDirty.size()); + for (Map.Entry dirty : cfDirty.entrySet()) + { + UUID cfId = dirty.getKey(); + AtomicInteger dirtyPos = dirty.getValue(); + AtomicInteger cleanPos = cfClean.get(cfId); + if (cleanPos == null || cleanPos.intValue() < dirtyPos.intValue()) + r.add(dirty.getKey()); + } + return r; + } + + /** + * @return true if this segment is unused and safe to recycle or delete + */ + public synchronized boolean isUnused() + { + // if room to allocate, we're still in use as the active allocatingFrom, + // so we don't want to race with updates to cfClean with removeCleanFromDirty + if (isStillAllocating()) + return false; + + removeCleanFromDirty(); + return cfDirty.isEmpty(); + } + + /** + * Check to see if a certain ReplayPosition is contained by this segment file. + * + * @param context the replay position to be checked + * @return true if the replay position is contained by this segment file. + */ + public boolean contains(ReplayPosition context) + { + return context.segment == id; + } + + // For debugging, not fast + public String dirtyString() + { + StringBuilder sb = new StringBuilder(); + for (UUID cfId : getDirtyCFIDs()) + { + CFMetaData m = Schema.instance.getCFMetaData(cfId); + sb.append(m == null ? "" : m.cfName).append(" (").append(cfId).append("), "); + } + return sb.toString(); + } + + @Override + public String toString() + { + return "CommitLogSegment(" + getPath() + ')'; + } + + public static class CommitLogSegmentFileComparator implements Comparator + { + public int compare(File f, File f2) + { + CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(f.getName()); + CommitLogDescriptor desc2 = CommitLogDescriptor.fromFileName(f2.getName()); + return Long.compare(desc.id, desc2.id); + } + } + + /** + * A simple class for tracking information about the portion of a segment that has been allocated to a log write. + * The constructor leaves the fields uninitialized for population by CommitlogManager, so that it can be + * stack-allocated by escape analysis in CommitLog.add. + */ + static class Allocation + { + + private final CommitLogSegment segment; + private final OpOrder.Group appendOp; + private final int position; + private final ByteBuffer buffer; + + Allocation(CommitLogSegment segment, OpOrder.Group appendOp, int position, ByteBuffer buffer) + { + this.segment = segment; + this.appendOp = appendOp; + this.position = position; + this.buffer = buffer; + } + + CommitLogSegment getSegment() + { + return segment; + } + + ByteBuffer getBuffer() + { + return buffer; + } + + // markWritten() MUST be called once we are done with the segment or the CL will never flush + // but must not be called more than once + void markWritten() + { + appendOp.close(); + } + + void awaitDiskSync() + { + while (segment.lastSyncedOffset < position) + { + WaitQueue.Signal signal = segment.syncComplete.register(CommitLog.instance.metrics.waitingOnCommit.time()); + if (segment.lastSyncedOffset < position) + signal.awaitUninterruptibly(); + else + signal.cancel(); + } + } + + public ReplayPosition getReplayPosition() + { + return new ReplayPosition(segment.id, buffer.limit()); + } + + } +} diff --git a/db/commitlog/CommitLogSegmentManager.java b/db/commitlog/CommitLogSegmentManager.java new file mode 100644 index 0000000000..0771b7a927 --- /dev/null +++ b/db/commitlog/CommitLogSegmentManager.java @@ -0,0 +1,558 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.Pair; +import org.apache.cassandra.utils.concurrent.WaitQueue; +import org.apache.cassandra.utils.JVMStabilityInspector; +import org.apache.cassandra.utils.WrappedRunnable; + +import static org.apache.cassandra.db.commitlog.CommitLogSegment.Allocation; + +/** + * Performs eager-creation of commit log segments in a background thread. All the + * public methods are thread safe. + */ +public class CommitLogSegmentManager +{ + static final Logger logger = LoggerFactory.getLogger(CommitLogSegmentManager.class); + + /** + * Queue of work to be done by the manager thread. This is usually a recycle operation, which returns + * a CommitLogSegment, or a delete operation, which returns null. + */ + private final BlockingQueue> segmentManagementTasks = new LinkedBlockingQueue<>(); + + /** Segments that are ready to be used. Head of the queue is the one we allocate writes to */ + private final ConcurrentLinkedQueue availableSegments = new ConcurrentLinkedQueue<>(); + + /** Active segments, containing unflushed data */ + private final ConcurrentLinkedQueue activeSegments = new ConcurrentLinkedQueue<>(); + + /** The segment we are currently allocating commit log records to */ + private volatile CommitLogSegment allocatingFrom = null; + + private final WaitQueue hasAvailableSegments = new WaitQueue(); + + /** + * Tracks commitlog size, in multiples of the segment size. We need to do this so we can "promise" size + * adjustments ahead of actually adding/freeing segments on disk, so that the "evict oldest segment" logic + * can see the effect of recycling segments immediately (even though they're really happening asynchronously + * on the manager thread, which will take a ms or two). + */ + private final AtomicLong size = new AtomicLong(); + + /** + * New segment creation is initially disabled because we'll typically get some "free" segments + * recycled after log replay. + */ + private volatile boolean createReserveSegments = false; + + private final Thread managerThread; + private volatile boolean run = true; + + public CommitLogSegmentManager() + { + // The run loop for the manager thread + Runnable runnable = new WrappedRunnable() + { + public void runMayThrow() throws Exception + { + while (run) + { + try + { + Callable task = segmentManagementTasks.poll(); + if (task == null) + { + // if we have no more work to do, check if we should create a new segment + if (availableSegments.isEmpty() && (activeSegments.isEmpty() || createReserveSegments)) + { + logger.debug("No segments in reserve; creating a fresh one"); + size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize()); + // TODO : some error handling in case we fail to create a new segment + availableSegments.add(CommitLogSegment.freshSegment()); + hasAvailableSegments.signalAll(); + } + + // flush old Cfs if we're full + long unused = unusedCapacity(); + if (unused < 0) + { + List segmentsToRecycle = new ArrayList<>(); + long spaceToReclaim = 0; + for (CommitLogSegment segment : activeSegments) + { + if (segment == allocatingFrom) + break; + segmentsToRecycle.add(segment); + spaceToReclaim += DatabaseDescriptor.getCommitLogSegmentSize(); + if (spaceToReclaim + unused >= 0) + break; + } + flushDataFrom(segmentsToRecycle, false); + } + + try + { + // wait for new work to be provided + task = segmentManagementTasks.take(); + } + catch (InterruptedException e) + { + // shutdown signal; exit cleanly + continue; + } + } + + CommitLogSegment recycled = task.call(); + if (recycled != null) + { + // if the work resulted in a segment to recycle, publish it + availableSegments.add(recycled); + hasAvailableSegments.signalAll(); + } + } + catch (Throwable t) + { + JVMStabilityInspector.inspectThrowable(t); + if (!CommitLog.handleCommitError("Failed managing commit log segments", t)) + return; + // sleep some arbitrary period to avoid spamming CL + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + } + } + } + }; + + managerThread = new Thread(runnable, "COMMIT-LOG-ALLOCATOR"); + managerThread.start(); + } + + /** + * Reserve space in the current segment for the provided mutation or, if there isn't space available, + * create a new segment. + * + * @return the provided Allocation object + */ + public Allocation allocate(Mutation mutation, int size) + { + CommitLogSegment segment = allocatingFrom(); + + Allocation alloc; + while ( null == (alloc = segment.allocate(mutation, size)) ) + { + // failed to allocate, so move to a new segment with enough room + advanceAllocatingFrom(segment); + segment = allocatingFrom; + } + + return alloc; + } + + // simple wrapper to ensure non-null value for allocatingFrom; only necessary on first call + CommitLogSegment allocatingFrom() + { + CommitLogSegment r = allocatingFrom; + if (r == null) + { + advanceAllocatingFrom(null); + r = allocatingFrom; + } + return r; + } + + /** + * Fetches a new segment from the queue, creating a new one if necessary, and activates it + */ + private void advanceAllocatingFrom(CommitLogSegment old) + { + while (true) + { + CommitLogSegment next; + synchronized (this) + { + // do this in a critical section so we can atomically remove from availableSegments and add to allocatingFrom/activeSegments + // see https://issues.apache.org/jira/browse/CASSANDRA-6557?focusedCommentId=13874432&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13874432 + if (allocatingFrom != old) + return; + next = availableSegments.poll(); + if (next != null) + { + allocatingFrom = next; + activeSegments.add(next); + } + } + + if (next != null) + { + if (old != null) + { + // Now we can run the user defined command just after switching to the new commit log. + // (Do this here instead of in the recycle call so we can get a head start on the archive.) + CommitLog.instance.archiver.maybeArchive(old); + + // ensure we don't continue to use the old file; not strictly necessary, but cleaner to enforce it + old.discardUnusedTail(); + } + + // request that the CL be synced out-of-band, as we've finished a segment + CommitLog.instance.requestExtraSync(); + return; + } + + // no more segments, so register to receive a signal when not empty + WaitQueue.Signal signal = hasAvailableSegments.register(CommitLog.instance.metrics.waitingOnSegmentAllocation.time()); + + // trigger the management thread; this must occur after registering + // the signal to ensure we are woken by any new segment creation + wakeManager(); + + // check if the queue has already been added to before waiting on the signal, to catch modifications + // that happened prior to registering the signal; *then* check to see if we've been beaten to making the change + if (!availableSegments.isEmpty() || allocatingFrom != old) + { + signal.cancel(); + // if we've been beaten, just stop immediately + if (allocatingFrom != old) + return; + // otherwise try again, as there should be an available segment + continue; + } + + // can only reach here if the queue hasn't been inserted into + // before we registered the signal, as we only remove items from the queue + // after updating allocatingFrom. Can safely block until we are signalled + // by the allocator that new segments have been published + signal.awaitUninterruptibly(); + } + } + + private void wakeManager() + { + // put a NO-OP on the queue, to trigger management thread (and create a new segment if necessary) + segmentManagementTasks.add(new Callable() + { + public CommitLogSegment call() + { + return null; + } + }); + } + + /** + * Switch to a new segment, regardless of how much is left in the current one. + * + * Flushes any dirty CFs for this segment and any older segments, and then recycles + * the segments + */ + void forceRecycleAll(Iterable droppedCfs) + { + List segmentsToRecycle = new ArrayList<>(activeSegments); + CommitLogSegment last = segmentsToRecycle.get(segmentsToRecycle.size() - 1); + advanceAllocatingFrom(last); + + // wait for the commit log modifications + last.waitForModifications(); + + // make sure the writes have materialized inside of the memtables by waiting for all outstanding writes + // on the relevant keyspaces to complete + Set keyspaces = new HashSet<>(); + for (UUID cfId : last.getDirtyCFIDs()) + { + ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(cfId); + if (cfs != null) + keyspaces.add(cfs.keyspace); + } + for (Keyspace keyspace : keyspaces) + keyspace.writeOrder.awaitNewBarrier(); + + // flush and wait for all CFs that are dirty in segments up-to and including 'last' + Future future = flushDataFrom(segmentsToRecycle, true); + try + { + future.get(); + + for (CommitLogSegment segment : activeSegments) + for (UUID cfId : droppedCfs) + segment.markClean(cfId, segment.getContext()); + + // now recycle segments that are unused, as we may not have triggered a discardCompletedSegments() + // if the previous active segment was the only one to recycle (since an active segment isn't + // necessarily dirty, and we only call dCS after a flush). + for (CommitLogSegment segment : activeSegments) + if (segment.isUnused()) + recycleSegment(segment); + + CommitLogSegment first; + if ((first = activeSegments.peek()) != null && first.id <= last.id) + logger.error("Failed to force-recycle all segments; at least one segment is still in use with dirty CFs."); + } + catch (Throwable t) + { + // for now just log the error and return false, indicating that we failed + logger.error("Failed waiting for a forced recycle of in-use commit log segments", t); + } + } + + /** + * Indicates that a segment is no longer in use and that it should be recycled. + * + * @param segment segment that is no longer in use + */ + void recycleSegment(final CommitLogSegment segment) + { + boolean archiveSuccess = CommitLog.instance.archiver.maybeWaitForArchiving(segment.getName()); + activeSegments.remove(segment); + if (!archiveSuccess) + { + // if archiving (command) was not successful then leave the file alone. don't delete or recycle. + discardSegment(segment, false); + return; + } + if (isCapExceeded()) + { + discardSegment(segment, true); + return; + } + + logger.debug("Recycling {}", segment); + segmentManagementTasks.add(new Callable() + { + public CommitLogSegment call() + { + return segment.recycle(); + } + }); + } + + /** + * Differs from the above because it can work on any file instead of just existing + * commit log segments managed by this manager. + * + * @param file segment file that is no longer in use. + */ + void recycleSegment(final File file) + { + if (isCapExceeded() + || CommitLogDescriptor.fromFileName(file.getName()).getMessagingVersion() != MessagingService.current_version) + { + // (don't decrease managed size, since this was never a "live" segment) + logger.debug("(Unopened) segment {} is no longer needed and will be deleted now", file); + FileUtils.deleteWithConfirm(file); + return; + } + + logger.debug("Recycling {}", file); + // this wasn't previously a live segment, so add it to the managed size when we make it live + size.addAndGet(DatabaseDescriptor.getCommitLogSegmentSize()); + segmentManagementTasks.add(new Callable() + { + public CommitLogSegment call() + { + return new CommitLogSegment(file.getPath()); + } + }); + } + + /** + * Indicates that a segment file should be deleted. + * + * @param segment segment to be discarded + */ + private void discardSegment(final CommitLogSegment segment, final boolean deleteFile) + { + logger.debug("Segment {} is no longer active and will be deleted {}", segment, deleteFile ? "now" : "by the archive script"); + size.addAndGet(-DatabaseDescriptor.getCommitLogSegmentSize()); + + segmentManagementTasks.add(new Callable() + { + public CommitLogSegment call() + { + segment.close(); + if (deleteFile) + segment.delete(); + return null; + } + }); + } + + /** + * @return the space (in bytes) used by all segment files. + */ + public long bytesUsed() + { + return size.get(); + } + + /** + * @param name the filename to check + * @return true if file is managed by this manager. + */ + public boolean manages(String name) + { + for (CommitLogSegment segment : Iterables.concat(activeSegments, availableSegments)) + if (segment.getName().equals(name)) + return true; + return false; + } + + /** + * Check to see if the speculative current size exceeds the cap. + * + * @return true if cap is exceeded + */ + private boolean isCapExceeded() + { + return unusedCapacity() < 0; + } + + private long unusedCapacity() + { + long currentSize = size.get(); + logger.debug("Total active commitlog segment space used is {}", currentSize); + return DatabaseDescriptor.getTotalCommitlogSpaceInMB() * 1024 * 1024 - currentSize; + } + + /** + * Throws a flag that enables the behavior of keeping at least one spare segment + * available at all times. + */ + public void enableReserveSegmentCreation() + { + createReserveSegments = true; + wakeManager(); + } + + /** + * Force a flush on all CFs that are still dirty in @param segments. + * + * @return a Future that will finish when all the flushes are complete. + */ + private Future flushDataFrom(List segments, boolean force) + { + if (segments.isEmpty()) + return Futures.immediateFuture(null); + final ReplayPosition maxReplayPosition = segments.get(segments.size() - 1).getContext(); + + // a map of CfId -> forceFlush() to ensure we only queue one flush per cf + final Map> flushes = new LinkedHashMap<>(); + + for (CommitLogSegment segment : segments) + { + for (UUID dirtyCFId : segment.getDirtyCFIDs()) + { + Pair pair = Schema.instance.getCF(dirtyCFId); + if (pair == null) + { + // even though we remove the schema entry before a final flush when dropping a CF, + // it's still possible for a writer to race and finish his append after the flush. + logger.debug("Marking clean CF {} that doesn't exist anymore", dirtyCFId); + segment.markClean(dirtyCFId, segment.getContext()); + } + else if (!flushes.containsKey(dirtyCFId)) + { + String keyspace = pair.left; + final ColumnFamilyStore cfs = Keyspace.open(keyspace).getColumnFamilyStore(dirtyCFId); + // can safely call forceFlush here as we will only ever block (briefly) for other attempts to flush, + // no deadlock possibility since switchLock removal + flushes.put(dirtyCFId, force ? cfs.forceFlush() : cfs.forceFlush(maxReplayPosition)); + } + } + } + + return Futures.allAsList(flushes.values()); + } + + /** + * Resets all the segments, for testing purposes. DO NOT USE THIS OUTSIDE OF TESTS. + */ + public void resetUnsafe() + { + logger.debug("Closing and clearing existing commit log segments..."); + + while (!segmentManagementTasks.isEmpty()) + Thread.yield(); + + for (CommitLogSegment segment : activeSegments) + segment.close(); + activeSegments.clear(); + + for (CommitLogSegment segment : availableSegments) + segment.close(); + availableSegments.clear(); + + allocatingFrom = null; + } + + /** + * Initiates the shutdown process for the management thread. + */ + public void shutdown() + { + run = false; + managerThread.interrupt(); + } + + /** + * Returns when the management thread terminates. + */ + public void awaitTermination() throws InterruptedException + { + managerThread.join(); + } + + /** + * @return a read-only collection of the active commit log segments + */ + Collection getActiveSegments() + { + return Collections.unmodifiableCollection(activeSegments); + } +} + diff --git a/db/commitlog/PeriodicCommitLogService.java b/db/commitlog/PeriodicCommitLogService.java new file mode 100644 index 0000000000..14bb367081 --- /dev/null +++ b/db/commitlog/PeriodicCommitLogService.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.utils.concurrent.WaitQueue; + +class PeriodicCommitLogService extends AbstractCommitLogService +{ + + private static final int blockWhenSyncLagsMillis = (int) (DatabaseDescriptor.getCommitLogSyncPeriod() * 1.5); + + public PeriodicCommitLogService(final CommitLog commitLog) + { + super(commitLog, "PERIODIC-COMMIT-LOG-SYNCER", DatabaseDescriptor.getCommitLogSyncPeriod()); + } + + protected void maybeWaitForSync(CommitLogSegment.Allocation alloc) + { + if (waitForSyncToCatchUp(Long.MAX_VALUE)) + { + // wait until periodic sync() catches up with its schedule + long started = System.currentTimeMillis(); + pending.incrementAndGet(); + while (waitForSyncToCatchUp(started)) + { + WaitQueue.Signal signal = syncComplete.register(CommitLog.instance.metrics.waitingOnCommit.time()); + if (waitForSyncToCatchUp(started)) + signal.awaitUninterruptibly(); + else + signal.cancel(); + } + pending.decrementAndGet(); + } + } + + /** + * @return true if sync is currently lagging behind inserts + */ + private boolean waitForSyncToCatchUp(long started) + { + return started > lastSyncedAt + blockWhenSyncLagsMillis; + } +} \ No newline at end of file diff --git a/db/commitlog/ReplayPosition.java b/db/commitlog/ReplayPosition.java new file mode 100644 index 0000000000..ca1969feb5 --- /dev/null +++ b/db/commitlog/ReplayPosition.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.commitlog; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Comparator; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.ISerializer; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.util.DataOutputPlus; + +public class ReplayPosition implements Comparable +{ + public static final ReplayPositionSerializer serializer = new ReplayPositionSerializer(); + + // NONE is used for SSTables that are streamed from other nodes and thus have no relationship + // with our local commitlog. The values satisfy the critera that + // - no real commitlog segment will have the given id + // - it will sort before any real replayposition, so it will be effectively ignored by getReplayPosition + public static final ReplayPosition NONE = new ReplayPosition(-1, 0); + + /** + * Convenience method to compute the replay position for a group of SSTables. + * @param sstables + * @return the most recent (highest) replay position + */ + public static ReplayPosition getReplayPosition(Iterable sstables) + { + if (Iterables.isEmpty(sstables)) + return NONE; + + Function f = new Function() + { + public ReplayPosition apply(SSTableReader sstable) + { + return sstable.getReplayPosition(); + } + }; + Ordering ordering = Ordering.from(ReplayPosition.comparator); + return ordering.max(Iterables.transform(sstables, f)); + } + + + public final long segment; + public final int position; + + public static final Comparator comparator = new Comparator() + { + public int compare(ReplayPosition o1, ReplayPosition o2) + { + if (o1.segment != o2.segment) + return Long.valueOf(o1.segment).compareTo(o2.segment); + + return Integer.valueOf(o1.position).compareTo(o2.position); + } + }; + + public ReplayPosition(long segment, int position) + { + this.segment = segment; + assert position >= 0; + this.position = position; + } + + public int compareTo(ReplayPosition other) + { + return comparator.compare(this, other); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ReplayPosition that = (ReplayPosition) o; + + if (position != that.position) return false; + return segment == that.segment; + } + + @Override + public int hashCode() + { + int result = (int) (segment ^ (segment >>> 32)); + result = 31 * result + position; + return result; + } + + @Override + public String toString() + { + return "ReplayPosition(" + + "segmentId=" + segment + + ", position=" + position + + ')'; + } + + public ReplayPosition clone() + { + return new ReplayPosition(segment, position); + } + + public static class ReplayPositionSerializer implements ISerializer + { + public void serialize(ReplayPosition rp, DataOutputPlus out) throws IOException + { + out.writeLong(rp.segment); + out.writeInt(rp.position); + } + + public ReplayPosition deserialize(DataInput in) throws IOException + { + return new ReplayPosition(in.readLong(), in.readInt()); + } + + public long serializedSize(ReplayPosition rp, TypeSizes typeSizes) + { + return typeSizes.sizeof(rp.segment) + typeSizes.sizeof(rp.position); + } + } +}