Skip to content

Commit

Permalink
Add javadocs
Browse files Browse the repository at this point in the history
  • Loading branch information
anmolanmol1234 committed Feb 21, 2025
1 parent f932c8d commit 6039e0e
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.store.DataBlocks;

import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.COMMA;

/**
* Manages Azure Blob blocks for append operations.
*/
Expand Down Expand Up @@ -61,7 +63,7 @@ public AzureBlobBlockManager(AbfsOutputStream abfsOutputStream,
if (abfsOutputStream.getPosition() > 0 && !abfsOutputStream.isAppendBlob()) {
List<String> committedBlocks = getBlockList(abfsOutputStream.getTracingContext());
if (!committedBlocks.isEmpty()) {
committedBlockEntries.append(String.join(",", committedBlocks)).append(",");
committedBlockEntries.append(String.join(COMMA, committedBlocks));
}
}
LOG.debug("Created a new Blob Block Manager for AbfsOutputStream instance {} for path {}",
Expand Down Expand Up @@ -182,7 +184,7 @@ protected synchronized boolean hasBlocksToCommit() throws IOException {
}
// Append the current block's ID to the committedBlockBuilder
if (committedBlockEntries.length() > 0) {
committedBlockEntries.append(",");
committedBlockEntries.append(COMMA);
}
committedBlockEntries.append(current.getBlockId());
LOG.debug("Block {} added to committed entries.", current.getBlockId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ protected InvalidIngressServiceException getIngressHandlerSwitchException(
*
* @return the block manager
*/
public abstract AzureBlockManager getBlockManager();
protected abstract AzureBlockManager getBlockManager();

/**
* Gets the client associated with this handler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ void createRenamePendingJson(Path path, byte[] bytes)
abfsClient.append(path.toUri().getPath(), bytes,
appendRequestParameters, null, null, tracingContext);

//List<String> blockIdList = new ArrayList<>(Collections.singleton(blockId));
String blockList = generateBlockListXml(blockId);
String blockList = generateBlockListXml(blockId);
// PutBlockList on the path.
abfsClient.flush(blockList.getBytes(StandardCharsets.UTF_8),
path.toUri().getPath(), true, null, null, eTag, null, tracingContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@
import static org.apache.hadoop.fs.store.DataBlocks.DataBlock.DestState.Writing;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

/**
* Test append operations.
Expand Down Expand Up @@ -169,16 +166,16 @@ public void testCloseOfDataBlockOnAppendComplete() throws Exception {
for (String blockBufferType : blockBufferTypes) {
Configuration configuration = new Configuration(getRawConfiguration());
configuration.set(DATA_BLOCKS_BUFFER, blockBufferType);
try (AzureBlobFileSystem fs = spy(
try (AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(configuration))) {
AzureBlobFileSystemStore store = spy(fs.getAbfsStore());
doReturn(store).when(fs).getAbfsStore();
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
Mockito.doReturn(store).when(fs).getAbfsStore();
DataBlocks.DataBlock[] dataBlock = new DataBlocks.DataBlock[1];
Mockito.doAnswer(getBlobFactoryInvocation -> {
DataBlocks.BlockFactory factory = spy(
DataBlocks.BlockFactory factory = Mockito.spy(
(DataBlocks.BlockFactory) getBlobFactoryInvocation.callRealMethod());
Mockito.doAnswer(factoryCreateInvocation -> {
dataBlock[0] = spy(
dataBlock[0] = Mockito.spy(
(DataBlocks.DataBlock) factoryCreateInvocation.callRealMethod());
return dataBlock[0];
})
Expand Down Expand Up @@ -247,8 +244,11 @@ public void testCreateOverDfsAppendOverBlob() throws IOException {
.isInstanceOf(AbfsDfsClient.class);
}

/**
* This test verifies that if multiple appends qualify for switch, no appends should fail.
*/
@Test
public void testMultipleAppendSwitches() throws Exception {
public void testMultipleAppendsQualifyForSwitch() throws Exception {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
final AzureBlobFileSystem fs = getFileSystem();
Path testPath = path(TEST_FILE_PATH);
Expand Down Expand Up @@ -309,8 +309,11 @@ public void testMultipleAppendSwitches() throws Exception {
.isInstanceOf(AbfsDfsClient.class);
}

/**
* This test verifies that parallel writes on dfs and blob endpoint should not fail.
*/
@Test
public void testParallelDfsBlob() throws Exception {
public void testParallelWritesOnDfsAndBlob() throws Exception {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
final AzureBlobFileSystem fs = getFileSystem();
Path testPath = path(TEST_FILE_PATH);
Expand Down Expand Up @@ -413,10 +416,10 @@ public void testCreateAppendBlobOverBlobEndpointAppendOverDfs()
conf.setBoolean(FS_AZURE_ENABLE_DFSTOBLOB_FALLBACK, true);
conf.set(FS_AZURE_INGRESS_SERVICE_TYPE,
String.valueOf(AbfsServiceType.DFS));
try (AzureBlobFileSystem fs = spy(
try (AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(conf))) {
AzureBlobFileSystemStore store = spy(fs.getAbfsStore());
doReturn(true).when(store).isAppendBlobKey(anyString());
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
Mockito.doReturn(true).when(store).isAppendBlobKey(anyString());

// Set abfsStore as our mocked value.
Field privateField = AzureBlobFileSystem.class.getDeclaredField(
Expand Down Expand Up @@ -458,9 +461,9 @@ public void testCreateAppendBlobOverBlobEndpointAppendOverDfs()
public void testCreateAppendBlobOverDfsEndpointAppendOverBlob()
throws IOException, NoSuchFieldException, IllegalAccessException {
assumeHnsEnabled("FNS does not support append blob creation for DFS endpoint");
final AzureBlobFileSystem fs = spy(getFileSystem());
AzureBlobFileSystemStore store = spy(fs.getAbfsStore());
doReturn(true).when(store).isAppendBlobKey(anyString());
final AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
Mockito.doReturn(true).when(store).isAppendBlobKey(anyString());

// Set abfsStore as our mocked value.
Field privateField = AzureBlobFileSystem.class.getDeclaredField(
Expand Down Expand Up @@ -500,7 +503,6 @@ public void testCreateAppendBlobOverDfsEndpointAppendOverBlob()
}



/**
* Tests the correct retrieval of the AzureIngressHandler based on the configured ingress service type.
*
Expand Down Expand Up @@ -610,22 +612,6 @@ public void testRecreateAppendAndFlush() throws IOException {
}
}

@Test
public void testFlush() throws IOException {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
final AzureBlobFileSystem fs = getFileSystem();
final Path filePath = path(TEST_FILE_PATH);
fs.create(filePath);
Assume.assumeTrue(getIngressServiceType() == AbfsServiceType.BLOB);
FSDataOutputStream outputStream = fs.append(filePath);
outputStream.write(TEN);
outputStream.write(20);
outputStream.hsync();
outputStream.write(30);
outputStream.write(40);
outputStream.close();
}

/**
* Recreate directory between append and flush. Etag mismatch happens.
**/
Expand Down Expand Up @@ -834,7 +820,7 @@ public void testEtagMismatch() throws Exception {
public void testAppendWithLease() throws Exception {
final Path testFilePath = new Path(path(methodName.getMethodName()),
TEST_FILE_PATH);
final AzureBlobFileSystem fs = spy(
final AzureBlobFileSystem fs = Mockito.spy(
getCustomFileSystem(testFilePath.getParent(), 1));
FsPermission permission = new FsPermission(FsAction.ALL, FsAction.ALL,
FsAction.ALL);
Expand Down Expand Up @@ -876,18 +862,18 @@ public void testAppendImplicitDirectoryAzcopy() throws Exception {
@Test
public void testIntermittentAppendFailureToBeReported() throws Exception {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
try (AzureBlobFileSystem fs = spy(
try (AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
assumeHnsDisabled();
AzureBlobFileSystemStore store = spy(fs.getAbfsStore());
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
assumeBlobServiceType();

AbfsClientHandler clientHandler = spy(store.getClientHandler());
AbfsBlobClient blobClient = spy(clientHandler.getBlobClient());
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());

doReturn(clientHandler).when(store).getClientHandler();
doReturn(blobClient).when(clientHandler).getBlobClient();
doReturn(blobClient).when(clientHandler).getIngressClient();
Mockito.doReturn(clientHandler).when(store).getClientHandler();
Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();

Mockito.doThrow(
new AbfsRestOperationException(HTTP_UNAVAILABLE, "", "", new Exception()))
Expand Down Expand Up @@ -956,14 +942,14 @@ public void testIntermittentAppendFailureToBeReported() throws Exception {
private FSDataOutputStream createMockedOutputStream(AzureBlobFileSystem fs,
Path path,
AbfsClient client) throws IOException {
AbfsOutputStream abfsOutputStream = spy(
AbfsOutputStream abfsOutputStream = Mockito.spy(
(AbfsOutputStream) fs.create(path).getWrappedStream());
AzureIngressHandler ingressHandler = spy(
AzureIngressHandler ingressHandler = Mockito.spy(
abfsOutputStream.getIngressHandler());
doReturn(ingressHandler).when(abfsOutputStream).getIngressHandler();
doReturn(client).when(ingressHandler).getClient();
Mockito.doReturn(ingressHandler).when(abfsOutputStream).getIngressHandler();
Mockito.doReturn(client).when(ingressHandler).getClient();

FSDataOutputStream fsDataOutputStream = spy(
FSDataOutputStream fsDataOutputStream = Mockito.spy(
new FSDataOutputStream(abfsOutputStream, null));
return fsDataOutputStream;
}
Expand All @@ -976,22 +962,22 @@ private FSDataOutputStream createMockedOutputStream(AzureBlobFileSystem fs,
@Test
public void testWriteAsyncOpFailedAfterCloseCalled() throws Exception {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
try (AzureBlobFileSystem fs = spy(
try (AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
AzureBlobFileSystemStore store = spy(fs.getAbfsStore());
AbfsClientHandler clientHandler = spy(store.getClientHandler());
AbfsBlobClient blobClient = spy(clientHandler.getBlobClient());
AbfsDfsClient dfsClient = spy(clientHandler.getDfsClient());
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());
AbfsDfsClient dfsClient = Mockito.spy(clientHandler.getDfsClient());

AbfsClient client = clientHandler.getIngressClient();
if (clientHandler.getIngressClient() instanceof AbfsBlobClient) {
doReturn(blobClient).when(clientHandler).getBlobClient();
doReturn(blobClient).when(clientHandler).getIngressClient();
Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
} else {
doReturn(dfsClient).when(clientHandler).getDfsClient();
doReturn(dfsClient).when(clientHandler).getIngressClient();
Mockito.doReturn(dfsClient).when(clientHandler).getDfsClient();
Mockito.doReturn(dfsClient).when(clientHandler).getIngressClient();
}
doReturn(clientHandler).when(store).getClientHandler();
Mockito.doReturn(clientHandler).when(store).getClientHandler();

byte[] bytes = new byte[1024 * 1024 * 8];
new Random().nextBytes(bytes);
Expand Down Expand Up @@ -1081,21 +1067,21 @@ private String generateBlockId(AbfsOutputStream os, long position) {
public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Exception {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
// Create a spy of AzureBlobFileSystem
try (AzureBlobFileSystem fs = spy(
try (AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
assumeHnsDisabled();
// Create a spy of AzureBlobFileSystemStore
AzureBlobFileSystemStore store = spy(fs.getAbfsStore());
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
assumeBlobServiceType();

// Create spies for the client handler and blob client
AbfsClientHandler clientHandler = spy(store.getClientHandler());
AbfsBlobClient blobClient = spy(clientHandler.getBlobClient());
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());

// Set up the spies to return the mocked objects
doReturn(clientHandler).when(store).getClientHandler();
doReturn(blobClient).when(clientHandler).getBlobClient();
doReturn(blobClient).when(clientHandler).getIngressClient();
Mockito.doReturn(clientHandler).when(store).getClientHandler();
Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
AtomicInteger flushCount = new AtomicInteger(0);
FSDataOutputStream os = createMockedOutputStream(fs,
new Path("/test/file"), blobClient);
Expand All @@ -1119,10 +1105,10 @@ public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Excep

int currentCount = flushCount.incrementAndGet();
if (currentCount == 1) {
when(httpOperation.getStatusCode())
Mockito.when(httpOperation.getStatusCode())
.thenReturn(
HTTP_INTERNAL_ERROR); // Status code 500 for Internal Server Error
when(httpOperation.getStorageErrorMessage())
Mockito.when(httpOperation.getStorageErrorMessage())
.thenReturn("CONNECTION_RESET"); // Error message
throw new IOException("Connection Reset");
}
Expand Down Expand Up @@ -1177,22 +1163,22 @@ public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Excep
public void testFlushSuccessWithConnectionResetOnResponseInvalidMd5() throws Exception {
Assume.assumeFalse("Not valid for APPEND BLOB", isAppendBlobEnabled());
// Create a spy of AzureBlobFileSystem
try (AzureBlobFileSystem fs = spy(
try (AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(getRawConfiguration()))) {
assumeHnsDisabled();

// Create a spy of AzureBlobFileSystemStore
AzureBlobFileSystemStore store = spy(fs.getAbfsStore());
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
assumeBlobServiceType();

// Create spies for the client handler and blob client
AbfsClientHandler clientHandler = spy(store.getClientHandler());
AbfsBlobClient blobClient = spy(clientHandler.getBlobClient());
AbfsClientHandler clientHandler = Mockito.spy(store.getClientHandler());
AbfsBlobClient blobClient = Mockito.spy(clientHandler.getBlobClient());

// Set up the spies to return the mocked objects
doReturn(clientHandler).when(store).getClientHandler();
doReturn(blobClient).when(clientHandler).getBlobClient();
doReturn(blobClient).when(clientHandler).getIngressClient();
Mockito.doReturn(clientHandler).when(store).getClientHandler();
Mockito.doReturn(blobClient).when(clientHandler).getBlobClient();
Mockito.doReturn(blobClient).when(clientHandler).getIngressClient();
AtomicInteger flushCount = new AtomicInteger(0);
FSDataOutputStream os = createMockedOutputStream(fs,
new Path("/test/file"), blobClient);
Expand All @@ -1216,16 +1202,16 @@ public void testFlushSuccessWithConnectionResetOnResponseInvalidMd5() throws Exc

int currentCount = flushCount.incrementAndGet();
if (currentCount == 1) {
when(httpOperation.getStatusCode())
Mockito.when(httpOperation.getStatusCode())
.thenReturn(
HTTP_INTERNAL_ERROR); // Status code 500 for Internal Server Error
when(httpOperation.getStorageErrorMessage())
Mockito.when(httpOperation.getStorageErrorMessage())
.thenReturn("CONNECTION_RESET"); // Error message
throw new IOException("Connection Reset");
} else if (currentCount == 2) {
when(httpOperation.getStatusCode())
Mockito.when(httpOperation.getStatusCode())
.thenReturn(HTTP_OK);
when(httpOperation.getStorageErrorMessage())
Mockito.when(httpOperation.getStorageErrorMessage())
.thenReturn("HTTP_OK");
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ public void testGetAclCallOnHnsConfigAbsence() throws Exception {
AzureBlobFileSystem fs = ((AzureBlobFileSystem) FileSystem.newInstance(
getRawConfiguration()));
AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
AbfsClient client = Mockito.spy(fs.getAbfsClient());
Mockito.doReturn(client).when(store).getClient();
AbfsClient client = Mockito.spy(fs.getAbfsStore().getClient(AbfsServiceType.DFS));
Mockito.doReturn(client).when(store).getClient(AbfsServiceType.DFS);

Mockito.doThrow(TrileanConversionException.class)
.when(store)
Expand Down

0 comments on commit 6039e0e

Please sign in to comment.