Finished Chunk Data Transfere if Paylouad size exeeds maximum

This commit is contained in:
Frank 2021-08-23 10:09:38 +02:00
parent 63830a27d6
commit 5c6d6a4cb0
5 changed files with 145 additions and 46 deletions

View file

@ -7,6 +7,7 @@ import net.minecraft.resources.ResourceLocation;
import ru.bclib.api.TagAPI; import ru.bclib.api.TagAPI;
import ru.bclib.api.WorldDataAPI; import ru.bclib.api.WorldDataAPI;
import ru.bclib.api.dataexchange.DataExchangeAPI; import ru.bclib.api.dataexchange.DataExchangeAPI;
import ru.bclib.api.dataexchange.handler.autosync.Chunker;
import ru.bclib.api.dataexchange.handler.autosync.HelloClient; import ru.bclib.api.dataexchange.handler.autosync.HelloClient;
import ru.bclib.api.dataexchange.handler.autosync.HelloServer; import ru.bclib.api.dataexchange.handler.autosync.HelloServer;
import ru.bclib.api.dataexchange.handler.autosync.RequestFiles; import ru.bclib.api.dataexchange.handler.autosync.RequestFiles;
@ -47,7 +48,8 @@ public class BCLib implements ModInitializer {
HelloClient.DESCRIPTOR, HelloClient.DESCRIPTOR,
HelloServer.DESCRIPTOR, HelloServer.DESCRIPTOR,
RequestFiles.DESCRIPTOR, RequestFiles.DESCRIPTOR,
SendFiles.DESCRIPTOR SendFiles.DESCRIPTOR,
Chunker.DESCRIPTOR
)); ));
DataFixerAPI.registerPatch(() -> new BCLibPatch()); DataFixerAPI.registerPatch(() -> new BCLibPatch());

View file

@ -15,8 +15,10 @@ import net.minecraft.server.MinecraftServer;
import net.minecraft.server.level.ServerPlayer; import net.minecraft.server.level.ServerPlayer;
import net.minecraft.server.network.ServerGamePacketListenerImpl; import net.minecraft.server.network.ServerGamePacketListenerImpl;
import ru.bclib.BCLib; import ru.bclib.BCLib;
import ru.bclib.api.dataexchange.handler.autosync.Chunker.FileChunkSender; import ru.bclib.api.dataexchange.handler.autosync.Chunker;
import ru.bclib.api.dataexchange.handler.autosync.Chunker.PacketChunkSender;
import java.util.Collection;
import java.util.List; import java.util.List;
public abstract class DataHandler extends BaseDataHandler { public abstract class DataHandler extends BaseDataHandler {
@ -71,14 +73,7 @@ public abstract class DataHandler extends BaseDataHandler {
FriendlyByteBuf buf = PacketByteBufs.create(); FriendlyByteBuf buf = PacketByteBufs.create();
serializeData(buf, false); serializeData(buf, false);
if (buf.readableBytes()>1024*1024) { _sendToClient(getIdentifier(), server, PlayerLookup.all(server), buf);
final FileChunkSender sender = new FileChunkSender(buf);
sender.sendChunks(PlayerLookup.all(server));
} else {
for (ServerPlayer player : PlayerLookup.all(server)) {
ServerPlayNetworking.send(player, getIdentifier(), buf);
}
}
} }
} }
@ -87,11 +82,19 @@ public abstract class DataHandler extends BaseDataHandler {
if (prepareData(false)) { if (prepareData(false)) {
FriendlyByteBuf buf = PacketByteBufs.create(); FriendlyByteBuf buf = PacketByteBufs.create();
serializeData(buf, false); serializeData(buf, false);
if (buf.readableBytes()>1024*1024) {
final FileChunkSender sender = new FileChunkSender(buf); _sendToClient(getIdentifier(), server, List.of(player), buf);
sender.sendChunks(List.of(player)); }
} else { }
ServerPlayNetworking.send(player, getIdentifier(), buf);
public static void _sendToClient(ResourceLocation identifier, MinecraftServer server, Collection<ServerPlayer> players, FriendlyByteBuf buf) {
if (buf.readableBytes()> Chunker.MAX_PACKET_SIZE) {
final PacketChunkSender sender = new PacketChunkSender(buf, identifier);
sender.sendChunks(players);
} else {
for (ServerPlayer player : players) {
ServerPlayNetworking.send(player, identifier, buf);
} }
} }
} }
@ -226,15 +229,17 @@ public abstract class DataHandler extends BaseDataHandler {
BCLib.LOGGER.error("[Internal Error] The message '" + getIdentifier() + "' must originate from the server!"); BCLib.LOGGER.error("[Internal Error] The message '" + getIdentifier() + "' must originate from the server!");
} }
public void receiveFromMemory(FriendlyByteBuf buf){
receiveFromServer(Minecraft.getInstance(), null, buf, null);
}
@Override @Override
final void sendToClient(MinecraftServer server) { final void sendToClient(MinecraftServer server) {
if (prepareDataOnServer()) { if (prepareDataOnServer()) {
FriendlyByteBuf buf = PacketByteBufs.create(); FriendlyByteBuf buf = PacketByteBufs.create();
serializeDataOnServer(buf); serializeDataOnServer(buf);
for (ServerPlayer player : PlayerLookup.all(server)) { _sendToClient(getIdentifier(), server, PlayerLookup.all(server), buf);
ServerPlayNetworking.send(player, getIdentifier(), buf);
}
} }
} }
@ -243,7 +248,8 @@ public abstract class DataHandler extends BaseDataHandler {
if (prepareDataOnServer()) { if (prepareDataOnServer()) {
FriendlyByteBuf buf = PacketByteBufs.create(); FriendlyByteBuf buf = PacketByteBufs.create();
serializeDataOnServer(buf); serializeDataOnServer(buf);
ServerPlayNetworking.send(player, getIdentifier(), buf);
_sendToClient(getIdentifier(), server, List.of(player), buf);
} }
} }

View file

@ -1,18 +1,20 @@
package ru.bclib.api.dataexchange; package ru.bclib.api.dataexchange;
import net.minecraft.resources.ResourceLocation; import net.minecraft.resources.ResourceLocation;
import org.jetbrains.annotations.NotNull;
import java.util.Objects;
import java.util.function.Supplier; import java.util.function.Supplier;
public class DataHandlerDescriptor { public class DataHandlerDescriptor {
public DataHandlerDescriptor(ResourceLocation identifier, Supplier<BaseDataHandler> instancer){ public DataHandlerDescriptor(@NotNull ResourceLocation identifier, @NotNull Supplier<BaseDataHandler> instancer){
this(identifier, instancer, instancer, false, false); this(identifier, instancer, instancer, false, false);
} }
public DataHandlerDescriptor(ResourceLocation identifier, Supplier<BaseDataHandler> instancer, boolean sendOnJoin, boolean sendBeforeEnter){ public DataHandlerDescriptor(@NotNull ResourceLocation identifier,@NotNull Supplier<BaseDataHandler> instancer, boolean sendOnJoin, boolean sendBeforeEnter){
this(identifier, instancer, instancer, sendOnJoin, sendBeforeEnter); this(identifier, instancer, instancer, sendOnJoin, sendBeforeEnter);
} }
public DataHandlerDescriptor(ResourceLocation identifier, Supplier<BaseDataHandler> receiv_instancer, Supplier<BaseDataHandler> join_instancer, boolean sendOnJoin, boolean sendBeforeEnter){ public DataHandlerDescriptor(@NotNull ResourceLocation identifier, @NotNull Supplier<BaseDataHandler> receiv_instancer, @NotNull Supplier<BaseDataHandler> join_instancer, boolean sendOnJoin, boolean sendBeforeEnter){
this.INSTANCE = receiv_instancer; this.INSTANCE = receiv_instancer;
this.JOIN_INSTANCE = join_instancer; this.JOIN_INSTANCE = join_instancer;
this.IDENTIFIER = identifier; this.IDENTIFIER = identifier;
@ -23,7 +25,25 @@ public class DataHandlerDescriptor {
public final boolean sendOnJoin; public final boolean sendOnJoin;
public final boolean sendBeforeEnter; public final boolean sendBeforeEnter;
@NotNull
public final ResourceLocation IDENTIFIER; public final ResourceLocation IDENTIFIER;
@NotNull
public final Supplier<BaseDataHandler> INSTANCE; public final Supplier<BaseDataHandler> INSTANCE;
@NotNull
public final Supplier<BaseDataHandler> JOIN_INSTANCE; public final Supplier<BaseDataHandler> JOIN_INSTANCE;
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o instanceof ResourceLocation){
return o.equals(IDENTIFIER);
}
if (!(o instanceof DataHandlerDescriptor that)) return false;
return IDENTIFIER.equals(that.IDENTIFIER);
}
@Override
public int hashCode() {
return Objects.hash(IDENTIFIER);
}
} }

View file

@ -4,6 +4,7 @@ import net.fabricmc.api.EnvType;
import net.fabricmc.api.Environment; import net.fabricmc.api.Environment;
import net.fabricmc.fabric.api.client.networking.v1.ClientPlayConnectionEvents; import net.fabricmc.fabric.api.client.networking.v1.ClientPlayConnectionEvents;
import net.fabricmc.fabric.api.networking.v1.ServerPlayConnectionEvents; import net.fabricmc.fabric.api.networking.v1.ServerPlayConnectionEvents;
import net.minecraft.resources.ResourceLocation;
import ru.bclib.api.dataexchange.BaseDataHandler; import ru.bclib.api.dataexchange.BaseDataHandler;
import ru.bclib.api.dataexchange.ConnectorClientside; import ru.bclib.api.dataexchange.ConnectorClientside;
import ru.bclib.api.dataexchange.ConnectorServerside; import ru.bclib.api.dataexchange.ConnectorServerside;
@ -43,6 +44,9 @@ abstract public class DataExchange {
public Set<DataHandlerDescriptor> getDescriptors() { return descriptors; } public Set<DataHandlerDescriptor> getDescriptors() { return descriptors; }
public static DataHandlerDescriptor getDescriptor(ResourceLocation identifier){
return getInstance().descriptors.stream().filter(d -> d.equals(identifier)).findFirst().orElse(null);
}
@Environment(EnvType.CLIENT) @Environment(EnvType.CLIENT)
protected void initClientside() { protected void initClientside() {

View file

@ -8,50 +8,59 @@ import net.minecraft.network.FriendlyByteBuf;
import net.minecraft.resources.ResourceLocation; import net.minecraft.resources.ResourceLocation;
import net.minecraft.server.level.ServerPlayer; import net.minecraft.server.level.ServerPlayer;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import ru.bclib.BCLib; import ru.bclib.BCLib;
import ru.bclib.api.dataexchange.BaseDataHandler;
import ru.bclib.api.dataexchange.DataHandler; import ru.bclib.api.dataexchange.DataHandler;
import ru.bclib.api.dataexchange.DataHandlerDescriptor; import ru.bclib.api.dataexchange.DataHandlerDescriptor;
import ru.bclib.api.dataexchange.handler.DataExchange;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.UUID; import java.util.UUID;
public class Chunker extends DataHandler.FromServer { public class Chunker extends DataHandler.FromServer {
public static class FileChunkReceiver { public static class PacketChunkReceiver {
@NotNull @NotNull
public final UUID uuid; public final UUID uuid;
public final int chunkCount; public final int chunkCount;
@NotNull @NotNull
private final FriendlyByteBuf networkedBuf; private final FriendlyByteBuf networkedBuf;
@Nullable
private final DataHandlerDescriptor descriptor;
private static List<FileChunkReceiver> active = new ArrayList<>(1); private static List<PacketChunkReceiver> active = new ArrayList<>(1);
private static FileChunkReceiver newReceiver(@NotNull UUID uuid, int chunkCount){ private static PacketChunkReceiver newReceiver(@NotNull UUID uuid, int chunkCount, ResourceLocation origin){
final FileChunkReceiver r = new FileChunkReceiver(uuid, chunkCount); DataHandlerDescriptor desc = DataExchange.getDescriptor(origin);
final PacketChunkReceiver r = new PacketChunkReceiver(uuid, chunkCount, desc);
active.add(r); active.add(r);
return r; return r;
} }
private static FileChunkReceiver getOrCreate(@NotNull UUID uuid, int chunkCount){ private static PacketChunkReceiver getOrCreate(@NotNull UUID uuid, int chunkCount, ResourceLocation origin){
return active.stream().filter(r -> r.uuid.equals(uuid)).findFirst().orElse(newReceiver(uuid, chunkCount)); return active.stream().filter(r -> r.uuid.equals(uuid)).findFirst().orElse(newReceiver(uuid, chunkCount, origin));
} }
public static FileChunkReceiver get(@NotNull UUID uuid){ public static PacketChunkReceiver get(@NotNull UUID uuid){
return active.stream().filter(r -> r.uuid.equals(uuid)).findFirst().orElse(null); return active.stream().filter(r -> r.uuid.equals(uuid)).findFirst().orElse(null);
} }
private FileChunkReceiver(@NotNull UUID uuid, int chunkCount){ private PacketChunkReceiver(@NotNull UUID uuid, int chunkCount, @Nullable DataHandlerDescriptor descriptor){
this.uuid = uuid; this.uuid = uuid;
this.chunkCount = chunkCount; this.chunkCount = chunkCount;
networkedBuf = PacketByteBufs.create(); networkedBuf = PacketByteBufs.create();
this.descriptor = descriptor;
} }
@Override @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) return true; if (this == o) return true;
if (!(o instanceof FileChunkReceiver)) return false; if (!(o instanceof PacketChunkReceiver)) return false;
FileChunkReceiver that = (FileChunkReceiver) o; PacketChunkReceiver that = (PacketChunkReceiver) o;
return uuid.equals(that.uuid); return uuid.equals(that.uuid);
} }
@ -61,32 +70,86 @@ public class Chunker extends DataHandler.FromServer {
} }
public boolean testFinished(){ public boolean testFinished(){
if (incomingBuffer == null){
return true;
} if (lastReadSerial>=chunkCount-1){
onFinish();
return true;
}
return false; return false;
} }
public void processReceived(FriendlyByteBuf buf, int serialNo, int size){ protected void addBuffer(FriendlyByteBuf input){
final int size = input.readableBytes();
final int cap = networkedBuf.capacity()-networkedBuf.writerIndex();
if (cap < size){
networkedBuf.capacity(networkedBuf.writerIndex() + size);
}
input.readBytes(networkedBuf, size);
input.clear();
}
protected void onFinish(){
incomingBuffer.clear();
incomingBuffer = null;
final BaseDataHandler baseHandler = descriptor.INSTANCE.get();
if (baseHandler instanceof DataHandler.FromServer handler){
handler.receiveFromMemory(networkedBuf);
}
}
Map<Integer, FriendlyByteBuf> incomingBuffer = new HashMap<>();
int lastReadSerial = -1;
public void processReceived(FriendlyByteBuf buf, int serialNo, int size){
if (lastReadSerial == serialNo-1){
addBuffer(buf);
lastReadSerial = serialNo;
} else {
//not sure if order is guaranteed by the underlying system!
boolean haveAll = true;
for (int nr = lastReadSerial+1; nr < serialNo-1; nr++){
if (incomingBuffer.get(nr) == null){
haveAll = false;
break;
}
}
if (haveAll){
for (int nr = lastReadSerial+1; nr < serialNo-1; nr++){
addBuffer(incomingBuffer.get(nr));
incomingBuffer.put(nr, null);
}
addBuffer(buf);
lastReadSerial = serialNo;
} else {
incomingBuffer.put(serialNo, buf);
}
}
} }
} }
public static class FileChunkSender { public static class PacketChunkSender {
private final FriendlyByteBuf networkedBuf; private final FriendlyByteBuf networkedBuf;
public final UUID uuid; public final UUID uuid;
public final int chunkCount; public final int chunkCount;
public final int size; public final int size;
public final ResourceLocation origin;
public FileChunkSender(FriendlyByteBuf buf){ public PacketChunkSender(FriendlyByteBuf buf, ResourceLocation origin){
networkedBuf = buf; networkedBuf = buf;
size = buf.readableBytes(); size = buf.readableBytes();
chunkCount = (int)Math.ceil((double)size / MAX_PAYLOAD_SIZE); chunkCount = (int)Math.ceil((double)size / MAX_PAYLOAD_SIZE);
uuid = UUID.randomUUID(); uuid = UUID.randomUUID();
this.origin = origin;
} }
public void sendChunks(Collection<ServerPlayer> players){ public void sendChunks(Collection<ServerPlayer> players){
BCLib.LOGGER.info("Sending Request in " + chunkCount + " File-Chunks"); BCLib.LOGGER.info("Sending Request in " + chunkCount + " Packet-Chunks");
for (int i=0; i<chunkCount; i++){ for (int i=-1; i<chunkCount; i++){
Chunker c = new Chunker(i, uuid, networkedBuf, chunkCount); Chunker c = new Chunker(i, uuid, networkedBuf, chunkCount, origin);
FriendlyByteBuf buf = PacketByteBufs.create(); FriendlyByteBuf buf = PacketByteBufs.create();
c.serializeDataOnServer(buf); c.serializeDataOnServer(buf);
@ -107,13 +170,15 @@ public class Chunker extends DataHandler.FromServer {
private UUID uuid; private UUID uuid;
private int chunkCount; private int chunkCount;
private FriendlyByteBuf networkedBuf; private FriendlyByteBuf networkedBuf;
private ResourceLocation origin;
protected Chunker(int serialNo, UUID uuid, FriendlyByteBuf networkedBuf, int chunkCount) { protected Chunker(int serialNo, UUID uuid, FriendlyByteBuf networkedBuf, int chunkCount, ResourceLocation origin) {
super(DESCRIPTOR.IDENTIFIER); super(DESCRIPTOR.IDENTIFIER);
this.serialNo = serialNo; this.serialNo = serialNo;
this.uuid = uuid; this.uuid = uuid;
this.networkedBuf = networkedBuf; this.networkedBuf = networkedBuf;
this.chunkCount = chunkCount; this.chunkCount = chunkCount;
this.origin = origin;
} }
protected Chunker(){ protected Chunker(){
@ -131,16 +196,17 @@ public class Chunker extends DataHandler.FromServer {
if (serialNo == -1){ if (serialNo == -1){
//this is our header //this is our header
buf.writeInt(chunkCount); buf.writeInt(chunkCount);
writeString(buf, SendFiles.DESCRIPTOR.IDENTIFIER.getNamespace()); writeString(buf, origin.getNamespace());
writeString(buf, SendFiles.DESCRIPTOR.IDENTIFIER.getPath()); writeString(buf, origin.getPath());
} else { } else {
buf.capacity(MAX_PACKET_SIZE);
final int size = Math.min(MAX_PAYLOAD_SIZE, networkedBuf.readableBytes()); final int size = Math.min(MAX_PAYLOAD_SIZE, networkedBuf.readableBytes());
buf.writeInt(size); buf.writeInt(size);
networkedBuf.readBytes(buf, size); networkedBuf.readBytes(buf, size);
} }
} }
private FileChunkReceiver receiver; private PacketChunkReceiver receiver;
@Override @Override
protected void deserializeIncomingDataOnClient(FriendlyByteBuf buf, PacketSender responseSender) { protected void deserializeIncomingDataOnClient(FriendlyByteBuf buf, PacketSender responseSender) {
@ -152,16 +218,17 @@ public class Chunker extends DataHandler.FromServer {
chunkCount = buf.readInt(); chunkCount = buf.readInt();
final String namespace = readString(buf); final String namespace = readString(buf);
final String path = readString(buf); final String path = readString(buf);
BCLib.LOGGER.info("Receiving " + chunkCount + " + File-Chunks for " + namespace +"."+path); ResourceLocation ident = new ResourceLocation(namespace, path);
BCLib.LOGGER.info("Receiving " + chunkCount + " + Packet-Chunks for " + ident);
receiver = FileChunkReceiver.getOrCreate(uuid, chunkCount); receiver = PacketChunkReceiver.getOrCreate(uuid, chunkCount, ident);
} else { } else {
receiver = FileChunkReceiver.get(uuid); receiver = PacketChunkReceiver.get(uuid);
if (receiver!=null) { if (receiver!=null) {
final int size = buf.readInt(); final int size = buf.readInt();
receiver.processReceived(buf, serialNo, size); receiver.processReceived(buf, serialNo, size);
} else { } else {
BCLib.LOGGER.error("Unknown File-Chunk Transfer for " + uuid); BCLib.LOGGER.error("Unknown Packet-Chunk Transfer for " + uuid);
} }
} }
} }